Skip to content

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Feb 3, 2026

Summary

Performs clean-up of NATS and Redis resources used by async jobs

Related Issues

Closes #1083

Testing

NATS Dashboard With job running:

image

After job finished:

image

Redis with job running:

127.0.0.1:6379> KEYS *job*
1) ":1:job:66:pending_images:results"
2) ":1:job:66:pending_images:process"
3) ":1:job:66:pending_images_total"

With job complete:

127.0.0.1:6379> KEYS *job*
(empty array)

Job logs:

[2026-02-03 21:58:56] INFO Cleaned up NATS resources for job 66
[2026-02-03 21:58:56] INFO Cleaned up Redis state for job 66

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

Release Notes

  • Bug Fixes

    • Improved resource cleanup for ML pipeline jobs that complete, fail, or are revoked, preventing resource leaks and improving system stability.
  • Tests

    • Added comprehensive tests for job resource cleanup across various lifecycle scenarios.

@netlify
Copy link

netlify bot commented Feb 3, 2026

👷 Deploy request for antenna-ssec pending review.

Visit the deploys page to approve it

Name Link
🔨 Latest commit e2849dc

@netlify
Copy link

netlify bot commented Feb 3, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit e2849dc
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/698280850b7e3a000811e295

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 3, 2026

📝 Walkthrough

Walkthrough

This PR implements automatic cleanup of async job resources (NATS queues and Redis state) upon job completion, failure, and revocation. A formerly NATS-only cleanup function is renamed and extended to handle both Redis and NATS resource cleanup, with invocation points added at key job lifecycle hooks.

Changes

Cohort / File(s) Summary
Job Task Orchestration
ami/jobs/tasks.py
Introduces _cleanup_job_if_needed() helper to conditionally invoke async resource cleanup based on job type and feature flags. Integrated into three job lifecycle paths: progress completion (when stage="results" and progress ≥ 1.0), status updates (when state=REVOKED), and job failures.
Async Resource Cleanup
ami/ml/orchestration/jobs.py
Renames cleanup_nats_resources() to cleanup_async_job_resources() and extends it to clean up both Redis state (via TaskStateManager.cleanup()) and NATS JetStream resources (via TaskQueueManager). Returns boolean success indicator requiring both cleanups to succeed.
Cleanup Integration Tests
ami/ml/orchestration/test_cleanup.py
Adds comprehensive test suite covering three job cleanup scenarios: completion, failure, and revocation. Validates resource cleanup by inspecting Redis keys and NATS streams/consumers after each lifecycle event.

Sequence Diagram

sequenceDiagram
    actor Job Lifecycle
    participant tasks.py as Task Handler
    participant jobs.py as Cleanup Orchestrator
    participant TaskStateManager as Redis
    participant TaskQueueManager as NATS

    Job Lifecycle->>tasks.py: Job completed/failed/revoked
    tasks.py->>tasks.py: _cleanup_job_if_needed()
    activate tasks.py
    tasks.py->>jobs.py: cleanup_async_job_resources(job)
    activate jobs.py
    
    jobs.py->>TaskStateManager: cleanup()
    activate TaskStateManager
    TaskStateManager-->>jobs.py: redis_success
    deactivate TaskStateManager
    
    jobs.py->>TaskQueueManager: cleanup NATS resources
    activate TaskQueueManager
    TaskQueueManager-->>jobs.py: nats_success
    deactivate TaskQueueManager
    
    jobs.py-->>tasks.py: redis_success AND nats_success
    deactivate jobs.py
    deactivate tasks.py
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • mihow

Poem

🐰 In queues both Redis, NATS so grand,
We cleanup what we've planned,
When jobs complete their hoppy race,
Resources vanish without a trace! 🌟

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 55.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely describes the main change: implementing queue cleanup upon job completion in PSv2.
Description check ✅ Passed The PR description includes summary, related issues, testing evidence with screenshots/logs, and a completed checklist. All required template sections are present or adequately addressed.
Linked Issues check ✅ Passed The code changes fully address issue #1083: cleanup functions are implemented, NATS queues/streams are deleted, Redis resources are removed, and comprehensive tests validate the cleanup across job completion/failure/revocation scenarios.
Out of Scope Changes check ✅ Passed All changes are directly aligned with the PR objective to implement queue cleanup for async jobs. No unrelated modifications to other systems or features are present.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@carlosgjs carlosgjs marked this pull request as ready for review February 4, 2026 03:03
Copilot AI review requested due to automatic review settings February 4, 2026 03:03
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements automatic cleanup of NATS JetStream and Redis resources when async ML jobs complete, fail, or are cancelled. This addresses issue #1083 by ensuring that temporary resources used for job orchestration are properly removed after jobs finish.

