Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dishka import AsyncContainer, make_async_container
from dishka.integrations.fastapi import FastapiProvider
from faststream.kafka import KafkaBroker

from app.core.providers import (
AdminServicesProvider,
Expand All @@ -8,8 +9,11 @@
CoordinatorProvider,
CoreServicesProvider,
DatabaseProvider,
DLQProvider,
DLQWorkerProvider,
EventProvider,
EventReplayProvider,
EventReplayWorkerProvider,
K8sWorkerProvider,
KafkaServicesProvider,
KubernetesProvider,
Expand All @@ -22,19 +26,21 @@
ResourceCleanerProvider,
ResultProcessorProvider,
SagaOrchestratorProvider,
SagaWorkerProvider,
SettingsProvider,
SSEProvider,
UserServicesProvider,
)
from app.settings import Settings


def create_app_container(settings: Settings) -> AsyncContainer:
def create_app_container(settings: Settings, broker: KafkaBroker) -> AsyncContainer:
"""
Create the application DI container.

Args:
settings: Application settings (injected via from_context).
broker: KafkaBroker instance (injected via from_context for MessagingProvider).
"""
return make_async_container(
SettingsProvider(),
Expand All @@ -45,6 +51,7 @@ def create_app_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
EventProvider(),
SagaOrchestratorProvider(),
KafkaServicesProvider(),
Expand All @@ -58,17 +65,12 @@ def create_app_container(settings: Settings) -> AsyncContainer:
KubernetesProvider(),
ResourceCleanerProvider(),
FastapiProvider(),
context={Settings: settings},
context={Settings: settings, KafkaBroker: broker},
)


def create_result_processor_container(settings: Settings) -> AsyncContainer:
"""
Create a minimal DI container for the ResultProcessor worker.

Args:
settings: Application settings (injected via from_context).
"""
def create_result_processor_container(settings: Settings, broker: KafkaBroker) -> AsyncContainer:
"""Create a minimal DI container for the ResultProcessor worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
Expand All @@ -79,12 +81,13 @@ def create_result_processor_container(settings: Settings) -> AsyncContainer:
RepositoryProvider(),
EventProvider(),
MessagingProvider(),
DLQProvider(),
ResultProcessorProvider(),
context={Settings: settings},
context={Settings: settings, KafkaBroker: broker},
)


def create_coordinator_container(settings: Settings) -> AsyncContainer:
def create_coordinator_container(settings: Settings, broker: KafkaBroker) -> AsyncContainer:
"""Create DI container for the ExecutionCoordinator worker."""
return make_async_container(
SettingsProvider(),
Expand All @@ -95,13 +98,14 @@ def create_coordinator_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
EventProvider(),
CoordinatorProvider(),
context={Settings: settings},
context={Settings: settings, KafkaBroker: broker},
)


def create_k8s_worker_container(settings: Settings) -> AsyncContainer:
def create_k8s_worker_container(settings: Settings, broker: KafkaBroker) -> AsyncContainer:
"""Create DI container for the KubernetesWorker."""
return make_async_container(
SettingsProvider(),
Expand All @@ -112,14 +116,15 @@ def create_k8s_worker_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
EventProvider(),
KubernetesProvider(),
K8sWorkerProvider(),
context={Settings: settings},
context={Settings: settings, KafkaBroker: broker},
)


def create_pod_monitor_container(settings: Settings) -> AsyncContainer:
def create_pod_monitor_container(settings: Settings, broker: KafkaBroker) -> AsyncContainer:
"""Create DI container for the PodMonitor worker."""
return make_async_container(
SettingsProvider(),
Expand All @@ -130,16 +135,20 @@ def create_pod_monitor_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
EventProvider(),
KafkaServicesProvider(),
KubernetesProvider(),
PodMonitorProvider(),
context={Settings: settings},
context={Settings: settings, KafkaBroker: broker},
)


def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the SagaOrchestrator worker."""
def create_saga_orchestrator_container(settings: Settings, broker: KafkaBroker) -> AsyncContainer:
"""Create DI container for the SagaOrchestrator worker.

Uses SagaWorkerProvider which adds APScheduler-managed timeout checking.
"""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
Expand All @@ -149,14 +158,18 @@ def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
EventProvider(),
SagaOrchestratorProvider(),
context={Settings: settings},
SagaWorkerProvider(),
context={Settings: settings, KafkaBroker: broker},
)


