Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 2, 2026


Summary by cubic

Decoupled NotificationService from app lifecycle and moved Kafka consumer management to the DI provider. Introduced a stateless NotificationScheduler (APScheduler) for scheduled sends; switched to inline retries and a 30‑day TTL index.

  • Refactors

    • NotificationService no longer extends LifecycleEnabled; internal loops removed and scheduling moved to NotificationScheduler.
    • DI provider starts/stops a UnifiedConsumer and registers EXECUTION_* handlers; lifespan no longer manages NotificationService.
    • Inline exponential backoff for delivery; removed scheduled retries. Due scheduled notifications are processed by NotificationScheduler every 15s.
    • Added 30‑day TTL index; enforce NOTIF_MAX_SCHEDULE_DAYS=25; replaced legacy pending/cleanup queries with find_due_notifications.
  • Bug Fixes

    • ExecutionFailedEvent now requires an explicit error_type; processor and tests updated.
    • Corrected Kafka subscription to EXECUTION_EVENTS for NotificationService.

Written for commit f2da711. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Added a scheduler to process and deliver due notifications automatically.
    • Notifications now auto-expire after 30 days.
  • Refactor

    • Simplified notification delivery with an inline retry flow and centralized status handling.
    • Scheduled deliveries moved to the scheduler; background consumer lifecycle adjusted.
  • Chores

    • Notification config consolidated into a single max-schedule-days setting.
  • Bug Fixes

    • Execution-failed events now require an explicit error type.

@coderabbitai
Copy link

coderabbitai bot commented Feb 2, 2026

📝 Walkthrough

Walkthrough

Adds an APScheduler-backed NotificationScheduler and wires it into DI/providers; moves consumer and dispatcher lifecycle into providers; removes consumer/dispatcher lifecycle from NotificationService in favor of inline delivery with retries; adds a 30-day TTL index and replaces pending/scheduled/cleanup repository methods with find_due_notifications. (37 words)

Changes

Cohort / File(s) Summary
DI & Lifespan Orchestration
backend/app/core/dishka_lifespan.py, backend/app/core/providers.py
Providers now construct/start/stop a UnifiedConsumer and a new NotificationScheduler; consumer/dispatcher wiring and lifecycle have been moved out of NotificationService into providers; Dishka lifespan no longer manually enters NotificationService.
NotificationService Refactor
backend/app/services/notification_service.py
Removed LifecycleEnabled and all consumer/dispatcher lifecycle code. Delivery is now inline with a multi-attempt retry loop (exponential backoff), _deliver_notification returns bool, scheduling validation added, and schema/metrics/lifecycle state removed.
Notification Scheduler
backend/app/services/notification_scheduler.py
New NotificationScheduler class with async process_due_notifications(batch_size) that queries due notifications and calls NotificationService._deliver_notification per item with per-item error handling and delivered-count reporting.
Repository & DB Schema
backend/app/db/repositories/notification_repository.py, backend/app/db/docs/notification.py
Added TTL index on created_at (30 days). Removed find_pending, find_scheduled, and cleanup_old_notifications; added find_due_notifications and tightened try_claim_pending to require scheduled_for <= now.
Providers: DI additions
backend/app/core/providers.py
Added get_notification_scheduler provider, wired APScheduler job (15s interval) to process_due_notifications, and adjusted NotificationService provider to start/stop its consumer before yield.
Configuration & Dependencies
backend/app/settings.py, backend/pyproject.toml
Removed NOTIF_PENDING_BATCH_SIZE/NOTIF_OLD_DAYS/NOTIF_RETRY_DELAY_MINUTES; added NOTIF_MAX_SCHEDULE_DAYS. Added dependency APScheduler==3.10.4.
Result/Event small changes & Tests
backend/app/services/result_processor/processor.py, backend/app/domain/events/typed.py, backend/app/domain/execution/models.py, backend/tests/...
Result processor now builds ExecutionResultDomain via model_dump; ExecutionFailedEvent.error_type made required and tests updated; ExecutionResultDomain now ignores extra fields.
Misc / Non-functional
backend/app/core/container.py, .github/actions/e2e-ready/action.yml
Whitespace-only change in container file; removed background cert-generator step and its logs from E2E-ready GitHub Action.

Sequence Diagram(s)

