From 863dcc0f315c970b32a2f13b5ac88038b3d3ac9b Mon Sep 17 00:00:00 2001 From: Marcos Amorim Date: Sat, 9 May 2026 11:57:40 -0400 Subject: [PATCH 1/3] Retry 422 errors on ResourceClaim status patch When multiple AnarchySubjects in a ResourceClaim change state concurrently, the JSON Patch to update RC status can fail with 422 because concurrent callers hold independent RC instances with stale state. Previously, 422 was silently dropped with only a warning log, leaving the RC status out of sync until the next periodic reconciliation. - Remove 422 suppression in update_status_from_handle (resourceclaim.py) - Add retry-with-refetch at both call sites in resourcehandle.py (update_status and manage_resource) - Follow existing codebase pattern from resourcepoolscaling.py: while True, attempt > 10 bail-out, refetch() without sleep - Add unit tests for retry behavior (success, transient 422, exhausted retries, 404, unexpected exceptions) Fixes: 20 occurrences of "Failed to apply patch" observed in 42h on babylon-west production cluster across handler and watch pods. --- operator/resourceclaim.py | 2 - operator/resourcehandle.py | 46 +++++-- test/unittest-resourceclaim_status_retry.py | 126 ++++++++++++++++++++ 3 files changed, 162 insertions(+), 12 deletions(-) create mode 100644 test/unittest-resourceclaim_status_retry.py diff --git a/operator/resourceclaim.py b/operator/resourceclaim.py index 655d502..781310e 100644 --- a/operator/resourceclaim.py +++ b/operator/resourceclaim.py @@ -660,8 +660,6 @@ async def update_status_from_handle(self, logger.info( f"Attempt to update status from {resource_handle} on deleted {self}" ) - elif exception.status == 422: - logger.warning(f"Failed to apply patch {patch} to {self}") else: raise diff --git a/operator/resourcehandle.py b/operator/resourcehandle.py index ad14abd..b35c6bf 100644 --- a/operator/resourcehandle.py +++ b/operator/resourcehandle.py @@ -1503,11 +1503,24 @@ async def manage(self, logger: kopf.ObjectLogger) -> None: raise if resource_claim: - await resource_claim.update_status_from_handle( - logger=logger, - resource_handle=self, - resource_states=resource_states, - ) + attempt = 0 + while True: + try: + await resource_claim.update_status_from_handle( + logger=logger, + resource_handle=self, + resource_states=resource_states, + ) + break + except K8sApiException as exception: + if exception.status == 404: + logger.info("Ignoring update on deleted %s", resource_claim) + break + elif exception.status == 422 and attempt <= 10: + await resource_claim.refetch() + attempt += 1 + else: + raise async def update_status(self, logger: kopf.ObjectLogger, @@ -1680,8 +1693,21 @@ async def update_status(self, await asyncio.sleep(0.2) if resource_claim: - await resource_claim.update_status_from_handle( - logger=logger, - resource_handle=self, - resource_states=resource_states, - ) + attempt = 0 + while True: + try: + await resource_claim.update_status_from_handle( + logger=logger, + resource_handle=self, + resource_states=resource_states, + ) + break + except K8sApiException as exception: + if exception.status == 404: + logger.info("Ignoring update on deleted %s", resource_claim) + break + elif exception.status == 422 and attempt <= 10: + await resource_claim.refetch() + attempt += 1 + else: + raise diff --git a/test/unittest-resourceclaim_status_retry.py b/test/unittest-resourceclaim_status_retry.py new file mode 100644 index 0000000..859f4a1 --- /dev/null +++ b/test/unittest-resourceclaim_status_retry.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python + +import asyncio +import logging +import sys +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +sys.path.append('../operator') + +import kubernetes_asyncio.client.exceptions + + +class FakeApiException(kubernetes_asyncio.client.exceptions.ApiException): + def __init__(self, status): + self.status = status + + +class FakeResourceClaim: + """Minimal ResourceClaim stub for testing retry behavior.""" + + def __init__(self, update_side_effects): + self.update_status_from_handle = AsyncMock(side_effect=update_side_effects) + self.refetch = AsyncMock() + self.refetch_count = 0 + + original_refetch = self.refetch + async def counting_refetch(): + self.refetch_count += 1 + await original_refetch() + self.refetch = counting_refetch + + def __str__(self): + return "ResourceClaim test-claim" + + +K8sApiException = kubernetes_asyncio.client.exceptions.ApiException + + +async def retry_update_status_from_handle(logger, resource_claim, resource_handle, resource_states): + """Extracted retry logic matching the pattern in resourcehandle.py.""" + attempt = 0 + while True: + try: + await resource_claim.update_status_from_handle( + logger=logger, + resource_handle=resource_handle, + resource_states=resource_states, + ) + break + except K8sApiException as exception: + if exception.status == 404: + logger.info("Ignoring update on deleted %s", resource_claim) + break + elif exception.status == 422 and attempt <= 10: + await resource_claim.refetch() + attempt += 1 + else: + raise + + +class TestResourceClaimStatusRetry(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + self.logger = logging.getLogger('test') + self.resource_handle = MagicMock() + self.resource_states = [{'kind': 'AnarchySubject', 'metadata': {'name': 'test-as'}}] + + async def test_success_no_retry(self): + """Successful update should not trigger any retry.""" + rc = FakeResourceClaim(update_side_effects=[None]) + await retry_update_status_from_handle( + self.logger, rc, self.resource_handle, self.resource_states, + ) + rc.update_status_from_handle.assert_called_once() + self.assertEqual(rc.refetch_count, 0) + + async def test_422_retries_and_succeeds(self): + """422 on first call should refetch and retry, then succeed.""" + rc = FakeResourceClaim(update_side_effects=[ + FakeApiException(status=422), + FakeApiException(status=422), + None, + ]) + await retry_update_status_from_handle( + self.logger, rc, self.resource_handle, self.resource_states, + ) + self.assertEqual(rc.update_status_from_handle.call_count, 3) + self.assertEqual(rc.refetch_count, 2) + + async def test_422_exhausts_retries(self): + """More than 10 consecutive 422s should raise the exception.""" + rc = FakeResourceClaim( + update_side_effects=[FakeApiException(status=422)] * 12 + ) + with self.assertRaises(K8sApiException) as ctx: + await retry_update_status_from_handle( + self.logger, rc, self.resource_handle, self.resource_states, + ) + self.assertEqual(ctx.exception.status, 422) + self.assertEqual(rc.update_status_from_handle.call_count, 12) + self.assertEqual(rc.refetch_count, 11) + + async def test_404_breaks_without_retry(self): + """404 should break immediately without retry.""" + rc = FakeResourceClaim(update_side_effects=[FakeApiException(status=404)]) + await retry_update_status_from_handle( + self.logger, rc, self.resource_handle, self.resource_states, + ) + rc.update_status_from_handle.assert_called_once() + self.assertEqual(rc.refetch_count, 0) + + async def test_other_exception_raises_immediately(self): + """Non-422/404 exceptions should raise immediately.""" + rc = FakeResourceClaim(update_side_effects=[FakeApiException(status=500)]) + with self.assertRaises(K8sApiException) as ctx: + await retry_update_status_from_handle( + self.logger, rc, self.resource_handle, self.resource_states, + ) + self.assertEqual(ctx.exception.status, 500) + rc.update_status_from_handle.assert_called_once() + self.assertEqual(rc.refetch_count, 0) + + +if __name__ == '__main__': + unittest.main() From ec6575d7f5c8095cd0eabe8fdd2910562c76e55a Mon Sep 17 00:00:00 2001 From: Marcos Amorim Date: Sat, 9 May 2026 12:06:01 -0400 Subject: [PATCH 2/3] Remove unused imports from retry test Remove unused asyncio and patch imports flagged by linter. --- test/unittest-resourceclaim_status_retry.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/unittest-resourceclaim_status_retry.py b/test/unittest-resourceclaim_status_retry.py index 859f4a1..4c92c9f 100644 --- a/test/unittest-resourceclaim_status_retry.py +++ b/test/unittest-resourceclaim_status_retry.py @@ -1,10 +1,9 @@ #!/usr/bin/env python -import asyncio import logging import sys import unittest -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock sys.path.append('../operator') From 18d63cf5b7be737a490ad8d3ba94e37918b9ea8b Mon Sep 17 00:00:00 2001 From: Marcos Amorim Date: Mon, 18 May 2026 11:28:25 -0400 Subject: [PATCH 3/3] Move retry logic into update_status_from_handle wrapper --- operator/resourceclaim.py | 21 +++++ operator/resourcehandle.py | 46 +++-------- test/unittest-resourceclaim_status_retry.py | 88 ++++++++++----------- 3 files changed, 74 insertions(+), 81 deletions(-) diff --git a/operator/resourceclaim.py b/operator/resourceclaim.py index 781310e..b1f513f 100644 --- a/operator/resourceclaim.py +++ b/operator/resourceclaim.py @@ -503,6 +503,27 @@ async def update_status_from_handle(self, logger: kopf.ObjectLogger, resource_handle: ResourceHandleT, resource_states: List[Mapping]|None=None, + ) -> None: + attempt = 0 + while True: + try: + await self.__update_status_from_handle( + logger=logger, + resource_handle=resource_handle, + resource_states=resource_states, + ) + break + except kubernetes_asyncio.client.exceptions.ApiException as exception: + if exception.status == 422 and attempt <= 10: + await self.refetch() + attempt += 1 + else: + raise + + async def __update_status_from_handle(self, + logger: kopf.ObjectLogger, + resource_handle: ResourceHandleT, + resource_states: List[Mapping]|None=None, ) -> None: async with self.lock: logger.debug(f"Updating {self} from {resource_handle}") diff --git a/operator/resourcehandle.py b/operator/resourcehandle.py index b35c6bf..ad14abd 100644 --- a/operator/resourcehandle.py +++ b/operator/resourcehandle.py @@ -1503,24 +1503,11 @@ async def manage(self, logger: kopf.ObjectLogger) -> None: raise if resource_claim: - attempt = 0 - while True: - try: - await resource_claim.update_status_from_handle( - logger=logger, - resource_handle=self, - resource_states=resource_states, - ) - break - except K8sApiException as exception: - if exception.status == 404: - logger.info("Ignoring update on deleted %s", resource_claim) - break - elif exception.status == 422 and attempt <= 10: - await resource_claim.refetch() - attempt += 1 - else: - raise + await resource_claim.update_status_from_handle( + logger=logger, + resource_handle=self, + resource_states=resource_states, + ) async def update_status(self, logger: kopf.ObjectLogger, @@ -1693,21 +1680,8 @@ async def update_status(self, await asyncio.sleep(0.2) if resource_claim: - attempt = 0 - while True: - try: - await resource_claim.update_status_from_handle( - logger=logger, - resource_handle=self, - resource_states=resource_states, - ) - break - except K8sApiException as exception: - if exception.status == 404: - logger.info("Ignoring update on deleted %s", resource_claim) - break - elif exception.status == 422 and attempt <= 10: - await resource_claim.refetch() - attempt += 1 - else: - raise + await resource_claim.update_status_from_handle( + logger=logger, + resource_handle=self, + resource_states=resource_states, + ) diff --git a/test/unittest-resourceclaim_status_retry.py b/test/unittest-resourceclaim_status_retry.py index 4c92c9f..6b7bf96 100644 --- a/test/unittest-resourceclaim_status_retry.py +++ b/test/unittest-resourceclaim_status_retry.py @@ -15,11 +15,14 @@ def __init__(self, status): self.status = status +K8sApiException = kubernetes_asyncio.client.exceptions.ApiException + + class FakeResourceClaim: - """Minimal ResourceClaim stub for testing retry behavior.""" + """Minimal ResourceClaim stub matching the new wrapper pattern in resourceclaim.py.""" def __init__(self, update_side_effects): - self.update_status_from_handle = AsyncMock(side_effect=update_side_effects) + self._update_impl = AsyncMock(side_effect=update_side_effects) self.refetch = AsyncMock() self.refetch_count = 0 @@ -29,35 +32,28 @@ async def counting_refetch(): await original_refetch() self.refetch = counting_refetch + async def update_status_from_handle(self, logger, resource_handle, resource_states): + """Public wrapper with retry — mirrors resourceclaim.py.""" + attempt = 0 + while True: + try: + await self._update_impl( + logger=logger, + resource_handle=resource_handle, + resource_states=resource_states, + ) + break + except K8sApiException as exception: + if exception.status == 422 and attempt <= 10: + await self.refetch() + attempt += 1 + else: + raise + def __str__(self): return "ResourceClaim test-claim" -K8sApiException = kubernetes_asyncio.client.exceptions.ApiException - - -async def retry_update_status_from_handle(logger, resource_claim, resource_handle, resource_states): - """Extracted retry logic matching the pattern in resourcehandle.py.""" - attempt = 0 - while True: - try: - await resource_claim.update_status_from_handle( - logger=logger, - resource_handle=resource_handle, - resource_states=resource_states, - ) - break - except K8sApiException as exception: - if exception.status == 404: - logger.info("Ignoring update on deleted %s", resource_claim) - break - elif exception.status == 422 and attempt <= 10: - await resource_claim.refetch() - attempt += 1 - else: - raise - - class TestResourceClaimStatusRetry(unittest.IsolatedAsyncioTestCase): def setUp(self): @@ -68,10 +64,10 @@ def setUp(self): async def test_success_no_retry(self): """Successful update should not trigger any retry.""" rc = FakeResourceClaim(update_side_effects=[None]) - await retry_update_status_from_handle( - self.logger, rc, self.resource_handle, self.resource_states, + await rc.update_status_from_handle( + self.logger, self.resource_handle, self.resource_states, ) - rc.update_status_from_handle.assert_called_once() + rc._update_impl.assert_called_once() self.assertEqual(rc.refetch_count, 0) async def test_422_retries_and_succeeds(self): @@ -81,10 +77,10 @@ async def test_422_retries_and_succeeds(self): FakeApiException(status=422), None, ]) - await retry_update_status_from_handle( - self.logger, rc, self.resource_handle, self.resource_states, + await rc.update_status_from_handle( + self.logger, self.resource_handle, self.resource_states, ) - self.assertEqual(rc.update_status_from_handle.call_count, 3) + self.assertEqual(rc._update_impl.call_count, 3) self.assertEqual(rc.refetch_count, 2) async def test_422_exhausts_retries(self): @@ -93,31 +89,33 @@ async def test_422_exhausts_retries(self): update_side_effects=[FakeApiException(status=422)] * 12 ) with self.assertRaises(K8sApiException) as ctx: - await retry_update_status_from_handle( - self.logger, rc, self.resource_handle, self.resource_states, + await rc.update_status_from_handle( + self.logger, self.resource_handle, self.resource_states, ) self.assertEqual(ctx.exception.status, 422) - self.assertEqual(rc.update_status_from_handle.call_count, 12) + self.assertEqual(rc._update_impl.call_count, 12) self.assertEqual(rc.refetch_count, 11) - async def test_404_breaks_without_retry(self): - """404 should break immediately without retry.""" + async def test_404_raises_without_retry(self): + """404 is not retried — it propagates immediately (in production, caught inside __update_status_from_handle).""" rc = FakeResourceClaim(update_side_effects=[FakeApiException(status=404)]) - await retry_update_status_from_handle( - self.logger, rc, self.resource_handle, self.resource_states, - ) - rc.update_status_from_handle.assert_called_once() + with self.assertRaises(K8sApiException) as ctx: + await rc.update_status_from_handle( + self.logger, self.resource_handle, self.resource_states, + ) + self.assertEqual(ctx.exception.status, 404) + rc._update_impl.assert_called_once() self.assertEqual(rc.refetch_count, 0) async def test_other_exception_raises_immediately(self): """Non-422/404 exceptions should raise immediately.""" rc = FakeResourceClaim(update_side_effects=[FakeApiException(status=500)]) with self.assertRaises(K8sApiException) as ctx: - await retry_update_status_from_handle( - self.logger, rc, self.resource_handle, self.resource_states, + await rc.update_status_from_handle( + self.logger, self.resource_handle, self.resource_states, ) self.assertEqual(ctx.exception.status, 500) - rc.update_status_from_handle.assert_called_once() + rc._update_impl.assert_called_once() self.assertEqual(rc.refetch_count, 0)