-
Notifications
You must be signed in to change notification settings - Fork 0
fix: removal of lifecycle enabled from notif service #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughAdds an APScheduler-backed NotificationScheduler and wires it into DI/providers; moves consumer and dispatcher lifecycle into providers; removes consumer/dispatcher lifecycle from NotificationService in favor of inline delivery with retries; adds a 30-day TTL index and replaces pending/scheduled/cleanup repository methods with find_due_notifications. (37 words) Changes
Sequence Diagram(s)sequenceDiagram
participant APS as rgba(0,128,0,0.5) APScheduler
participant SCHED as rgba(0,0,255,0.5) NotificationScheduler
participant REPO as rgba(255,165,0,0.5) NotificationRepository
participant SVC as rgba(128,0,128,0.5) NotificationService
participant CH as rgba(70,130,180,0.5) ChannelHandler
APS->>SCHED: trigger process_due_notifications()
SCHED->>REPO: find_due_notifications(limit)
REPO-->>SCHED: due notifications list
loop per notification
SCHED->>SVC: _deliver_notification(notification)
SVC->>CH: deliver via channel (SSE/Webhook/Slack)
CH-->>SVC: success / error
SVC-->>SCHED: return delivered (true/false)
end
SCHED-->>APS: delivered_count
sequenceDiagram
participant CON as rgba(0,128,128,0.5) UnifiedConsumer
participant DISP as rgba(255,69,0,0.5) Dispatcher
participant SVC as rgba(128,0,128,0.5) NotificationService
CON->>DISP: received event (EXECUTION_*)
DISP->>SVC: invoke handler (execution -> create/notify)
SVC-->>DISP: handler result
note right of CON: consumer lifecycle started/stopped by providers
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issues found across 6 files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@backend/app/db/docs/notification.py`:
- Around line 68-70: The TTL index "idx_notif_ttl" on created_at will remove
notifications scheduled >30 days ahead before delivery; either enforce a max
scheduling window or move TTL to a post-delivery timestamp: update the
notification creation/validation logic (where notifications are
created/scheduled) to reject or clamp scheduled_for values beyond created_at +
30 days (e.g., in the function/method that inserts Notification documents), or
instead create a TTL index on a delivered_at (or completed_at) field and ensure
the send/worker code sets delivered_at when the notification is sent (and
remove/rename the existing IndexModel([("created_at", ASCENDING)],
name="idx_notif_ttl")). Ensure references to IndexModel and the index name
"idx_notif_ttl", as well as the fields "created_at", "scheduled_for", and the
code path that marks delivery, are updated accordingly.
In `@backend/app/services/notification_service.py`:
- Around line 685-689: The delivery-time metric call in notification_service.py
is missing the channel argument and thus defaults to "in_app", misattributing
non-in_app deliveries; update the call to
metrics.record_notification_delivery_time in the same block where
metrics.record_notification_sent is called to pass channel=notification.channel
(and keep the existing delivery_time and notification.severity args) so
delivery-time metrics are attributed to the correct channel.
- Around line 621-625: The _deliver_notification flow uses
repository.try_claim_pending which only claims notifications whose scheduled_for
<= now, so without a scheduler future-scheduled notifications will never be
retried; update _deliver_notification (or the higher-level enqueue path) to
explicitly handle future schedules: either (A) reject/fail fast by checking
notification.scheduled_for against current time and returning/logging an error
if scheduled_for > now (e.g., raise or mark as unsupported), or (B) reintroduce
scheduling by enqueuing a background job/queue to re-invoke
_deliver_notification at notification.scheduled_for (ensure the enqueue logic
records notification_id and scheduled_for). Reference the _deliver_notification
method and repository.try_claim_pending when adding the time check or enqueue
logic so future notifications are not left pending indefinitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 9 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/notification_scheduler.py">
<violation number="1" location="backend/app/services/notification_scheduler.py:43">
P2: `delivered` is incremented even when delivery fails, because `_deliver_notification` handles failures internally without raising. The returned count/log message will report successes that may include FAILED deliveries. Consider returning a success flag from the service or raising on failure and only incrementing on true success.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/core/dishka_lifespan.py`:
- Around line 79-81: Update the lifecycle comment around the "Phase 1" block in
dishka_lifespan.py to reflect the new scheduler-based flow: remove the reference
to NotificationService background tasks and instead note that the scheduler
(started via its DI provider) handles background jobs; keep the note about
resolving DI dependencies in parallel and that consumers/providers start
automatically via DI where applicable (refer to the "Phase 1" comment and any
mentions of NotificationService or scheduler in the surrounding block).
In `@backend/app/services/notification_scheduler.py`:
- Around line 25-50: The method process_due_notifications in
notification_scheduler.py increments delivered based solely on calls to
service._deliver_notification which doesn't return success, so the counter/log
message is misleading; either update service._deliver_notification to return a
boolean success flag and use that to increment delivered (e.g., call success =
await self.service._deliver_notification(...) and increment only when success is
True), or rename variables/log messages to reflect "processed" instead of
"delivered" (change delivered -> processed and the info logs accordingly) and
keep current behavior; modify the logger lines and the delivered/processed
counter consistently and reference process_due_notifications and
_deliver_notification when making the change.
🧹 Nitpick comments (3)
backend/app/settings.py (1)
118-118: Guard the schedule window against TTL drift.
The comment says this must be< TTL, but there’s no enforcement. A misconfig could let scheduled notifications expire before delivery. Consider validating this at settings load (or centralizing the TTL constant) to prevent silent data loss.🛠️ Optional guard at settings load
@@ if override_path: with open(override_path, "rb") as f: data |= tomllib.load(f) super().__init__(**data) + # Keep scheduling window inside notification TTL (currently 30 days). + if self.NOTIF_MAX_SCHEDULE_DAYS >= 30: + raise ValueError("NOTIF_MAX_SCHEDULE_DAYS must be < 30 days (notification TTL)")backend/app/services/notification_service.py (1)
676-685: Persist retry_count when delivery succeeds after retries.
Right now retry_count stays at 0 even if the first attempt failed and a later attempt succeeded. Persisting the actual retry usage makes diagnostics clearer.♻️ Suggested tweak
- await self.repository.update_notification( - notification.notification_id, - notification.user_id, - DomainNotificationUpdate(status=NotificationStatus.DELIVERED, delivered_at=datetime.now(UTC)), - ) + await self.repository.update_notification( + notification.notification_id, + notification.user_id, + DomainNotificationUpdate( + status=NotificationStatus.DELIVERED, + delivered_at=datetime.now(UTC), + retry_count=attempt, + ), + )backend/app/core/dishka_lifespan.py (1)
109-112: Useenter_async_contextfor clearer, more idiomatic lifecycle management.The current manual
__aenter__+push_async_callback(aclose)pattern works correctly—__aexit__andacloseare functionally identical inLifecycleEnabled. However,enter_async_contextis clearer and more idiomatic, avoiding manual context manager calls and lettingAsyncExitStackmanage the full lifecycle.♻️ Proposed change
async with AsyncExitStack() as stack: - stack.push_async_callback(event_store_consumer.aclose) - await event_store_consumer.__aenter__() + await stack.enter_async_context(event_store_consumer) logger.info("EventStoreConsumer started") yield
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/app/services/result_processor/processor.py`:
- Around line 37-38: You're currently reusing a single EventMetadata instance
stored in self._result_metadata (created with
EventMetadata(service_name=GroupId.RESULT_PROCESSOR, service_version="1.0.0")),
which causes all ResultStored/ResultFailed events to share the same
correlation_id; change the code to construct a fresh EventMetadata for each
publish (in the ResultStored/ResultFailed publishing paths around the code
referenced at 124-142) instead of reusing self._result_metadata, and when
available derive fields (especially correlation_id) from the result.metadata
source; ensure the new EventMetadata uses service_name=GroupId.RESULT_PROCESSOR
and service_version="1.0.0" so each event gets a unique or source-aligned
correlation_id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 2 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/result_processor/processor.py">
<violation number="1" location="backend/app/services/result_processor/processor.py:132">
P2: Avoid reusing a single EventMetadata instance across events; it fixes correlation_id to one value for all published results. Create a fresh EventMetadata per event so each ResultStored/ResultFailed gets a unique correlation_id for tracing.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/domain/events/typed.py (1)
121-126:⚠️ Potential issue | 🟠 MajorEnsure
error_typehas a default or validate all old events include it before deploying.Line 125 requires
error_typewith no default, which will cause validation failures when deserializing anyExecutionFailedEventpayloads that lack this field. While all current producers explicitly set it, any persisted events from older versions or external systems could fail deserialization. Add a safe default (e.g.,ExecutionErrorType.SYSTEM_ERROR) or verify through a backfill/data migration that all stored events include this field.
|



Summary by cubic
Decoupled NotificationService from app lifecycle and moved Kafka consumer management to the DI provider. Introduced a stateless NotificationScheduler (APScheduler) for scheduled sends; switched to inline retries and a 30‑day TTL index.
Refactors
Bug Fixes
Written for commit f2da711. Summary will update on new commits.
Summary by CodeRabbit
New Features
Refactor
Chores
Bug Fixes