sequenceDiagram
    participant APS as rgba(0,128,0,0.5) APScheduler
    participant SCHED as rgba(0,0,255,0.5) NotificationScheduler
    participant REPO as rgba(255,165,0,0.5) NotificationRepository
    participant SVC as rgba(128,0,128,0.5) NotificationService
    participant CH as rgba(70,130,180,0.5) ChannelHandler

    APS->>SCHED: trigger process_due_notifications()
    SCHED->>REPO: find_due_notifications(limit)
    REPO-->>SCHED: due notifications list
    loop per notification
      SCHED->>SVC: _deliver_notification(notification)
      SVC->>CH: deliver via channel (SSE/Webhook/Slack)
      CH-->>SVC: success / error
      SVC-->>SCHED: return delivered (true/false)
    end
    SCHED-->>APS: delivered_count
Loading
sequenceDiagram
    participant CON as rgba(0,128,128,0.5) UnifiedConsumer
    participant DISP as rgba(255,69,0,0.5) Dispatcher
    participant SVC as rgba(128,0,128,0.5) NotificationService

    CON->>DISP: received event (EXECUTION_*)
    DISP->>SVC: invoke handler (execution -> create/notify)
    SVC-->>DISP: handler result
    note right of CON: consumer lifecycle started/stopped by providers
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐇 I hop through queues at break of dawn,

I nibble TTLs till old notes’re gone,
I stitch retries with patient cheer,
The scheduler hums — deliveries near,
A rabbit's wink: the inbox clear.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 52.17% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly addresses the main architectural change: removing LifecycleEnabled from NotificationService, which is the primary refactoring across the entire changeset.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/not-service

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 6 files

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@backend/app/db/docs/notification.py`:
- Around line 68-70: The TTL index "idx_notif_ttl" on created_at will remove
notifications scheduled >30 days ahead before delivery; either enforce a max
scheduling window or move TTL to a post-delivery timestamp: update the
notification creation/validation logic (where notifications are
created/scheduled) to reject or clamp scheduled_for values beyond created_at +
30 days (e.g., in the function/method that inserts Notification documents), or
instead create a TTL index on a delivered_at (or completed_at) field and ensure
the send/worker code sets delivered_at when the notification is sent (and
remove/rename the existing IndexModel([("created_at", ASCENDING)],
name="idx_notif_ttl")). Ensure references to IndexModel and the index name
"idx_notif_ttl", as well as the fields "created_at", "scheduled_for", and the
code path that marks delivery, are updated accordingly.

In `@backend/app/services/notification_service.py`:
- Around line 685-689: The delivery-time metric call in notification_service.py
is missing the channel argument and thus defaults to "in_app", misattributing
non-in_app deliveries; update the call to
metrics.record_notification_delivery_time in the same block where
metrics.record_notification_sent is called to pass channel=notification.channel
(and keep the existing delivery_time and notification.severity args) so
delivery-time metrics are attributed to the correct channel.
- Around line 621-625: The _deliver_notification flow uses
repository.try_claim_pending which only claims notifications whose scheduled_for
<= now, so without a scheduler future-scheduled notifications will never be
retried; update _deliver_notification (or the higher-level enqueue path) to
explicitly handle future schedules: either (A) reject/fail fast by checking
notification.scheduled_for against current time and returning/logging an error
if scheduled_for > now (e.g., raise or mark as unsupported), or (B) reintroduce
scheduling by enqueuing a background job/queue to re-invoke
_deliver_notification at notification.scheduled_for (ensure the enqueue logic
records notification_id and scheduled_for). Reference the _deliver_notification
method and repository.try_claim_pending when adding the time check or enqueue
logic so future notifications are not left pending indefinitely.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 9 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/notification_scheduler.py">

<violation number="1" location="backend/app/services/notification_scheduler.py:43">
P2: `delivered` is incremented even when delivery fails, because `_deliver_notification` handles failures internally without raising. The returned count/log message will report successes that may include FAILED deliveries. Consider returning a success flag from the service or raising on failure and only incrementing on true success.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@backend/app/core/dishka_lifespan.py`:
- Around line 79-81: Update the lifecycle comment around the "Phase 1" block in
dishka_lifespan.py to reflect the new scheduler-based flow: remove the reference
to NotificationService background tasks and instead note that the scheduler
(started via its DI provider) handles background jobs; keep the note about
resolving DI dependencies in parallel and that consumers/providers start
automatically via DI where applicable (refer to the "Phase 1" comment and any
mentions of NotificationService or scheduler in the surrounding block).