def create_event_replay_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the EventReplay worker."""
def create_event_replay_container(settings: Settings, broker: KafkaBroker) -> AsyncContainer:
"""Create DI container for the EventReplay worker.

Uses EventReplayWorkerProvider which adds APScheduler-managed session cleanup.
"""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
Expand All @@ -166,14 +179,19 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQProvider(),
EventProvider(),
EventReplayProvider(),
context={Settings: settings},
EventReplayWorkerProvider(),
context={Settings: settings, KafkaBroker: broker},
)


def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the DLQ processor worker."""
def create_dlq_processor_container(settings: Settings, broker: KafkaBroker) -> AsyncContainer:
"""Create DI container for the DLQ processor worker.

Uses DLQWorkerProvider which adds APScheduler-managed retry monitoring
and configures retry policies and filters.
"""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
Expand All @@ -183,8 +201,7 @@ def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
DLQWorkerProvider(),
EventProvider(),
context={Settings: settings},
context={Settings: settings, KafkaBroker: broker},
)


64 changes: 19 additions & 45 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
from __future__ import annotations

import asyncio
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator

import redis.asyncio as redis
from beanie import init_beanie
from dishka import AsyncContainer
from fastapi import FastAPI
from faststream.kafka import KafkaBroker

from app.core.database_context import Database
from app.core.metrics import RateLimitMetrics
from app.core.startup import initialize_rate_limits
from app.core.tracing import init_tracing
from app.db.docs import ALL_DOCUMENTS
from app.events.event_store import EventStore
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
from app.services.notification_scheduler import NotificationScheduler
from app.services.notification_service import NotificationService
from app.settings import Settings


Expand All @@ -27,13 +18,14 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan with dishka dependency injection.

This is much cleaner than the old lifespan.py:
- No dependency_overrides
- No manual service management
- Dishka handles all lifecycle automatically
Infrastructure init (Beanie, schemas, rate limits) is handled inside
DI providers. Resolving NotificationScheduler cascades through the
dependency graph and triggers all required initialisation.

Kafka broker lifecycle is managed here (start/stop).
"""
# Get settings and logger from DI container (uses test settings in tests)
container: AsyncContainer = app.state.dishka_container
broker: KafkaBroker = app.state.kafka_broker
settings = await container.get(Settings)
logger = await container.get(logging.Logger)

Expand All @@ -45,9 +37,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
},
)

# Metrics setup moved to app creation to allow middleware registration
logger.info("Lifespan start: tracing and services initialization")

# Initialize tracing only when enabled (avoid exporter retries in tests)
if settings.ENABLE_TRACING and not settings.TESTING:
instrumentation_report = init_tracing(
Expand Down Expand Up @@ -76,32 +65,17 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
)

# Phase 1: Resolve all DI dependencies in parallel
# Consumers and the notification scheduler (APScheduler) start automatically via their DI providers
(
schema_registry,
database,
redis_client,
rate_limit_metrics,
_event_store,
_notification_service,
_notification_scheduler,
) = await asyncio.gather(
container.get(SchemaRegistryManager),
container.get(Database),
container.get(redis.Redis),
container.get(RateLimitMetrics),
container.get(EventStore),
container.get(NotificationService),
container.get(NotificationScheduler),
)
# Resolve NotificationScheduler — cascades init_beanie, schema registration,
# and starts APScheduler via the DI provider graph.
await container.get(NotificationScheduler)
logger.info("Infrastructure initialized via DI providers")

# Phase 2: Initialize infrastructure in parallel (independent subsystems)
await asyncio.gather(
initialize_event_schemas(schema_registry),
init_beanie(database=database, document_models=ALL_DOCUMENTS),
initialize_rate_limits(redis_client, settings, logger, rate_limit_metrics),
)
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")
# Start Kafka broker (subscribers begin consuming)
await broker.start()
logger.info("Kafka broker started — consumers active")

yield
try:
yield
finally:
await broker.stop()
logger.info("Kafka broker stopped")
Loading
Loading