diff --git a/.github/actions/e2e-ready/action.yml b/.github/actions/e2e-ready/action.yml index fb794382..43073d27 100644 --- a/.github/actions/e2e-ready/action.yml +++ b/.github/actions/e2e-ready/action.yml @@ -26,14 +26,6 @@ runs: /home/runner/.kube/config > backend/kubeconfig.yaml chmod 644 backend/kubeconfig.yaml - - name: Start cert-generator (background) - shell: bash - env: - IMAGE_TAG: ${{ inputs.image-tag }} - run: | - nohup docker compose up -d --no-build cert-generator \ - > /tmp/cert-gen.log 2>&1 & - - name: Use test environment config shell: bash run: | @@ -51,7 +43,6 @@ runs: fi fi cat /tmp/infra-pull.log 2>/dev/null || true - cat /tmp/cert-gen.log 2>/dev/null || true if [ -f /tmp/infra-pull.exit ]; then EXIT_CODE=$(cat /tmp/infra-pull.exit) if [ "$EXIT_CODE" != "0" ]; then diff --git a/backend/app/core/container.py b/backend/app/core/container.py index 284b447d..20f6b101 100644 --- a/backend/app/core/container.py +++ b/backend/app/core/container.py @@ -186,3 +186,5 @@ def create_dlq_processor_container(settings: Settings) -> AsyncContainer: EventProvider(), context={Settings: settings}, ) + + diff --git a/backend/app/core/dishka_lifespan.py b/backend/app/core/dishka_lifespan.py index 916c7b03..28b08b42 100644 --- a/backend/app/core/dishka_lifespan.py +++ b/backend/app/core/dishka_lifespan.py @@ -17,6 +17,7 @@ from app.db.docs import ALL_DOCUMENTS from app.events.event_store_consumer import EventStoreConsumer from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas +from app.services.notification_scheduler import NotificationScheduler from app.services.notification_service import NotificationService from app.settings import Settings @@ -76,14 +77,15 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: ) # Phase 1: Resolve all DI dependencies in parallel - # SSE bus + consumers start automatically via NotificationService's dependency on SSERedisBus + # Consumers and the notification scheduler (APScheduler) start automatically via their DI providers ( schema_registry, database, redis_client, rate_limit_metrics, event_store_consumer, - notification_service, + _notification_service, + _notification_scheduler, ) = await asyncio.gather( container.get(SchemaRegistryManager), container.get(Database), @@ -91,6 +93,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: container.get(RateLimitMetrics), container.get(EventStoreConsumer), container.get(NotificationService), + container.get(NotificationScheduler), ) # Phase 2: Initialize infrastructure in parallel (independent subsystems) @@ -102,13 +105,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: logger.info("Infrastructure initialized (schemas, beanie, rate limits)") # Phase 3: Start lifecycle-managed services - # SSE bridge consumers are started by the DI provider — no lifecycle to manage here + # EventStoreConsumer requires explicit __aenter__; all other services are managed by DI providers async with AsyncExitStack() as stack: stack.push_async_callback(event_store_consumer.aclose) - stack.push_async_callback(notification_service.aclose) - await asyncio.gather( - event_store_consumer.__aenter__(), - notification_service.__aenter__(), - ) - logger.info("EventStoreConsumer and NotificationService started") + await event_store_consumer.__aenter__() + logger.info("EventStoreConsumer started") yield diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index 7f4a7129..095ea351 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -67,6 +67,7 @@ from app.services.idempotency.redis_repository import RedisIdempotencyRepository from app.services.k8s_worker import KubernetesWorker from app.services.kafka_event_service import KafkaEventService +from app.services.notification_scheduler import NotificationScheduler from app.services.notification_service import NotificationService from app.services.pod_monitor.config import PodMonitorConfig from app.services.pod_monitor.event_mapper import PodEventMapper @@ -552,15 +553,78 @@ async def get_notification_service( service = NotificationService( notification_repository=notification_repository, event_service=kafka_event_service, - schema_registry_manager=schema_registry, sse_bus=sse_redis_bus, settings=settings, logger=logger, notification_metrics=notification_metrics, + ) + + dispatcher = EventDispatcher(logger=logger) + dispatcher.register_handler(EventType.EXECUTION_COMPLETED, service._handle_execution_event) + dispatcher.register_handler(EventType.EXECUTION_FAILED, service._handle_execution_event) + dispatcher.register_handler(EventType.EXECUTION_TIMEOUT, service._handle_execution_event) + + consumer_config = ConsumerConfig( + bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, + group_id=GroupId.NOTIFICATION_SERVICE, + max_poll_records=10, + enable_auto_commit=False, + auto_offset_reset="latest", + session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS, + heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS, + max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS, + request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS, + ) + consumer = UnifiedConsumer( + consumer_config, + event_dispatcher=dispatcher, + schema_registry=schema_registry, + settings=settings, + logger=logger, event_metrics=event_metrics, ) - async with service: + await consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.NOTIFICATION_SERVICE])) + + logger.info("NotificationService started") + + try: yield service + finally: + await consumer.stop() + logger.info("NotificationService stopped") + + @provide + async def get_notification_scheduler( + self, + notification_repository: NotificationRepository, + notification_service: NotificationService, + logger: logging.Logger, + ) -> AsyncIterator[NotificationScheduler]: + from apscheduler.schedulers.asyncio import AsyncIOScheduler + + scheduler_service = NotificationScheduler( + notification_repository=notification_repository, + notification_service=notification_service, + logger=logger, + ) + + apscheduler = AsyncIOScheduler() + apscheduler.add_job( + scheduler_service.process_due_notifications, + trigger="interval", + seconds=15, + id="process_due_notifications", + max_instances=1, + misfire_grace_time=60, + ) + apscheduler.start() + logger.info("NotificationScheduler started (APScheduler interval=15s)") + + try: + yield scheduler_service + finally: + apscheduler.shutdown(wait=False) + logger.info("NotificationScheduler stopped") @provide def get_grafana_alert_processor( @@ -988,3 +1052,5 @@ def get_event_replay_service( replay_metrics=replay_metrics, logger=logger, ) + + diff --git a/backend/app/db/docs/notification.py b/backend/app/db/docs/notification.py index 70944d79..e44ef4ed 100644 --- a/backend/app/db/docs/notification.py +++ b/backend/app/db/docs/notification.py @@ -67,6 +67,7 @@ class Settings: indexes = [ IndexModel([("user_id", ASCENDING), ("created_at", DESCENDING)], name="idx_notif_user_created_desc"), IndexModel([("status", ASCENDING), ("scheduled_for", ASCENDING)], name="idx_notif_status_sched"), + IndexModel([("created_at", ASCENDING)], expireAfterSeconds=30 * 86400, name="idx_notif_ttl"), ] diff --git a/backend/app/db/repositories/notification_repository.py b/backend/app/db/repositories/notification_repository.py index 4cd94e81..4202d7c7 100644 --- a/backend/app/db/repositories/notification_repository.py +++ b/backend/app/db/repositories/notification_repository.py @@ -2,7 +2,7 @@ from datetime import UTC, datetime, timedelta from beanie.odm.enums import SortDirection -from beanie.operators import GTE, LT, LTE, ElemMatch, In, NotIn, Or +from beanie.operators import GTE, LTE, ElemMatch, In, NotIn, Or from app.db.docs import NotificationDocument, NotificationSubscriptionDocument, UserDocument from app.domain.enums.notification import NotificationChannel, NotificationStatus @@ -126,6 +126,21 @@ async def get_unread_count(self, user_id: str) -> int: In(NotificationDocument.status, [NotificationStatus.DELIVERED]), ).count() + async def find_due_notifications(self, limit: int = 50) -> list[DomainNotification]: + """Find PENDING notifications whose scheduled_for time has passed.""" + now = datetime.now(UTC) + docs = ( + await NotificationDocument.find( + NotificationDocument.status == NotificationStatus.PENDING, + NotificationDocument.scheduled_for != None, # noqa: E711 + LTE(NotificationDocument.scheduled_for, now), + ) + .sort([("scheduled_for", SortDirection.ASCENDING)]) + .limit(limit) + .to_list() + ) + return [DomainNotification.model_validate(doc, from_attributes=True) for doc in docs] + async def try_claim_pending(self, notification_id: str) -> bool: now = datetime.now(UTC) doc = await NotificationDocument.find_one( @@ -141,41 +156,6 @@ async def try_claim_pending(self, notification_id: str) -> bool: await doc.set({"status": NotificationStatus.SENDING, "sent_at": now}) return True - async def find_pending_notifications(self, batch_size: int = 10) -> list[DomainNotification]: - now = datetime.now(UTC) - docs = ( - await NotificationDocument.find( - NotificationDocument.status == NotificationStatus.PENDING, - Or( - NotificationDocument.scheduled_for == None, # noqa: E711 - LTE(NotificationDocument.scheduled_for, now), - ), - ) - .limit(batch_size) - .to_list() - ) - return [DomainNotification.model_validate(doc, from_attributes=True) for doc in docs] - - async def find_scheduled_notifications(self, batch_size: int = 10) -> list[DomainNotification]: - now = datetime.now(UTC) - docs = ( - await NotificationDocument.find( - NotificationDocument.status == NotificationStatus.PENDING, - LTE(NotificationDocument.scheduled_for, now), - NotificationDocument.scheduled_for != None, # noqa: E711 - ) - .limit(batch_size) - .to_list() - ) - return [DomainNotification.model_validate(doc, from_attributes=True) for doc in docs] - - async def cleanup_old_notifications(self, days: int = 30) -> int: - cutoff = datetime.now(UTC) - timedelta(days=days) - result = await NotificationDocument.find( - LT(NotificationDocument.created_at, cutoff), - ).delete() - return result.deleted_count if result else 0 - # Subscriptions async def get_subscription( self, user_id: str, channel: NotificationChannel diff --git a/backend/app/domain/enums/kafka.py b/backend/app/domain/enums/kafka.py index 86b24164..e1eceeb7 100644 --- a/backend/app/domain/enums/kafka.py +++ b/backend/app/domain/enums/kafka.py @@ -96,7 +96,7 @@ class GroupId(StringEnum): }, GroupId.NOTIFICATION_SERVICE: { KafkaTopic.NOTIFICATION_EVENTS, - KafkaTopic.EXECUTION_RESULTS, + KafkaTopic.EXECUTION_EVENTS, }, GroupId.DLQ_PROCESSOR: { KafkaTopic.DEAD_LETTER_QUEUE, diff --git a/backend/app/domain/events/typed.py b/backend/app/domain/events/typed.py index 02be84c2..6212fd6c 100644 --- a/backend/app/domain/events/typed.py +++ b/backend/app/domain/events/typed.py @@ -122,7 +122,7 @@ class ExecutionFailedEvent(BaseEvent): event_type: Literal[EventType.EXECUTION_FAILED] = EventType.EXECUTION_FAILED execution_id: str exit_code: int - error_type: ExecutionErrorType | None = None + error_type: ExecutionErrorType error_message: str = "" resource_usage: ResourceUsageDomain | None = None stdout: str = "" diff --git a/backend/app/domain/execution/models.py b/backend/app/domain/execution/models.py index dfa684ef..df8e38d7 100644 --- a/backend/app/domain/execution/models.py +++ b/backend/app/domain/execution/models.py @@ -29,7 +29,7 @@ class DomainExecution(BaseModel): class ExecutionResultDomain(BaseModel): - model_config = ConfigDict(from_attributes=True) + model_config = ConfigDict(from_attributes=True, extra="ignore") execution_id: str status: ExecutionStatus diff --git a/backend/app/services/notification_scheduler.py b/backend/app/services/notification_scheduler.py new file mode 100644 index 00000000..4e16e9ce --- /dev/null +++ b/backend/app/services/notification_scheduler.py @@ -0,0 +1,51 @@ +import logging + +from app.db.repositories.notification_repository import NotificationRepository +from app.services.notification_service import NotificationService + + +class NotificationScheduler: + """Stateless scheduler service that processes due notifications. + + APScheduler manages the timer (interval trigger) in the DI provider. + This class contains only the business logic for finding and delivering + due notifications — no loops, no lifecycle, no state. + """ + + def __init__( + self, + notification_repository: NotificationRepository, + notification_service: NotificationService, + logger: logging.Logger, + ) -> None: + self.repository = notification_repository + self.service = notification_service + self.logger = logger + + async def process_due_notifications(self, batch_size: int = 50) -> int: + """Find and deliver all notifications whose scheduled_for <= now. + + Called by APScheduler on a fixed interval. Each invocation is + a single-shot batch: query DB for due items, deliver each, return. + + Returns the number of notifications successfully delivered. + """ + due = await self.repository.find_due_notifications(limit=batch_size) + if not due: + return 0 + + self.logger.info(f"Found {len(due)} due scheduled notifications") + + delivered = 0 + for notification in due: + try: + if await self.service._deliver_notification(notification): + delivered += 1 + except Exception as e: + self.logger.error( + f"Failed to deliver scheduled notification {notification.notification_id}: {e}", + exc_info=True, + ) + + self.logger.info(f"Delivered {delivered}/{len(due)} scheduled notifications") + return delivered diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py index 3980f0d4..1ace24bb 100644 --- a/backend/app/services/notification_service.py +++ b/backend/app/services/notification_service.py @@ -6,12 +6,9 @@ import httpx -from app.core.lifecycle import LifecycleEnabled -from app.core.metrics import EventMetrics, NotificationMetrics +from app.core.metrics import NotificationMetrics from app.core.tracing.utils import add_span_attributes from app.db.repositories.notification_repository import NotificationRepository -from app.domain.enums.events import EventType -from app.domain.enums.kafka import GroupId from app.domain.enums.notification import ( NotificationChannel, NotificationSeverity, @@ -35,9 +32,6 @@ NotificationThrottledError, NotificationValidationError, ) -from app.events.core import ConsumerConfig, EventDispatcher, UnifiedConsumer -from app.events.schema.schema_registry import SchemaRegistryManager -from app.infrastructure.kafka.mappings import get_topic_for_event from app.schemas_pydantic.sse import RedisNotificationMessage from app.services.kafka_event_service import KafkaEventService from app.services.sse.redis_bus import SSERedisBus @@ -100,47 +94,25 @@ class SystemConfig: throttle_exempt: bool -class NotificationService(LifecycleEnabled): +class NotificationService: def __init__( self, notification_repository: NotificationRepository, event_service: KafkaEventService, - schema_registry_manager: SchemaRegistryManager, sse_bus: SSERedisBus, settings: Settings, logger: logging.Logger, notification_metrics: NotificationMetrics, - event_metrics: EventMetrics, ) -> None: - super().__init__() self.repository = notification_repository self.event_service = event_service self.metrics = notification_metrics - self._event_metrics = event_metrics self.settings = settings - self.schema_registry_manager = schema_registry_manager self.sse_bus = sse_bus self.logger = logger - # State self._throttle_cache = ThrottleCache() - # Tasks - self._tasks: set[asyncio.Task[None]] = set() - - self._consumer: UnifiedConsumer | None = None - self._dispatcher: EventDispatcher | None = None - self._consumer_task: asyncio.Task[None] | None = None - - self.logger.info( - "NotificationService initialized", - extra={ - "repository": type(notification_repository).__name__, - "event_service": type(event_service).__name__, - "schema_registry": type(schema_registry_manager).__name__, - }, - ) - # Channel handlers mapping self._channel_handlers: dict[NotificationChannel, ChannelHandler] = { NotificationChannel.IN_APP: self._send_in_app, @@ -148,92 +120,6 @@ def __init__( NotificationChannel.SLACK: self._send_slack, } - async def _on_start(self) -> None: - """Start the notification service with Kafka consumer.""" - self.logger.info("Starting notification service...") - self._start_background_tasks() - await self._subscribe_to_events() - self.logger.info("Notification service started with Kafka consumer") - - async def _on_stop(self) -> None: - """Stop the notification service.""" - self.logger.info("Stopping notification service...") - - # Cancel all tasks - for task in self._tasks: - task.cancel() - - # Wait for cancellation - if self._tasks: - await asyncio.gather(*self._tasks, return_exceptions=True) - - # Stop consumer - if self._consumer: - await self._consumer.stop() - - # Clear cache - await self._throttle_cache.clear() - - self.logger.info("Notification service stopped") - - def _start_background_tasks(self) -> None: - """Start background processing tasks.""" - tasks = [ - asyncio.create_task(self._process_pending_notifications()), - asyncio.create_task(self._cleanup_old_notifications()), - ] - - for task in tasks: - self._tasks.add(task) - task.add_done_callback(self._tasks.discard) - - async def _subscribe_to_events(self) -> None: - """Subscribe to relevant events for notifications.""" - # Configure consumer for notification-relevant events - consumer_config = ConsumerConfig( - bootstrap_servers=self.settings.KAFKA_BOOTSTRAP_SERVERS, - group_id=GroupId.NOTIFICATION_SERVICE, - max_poll_records=10, - enable_auto_commit=True, - auto_offset_reset="latest", - session_timeout_ms=self.settings.KAFKA_SESSION_TIMEOUT_MS, - heartbeat_interval_ms=self.settings.KAFKA_HEARTBEAT_INTERVAL_MS, - max_poll_interval_ms=self.settings.KAFKA_MAX_POLL_INTERVAL_MS, - request_timeout_ms=self.settings.KAFKA_REQUEST_TIMEOUT_MS, - ) - - execution_results_topic = get_topic_for_event(EventType.EXECUTION_COMPLETED) - - # Log topics for debugging - self.logger.info(f"Notification service will subscribe to topics: {execution_results_topic}") - - # Create dispatcher and register handlers for specific event types - self._dispatcher = EventDispatcher(logger=self.logger) - # Use a single handler for execution result events (simpler and less brittle) - self._dispatcher.register_handler(EventType.EXECUTION_COMPLETED, self._handle_execution_event) - self._dispatcher.register_handler(EventType.EXECUTION_FAILED, self._handle_execution_event) - self._dispatcher.register_handler(EventType.EXECUTION_TIMEOUT, self._handle_execution_event) - - # Create consumer with dispatcher - self._consumer = UnifiedConsumer( - consumer_config, - event_dispatcher=self._dispatcher, - schema_registry=self.schema_registry_manager, - settings=self.settings, - logger=self.logger, - event_metrics=self._event_metrics, - ) - - # Start consumer - await self._consumer.start([execution_results_topic]) - - # Start consumer task - self._consumer_task = asyncio.create_task(self._run_consumer()) - self._tasks.add(self._consumer_task) - self._consumer_task.add_done_callback(self._tasks.discard) - - self.logger.info("Notification service subscribed to execution events") - async def create_notification( self, user_id: str, @@ -248,6 +134,13 @@ async def create_notification( ) -> DomainNotification: if not tags: raise NotificationValidationError("tags must be a non-empty list") + if scheduled_for is not None: + max_days = self.settings.NOTIF_MAX_SCHEDULE_DAYS + max_schedule = datetime.now(UTC) + timedelta(days=max_days) + if scheduled_for > max_schedule: + raise NotificationValidationError( + f"scheduled_for cannot exceed {max_days} days from now" + ) self.logger.info( f"Creating notification for user {user_id}", extra={ @@ -294,7 +187,10 @@ async def create_notification( # Save to database notification = await self.repository.create_notification(create_data) - await self._deliver_notification(notification) + # Deliver immediately if not scheduled; scheduled notifications are + # picked up by the NotificationScheduler worker. + if scheduled_for is None: + await self._deliver_notification(notification) return notification @@ -514,59 +410,6 @@ def _get_slack_color(self, priority: NotificationSeverity) -> str: NotificationSeverity.URGENT: "#990000", # Dark Red }.get(priority, "#808080") # Default gray - async def _process_pending_notifications(self) -> None: - """Process pending notifications in background.""" - while self.is_running: - try: - # Find pending notifications - notifications = await self.repository.find_pending_notifications( - batch_size=self.settings.NOTIF_PENDING_BATCH_SIZE - ) - - # Process each notification - for notification in notifications: - if not self.is_running: - break - await self._deliver_notification(notification) - - # Sleep between batches - await asyncio.sleep(5) - - except Exception as e: - self.logger.error(f"Error processing pending notifications: {e}") - await asyncio.sleep(10) - - async def _cleanup_old_notifications(self) -> None: - """Cleanup old notifications periodically.""" - while self.is_running: - try: - # Run cleanup once per day - await asyncio.sleep(86400) # 24 hours - - if not self.is_running: - break - - # Delete old notifications - deleted_count = await self.repository.cleanup_old_notifications(self.settings.NOTIF_OLD_DAYS) - - self.logger.info(f"Cleaned up {deleted_count} old notifications") - - except Exception as e: - self.logger.error(f"Error cleaning up old notifications: {e}") - - async def _run_consumer(self) -> None: - """Run the event consumer loop.""" - while self.is_running: - try: - # Consumer handles polling internally - await asyncio.sleep(1) - except asyncio.CancelledError: - self.logger.info("Notification consumer task cancelled") - break - except Exception as e: - self.logger.error(f"Error in notification consumer loop: {e}") - await asyncio.sleep(5) - async def _handle_execution_timeout_typed(self, event: ExecutionTimeoutEvent) -> None: """Handle typed execution timeout event.""" user_id = event.metadata.user_id @@ -785,12 +628,14 @@ async def _should_skip_notification( return None - async def _deliver_notification(self, notification: DomainNotification) -> None: - """Deliver notification through configured channel using safe state transitions.""" - # Attempt to claim this notification for sending + async def _deliver_notification(self, notification: DomainNotification) -> bool: + """Deliver notification through configured channel with inline retry. + + Returns True only when the notification reaches DELIVERED status. + """ claimed = await self.repository.try_claim_pending(notification.notification_id) if not claimed: - return + return False self.logger.info( f"Delivering notification {notification.notification_id}", @@ -803,10 +648,8 @@ async def _deliver_notification(self, notification: DomainNotification) -> None: }, ) - # Check user subscription for the channel subscription = await self.repository.get_subscription(notification.user_id, notification.channel) - # Check if notification should be skipped skip_reason = await self._should_skip_notification(notification, subscription) if skip_reason: self.logger.info(skip_reason) @@ -815,91 +658,72 @@ async def _deliver_notification(self, notification: DomainNotification) -> None: notification.user_id, DomainNotificationUpdate(status=NotificationStatus.SKIPPED, error_message=skip_reason), ) - return - - # Send through channel - start_time = asyncio.get_running_loop().time() - try: - handler = self._channel_handlers.get(notification.channel) - if handler is None: - raise ValueError( - f"No handler configured for notification channel: {notification.channel}. " - f"Available channels: {list(self._channel_handlers.keys())}" - ) - - self.logger.debug(f"Using handler {handler.__name__} for channel {notification.channel}") - await handler(notification, subscription) - delivery_time = asyncio.get_running_loop().time() - start_time + return False - # Mark delivered + handler = self._channel_handlers.get(notification.channel) + if handler is None: await self.repository.update_notification( notification.notification_id, notification.user_id, - DomainNotificationUpdate(status=NotificationStatus.DELIVERED, delivered_at=datetime.now(UTC)), - ) - - self.logger.info( - f"Successfully delivered notification {notification.notification_id}", - extra={ - "notification_id": str(notification.notification_id), - "channel": notification.channel, - "delivery_time_ms": int(delivery_time * 1000), - }, + DomainNotificationUpdate( + status=NotificationStatus.FAILED, + failed_at=datetime.now(UTC), + error_message=f"No handler for channel: {notification.channel}", + ), ) + return False - # Metrics (use tag string or severity) - self.metrics.record_notification_sent( - notification.severity, channel=notification.channel, severity=notification.severity - ) - self.metrics.record_notification_delivery_time(delivery_time, notification.severity) - - except Exception as e: - error_details = { - "notification_id": str(notification.notification_id), - "channel": notification.channel, - "error_type": type(e).__name__, - "error_message": str(e), - "retry_count": notification.retry_count, - "max_retries": notification.max_retries, - } - - self.logger.error( - f"Failed to deliver notification {notification.notification_id}: {str(e)}", - extra=error_details, - exc_info=True, - ) + last_error: Exception | None = None + start_time = asyncio.get_running_loop().time() - new_retry_count = notification.retry_count + 1 - error_message = f"Delivery failed via {notification.channel}: {str(e)}" - failed_at = datetime.now(UTC) + for attempt in range(notification.max_retries): + try: + await handler(notification, subscription) - # Schedule retry if under limit - if new_retry_count < notification.max_retries: - retry_time = datetime.now(UTC) + timedelta(minutes=self.settings.NOTIF_RETRY_DELAY_MINUTES) - self.logger.info( - f"Scheduled retry {new_retry_count}/{notification.max_retries} for {notification.notification_id}", - extra={"retry_at": retry_time.isoformat()}, - ) - # Will be retried - keep as PENDING but with scheduled_for - # Note: scheduled_for not in DomainNotificationUpdate, so we update status fields only + delivery_time = asyncio.get_running_loop().time() - start_time await self.repository.update_notification( notification.notification_id, notification.user_id, - DomainNotificationUpdate( - status=NotificationStatus.PENDING, - failed_at=failed_at, - error_message=error_message, - retry_count=new_retry_count, - ), + DomainNotificationUpdate(status=NotificationStatus.DELIVERED, delivered_at=datetime.now(UTC)), ) - else: - await self.repository.update_notification( - notification.notification_id, - notification.user_id, - DomainNotificationUpdate( - status=NotificationStatus.FAILED, - failed_at=failed_at, - error_message=error_message, - retry_count=new_retry_count, - ), + self.logger.info( + f"Delivered notification {notification.notification_id}", + extra={ + "notification_id": str(notification.notification_id), + "channel": notification.channel, + "delivery_time_ms": int(delivery_time * 1000), + "attempt": attempt + 1, + }, ) + self.metrics.record_notification_sent( + notification.severity, channel=notification.channel, severity=notification.severity + ) + self.metrics.record_notification_delivery_time( + delivery_time, notification.severity, channel=notification.channel + ) + return True + + except Exception as e: + last_error = e + self.logger.warning( + f"Delivery attempt {attempt + 1}/{notification.max_retries} failed " + f"for {notification.notification_id}: {e}", + ) + if attempt + 1 < notification.max_retries: + await asyncio.sleep(min(2 ** attempt, 30)) + + await self.repository.update_notification( + notification.notification_id, + notification.user_id, + DomainNotificationUpdate( + status=NotificationStatus.FAILED, + failed_at=datetime.now(UTC), + error_message=f"Delivery failed via {notification.channel}: {last_error}", + retry_count=notification.max_retries, + ), + ) + self.logger.error( + f"All delivery attempts exhausted for {notification.notification_id}: {last_error}", + exc_info=last_error, + ) + return False diff --git a/backend/app/services/result_processor/processor.py b/backend/app/services/result_processor/processor.py index 0f149c44..85ef1967 100644 --- a/backend/app/services/result_processor/processor.py +++ b/backend/app/services/result_processor/processor.py @@ -65,15 +65,7 @@ async def handle_execution_completed(self, event: DomainEvent) -> None: memory_percent, attributes={"lang_and_version": lang_and_version} ) - result = ExecutionResultDomain( - execution_id=event.execution_id, - status=ExecutionStatus.COMPLETED, - exit_code=event.exit_code, - stdout=event.stdout, - stderr=event.stderr, - resource_usage=event.resource_usage, - metadata=event.metadata, - ) + result = ExecutionResultDomain(**event.model_dump(), status=ExecutionStatus.COMPLETED) try: await self._execution_repo.write_terminal_result(result) @@ -91,20 +83,11 @@ async def handle_execution_failed(self, event: DomainEvent) -> None: if exec_obj is None: raise ExecutionNotFoundError(event.execution_id) - self._metrics.record_error(str(event.error_type) if event.error_type else "unknown") + self._metrics.record_error(event.error_type) lang_and_version = f"{exec_obj.lang}-{exec_obj.lang_version}" self._metrics.record_script_execution(ExecutionStatus.FAILED, lang_and_version) - result = ExecutionResultDomain( - execution_id=event.execution_id, - status=ExecutionStatus.FAILED, - exit_code=event.exit_code or -1, - stdout=event.stdout, - stderr=event.stderr, - resource_usage=event.resource_usage, - metadata=event.metadata, - error_type=event.error_type, - ) + result = ExecutionResultDomain(**event.model_dump(), status=ExecutionStatus.FAILED) try: await self._execution_repo.write_terminal_result(result) await self._publish_result_stored(result) @@ -128,14 +111,7 @@ async def handle_execution_timeout(self, event: DomainEvent) -> None: self._metrics.record_execution_duration(event.timeout_seconds, lang_and_version) result = ExecutionResultDomain( - execution_id=event.execution_id, - status=ExecutionStatus.TIMEOUT, - exit_code=-1, - stdout=event.stdout, - stderr=event.stderr, - resource_usage=event.resource_usage, - metadata=event.metadata, - error_type=ExecutionErrorType.TIMEOUT, + **event.model_dump(), status=ExecutionStatus.TIMEOUT, exit_code=-1, error_type=ExecutionErrorType.TIMEOUT, ) try: await self._execution_repo.write_terminal_result(result) diff --git a/backend/app/settings.py b/backend/app/settings.py index d487f6da..4de6729c 100644 --- a/backend/app/settings.py +++ b/backend/app/settings.py @@ -115,9 +115,7 @@ def __init__( # Notification configuration NOTIF_THROTTLE_WINDOW_HOURS: int = 1 NOTIF_THROTTLE_MAX_PER_HOUR: int = 5 - NOTIF_PENDING_BATCH_SIZE: int = 10 - NOTIF_OLD_DAYS: int = 30 - NOTIF_RETRY_DELAY_MINUTES: int = 5 + NOTIF_MAX_SCHEDULE_DAYS: int = 25 # Max days ahead a notification can be scheduled (must be < TTL) # Schema Configuration SCHEMA_BASE_PATH: str = "app/schemas_avro" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index d23a3c1b..f0396c26 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -124,6 +124,7 @@ dependencies = [ "zipp==3.20.2", "monggregate==0.22.1", "aiofiles==25.1.0", + "APScheduler==3.10.4", ] [build-system] diff --git a/backend/tests/unit/services/result_processor/test_processor.py b/backend/tests/unit/services/result_processor/test_processor.py index 3ea2cfa5..7199bd30 100644 --- a/backend/tests/unit/services/result_processor/test_processor.py +++ b/backend/tests/unit/services/result_processor/test_processor.py @@ -4,6 +4,7 @@ import pytest from app.core.metrics import ExecutionMetrics from app.domain.enums.execution import ExecutionStatus +from app.domain.enums.storage import ExecutionErrorType from app.domain.events.typed import ( EventMetadata, ExecutionCompletedEvent, @@ -43,7 +44,7 @@ class TestHandlerTypeGuards: async def test_completed_rejects_wrong_type(self, execution_metrics: ExecutionMetrics) -> None: processor = _make_processor(execution_metrics) - wrong = ExecutionFailedEvent(execution_id="e1", exit_code=1, metadata=_METADATA) + wrong = ExecutionFailedEvent(execution_id="e1", exit_code=1, error_type=ExecutionErrorType.SCRIPT_ERROR, metadata=_METADATA) with pytest.raises(TypeError, match="Expected ExecutionCompletedEvent"): await processor.handle_execution_completed(wrong) @@ -104,7 +105,8 @@ async def test_stores_result_and_publishes(self, execution_metrics: ExecutionMet processor = _make_processor(execution_metrics, exec_repo=repo, producer=producer) event = ExecutionFailedEvent( - execution_id="e2", exit_code=1, stderr="error", metadata=_METADATA + execution_id="e2", exit_code=1, error_type=ExecutionErrorType.SCRIPT_ERROR, stderr="error", + metadata=_METADATA, ) await processor.handle_execution_failed(event) diff --git a/backend/uv.lock b/backend/uv.lock index 750c082e..2a9a72dc 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -194,6 +194,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a1/ee/48ca1a7c89ffec8b6a0c5d02b89c305671d5ffd8d3c94acf8b8c408575bb/anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c", size = 100916, upload-time = "2025-03-17T00:02:52.713Z" }, ] +[[package]] +name = "apscheduler" +version = "3.10.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytz" }, + { name = "six" }, + { name = "tzlocal" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5e/34/5dcb368cf89f93132d9a31bd3747962a9dc874480e54333b0c09fa7d56ac/APScheduler-3.10.4.tar.gz", hash = "sha256:e6df071b27d9be898e486bc7940a7be50b4af2e9da7c08f0744a96d4bd4cef4a", size = 100832, upload-time = "2023-08-19T16:44:58.293Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/13/b5/7af0cb920a476dccd612fbc9a21a3745fb29b1fcd74636078db8f7ba294c/APScheduler-3.10.4-py3-none-any.whl", hash = "sha256:fb91e8a768632a4756a585f79ec834e0e27aad5860bac7eaa523d9ccefd87661", size = 59303, upload-time = "2023-08-19T16:44:56.814Z" }, +] + [[package]] name = "asgiref" version = "3.11.0" @@ -1043,6 +1057,7 @@ dependencies = [ { name = "annotated-doc" }, { name = "annotated-types" }, { name = "anyio" }, + { name = "apscheduler" }, { name = "asgiref" }, { name = "async-timeout" }, { name = "attrs" }, @@ -1187,6 +1202,7 @@ requires-dist = [ { name = "annotated-doc", specifier = "==0.0.4" }, { name = "annotated-types", specifier = "==0.7.0" }, { name = "anyio", specifier = "==4.9.0" }, + { name = "apscheduler", specifier = "==3.10.4" }, { name = "asgiref", specifier = "==3.11.0" }, { name = "async-timeout", specifier = "==5.0.1" }, { name = "attrs", specifier = "==25.3.0" }, @@ -2670,6 +2686,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/41/c1/abd18fc3c23dbe09321fcd812091320d4dc954046f95cb431ef2926cb11c/python_schema_registry_client-2.6.1-py3-none-any.whl", hash = "sha256:05950ca8f9a3409247514bef3fdb421839d6e1ae544b32dfd3b7b16237673303", size = 23095, upload-time = "2025-04-04T15:07:49.592Z" }, ] +[[package]] +name = "pytz" +version = "2025.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f8/bf/abbd3cdfb8fbc7fb3d4d38d320f2441b1e7cbe29be4f23797b4a2b5d8aac/pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3", size = 320884, upload-time = "2025-03-25T02:25:00.538Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225, upload-time = "2025-03-25T02:24:58.468Z" }, +] + [[package]] name = "pyyaml" version = "6.0.2" @@ -3089,6 +3114,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/26/9f/ad63fc0248c5379346306f8668cda6e2e2e9c95e01216d2b8ffd9ff037d0/typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d", size = 37438, upload-time = "2024-06-07T18:52:13.582Z" }, ] +[[package]] +name = "tzdata" +version = "2025.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/5e/a7/c202b344c5ca7daf398f3b8a477eeb205cf3b6f32e7ec3a6bac0629ca975/tzdata-2025.3.tar.gz", hash = "sha256:de39c2ca5dc7b0344f2eba86f49d614019d29f060fc4ebc8a417896a620b56a7", size = 196772, upload-time = "2025-12-13T17:45:35.667Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" }, +] + +[[package]] +name = "tzlocal" +version = "5.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzdata", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8b/2e/c14812d3d4d9cd1773c6be938f89e5735a1f11a9f184ac3639b93cef35d5/tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd", size = 30761, upload-time = "2025-03-05T21:17:41.549Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c2/14/e2a54fabd4f08cd7af1c07030603c3356b74da07f7cc056e600436edfa17/tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d", size = 18026, upload-time = "2025-03-05T21:17:39.857Z" }, +] + [[package]] name = "urllib3" version = "2.6.3"