In `@backend/app/services/notification_scheduler.py`:
- Around line 25-50: The method process_due_notifications in
notification_scheduler.py increments delivered based solely on calls to
service._deliver_notification which doesn't return success, so the counter/log
message is misleading; either update service._deliver_notification to return a
boolean success flag and use that to increment delivered (e.g., call success =
await self.service._deliver_notification(...) and increment only when success is
True), or rename variables/log messages to reflect "processed" instead of
"delivered" (change delivered -> processed and the info logs accordingly) and
keep current behavior; modify the logger lines and the delivered/processed
counter consistently and reference process_due_notifications and
_deliver_notification when making the change.
🧹 Nitpick comments (3)
backend/app/settings.py (1)

118-118: Guard the schedule window against TTL drift.
The comment says this must be < TTL, but there’s no enforcement. A misconfig could let scheduled notifications expire before delivery. Consider validating this at settings load (or centralizing the TTL constant) to prevent silent data loss.

🛠️ Optional guard at settings load
@@
         if override_path:
             with open(override_path, "rb") as f:
                 data |= tomllib.load(f)
         super().__init__(**data)
+        # Keep scheduling window inside notification TTL (currently 30 days).
+        if self.NOTIF_MAX_SCHEDULE_DAYS >= 30:
+            raise ValueError("NOTIF_MAX_SCHEDULE_DAYS must be < 30 days (notification TTL)")
backend/app/services/notification_service.py (1)

676-685: Persist retry_count when delivery succeeds after retries.
Right now retry_count stays at 0 even if the first attempt failed and a later attempt succeeded. Persisting the actual retry usage makes diagnostics clearer.

♻️ Suggested tweak
-                await self.repository.update_notification(
-                    notification.notification_id,
-                    notification.user_id,
-                    DomainNotificationUpdate(status=NotificationStatus.DELIVERED, delivered_at=datetime.now(UTC)),
-                )
+                await self.repository.update_notification(
+                    notification.notification_id,
+                    notification.user_id,
+                    DomainNotificationUpdate(
+                        status=NotificationStatus.DELIVERED,
+                        delivered_at=datetime.now(UTC),
+                        retry_count=attempt,
+                    ),
+                )
backend/app/core/dishka_lifespan.py (1)

109-112: Use enter_async_context for clearer, more idiomatic lifecycle management.

The current manual __aenter__ + push_async_callback(aclose) pattern works correctly—__aexit__ and aclose are functionally identical in LifecycleEnabled. However, enter_async_context is clearer and more idiomatic, avoiding manual context manager calls and letting AsyncExitStack manage the full lifecycle.

♻️ Proposed change
     async with AsyncExitStack() as stack:
-        stack.push_async_callback(event_store_consumer.aclose)
-        await event_store_consumer.__aenter__()
+        await stack.enter_async_context(event_store_consumer)
         logger.info("EventStoreConsumer started")
         yield

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@backend/app/services/result_processor/processor.py`:
- Around line 37-38: You're currently reusing a single EventMetadata instance
stored in self._result_metadata (created with
EventMetadata(service_name=GroupId.RESULT_PROCESSOR, service_version="1.0.0")),
which causes all ResultStored/ResultFailed events to share the same
correlation_id; change the code to construct a fresh EventMetadata for each
publish (in the ResultStored/ResultFailed publishing paths around the code
referenced at 124-142) instead of reusing self._result_metadata, and when
available derive fields (especially correlation_id) from the result.metadata
source; ensure the new EventMetadata uses service_name=GroupId.RESULT_PROCESSOR
and service_version="1.0.0" so each event gets a unique or source-aligned
correlation_id.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 2 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/result_processor/processor.py">

<violation number="1" location="backend/app/services/result_processor/processor.py:132">
P2: Avoid reusing a single EventMetadata instance across events; it fixes correlation_id to one value for all published results. Create a fresh EventMetadata per event so each ResultStored/ResultFailed gets a unique correlation_id for tracing.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/domain/events/typed.py (1)

121-126: ⚠️ Potential issue | 🟠 Major

Ensure error_type has a default or validate all old events include it before deploying.

Line 125 requires error_type with no default, which will cause validation failures when deserializing any ExecutionFailedEvent payloads that lack this field. While all current producers explicitly set it, any persisted events from older versions or external systems could fail deserialization. Add a safe default (e.g., ExecutionErrorType.SYSTEM_ERROR) or verify through a backfill/data migration that all stored events include this field.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 3, 2026

@HardMax71 HardMax71 merged commit 8ccc390 into main Feb 3, 2026
15 checks passed
@HardMax71 HardMax71 deleted the fix/not-service branch February 3, 2026 00:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants