Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .github/actions/e2e-ready/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ runs:
/home/runner/.kube/config > backend/kubeconfig.yaml
chmod 644 backend/kubeconfig.yaml

- name: Start cert-generator (background)
shell: bash
env:
IMAGE_TAG: ${{ inputs.image-tag }}
run: |
nohup docker compose up -d --no-build cert-generator \
> /tmp/cert-gen.log 2>&1 &

- name: Use test environment config
shell: bash
run: |
Expand All @@ -51,7 +43,6 @@ runs:
fi
fi
cat /tmp/infra-pull.log 2>/dev/null || true
cat /tmp/cert-gen.log 2>/dev/null || true
if [ -f /tmp/infra-pull.exit ]; then
EXIT_CODE=$(cat /tmp/infra-pull.exit)
if [ "$EXIT_CODE" != "0" ]; then
Expand Down
2 changes: 2 additions & 0 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,5 @@ def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
EventProvider(),
context={Settings: settings},
)


17 changes: 8 additions & 9 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from app.db.docs import ALL_DOCUMENTS
from app.events.event_store_consumer import EventStoreConsumer
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
from app.services.notification_scheduler import NotificationScheduler
from app.services.notification_service import NotificationService
from app.settings import Settings

Expand Down Expand Up @@ -76,21 +77,23 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
)

# Phase 1: Resolve all DI dependencies in parallel
# SSE bus + consumers start automatically via NotificationService's dependency on SSERedisBus
# Consumers and the notification scheduler (APScheduler) start automatically via their DI providers
(
schema_registry,
database,
redis_client,
rate_limit_metrics,
event_store_consumer,
notification_service,
_notification_service,
_notification_scheduler,
) = await asyncio.gather(
container.get(SchemaRegistryManager),
container.get(Database),
container.get(redis.Redis),
container.get(RateLimitMetrics),
container.get(EventStoreConsumer),
container.get(NotificationService),
container.get(NotificationScheduler),
)

# Phase 2: Initialize infrastructure in parallel (independent subsystems)
Expand All @@ -102,13 +105,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")

# Phase 3: Start lifecycle-managed services
# SSE bridge consumers are started by the DI provider — no lifecycle to manage here
# EventStoreConsumer requires explicit __aenter__; all other services are managed by DI providers
async with AsyncExitStack() as stack:
stack.push_async_callback(event_store_consumer.aclose)
stack.push_async_callback(notification_service.aclose)
await asyncio.gather(
event_store_consumer.__aenter__(),
notification_service.__aenter__(),
)
logger.info("EventStoreConsumer and NotificationService started")
await event_store_consumer.__aenter__()
logger.info("EventStoreConsumer started")
yield
70 changes: 68 additions & 2 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from app.services.idempotency.redis_repository import RedisIdempotencyRepository
from app.services.k8s_worker import KubernetesWorker
from app.services.kafka_event_service import KafkaEventService
from app.services.notification_scheduler import NotificationScheduler
from app.services.notification_service import NotificationService
from app.services.pod_monitor.config import PodMonitorConfig
from app.services.pod_monitor.event_mapper import PodEventMapper
Expand Down Expand Up @@ -552,15 +553,78 @@ async def get_notification_service(
service = NotificationService(
notification_repository=notification_repository,
event_service=kafka_event_service,
schema_registry_manager=schema_registry,
sse_bus=sse_redis_bus,
settings=settings,
logger=logger,
notification_metrics=notification_metrics,
)

dispatcher = EventDispatcher(logger=logger)
dispatcher.register_handler(EventType.EXECUTION_COMPLETED, service._handle_execution_event)
dispatcher.register_handler(EventType.EXECUTION_FAILED, service._handle_execution_event)
dispatcher.register_handler(EventType.EXECUTION_TIMEOUT, service._handle_execution_event)

consumer_config = ConsumerConfig(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=GroupId.NOTIFICATION_SERVICE,
max_poll_records=10,
enable_auto_commit=False,
auto_offset_reset="latest",
session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS,
heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS,
max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS,
request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS,
)
consumer = UnifiedConsumer(
consumer_config,
event_dispatcher=dispatcher,
schema_registry=schema_registry,
settings=settings,
logger=logger,
event_metrics=event_metrics,
)
async with service:
await consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.NOTIFICATION_SERVICE]))

logger.info("NotificationService started")

try:
yield service
finally:
await consumer.stop()
logger.info("NotificationService stopped")

@provide
async def get_notification_scheduler(
self,
notification_repository: NotificationRepository,
notification_service: NotificationService,
logger: logging.Logger,
) -> AsyncIterator[NotificationScheduler]:
from apscheduler.schedulers.asyncio import AsyncIOScheduler

scheduler_service = NotificationScheduler(
notification_repository=notification_repository,
notification_service=notification_service,
logger=logger,
)

apscheduler = AsyncIOScheduler()
apscheduler.add_job(
scheduler_service.process_due_notifications,
trigger="interval",
seconds=15,
id="process_due_notifications",
max_instances=1,
misfire_grace_time=60,
)
apscheduler.start()
logger.info("NotificationScheduler started (APScheduler interval=15s)")

try:
yield scheduler_service
finally:
apscheduler.shutdown(wait=False)
logger.info("NotificationScheduler stopped")

@provide
def get_grafana_alert_processor(
Expand Down Expand Up @@ -988,3 +1052,5 @@ def get_event_replay_service(
replay_metrics=replay_metrics,
logger=logger,
)


1 change: 1 addition & 0 deletions backend/app/db/docs/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Settings:
indexes = [
IndexModel([("user_id", ASCENDING), ("created_at", DESCENDING)], name="idx_notif_user_created_desc"),
IndexModel([("status", ASCENDING), ("scheduled_for", ASCENDING)], name="idx_notif_status_sched"),
IndexModel([("created_at", ASCENDING)], expireAfterSeconds=30 * 86400, name="idx_notif_ttl"),
]


Expand Down
52 changes: 16 additions & 36 deletions backend/app/db/repositories/notification_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import UTC, datetime, timedelta

from beanie.odm.enums import SortDirection
from beanie.operators import GTE, LT, LTE, ElemMatch, In, NotIn, Or
from beanie.operators import GTE, LTE, ElemMatch, In, NotIn, Or

from app.db.docs import NotificationDocument, NotificationSubscriptionDocument, UserDocument
from app.domain.enums.notification import NotificationChannel, NotificationStatus
Expand Down Expand Up @@ -126,6 +126,21 @@ async def get_unread_count(self, user_id: str) -> int:
In(NotificationDocument.status, [NotificationStatus.DELIVERED]),
).count()

async def find_due_notifications(self, limit: int = 50) -> list[DomainNotification]:
"""Find PENDING notifications whose scheduled_for time has passed."""
now = datetime.now(UTC)
docs = (
await NotificationDocument.find(
NotificationDocument.status == NotificationStatus.PENDING,
NotificationDocument.scheduled_for != None, # noqa: E711
LTE(NotificationDocument.scheduled_for, now),
)
.sort([("scheduled_for", SortDirection.ASCENDING)])
.limit(limit)
.to_list()
)
return [DomainNotification.model_validate(doc, from_attributes=True) for doc in docs]