Changes:

  • Renamed cleanup_nats_resources to cleanup_async_job_resources to handle both NATS and Redis cleanup
  • Integrated cleanup into job lifecycle at three points: completion, failure, and revocation
  • Added comprehensive integration tests covering all three cleanup scenarios

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
ami/ml/orchestration/jobs.py Enhanced cleanup function to handle both Redis and NATS resources, with proper error handling and logging
ami/jobs/tasks.py Integrated cleanup calls in job completion, failure, and revocation handlers with feature flag checks
ami/ml/orchestration/test_cleanup.py Added comprehensive integration tests verifying cleanup works correctly in all three scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/test_cleanup.py`:
- Around line 118-159: In _verify_resources_cleaned, change the broad exception
handling inside the async check_nats_resources (which calls
manager.js.stream_info and manager.js.consumer_info via TaskQueueManager) to
only treat nats.js.errors.NotFoundError as "not found" (set
stream_exists/consumer_exists = False) and re-raise any other exceptions so
connection/infra errors fail the test; import or reference NotFoundError from
nats.js.errors and use it in the except clauses for the respective stream and
consumer checks.
🧹 Nitpick comments (1)
ami/ml/orchestration/jobs.py (1)

33-53: Capture stack traces on cleanup failures for easier diagnosis.

job.logger.error drops the traceback; job.logger.exception preserves context without changing behavior.

🔧 Suggested update
-    except Exception as e:
-        job.logger.error(f"Error cleaning up Redis state for job {job.pk}: {e}")
+    except Exception:
+        job.logger.exception(f"Error cleaning up Redis state for job {job.pk}")
...
-    except Exception as e:
-        job.logger.error(f"Error cleaning up NATS resources for job {job.pk}: {e}")
+    except Exception:
+        job.logger.exception(f"Error cleaning up NATS resources for job {job.pk}")

Comment on lines +118 to +159
def _verify_resources_cleaned(self, job_id: int):
"""
Verify that both Redis and NATS resources are cleaned up.

Args:
job_id: The job ID to check
"""
# Verify Redis keys are deleted
state_manager = TaskStateManager(job_id)
for stage in state_manager.STAGES:
pending_key = state_manager._get_pending_key(stage)
self.assertIsNone(cache.get(pending_key), f"Redis key {pending_key} should be deleted")
total_key = state_manager._total_key
self.assertIsNone(cache.get(total_key), f"Redis key {total_key} should be deleted")

# Verify NATS stream and consumer are deleted
async def check_nats_resources():
async with TaskQueueManager() as manager:
stream_name = manager._get_stream_name(job_id)
consumer_name = manager._get_consumer_name(job_id)

# Try to get stream info - should fail if deleted
stream_exists = True
try:
await manager.js.stream_info(stream_name)
except Exception:
stream_exists = False

# Try to get consumer info - should fail if deleted
consumer_exists = True
try:
await manager.js.consumer_info(stream_name, consumer_name)
except Exception:
consumer_exists = False

return stream_exists, consumer_exists

stream_exists, consumer_exists = async_to_sync(check_nats_resources)()

self.assertFalse(stream_exists, f"NATS stream for job {job_id} should be deleted")
self.assertFalse(consumer_exists, f"NATS consumer for job {job_id} should be deleted")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's explore the codebase structure and find related files
cd /tmp/codebase 2>/dev/null || pwd
find . -type f -name "*.py" | grep -E "(nats|queue|manager)" | head -20

Repository: RolnickLab/antenna

Length of output: 218


🏁 Script executed:

# Check the test file in question
cat -n ami/ml/orchestration/test_cleanup.py | sed -n '118,159p'

Repository: RolnickLab/antenna

Length of output: 2140


🏁 Script executed:

# Look for TaskQueueManager implementation
rg "class TaskQueueManager" -A 20 --max-count 1

Repository: RolnickLab/antenna

Length of output: 1556


🏁 Script executed:

# Search for existing NATS error handling patterns in the codebase
rg "nats\.js" -B 2 -A 2 | head -50

Repository: RolnickLab/antenna

Length of output: 469


🌐 Web query:

nats.py JetStream exceptions NotFoundError NoStreamError NoConsumerError

💡 Result:

In nats.py (nats-py) JetStream there aren’t separate exceptions named NoStreamError / NoConsumerError. Missing resources are reported as a JetStream API 404 which the client maps to nats.js.errors.NotFoundError (a subclass of APIError). (nats-io.github.io)

To tell what was not found, inspect the JetStream err_code on the exception:

Example:

from nats.js.errors import NotFoundError

try:
    await js.consumer_info("MY_STREAM", "MY_DURABLE")
except NotFoundError as e:
    if e.err_code == 10059:
        # stream does not exist
        ...
    elif e.err_code == 10014:
        # consumer does not exist
        ...
    else:
        # some other JetStream 404
        ...

Related: when publishing, if the client receives a 503 “no response”, nats.py raises NoStreamResponseError (not NoStreamError). (nats-io.github.io)

Citations:


🏁 Script executed:

# Check for imports of NATS errors in the codebase
rg "from nats.*error" -i

Repository: RolnickLab/antenna

Length of output: 44


🏁 Script executed:

# Look for existing error handling patterns in nats_queue.py
cat -n ami/ml/orchestration/nats_queue.py | head -100

Repository: RolnickLab/antenna

Length of output: 4138


🏁 Script executed:

# Check for any try-except blocks in nats_queue.py
rg "try:" -A 3 ami/ml/orchestration/nats_queue.py | head -40

Repository: RolnickLab/antenna

Length of output: 1290


🌐 Web query:

nats.py JetStream NotFoundError APIError exception classes documentation

💡 Result:

In nats.py, JetStream-specific exceptions are documented under “JetStream Errors” in the generated docs and implemented in nats.js.errors. (nats-io.github.io)

Key exception classes (JetStream)

  • nats.js.errors.Error: Base class for JetStream-related errors (description: Optional[str]). (nats-io.github.io)
  • nats.js.errors.APIError: Raised for JetStream API (JS API) error responses; it carries structured fields:
  • nats.js.errors.NotFoundError: A subclass of APIError representing an HTTP-like 404 condition. (nats-io.github.io)

Where to find the documentation/source

  • JetStream Errors section (exception list + constructor signatures): (nats-io.github.io)
  • Source for the exception classes: nats.js.errors module page (nats-io.github.io)

Typical imports / catching

from nats.js.errors import APIError, NotFoundError

try:
    ...
except NotFoundError as e:
    ...
except APIError as e:
    ...
``` ([nats-io.github.io](https://nats-io.github.io/nats.py/modules.html))

Citations:

- 1: https://nats-io.github.io/nats.py/modules.html
- 2: https://nats-io.github.io/nats.py/modules.html
- 3: https://nats-io.github.io/nats.py/modules.html
- 4: https://nats-io.github.io/nats.py/modules.html
- 5: https://nats-io.github.io/nats.py/modules.html
- 6: https://nats-io.github.io/nats.py/_modules/nats/js/errors.html
- 7: https://nats-io.github.io/nats.py/modules.html

---



</details>

**Catch only NATS "not found" errors; let other exceptions fail the test.**

In `_verify_resources_cleaned`, catching all exceptions masks infrastructure issues like connection failures. The suggested exception classes in the diff (`NoStreamError`, `NoConsumerError`) don't exist in nats.py. Catch only `NotFoundError` from `nats.js.errors` (which indicates a 404/not found response) and re-raise all other exceptions so the test fails on NATS issues.

<details>
<summary>🛠️ Suggested fix (use the correct NATS exception class)</summary>

```diff
+from nats.js.errors import NotFoundError
...
-                try:
-                    await manager.js.stream_info(stream_name)
-                except Exception:
-                    stream_exists = False
+                try:
+                    await manager.js.stream_info(stream_name)
+                except NotFoundError:
+                    stream_exists = False
+                except Exception:
+                    raise
...
-                try:
-                    await manager.js.consumer_info(stream_name, consumer_name)
-                except Exception:
-                    consumer_exists = False
+                try:
+                    await manager.js.consumer_info(stream_name, consumer_name)
+                except NotFoundError:
+                    consumer_exists = False
+                except Exception:
+                    raise
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 143-143: Do not catch blind exception: Exception

(BLE001)


[warning] 150-150: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@ami/ml/orchestration/test_cleanup.py` around lines 118 - 159, In
_verify_resources_cleaned, change the broad exception handling inside the async
check_nats_resources (which calls manager.js.stream_info and
manager.js.consumer_info via TaskQueueManager) to only treat
nats.js.errors.NotFoundError as "not found" (set stream_exists/consumer_exists =
False) and re-raise any other exceptions so connection/infra errors fail the
test; import or reference NotFoundError from nats.js.errors and use it in the
except clauses for the respective stream and consumer checks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PSv2: Implement queue clean-up upon job completion

2 participants