-
Notifications
You must be signed in to change notification settings - Fork 11
PSv2: Implement queue clean-up upon job completion #1113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
👷 Deploy request for antenna-ssec pending review.Visit the deploys page to approve it
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughThis PR implements automatic cleanup of async job resources (NATS queues and Redis state) upon job completion, failure, and revocation. A formerly NATS-only cleanup function is renamed and extended to handle both Redis and NATS resource cleanup, with invocation points added at key job lifecycle hooks. Changes
Sequence DiagramsequenceDiagram
actor Job Lifecycle
participant tasks.py as Task Handler
participant jobs.py as Cleanup Orchestrator
participant TaskStateManager as Redis
participant TaskQueueManager as NATS
Job Lifecycle->>tasks.py: Job completed/failed/revoked
tasks.py->>tasks.py: _cleanup_job_if_needed()
activate tasks.py
tasks.py->>jobs.py: cleanup_async_job_resources(job)
activate jobs.py
jobs.py->>TaskStateManager: cleanup()
activate TaskStateManager
TaskStateManager-->>jobs.py: redis_success
deactivate TaskStateManager
jobs.py->>TaskQueueManager: cleanup NATS resources
activate TaskQueueManager
TaskQueueManager-->>jobs.py: nats_success
deactivate TaskQueueManager
jobs.py-->>tasks.py: redis_success AND nats_success
deactivate jobs.py
deactivate tasks.py
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements automatic cleanup of NATS JetStream and Redis resources when async ML jobs complete, fail, or are cancelled. This addresses issue #1083 by ensuring that temporary resources used for job orchestration are properly removed after jobs finish.
Changes:
- Renamed
cleanup_nats_resourcestocleanup_async_job_resourcesto handle both NATS and Redis cleanup - Integrated cleanup into job lifecycle at three points: completion, failure, and revocation
- Added comprehensive integration tests covering all three cleanup scenarios
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| ami/ml/orchestration/jobs.py | Enhanced cleanup function to handle both Redis and NATS resources, with proper error handling and logging |
| ami/jobs/tasks.py | Integrated cleanup calls in job completion, failure, and revocation handlers with feature flag checks |
| ami/ml/orchestration/test_cleanup.py | Added comprehensive integration tests verifying cleanup works correctly in all three scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/test_cleanup.py`:
- Around line 118-159: In _verify_resources_cleaned, change the broad exception
handling inside the async check_nats_resources (which calls
manager.js.stream_info and manager.js.consumer_info via TaskQueueManager) to
only treat nats.js.errors.NotFoundError as "not found" (set
stream_exists/consumer_exists = False) and re-raise any other exceptions so
connection/infra errors fail the test; import or reference NotFoundError from
nats.js.errors and use it in the except clauses for the respective stream and
consumer checks.
🧹 Nitpick comments (1)
ami/ml/orchestration/jobs.py (1)
33-53: Capture stack traces on cleanup failures for easier diagnosis.
job.logger.errordrops the traceback;job.logger.exceptionpreserves context without changing behavior.🔧 Suggested update
- except Exception as e: - job.logger.error(f"Error cleaning up Redis state for job {job.pk}: {e}") + except Exception: + job.logger.exception(f"Error cleaning up Redis state for job {job.pk}") ... - except Exception as e: - job.logger.error(f"Error cleaning up NATS resources for job {job.pk}: {e}") + except Exception: + job.logger.exception(f"Error cleaning up NATS resources for job {job.pk}")
| def _verify_resources_cleaned(self, job_id: int): | ||
| """ | ||
| Verify that both Redis and NATS resources are cleaned up. | ||
|
|
||
| Args: | ||
| job_id: The job ID to check | ||
| """ | ||
| # Verify Redis keys are deleted | ||
| state_manager = TaskStateManager(job_id) | ||
| for stage in state_manager.STAGES: | ||
| pending_key = state_manager._get_pending_key(stage) | ||
| self.assertIsNone(cache.get(pending_key), f"Redis key {pending_key} should be deleted") | ||
| total_key = state_manager._total_key | ||
| self.assertIsNone(cache.get(total_key), f"Redis key {total_key} should be deleted") | ||
|
|
||
| # Verify NATS stream and consumer are deleted | ||
| async def check_nats_resources(): | ||
| async with TaskQueueManager() as manager: | ||
| stream_name = manager._get_stream_name(job_id) | ||
| consumer_name = manager._get_consumer_name(job_id) | ||
|
|
||
| # Try to get stream info - should fail if deleted | ||
| stream_exists = True | ||
| try: | ||
| await manager.js.stream_info(stream_name) | ||
| except Exception: | ||
| stream_exists = False | ||
|
|
||
| # Try to get consumer info - should fail if deleted | ||
| consumer_exists = True | ||
| try: | ||
| await manager.js.consumer_info(stream_name, consumer_name) | ||
| except Exception: | ||
| consumer_exists = False | ||
|
|
||
| return stream_exists, consumer_exists | ||
|
|
||
| stream_exists, consumer_exists = async_to_sync(check_nats_resources)() | ||
|
|
||
| self.assertFalse(stream_exists, f"NATS stream for job {job_id} should be deleted") | ||
| self.assertFalse(consumer_exists, f"NATS consumer for job {job_id} should be deleted") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's explore the codebase structure and find related files
cd /tmp/codebase 2>/dev/null || pwd
find . -type f -name "*.py" | grep -E "(nats|queue|manager)" | head -20Repository: RolnickLab/antenna
Length of output: 218
🏁 Script executed:
# Check the test file in question
cat -n ami/ml/orchestration/test_cleanup.py | sed -n '118,159p'Repository: RolnickLab/antenna
Length of output: 2140
🏁 Script executed:
# Look for TaskQueueManager implementation
rg "class TaskQueueManager" -A 20 --max-count 1Repository: RolnickLab/antenna
Length of output: 1556
🏁 Script executed:
# Search for existing NATS error handling patterns in the codebase
rg "nats\.js" -B 2 -A 2 | head -50Repository: RolnickLab/antenna
Length of output: 469
🌐 Web query:
nats.py JetStream exceptions NotFoundError NoStreamError NoConsumerError
💡 Result:
In nats.py (nats-py) JetStream there aren’t separate exceptions named NoStreamError / NoConsumerError. Missing resources are reported as a JetStream API 404 which the client maps to nats.js.errors.NotFoundError (a subclass of APIError). (nats-io.github.io)
To tell what was not found, inspect the JetStream err_code on the exception:
- Stream not found →
err_code == 10059(beta-docs.nats.io) - Consumer not found →
err_code == 10014(beta-docs.nats.io)
Example:
from nats.js.errors import NotFoundError
try:
await js.consumer_info("MY_STREAM", "MY_DURABLE")
except NotFoundError as e:
if e.err_code == 10059:
# stream does not exist
...
elif e.err_code == 10014:
# consumer does not exist
...
else:
# some other JetStream 404
...Related: when publishing, if the client receives a 503 “no response”, nats.py raises NoStreamResponseError (not NoStreamError). (nats-io.github.io)
Citations:
- 1: https://nats-io.github.io/nats.py/_modules/nats/js/errors.html
- 2: https://beta-docs.nats.io/ref/protocols/jetstream?utm_source=openai
- 3: https://beta-docs.nats.io/ref/protocols/jetstream?utm_source=openai
- 4: https://nats-io.github.io/nats.py/_modules/nats/js/errors.html
🏁 Script executed:
# Check for imports of NATS errors in the codebase
rg "from nats.*error" -iRepository: RolnickLab/antenna
Length of output: 44
🏁 Script executed:
# Look for existing error handling patterns in nats_queue.py
cat -n ami/ml/orchestration/nats_queue.py | head -100Repository: RolnickLab/antenna
Length of output: 4138
🏁 Script executed:
# Check for any try-except blocks in nats_queue.py
rg "try:" -A 3 ami/ml/orchestration/nats_queue.py | head -40Repository: RolnickLab/antenna
Length of output: 1290
🌐 Web query:
nats.py JetStream NotFoundError APIError exception classes documentation
💡 Result:
In nats.py, JetStream-specific exceptions are documented under “JetStream Errors” in the generated docs and implemented in nats.js.errors. (nats-io.github.io)
Key exception classes (JetStream)
nats.js.errors.Error: Base class for JetStream-related errors (description: Optional[str]). (nats-io.github.io)nats.js.errors.APIError: Raised for JetStream API (JS API) error responses; it carries structured fields:code,err_code,description,stream,seq(nats-io.github.io)
nats.js.errors.NotFoundError: A subclass ofAPIErrorrepresenting an HTTP-like 404 condition. (nats-io.github.io)
Where to find the documentation/source
- JetStream Errors section (exception list + constructor signatures): (nats-io.github.io)
- Source for the exception classes:
nats.js.errorsmodule page (nats-io.github.io)
Typical imports / catching
from nats.js.errors import APIError, NotFoundError
try:
...
except NotFoundError as e:
...
except APIError as e:
...
``` ([nats-io.github.io](https://nats-io.github.io/nats.py/modules.html))
Citations:
- 1: https://nats-io.github.io/nats.py/modules.html
- 2: https://nats-io.github.io/nats.py/modules.html
- 3: https://nats-io.github.io/nats.py/modules.html
- 4: https://nats-io.github.io/nats.py/modules.html
- 5: https://nats-io.github.io/nats.py/modules.html
- 6: https://nats-io.github.io/nats.py/_modules/nats/js/errors.html
- 7: https://nats-io.github.io/nats.py/modules.html
---
</details>
**Catch only NATS "not found" errors; let other exceptions fail the test.**
In `_verify_resources_cleaned`, catching all exceptions masks infrastructure issues like connection failures. The suggested exception classes in the diff (`NoStreamError`, `NoConsumerError`) don't exist in nats.py. Catch only `NotFoundError` from `nats.js.errors` (which indicates a 404/not found response) and re-raise all other exceptions so the test fails on NATS issues.
<details>
<summary>🛠️ Suggested fix (use the correct NATS exception class)</summary>
```diff
+from nats.js.errors import NotFoundError
...
- try:
- await manager.js.stream_info(stream_name)
- except Exception:
- stream_exists = False
+ try:
+ await manager.js.stream_info(stream_name)
+ except NotFoundError:
+ stream_exists = False
+ except Exception:
+ raise
...
- try:
- await manager.js.consumer_info(stream_name, consumer_name)
- except Exception:
- consumer_exists = False
+ try:
+ await manager.js.consumer_info(stream_name, consumer_name)
+ except NotFoundError:
+ consumer_exists = False
+ except Exception:
+ raise🧰 Tools
🪛 Ruff (0.14.14)
[warning] 143-143: Do not catch blind exception: Exception
(BLE001)
[warning] 150-150: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In `@ami/ml/orchestration/test_cleanup.py` around lines 118 - 159, In
_verify_resources_cleaned, change the broad exception handling inside the async
check_nats_resources (which calls manager.js.stream_info and
manager.js.consumer_info via TaskQueueManager) to only treat
nats.js.errors.NotFoundError as "not found" (set stream_exists/consumer_exists =
False) and re-raise any other exceptions so connection/infra errors fail the
test; import or reference NotFoundError from nats.js.errors and use it in the
except clauses for the respective stream and consumer checks.
Summary
Performs clean-up of NATS and Redis resources used by async jobs
Related Issues
Closes #1083
Testing
NATS Dashboard With job running:
After job finished:
Redis with job running:
With job complete:
Job logs:
Checklist
Summary by CodeRabbit
Release Notes
Bug Fixes
Tests