async def try_claim_pending(self, notification_id: str) -> bool:
now = datetime.now(UTC)
doc = await NotificationDocument.find_one(
Expand All @@ -141,41 +156,6 @@ async def try_claim_pending(self, notification_id: str) -> bool:
await doc.set({"status": NotificationStatus.SENDING, "sent_at": now})
return True

async def find_pending_notifications(self, batch_size: int = 10) -> list[DomainNotification]:
now = datetime.now(UTC)
docs = (
await NotificationDocument.find(
NotificationDocument.status == NotificationStatus.PENDING,
Or(
NotificationDocument.scheduled_for == None, # noqa: E711
LTE(NotificationDocument.scheduled_for, now),
),
)
.limit(batch_size)
.to_list()
)
return [DomainNotification.model_validate(doc, from_attributes=True) for doc in docs]

async def find_scheduled_notifications(self, batch_size: int = 10) -> list[DomainNotification]:
now = datetime.now(UTC)
docs = (
await NotificationDocument.find(
NotificationDocument.status == NotificationStatus.PENDING,
LTE(NotificationDocument.scheduled_for, now),
NotificationDocument.scheduled_for != None, # noqa: E711
)
.limit(batch_size)
.to_list()
)
return [DomainNotification.model_validate(doc, from_attributes=True) for doc in docs]

async def cleanup_old_notifications(self, days: int = 30) -> int:
cutoff = datetime.now(UTC) - timedelta(days=days)
result = await NotificationDocument.find(
LT(NotificationDocument.created_at, cutoff),
).delete()
return result.deleted_count if result else 0

# Subscriptions
async def get_subscription(
self, user_id: str, channel: NotificationChannel
Expand Down
2 changes: 1 addition & 1 deletion backend/app/domain/enums/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class GroupId(StringEnum):
},
GroupId.NOTIFICATION_SERVICE: {
KafkaTopic.NOTIFICATION_EVENTS,
KafkaTopic.EXECUTION_RESULTS,
KafkaTopic.EXECUTION_EVENTS,
},
GroupId.DLQ_PROCESSOR: {
KafkaTopic.DEAD_LETTER_QUEUE,
Expand Down
2 changes: 1 addition & 1 deletion backend/app/domain/events/typed.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class ExecutionFailedEvent(BaseEvent):
event_type: Literal[EventType.EXECUTION_FAILED] = EventType.EXECUTION_FAILED
execution_id: str
exit_code: int
error_type: ExecutionErrorType | None = None
error_type: ExecutionErrorType
error_message: str = ""
resource_usage: ResourceUsageDomain | None = None
stdout: str = ""
Expand Down
2 changes: 1 addition & 1 deletion backend/app/domain/execution/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DomainExecution(BaseModel):


class ExecutionResultDomain(BaseModel):
model_config = ConfigDict(from_attributes=True)
model_config = ConfigDict(from_attributes=True, extra="ignore")

execution_id: str
status: ExecutionStatus
Expand Down
51 changes: 51 additions & 0 deletions backend/app/services/notification_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging

from app.db.repositories.notification_repository import NotificationRepository
from app.services.notification_service import NotificationService


class NotificationScheduler:
"""Stateless scheduler service that processes due notifications.

APScheduler manages the timer (interval trigger) in the DI provider.
This class contains only the business logic for finding and delivering
due notifications — no loops, no lifecycle, no state.
"""

def __init__(
self,
notification_repository: NotificationRepository,
notification_service: NotificationService,
logger: logging.Logger,
) -> None:
self.repository = notification_repository
self.service = notification_service
self.logger = logger

async def process_due_notifications(self, batch_size: int = 50) -> int:
"""Find and deliver all notifications whose scheduled_for <= now.

Called by APScheduler on a fixed interval. Each invocation is
a single-shot batch: query DB for due items, deliver each, return.

Returns the number of notifications successfully delivered.
"""
due = await self.repository.find_due_notifications(limit=batch_size)
if not due:
return 0

self.logger.info(f"Found {len(due)} due scheduled notifications")

delivered = 0
for notification in due:
try:
if await self.service._deliver_notification(notification):
delivered += 1
except Exception as e:
self.logger.error(
f"Failed to deliver scheduled notification {notification.notification_id}: {e}",
exc_info=True,
)

self.logger.info(f"Delivered {delivered}/{len(due)} scheduled notifications")
return delivered
Loading
Loading