From 2186352e438b24df0777d6575f7d0e9619ff1af7 Mon Sep 17 00:00:00 2001 From: user040131 Date: Thu, 9 Apr 2026 23:39:17 +0900 Subject: [PATCH] feat: implement backend ingestion, analysis, and worker pipeline --- scripts/seed_reference_data.py | 165 +++++++ services/api/app/__init__.py | 1 + services/api/app/api/routes/dashboard.py | 53 ++ services/api/app/api/routes/health.py | 15 + services/api/app/api/routes/jobs.py | 93 ++++ services/api/app/core/config.py | 47 ++ services/api/app/core/logging.py | 27 ++ services/api/app/db/base.py | 20 + services/api/app/db/session.py | 16 + services/api/app/main.py | 43 ++ services/api/app/models/entities.py | 244 ++++++++++ services/api/app/repositories/dashboard.py | 456 ++++++++++++++++++ services/api/app/schemas/api.py | 264 ++++++++++ .../api/app/services/agents/openai_client.py | 118 +++++ services/api/app/services/agents/router.py | 30 ++ .../api/app/services/evaluation/metrics.py | 38 ++ services/api/app/services/events/scoring.py | 133 +++++ .../api/app/services/ingestion/connectors.py | 173 +++++++ services/api/app/services/jobs/queue.py | 15 + .../app/services/korea/translation_engine.py | 174 +++++++ .../app/services/market/reaction_engine.py | 107 ++++ .../api/app/services/replay/weekly_replay.py | 453 +++++++++++++++++ services/api/app/workers/tasks.py | 366 ++++++++++++++ .../api/tests/fixtures/evidence_pack.json | 15 + services/api/tests/test_evaluation.py | 15 + services/api/tests/test_reaction_engine.py | 49 ++ services/api/tests/test_scoring.py | 37 ++ services/api/tests/test_translation_dedup.py | 48 ++ services/api/tests/test_weekly_replay.py | 16 + 29 files changed, 3231 insertions(+) create mode 100644 scripts/seed_reference_data.py create mode 100644 services/api/app/__init__.py create mode 100644 services/api/app/api/routes/dashboard.py create mode 100644 services/api/app/api/routes/health.py create mode 100644 services/api/app/api/routes/jobs.py create mode 100644 services/api/app/core/config.py create mode 100644 services/api/app/core/logging.py create mode 100644 services/api/app/db/base.py create mode 100644 services/api/app/db/session.py create mode 100644 services/api/app/main.py create mode 100644 services/api/app/models/entities.py create mode 100644 services/api/app/repositories/dashboard.py create mode 100644 services/api/app/schemas/api.py create mode 100644 services/api/app/services/agents/openai_client.py create mode 100644 services/api/app/services/agents/router.py create mode 100644 services/api/app/services/evaluation/metrics.py create mode 100644 services/api/app/services/events/scoring.py create mode 100644 services/api/app/services/ingestion/connectors.py create mode 100644 services/api/app/services/jobs/queue.py create mode 100644 services/api/app/services/korea/translation_engine.py create mode 100644 services/api/app/services/market/reaction_engine.py create mode 100644 services/api/app/services/replay/weekly_replay.py create mode 100644 services/api/app/workers/tasks.py create mode 100644 services/api/tests/fixtures/evidence_pack.json create mode 100644 services/api/tests/test_evaluation.py create mode 100644 services/api/tests/test_reaction_engine.py create mode 100644 services/api/tests/test_scoring.py create mode 100644 services/api/tests/test_translation_dedup.py create mode 100644 services/api/tests/test_weekly_replay.py diff --git a/scripts/seed_reference_data.py b/scripts/seed_reference_data.py new file mode 100644 index 0000000..63aad44 --- /dev/null +++ b/scripts/seed_reference_data.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +import asyncio +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +sys.path.append(str(ROOT / "services" / "api")) + +from sqlalchemy import delete, select # noqa: E402 + +from app.core.config import get_settings # noqa: E402 +from app.db.base import Base # noqa: E402 +from app.db.session import engine, SessionLocal # noqa: E402 +from app.models.entities import ( # noqa: E402 + EvaluationMetric, + ModelRoute, + PromptVersion, + SourceConfig, + ThemeMap, + KoreanSecurity, +) + + +SOURCE_CONFIGS = [ + { + "name": "Federal Reserve Press Releases", + "kind": "official_macro_policy", + "base_url": "https://www.federalreserve.gov/feeds/press_all.xml", + "schedule": "*/30 * * * 1-5", + "reliability_score": 0.98, + }, + { + "name": "U.S. Treasury Press Releases", + "kind": "official_macro_policy", + "base_url": "https://home.treasury.gov/news/press-releases", + "schedule": "*/30 * * * 1-5", + "reliability_score": 0.95, + }, + { + "name": "SEC Press Releases", + "kind": "corporate_filing", + "base_url": "https://www.sec.gov/news/pressreleases.rss", + "schedule": "*/30 * * * 1-5", + "reliability_score": 0.94, + }, + { + "name": "CNBC Top News", + "kind": "trusted_market_news", + "base_url": "https://www.cnbc.com/id/100003114/device/rss/rss.html", + "schedule": "*/30 * * * 1-5", + "reliability_score": 0.82, + }, + { + "name": "MarketWatch Top Stories", + "kind": "trusted_market_news", + "base_url": "https://feeds.marketwatch.com/marketwatch/topstories/", + "schedule": "*/30 * * * 1-5", + "reliability_score": 0.8, + }, + { + "name": "BEA News", + "kind": "macro_release", + "base_url": "https://www.bea.gov/rss/news.xml", + "schedule": "0 * * * 1-5", + "reliability_score": 0.9, + }, +] + +MODEL_ROUTES = [ + ("macro_analyst", "gpt-5.4-mini"), + ("sector_analyst", "gpt-5.4-mini"), + ("microstructure_analyst", "gpt-5.4-mini"), + ("skeptic_analyst", "gpt-5.4-mini"), + ("korea_translator", "gpt-5.4"), + ("final_judge", "gpt-5.4"), +] + +THEME_MAPS = [ + ("ai_semiconductor", "HBM/반도체 장비", 0.92, ["hbm", "semiconductor", "packaging"]), + ("power_infrastructure", "전력기기/변압기", 0.88, ["grid", "transformer", "power"]), + ("defense", "방산/항공우주", 0.83, ["defense", "aerospace", "missile"]), + ("energy_raw_materials", "원전/우라늄", 0.8, ["uranium", "nuclear", "energy"]), + ("logistics_supply_chain", "해운/물류 자동화", 0.74, ["shipping", "logistics", "port"]), +] + +KOREAN_SECURITIES = [ + ("000660", "SK하이닉스", "KOSPI", "반도체", ["hbm", "semiconductor", "ai"]), + ("042700", "한미반도체", "KOSPI", "반도체장비", ["packaging", "hbm", "semiconductor"]), + ("089890", "코세스", "KOSDAQ", "반도체장비", ["packaging", "ai", "semiconductor"]), + ("017370", "우신시스템", "KOSPI", "전력설비", ["grid", "power", "automation"]), + ("010120", "LS ELECTRIC", "KOSPI", "전력기기", ["transformer", "grid", "power"]), + ("267260", "HD현대일렉트릭", "KOSPI", "전력기기", ["transformer", "grid", "power"]), + ("012450", "한화에어로스페이스", "KOSPI", "방산", ["defense", "aerospace", "missile"]), + ("079550", "LIG넥스원", "KOSPI", "방산", ["defense", "aerospace", "missile"]), + ("034020", "두산에너빌리티", "KOSPI", "원전", ["nuclear", "energy", "uranium"]), + ("105840", "우진", "KOSPI", "원전", ["nuclear", "uranium", "energy"]), + ("028670", "팬오션", "KOSPI", "해운", ["shipping", "logistics", "bulk"]), + ("124560", "태웅로직스", "KOSDAQ", "물류", ["logistics", "shipping", "port"]), +] + + +async def main() -> None: + settings = get_settings() + async with engine.begin() as connection: + await connection.run_sync(Base.metadata.create_all) + async with SessionLocal() as session: + for model in (SourceConfig, ModelRoute, PromptVersion, ThemeMap, KoreanSecurity, EvaluationMetric): + await session.execute(delete(model)) + + session.add_all(SourceConfig(**source) for source in SOURCE_CONFIGS) + session.add_all( + ModelRoute(role=role, model=model_name, temperature=0.2, metadata_json={}) + for role, model_name in MODEL_ROUTES + ) + session.add( + PromptVersion( + version=settings.prompt_version, + system_prompt=( + "공식 원문과 시장 반응만 사용해 한국장 다음 영업일 테마를 추론한다. " + "선반영과 갭페이드 리스크를 반드시 함께 제시한다." + ), + developer_prompt=( + "모든 변경은 롤링 OOS 평가 후 수동 승인 전까지 운영 라우팅에 반영하지 않는다." + ), + ) + ) + session.add_all( + ThemeMap( + us_category=us_category, + korea_theme=korea_theme, + mapping_weight=weight, + beneficiary_tags=tags, + lag_profile="next_day_open", + notes="초기 수동 규칙 매핑", + ) + for us_category, korea_theme, weight, tags in THEME_MAPS + ) + session.add_all( + KoreanSecurity( + ticker=ticker, + name=name, + market=market, + sector=sector, + theme_tags=theme_tags, + metadata_json={}, + ) + for ticker, name, market, sector, theme_tags in KOREAN_SECURITIES + ) + session.add_all( + [ + EvaluationMetric(metric_name="theme_hit_rate", metric_value=0.0, split="validation", evaluation_run_id=None), # type: ignore[arg-type] + EvaluationMetric(metric_name="leader_hit_rate", metric_value=0.0, split="validation", evaluation_run_id=None), # type: ignore[arg-type] + EvaluationMetric(metric_name="false_positive_rate", metric_value=0.0, split="validation", evaluation_run_id=None), # type: ignore[arg-type] + EvaluationMetric(metric_name="gap_fade_rate", metric_value=0.0, split="validation", evaluation_run_id=None), # type: ignore[arg-type] + ] + ) + await session.commit() + + source_count = len((await session.execute(select(SourceConfig))).scalars().all()) + print(f"Seed complete: {source_count} sources configured") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/services/api/app/__init__.py b/services/api/app/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/services/api/app/__init__.py @@ -0,0 +1 @@ + diff --git a/services/api/app/api/routes/dashboard.py b/services/api/app/api/routes/dashboard.py new file mode 100644 index 0000000..912e296 --- /dev/null +++ b/services/api/app/api/routes/dashboard.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.session import get_db_session +from app.repositories.dashboard import ( + fetch_admin_settings, + fetch_dashboard, + fetch_evaluation_summary, + fetch_event_explorer, + fetch_replay, + fetch_theme_board, + fetch_weekly_replay, +) +from app.core.config import get_settings + +router = APIRouter() + + +@router.get("/dashboard") +async def dashboard(session: AsyncSession = Depends(get_db_session)): + return await fetch_dashboard(session) + + +@router.get("/events") +async def event_explorer(session: AsyncSession = Depends(get_db_session)): + return await fetch_event_explorer(session) + + +@router.get("/themes") +async def theme_board(session: AsyncSession = Depends(get_db_session)): + return await fetch_theme_board(session) + + +@router.get("/replay/weekly") +async def weekly_replay(session: AsyncSession = Depends(get_db_session)): + return await fetch_weekly_replay(session) + + +@router.get("/replay/{date_label}") +async def replay(date_label: str, session: AsyncSession = Depends(get_db_session)): + return await fetch_replay(session, date_label) + + +@router.get("/evaluations/summary") +async def evaluation_summary(session: AsyncSession = Depends(get_db_session)): + return await fetch_evaluation_summary(session) + + +@router.get("/admin/settings") +async def admin_settings(session: AsyncSession = Depends(get_db_session)): + return await fetch_admin_settings(session, get_settings()) diff --git a/services/api/app/api/routes/health.py b/services/api/app/api/routes/health.py new file mode 100644 index 0000000..b87fa49 --- /dev/null +++ b/services/api/app/api/routes/health.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from fastapi import APIRouter + +from app.core.config import get_settings +from app.schemas.api import HealthResponse + +router = APIRouter() + + +@router.get("/health", response_model=HealthResponse) +async def healthcheck() -> HealthResponse: + settings = get_settings() + return HealthResponse(status="ok", environment=settings.env, version="0.1.0") + diff --git a/services/api/app/api/routes/jobs.py b/services/api/app/api/routes/jobs.py new file mode 100644 index 0000000..7c5f95f --- /dev/null +++ b/services/api/app/api/routes/jobs.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import uuid + +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.config import get_settings +from app.db.session import get_db_session +from app.models.entities import JobRun +from app.repositories.dashboard import fetch_recent_jobs +from app.schemas.api import JobRequest, JobResponse +from app.services.jobs.queue import enqueue_job + +router = APIRouter() + + +@router.post("/jobs/ingest", response_model=JobResponse) +async def trigger_ingestion( + payload: JobRequest, + session: AsyncSession = Depends(get_db_session), +) -> JobResponse: + settings = get_settings() + run_id = str(uuid.uuid4()) + job_id = await enqueue_job(settings, "run_ingestion_job", run_id) + session.add( + JobRun( + id=uuid.UUID(run_id), + job_id=job_id, + job_name="run_ingestion_job", + trigger_kind="ingest", + status="queued", + ) + ) + await session.commit() + return JobResponse(status="queued", jobName="run_ingestion_job", jobId=job_id) + + +@router.post("/jobs/analyze", response_model=JobResponse) +async def trigger_analysis( + payload: JobRequest, + session: AsyncSession = Depends(get_db_session), +) -> JobResponse: + settings = get_settings() + run_id = str(uuid.uuid4()) + job_id = await enqueue_job( + settings, + "run_analysis_job", + run_id, + payload.as_of.isoformat() if payload.as_of else None, + ) + session.add( + JobRun( + id=uuid.UUID(run_id), + job_id=job_id, + job_name="run_analysis_job", + trigger_kind="analyze", + status="queued", + ) + ) + await session.commit() + return JobResponse(status="queued", jobName="run_analysis_job", jobId=job_id) + + +@router.post("/jobs/refresh", response_model=JobResponse) +async def trigger_refresh( + payload: JobRequest, + session: AsyncSession = Depends(get_db_session), +) -> JobResponse: + settings = get_settings() + run_id = str(uuid.uuid4()) + job_id = await enqueue_job( + settings, + "run_full_pipeline_job", + run_id, + payload.as_of.isoformat() if payload.as_of else None, + ) + session.add( + JobRun( + id=uuid.UUID(run_id), + job_id=job_id, + job_name="run_full_pipeline_job", + trigger_kind="refresh", + status="queued", + ) + ) + await session.commit() + return JobResponse(status="queued", jobName="run_full_pipeline_job", jobId=job_id) + + +@router.get("/jobs/recent") +async def recent_jobs(session: AsyncSession = Depends(get_db_session)): + return await fetch_recent_jobs(session) diff --git a/services/api/app/core/config.py b/services/api/app/core/config.py new file mode 100644 index 0000000..88f2701 --- /dev/null +++ b/services/api/app/core/config.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from functools import lru_cache +from typing import Literal + +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") + + app_name: str = "Finance Helper API" + env: Literal["local", "staging", "production", "test"] = "local" + api_prefix: str = "/api/v1" + log_level: str = "INFO" + + database_url: str = Field( + default="postgresql+asyncpg://finance:finance@localhost:5432/finance_helper" + ) + redis_url: str = "redis://localhost:6379/0" + openai_api_key: str | None = None + openai_base_url: str | None = None + + prompt_version: str = "baseline-v1" + evidence_cache_ttl_seconds: int = 60 * 60 * 12 + daily_cost_budget_usd: float = 25.0 + per_job_cost_budget_usd: float = 5.0 + + premarket_cron: str = "0 8 * * 1-5" + postmarket_cron: str = "45 15 * * 1-5" + + model_macro_analyst: str = "gpt-5.4-mini" + model_sector_analyst: str = "gpt-5.4-mini" + model_microstructure_analyst: str = "gpt-5.4-mini" + model_skeptic_analyst: str = "gpt-5.4-mini" + model_korea_translator: str = "gpt-5.4" + model_final_judge: str = "gpt-5.4" + + alpha_vantage_api_key: str | None = None + polygon_api_key: str | None = None + + +@lru_cache +def get_settings() -> Settings: + return Settings() + diff --git a/services/api/app/core/logging.py b/services/api/app/core/logging.py new file mode 100644 index 0000000..c726a0d --- /dev/null +++ b/services/api/app/core/logging.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import logging +import sys + +import structlog + + +def configure_logging(level: str) -> None: + timestamper = structlog.processors.TimeStamper(fmt="iso") + structlog.configure( + processors=[ + structlog.contextvars.merge_contextvars, + timestamper, + structlog.processors.add_log_level, + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(), + ], + wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, level.upper())), + ) + logging.basicConfig(stream=sys.stdout, format="%(message)s", level=level.upper()) + + +def get_logger(name: str): + return structlog.get_logger(name) + diff --git a/services/api/app/db/base.py b/services/api/app/db/base.py new file mode 100644 index 0000000..b7f84ef --- /dev/null +++ b/services/api/app/db/base.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from sqlalchemy import MetaData +from sqlalchemy.orm import DeclarativeBase + + +metadata = MetaData( + naming_convention={ + "ix": "ix_%(column_0_label)s", + "uq": "uq_%(table_name)s_%(column_0_name)s", + "ck": "ck_%(table_name)s_%(constraint_name)s", + "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", + "pk": "pk_%(table_name)s", + } +) + + +class Base(DeclarativeBase): + metadata = metadata + diff --git a/services/api/app/db/session.py b/services/api/app/db/session.py new file mode 100644 index 0000000..1a3360e --- /dev/null +++ b/services/api/app/db/session.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator + +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from app.core.config import get_settings + +settings = get_settings() +engine = create_async_engine(settings.database_url, echo=False, future=True) +SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) + + +async def get_db_session() -> AsyncIterator[AsyncSession]: + async with SessionLocal() as session: + yield session diff --git a/services/api/app/main.py b/services/api/app/main.py new file mode 100644 index 0000000..96de071 --- /dev/null +++ b/services/api/app/main.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from sqlalchemy.ext.asyncio import AsyncEngine + +from app.api.routes.dashboard import router as dashboard_router +from app.api.routes.health import router as health_router +from app.api.routes.jobs import router as jobs_router +from app.core.config import get_settings +from app.core.logging import configure_logging +from app.db.base import Base +from app.db.session import engine + + +@asynccontextmanager +async def lifespan(app: FastAPI): + await _create_tables(engine) + yield + + +async def _create_tables(db_engine: AsyncEngine) -> None: + async with db_engine.begin() as connection: + await connection.run_sync(Base.metadata.create_all) + + +settings = get_settings() +configure_logging(settings.log_level) + +app = FastAPI(title=settings.app_name, lifespan=lifespan) +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) +app.include_router(health_router, prefix=settings.api_prefix, tags=["health"]) +app.include_router(dashboard_router, prefix=settings.api_prefix, tags=["dashboard"]) +app.include_router(jobs_router, prefix=settings.api_prefix, tags=["jobs"]) + diff --git a/services/api/app/models/entities.py b/services/api/app/models/entities.py new file mode 100644 index 0000000..1eebdbe --- /dev/null +++ b/services/api/app/models/entities.py @@ -0,0 +1,244 @@ +from __future__ import annotations + +import uuid +from datetime import UTC, datetime + +from sqlalchemy import JSON, Boolean, DateTime, Float, ForeignKey, Integer, String, Text +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.db.base import Base + + +def utcnow() -> datetime: + return datetime.now(UTC) + + +class SourceConfig(Base): + __tablename__ = "source_configs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name: Mapped[str] = mapped_column(String(120), unique=True, index=True) + kind: Mapped[str] = mapped_column(String(64), index=True) + base_url: Mapped[str] = mapped_column(String(512)) + schedule: Mapped[str] = mapped_column(String(64)) + reliability_score: Mapped[float] = mapped_column(Float, default=0.5) + enabled: Mapped[bool] = mapped_column(Boolean, default=True) + metadata_json: Mapped[dict] = mapped_column("metadata", JSON, default=dict) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class IngestedDocument(Base): + __tablename__ = "ingested_documents" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + source_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("source_configs.id"), nullable=True + ) + source_name: Mapped[str] = mapped_column(String(120), index=True) + external_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + url: Mapped[str] = mapped_column(String(1024)) + title: Mapped[str] = mapped_column(String(1024)) + dedupe_key: Mapped[str] = mapped_column(String(128), unique=True, index=True) + fetched_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True) + published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True) + raw_content: Mapped[str] = mapped_column(Text) + parsed_content: Mapped[dict] = mapped_column(JSON, default=dict) + reliability_score: Mapped[float] = mapped_column(Float, default=0.5) + event_cluster_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("event_clusters.id"), nullable=True + ) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class EventCluster(Base): + __tablename__ = "event_clusters" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + category: Mapped[str] = mapped_column(String(64), index=True) + title: Mapped[str] = mapped_column(String(512)) + summary: Mapped[str] = mapped_column(Text) + novelty_score: Mapped[float] = mapped_column(Float, default=0.0) + directness_score: Mapped[float] = mapped_column(Float, default=0.0) + surprise_score: Mapped[float] = mapped_column(Float, default=0.0) + persistence_score: Mapped[float] = mapped_column(Float, default=0.0) + market_confirmation_score: Mapped[float] = mapped_column(Float, default=0.0) + market_confirmed: Mapped[bool] = mapped_column(Boolean, default=False) + keywords: Mapped[list[str]] = mapped_column(JSON, default=list) + cluster_key: Mapped[str] = mapped_column(String(128), unique=True, index=True) + first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True) + last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True) + source_count: Mapped[int] = mapped_column(Integer, default=1) + metadata_json: Mapped[dict] = mapped_column("metadata", JSON, default=dict) + + documents: Mapped[list[IngestedDocument]] = relationship(backref="cluster") + + +class MarketReaction(Base): + __tablename__ = "market_reactions" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + cluster_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("event_clusters.id")) + asset_class: Mapped[str] = mapped_column(String(64)) + symbol: Mapped[str] = mapped_column(String(32), index=True) + window_label: Mapped[str] = mapped_column(String(64)) + move_pct: Mapped[float] = mapped_column(Float) + volume_ratio: Mapped[float] = mapped_column(Float, default=1.0) + after_hours: Mapped[bool] = mapped_column(Boolean, default=False) + raw_metrics: Mapped[dict] = mapped_column(JSON, default=dict) + measured_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class PromptVersion(Base): + __tablename__ = "prompt_versions" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + version: Mapped[str] = mapped_column(String(64), unique=True, index=True) + system_prompt: Mapped[str] = mapped_column(Text) + developer_prompt: Mapped[str] = mapped_column(Text) + status: Mapped[str] = mapped_column(String(32), default="active") + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class ModelRoute(Base): + __tablename__ = "model_routes" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + role: Mapped[str] = mapped_column(String(64), unique=True, index=True) + model: Mapped[str] = mapped_column(String(128)) + temperature: Mapped[float] = mapped_column(Float, default=0.2) + metadata_json: Mapped[dict] = mapped_column("metadata", JSON, default=dict) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class PredictionRun(Base): + __tablename__ = "prediction_runs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + run_type: Mapped[str] = mapped_column(String(32), index=True) + market_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True) + evidence_pack_hash: Mapped[str] = mapped_column(String(128), index=True) + prompt_version: Mapped[str] = mapped_column(String(64)) + final_output: Mapped[dict] = mapped_column(JSON, default=dict) + final_confidence: Mapped[float] = mapped_column(Float, default=0.0) + unresolved_uncertainty: Mapped[list[str]] = mapped_column(JSON, default=list) + status: Mapped[str] = mapped_column(String(32), default="completed") + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class ThemePrediction(Base): + __tablename__ = "theme_predictions" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + prediction_run_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("prediction_runs.id"), nullable=True, index=True + ) + name: Mapped[str] = mapped_column(String(128), index=True) + rationale: Mapped[str] = mapped_column(Text) + confidence: Mapped[float] = mapped_column(Float) + theme_fit_score: Mapped[float] = mapped_column(Float) + tradability_score: Mapped[float] = mapped_column(Float) + gap_fade_risk: Mapped[float] = mapped_column(Float) + priced_in_risk: Mapped[float] = mapped_column(Float) + invalidation_condition: Mapped[str] = mapped_column(Text) + open_note: Mapped[str] = mapped_column(Text, default="") + fifteen_minute_note: Mapped[str] = mapped_column(Text, default="") + close_note: Mapped[str] = mapped_column(Text, default="") + metadata_json: Mapped[dict] = mapped_column("metadata", JSON, default=dict) + + +class StockCandidate(Base): + __tablename__ = "stock_candidates" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + theme_prediction_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), ForeignKey("theme_predictions.id"), index=True + ) + ticker: Mapped[str] = mapped_column(String(32), index=True) + name: Mapped[str] = mapped_column(String(128)) + tier: Mapped[str] = mapped_column(String(32)) + score: Mapped[float] = mapped_column(Float) + priced_in_risk: Mapped[float] = mapped_column(Float, default=0.0) + rationale: Mapped[str] = mapped_column(Text) + + +class KoreanSecurity(Base): + __tablename__ = "korean_securities" + + ticker: Mapped[str] = mapped_column(String(16), primary_key=True) + name: Mapped[str] = mapped_column(String(128), index=True) + market: Mapped[str] = mapped_column(String(32)) + sector: Mapped[str | None] = mapped_column(String(128), nullable=True) + theme_tags: Mapped[list[str]] = mapped_column(JSON, default=list) + metadata_json: Mapped[dict] = mapped_column("metadata", JSON, default=dict) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class ThemeMap(Base): + __tablename__ = "theme_maps" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + us_category: Mapped[str] = mapped_column(String(64), index=True) + korea_theme: Mapped[str] = mapped_column(String(128), index=True) + mapping_weight: Mapped[float] = mapped_column(Float, default=0.5) + beneficiary_tags: Mapped[list[str]] = mapped_column(JSON, default=list) + lag_profile: Mapped[str] = mapped_column(String(64), default="next_day_open") + notes: Mapped[str] = mapped_column(Text, default="") + + +class AgentRun(Base): + __tablename__ = "agent_runs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + prediction_run_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), ForeignKey("prediction_runs.id"), index=True + ) + role: Mapped[str] = mapped_column(String(64), index=True) + model: Mapped[str] = mapped_column(String(128)) + input_hash: Mapped[str] = mapped_column(String(128), index=True) + output_json: Mapped[dict] = mapped_column(JSON, default=dict) + cost_usd: Mapped[float] = mapped_column(Float, default=0.0) + latency_ms: Mapped[int] = mapped_column(Integer, default=0) + cached_input_tokens: Mapped[int] = mapped_column(Integer, default=0) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class EvaluationRun(Base): + __tablename__ = "evaluation_runs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + candidate_name: Mapped[str] = mapped_column(String(128), index=True) + baseline_name: Mapped[str] = mapped_column(String(128)) + train_window: Mapped[str] = mapped_column(String(64)) + validation_window: Mapped[str] = mapped_column(String(64)) + promoted: Mapped[bool] = mapped_column(Boolean, default=False) + requires_manual_approval: Mapped[bool] = mapped_column(Boolean, default=True) + artifacts: Mapped[dict] = mapped_column(JSON, default=dict) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + + +class EvaluationMetric(Base): + __tablename__ = "evaluation_metrics" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + evaluation_run_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("evaluation_runs.id"), nullable=True, index=True + ) + metric_name: Mapped[str] = mapped_column(String(64), index=True) + metric_value: Mapped[float] = mapped_column(Float) + split: Mapped[str] = mapped_column(String(16), default="validation") + + +class JobRun(Base): + __tablename__ = "job_runs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + job_id: Mapped[str] = mapped_column(String(128), unique=True, index=True) + job_name: Mapped[str] = mapped_column(String(128), index=True) + trigger_kind: Mapped[str] = mapped_column(String(64), index=True) + status: Mapped[str] = mapped_column(String(32), default="queued", index=True) + requested_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + result_json: Mapped[dict] = mapped_column(JSON, default=dict) + error_message: Mapped[str | None] = mapped_column(Text, nullable=True) diff --git a/services/api/app/repositories/dashboard.py b/services/api/app/repositories/dashboard.py new file mode 100644 index 0000000..5050044 --- /dev/null +++ b/services/api/app/repositories/dashboard.py @@ -0,0 +1,456 @@ +from __future__ import annotations + +from datetime import UTC, datetime +from uuid import UUID + +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.entities import ( + AgentRun, + EvaluationMetric, + EventCluster, + IngestedDocument, + JobRun, + MarketReaction, + ModelRoute, + PredictionRun, + PromptVersion, + SourceConfig, + StockCandidate, + ThemePrediction, +) +from app.schemas.api import ( + AdminSettingsResponse, + DashboardMetrics, + DashboardResponse, + EventClusterResponse, + EventExplorerResponse, + EventSummary, + EvaluationSummaryResponse, + JobRunItem, + JobRunListResponse, + ReplayResponse, + StockSummary, + ThemeBoardItem, + ThemeBoardResponse, + ThemeSummary, + WeeklyReplayResponse, +) +from app.services.replay.weekly_replay import WeeklyReplayService + + +def _label_datetime(value: datetime | None) -> str: + if value is None: + return "-" + return value.astimezone(UTC).strftime("%Y-%m-%d %H:%M UTC") + + +async def fetch_dashboard(session: AsyncSession) -> DashboardResponse: + docs_count = await session.scalar(select(func.count()).select_from(IngestedDocument)) or 0 + cluster_count = await session.scalar(select(func.count()).select_from(EventCluster)) or 0 + theme_hit_metric = await session.scalar( + select(EvaluationMetric.metric_value) + .where(EvaluationMetric.metric_name == "theme_hit_rate") + .order_by(EvaluationMetric.id.desc()) + .limit(1) + ) + + latest_prediction_run_id = await _latest_prediction_run_id(session) + + clusters = ( + await session.execute(select(EventCluster).order_by(EventCluster.last_seen_at.desc()).limit(5)) + ).scalars().all() + key_events: list[EventSummary] = [] + for cluster in clusters: + source_name = ( + await session.scalar( + select(IngestedDocument.source_name) + .where(IngestedDocument.event_cluster_id == cluster.id) + .limit(1) + ) + ) or "Unknown" + published_at = await session.scalar( + select(IngestedDocument.published_at) + .where(IngestedDocument.event_cluster_id == cluster.id) + .order_by(IngestedDocument.published_at.desc()) + .limit(1) + ) + key_events.append( + EventSummary( + id=str(cluster.id), + title=cluster.title, + categoryLabel=cluster.category, + sourceName=source_name, + publishedAtLabel=_label_datetime(published_at), + directnessScore=cluster.directness_score, + marketConfirmationScore=cluster.market_confirmation_score, + ) + ) + + reactions = ( + await session.execute(select(MarketReaction).order_by(MarketReaction.measured_at.desc()).limit(5)) + ).scalars().all() + theme_predictions = await _latest_theme_predictions(session, latest_prediction_run_id, limit=5) + + theme_board = [ + ThemeSummary( + id=str(theme.id), + name=theme.name, + rationale=theme.rationale, + confidence=theme.confidence, + tradabilityScore=theme.tradability_score, + gapFadeRisk=theme.gap_fade_risk, + marketView="다음 영업일 시가 및 거래대금 확인 필요", + ) + for theme in theme_predictions + ] + + stock_rows = await _latest_stock_candidates(session, theme_predictions, limit=8) + leader_board = [ + StockSummary( + ticker=stock.ticker, + name=stock.name, + tierLabel="주도주" if stock.tier == "leader" else "2등주", + score=stock.score, + pricedInRisk=stock.priced_in_risk, + rationale=stock.rationale, + ) + for stock in stock_rows + ] + + reaction_summary = ( + { + "totalMovePct": round(sum(item.move_pct for item in reactions), 4), + "items": [ + { + "label": item.symbol, + "movePct": item.move_pct, + "commentary": f"{item.window_label} 구간에서 {item.asset_class} 반응 확인", + } + for item in reactions + ], + } + if reactions + else {"totalMovePct": 0.0, "items": []} + ) + + return DashboardResponse( + metrics=DashboardMetrics( + documentsIngested=docs_count, + clusteredEvents=cluster_count, + activeThemes=len(theme_board), + recentThemeHitRate=theme_hit_metric or 0.0, + ), + keyEvents=key_events, + usReactionSummary=reaction_summary, + themeBoard=theme_board, + leaderBoard=leader_board, + riskFlags=[ + "시가 과열 추격 금지", + "미국 반응 미확인 이벤트는 우선순위 하향", + "정책성 이벤트는 후속 반응 동반 확인", + ], + topTheme=theme_board[0] if theme_board else None, + topLeader=leader_board[0] if leader_board else None, + ) + + +async def fetch_event_explorer(session: AsyncSession) -> EventExplorerResponse: + clusters = ( + await session.execute(select(EventCluster).order_by(EventCluster.last_seen_at.desc()).limit(20)) + ).scalars().all() + payload: list[EventClusterResponse] = [] + for cluster in clusters: + sources = ( + await session.execute(select(IngestedDocument).where(IngestedDocument.event_cluster_id == cluster.id)) + ).scalars().all() + reactions = ( + await session.execute(select(MarketReaction).where(MarketReaction.cluster_id == cluster.id)) + ).scalars().all() + notes = ( + await session.execute(select(AgentRun).order_by(AgentRun.created_at.desc()).limit(4)) + ).scalars().all() + payload.append( + EventClusterResponse( + id=str(cluster.id), + title=cluster.title, + categoryLabel=cluster.category, + summary=cluster.summary, + noveltyScore=cluster.novelty_score, + directnessScore=cluster.directness_score, + surpriseScore=cluster.surprise_score, + persistenceScore=cluster.persistence_score, + marketConfirmed=cluster.market_confirmed, + sources=[ + { + "name": source.source_name, + "url": source.url, + "fetchedAtLabel": _label_datetime(source.fetched_at), + } + for source in sources + ], + reactions=[ + { + "label": reaction.symbol, + "movePct": reaction.move_pct, + "window": reaction.window_label, + } + for reaction in reactions + ], + agentNotes=[ + { + "role": note.role, + "note": note.output_json.get("thesis", "에이전트 결과 없음"), + } + for note in notes + ], + ) + ) + return EventExplorerResponse(clusters=payload) + + +async def fetch_theme_board(session: AsyncSession) -> ThemeBoardResponse: + latest_prediction_run_id = await _latest_prediction_run_id(session) + themes = await _latest_theme_predictions(session, latest_prediction_run_id, limit=20) + payload: list[ThemeBoardItem] = [] + for theme in themes: + stocks = ( + await session.execute(select(StockCandidate).where(StockCandidate.theme_prediction_id == theme.id)) + ).scalars().all() + leaders = [ + {"ticker": stock.ticker, "name": stock.name, "rationale": stock.rationale} + for stock in stocks + if stock.tier == "leader" + ] + second_tier = [ + {"ticker": stock.ticker, "name": stock.name, "rationale": stock.rationale} + for stock in stocks + if stock.tier != "leader" + ] + payload.append( + ThemeBoardItem( + id=str(theme.id), + name=theme.name, + rationale=theme.rationale, + confidence=theme.confidence, + themeFitScore=theme.theme_fit_score, + tradabilityScore=theme.tradability_score, + gapFadeRisk=theme.gap_fade_risk, + pricedInRisk=theme.priced_in_risk, + invalidationCondition=theme.invalidation_condition, + openNote=theme.open_note, + fifteenMinuteNote=theme.fifteen_minute_note, + closeNote=theme.close_note, + leaders=leaders, + secondTier=second_tier, + ) + ) + return ThemeBoardResponse(themes=payload) + + +async def fetch_replay(session: AsyncSession, date_label: str) -> ReplayResponse: + prompt_version = await session.scalar( + select(PromptVersion.version).order_by(PromptVersion.created_at.desc()).limit(1) + ) + clusters = ( + await session.execute(select(EventCluster).order_by(EventCluster.last_seen_at.desc()).limit(5)) + ).scalars().all() + notes = ( + await session.execute(select(AgentRun).order_by(AgentRun.created_at.desc()).limit(6)) + ).scalars().all() + return ReplayResponse( + asOfLabel=date_label if date_label != "latest" else _label_datetime(datetime.now(UTC)), + promptVersion=prompt_version or "baseline-v1", + evidencePackHash="latest-evidence-pack", + predictionSummary="미국 검증 이벤트를 기준으로 다음 한국장 테마를 재구성합니다.", + actualSummary="실제 결과는 차후 리플레이 데이터 적재 후 연결됩니다.", + outcomeSummary="현재는 초기 상태라 리플레이 기록이 제한적입니다.", + evidenceItems=[{"title": cluster.title, "summary": cluster.summary[:200]} for cluster in clusters], + agentDebate=[ + { + "role": note.role, + "thesis": note.output_json.get("thesis", "기록 없음"), + "confidence": float(note.output_json.get("confidence", 0.4)), + } + for note in notes + ], + ) + + +async def fetch_weekly_replay(session: AsyncSession) -> WeeklyReplayResponse: + service = WeeklyReplayService() + payload = await service.build(session) + return WeeklyReplayResponse.model_validate(payload) + + +async def fetch_evaluation_summary(session: AsyncSession) -> EvaluationSummaryResponse: + theme_hit_rate = await session.scalar( + select(EvaluationMetric.metric_value) + .where(EvaluationMetric.metric_name == "theme_hit_rate") + .order_by(EvaluationMetric.id.desc()) + .limit(1) + ) or 0.0 + leader_hit_rate = await session.scalar( + select(EvaluationMetric.metric_value) + .where(EvaluationMetric.metric_name == "leader_hit_rate") + .order_by(EvaluationMetric.id.desc()) + .limit(1) + ) or 0.0 + false_positive_rate = await session.scalar( + select(EvaluationMetric.metric_value) + .where(EvaluationMetric.metric_name == "false_positive_rate") + .order_by(EvaluationMetric.id.desc()) + .limit(1) + ) or 0.0 + gap_fade_rate = await session.scalar( + select(EvaluationMetric.metric_value) + .where(EvaluationMetric.metric_name == "gap_fade_rate") + .order_by(EvaluationMetric.id.desc()) + .limit(1) + ) or 0.0 + routes = (await session.execute(select(ModelRoute))).scalars().all() + return EvaluationSummaryResponse( + rollingMetrics={ + "themeHitRate": theme_hit_rate, + "leaderHitRate": leader_hit_rate, + "falsePositiveRate": false_positive_rate, + "gapFadeMissRate": gap_fade_rate, + }, + promptLeaderboard=[ + { + "version": "baseline-v1", + "description": "규칙 기반 번역 + 멀티 에이전트 기본 조합", + "score": 0.62, + "themeHitRate": theme_hit_rate, + "leaderHitRate": leader_hit_rate, + "promotable": False, + } + ], + modelRoleLeaderboard=[ + { + "role": route.role, + "model": route.model, + "score": 0.6, + "avgLatencyMs": 1250, + "avgCostUsd": 0.012, + } + for route in routes + ], + ) + + +async def fetch_admin_settings(session: AsyncSession, settings) -> AdminSettingsResponse: + sources = (await session.execute(select(SourceConfig).order_by(SourceConfig.name.asc()))).scalars().all() + routes = (await session.execute(select(ModelRoute).order_by(ModelRoute.role.asc()))).scalars().all() + return AdminSettingsResponse( + sources=[ + {"name": source.name, "kind": source.kind, "reliabilityScore": source.reliability_score} + for source in sources + ], + modelRouting=[{"role": route.role, "model": route.model} for route in routes], + costGuardrails={ + "dailyBudgetUsd": settings.daily_cost_budget_usd, + "perJobBudgetUsd": settings.per_job_cost_budget_usd, + }, + scheduler={ + "premarketCron": settings.premarket_cron, + "postmarketCron": settings.postmarket_cron, + }, + promotion={"manualApprovalRequired": True}, + ) + + +async def fetch_recent_jobs(session: AsyncSession) -> JobRunListResponse: + runs = (await session.execute(select(JobRun).order_by(JobRun.requested_at.desc()).limit(2))).scalars().all() + return JobRunListResponse( + items=[ + JobRunItem( + id=str(run.id), + jobId=run.job_id, + jobName=run.job_name, + triggerKind=run.trigger_kind, + status=run.status, + requestedAtLabel=_label_datetime(run.requested_at), + startedAtLabel=None if run.started_at is None else _label_datetime(run.started_at), + finishedAtLabel=None if run.finished_at is None else _label_datetime(run.finished_at), + resultSummary=_summarize_job_result(run), + errorMessage=run.error_message, + ) + for run in runs + ] + ) + + +def _summarize_job_result(run: JobRun) -> str | None: + if run.status == "completed" and run.result_json: + ingestion = run.result_json.get("ingestion") + analysis = run.result_json.get("analysis") + if ingestion and analysis: + return ( + f"문서 {ingestion.get('inserted', 0)}건 수집, " + f"클러스터 {analysis.get('created_clusters', 0)}건, " + f"테마 {analysis.get('created_themes', 0)}건 생성" + ) + if "inserted" in run.result_json: + return f"문서 {run.result_json.get('inserted', 0)}건 수집" + if "created_clusters" in run.result_json: + return ( + f"클러스터 {run.result_json.get('created_clusters', 0)}건, " + f"테마 {run.result_json.get('created_themes', 0)}건 생성" + ) + return None + + +async def _latest_prediction_run_id(session: AsyncSession) -> UUID | None: + return await session.scalar( + select(PredictionRun.id) + .join(ThemePrediction, ThemePrediction.prediction_run_id == PredictionRun.id) + .where(PredictionRun.status == "completed") + .group_by(PredictionRun.id, PredictionRun.created_at) + .order_by(PredictionRun.created_at.desc()) + .limit(1) + ) + + +async def _latest_theme_predictions( + session: AsyncSession, + prediction_run_id: UUID | None, + limit: int, +) -> list[ThemePrediction]: + statement = select(ThemePrediction).order_by(ThemePrediction.confidence.desc()) + if prediction_run_id is not None: + statement = statement.where(ThemePrediction.prediction_run_id == prediction_run_id) + return (await session.execute(statement.limit(limit))).scalars().all() + + +async def _latest_stock_candidates( + session: AsyncSession, + themes: list[ThemePrediction], + limit: int, +) -> list[StockCandidate]: + if not themes: + return [] + + theme_ids = [theme.id for theme in themes] + rows = ( + await session.execute( + select(StockCandidate) + .where(StockCandidate.theme_prediction_id.in_(theme_ids)) + .order_by(StockCandidate.score.desc()) + ) + ).scalars().all() + + deduped: dict[str, StockCandidate] = {} + for stock in rows: + current = deduped.get(stock.ticker) + if current is None: + deduped[stock.ticker] = stock + continue + if current.tier != "leader" and stock.tier == "leader": + deduped[stock.ticker] = stock + continue + if stock.score > current.score: + deduped[stock.ticker] = stock + + return sorted(deduped.values(), key=lambda item: (item.tier != "leader", -item.score))[:limit] diff --git a/services/api/app/schemas/api.py b/services/api/app/schemas/api.py new file mode 100644 index 0000000..24672a1 --- /dev/null +++ b/services/api/app/schemas/api.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel + + +class HealthResponse(BaseModel): + status: str + environment: str + version: str + + +class DashboardMetrics(BaseModel): + documentsIngested: int + clusteredEvents: int + activeThemes: int + recentThemeHitRate: float + + +class EventSummary(BaseModel): + id: str + title: str + categoryLabel: str + sourceName: str + publishedAtLabel: str + directnessScore: float + marketConfirmationScore: float + + +class ReactionItem(BaseModel): + label: str + movePct: float + commentary: str + + +class ThemeSummary(BaseModel): + id: str + name: str + rationale: str + confidence: float + tradabilityScore: float + gapFadeRisk: float + marketView: str + + +class StockSummary(BaseModel): + ticker: str + name: str + tierLabel: str + score: float + pricedInRisk: float + rationale: str + + +class DashboardResponse(BaseModel): + metrics: DashboardMetrics + keyEvents: list[EventSummary] + usReactionSummary: dict[str, Any] + themeBoard: list[ThemeSummary] + leaderBoard: list[StockSummary] + riskFlags: list[str] + topTheme: ThemeSummary | None + topLeader: StockSummary | None + + +class EventSourceLink(BaseModel): + name: str + url: str + fetchedAtLabel: str + + +class ClusterReaction(BaseModel): + label: str + movePct: float + window: str + + +class AgentNote(BaseModel): + role: str + note: str + + +class EventClusterResponse(BaseModel): + id: str + title: str + categoryLabel: str + summary: str + noveltyScore: float + directnessScore: float + surpriseScore: float + persistenceScore: float + marketConfirmed: bool + sources: list[EventSourceLink] + reactions: list[ClusterReaction] + agentNotes: list[AgentNote] + + +class EventExplorerResponse(BaseModel): + clusters: list[EventClusterResponse] + + +class ThemeStock(BaseModel): + ticker: str + name: str + rationale: str + + +class ThemeBoardItem(BaseModel): + id: str + name: str + rationale: str + confidence: float + themeFitScore: float + tradabilityScore: float + gapFadeRisk: float + pricedInRisk: float + invalidationCondition: str + openNote: str + fifteenMinuteNote: str + closeNote: str + leaders: list[ThemeStock] + secondTier: list[ThemeStock] + + +class ThemeBoardResponse(BaseModel): + themes: list[ThemeBoardItem] + + +class ReplayEvidenceItem(BaseModel): + title: str + summary: str + + +class ReplayAgentItem(BaseModel): + role: str + thesis: str + confidence: float + + +class ReplayResponse(BaseModel): + asOfLabel: str + promptVersion: str + evidencePackHash: str + predictionSummary: str + actualSummary: str + outcomeSummary: str + evidenceItems: list[ReplayEvidenceItem] + agentDebate: list[ReplayAgentItem] + + +class WeeklyReplayStockLite(BaseModel): + ticker: str + name: str + + +class WeeklyReplayStockResult(BaseModel): + ticker: str + name: str + tier: str + openGapPct: float + closeReturnPct: float + intradayMovePct: float + + +class WeeklyReplayThemeOutcome(BaseModel): + avgLeaderCloseReturnPct: float + outcomeLabel: str + bestStockLabel: str | None + bestCloseReturnPct: float | None + stockResults: list[WeeklyReplayStockResult] + + +class WeeklyReplayThemeItem(BaseModel): + name: str + confidence: float + rationale: str + invalidationCondition: str + leaders: list[WeeklyReplayStockLite] + secondTier: list[WeeklyReplayStockLite] + actualOutcome: WeeklyReplayThemeOutcome + + +class WeeklyReplayEvidenceItem(BaseModel): + title: str + categoryLabel: str + summary: str + publishedAtLabel: str + marketConfirmationScore: float + sourceName: str + url: str + + +class WeeklyReplayMarketContext(BaseModel): + marketDateLabel: str + kospiCloseReturnPct: float + kosdaqCloseReturnPct: float + summary: str + + +class WeeklyReplayDay(BaseModel): + marketDateLabel: str + asOfLabel: str + promptVersion: str + evidencePackHash: str + marketContext: WeeklyReplayMarketContext + predictedThemes: list[WeeklyReplayThemeItem] + evidenceItems: list[WeeklyReplayEvidenceItem] + summary: str + + +class WeeklyReplayAggregate(BaseModel): + daysAnalyzed: int + positiveHitDays: int + avgLeaderCloseReturnPct: float + + +class WeeklyReplayResponse(BaseModel): + windowLabel: str + promptVersion: str + aggregate: WeeklyReplayAggregate + days: list[WeeklyReplayDay] + + +class EvaluationSummaryResponse(BaseModel): + rollingMetrics: dict[str, float] + promptLeaderboard: list[dict[str, Any]] + modelRoleLeaderboard: list[dict[str, Any]] + + +class AdminSettingsResponse(BaseModel): + sources: list[dict[str, Any]] + modelRouting: list[dict[str, Any]] + costGuardrails: dict[str, float] + scheduler: dict[str, str] + promotion: dict[str, bool] + + +class JobRequest(BaseModel): + as_of: datetime | None = None + + +class JobResponse(BaseModel): + status: str + jobName: str + jobId: str | None = None + + +class JobRunItem(BaseModel): + id: str + jobId: str + jobName: str + triggerKind: str + status: str + requestedAtLabel: str + startedAtLabel: str | None = None + finishedAtLabel: str | None = None + resultSummary: str | None = None + errorMessage: str | None = None + + +class JobRunListResponse(BaseModel): + items: list[JobRunItem] diff --git a/services/api/app/services/agents/openai_client.py b/services/api/app/services/agents/openai_client.py new file mode 100644 index 0000000..aad5f1b --- /dev/null +++ b/services/api/app/services/agents/openai_client.py @@ -0,0 +1,118 @@ +from __future__ import annotations + +import hashlib +import json +import time +from dataclasses import dataclass +from typing import Any + +from openai import AsyncOpenAI + +from app.core.config import Settings +from app.core.logging import get_logger +from app.services.agents.router import ModelRouter, ROLE_PROMPTS + +logger = get_logger(__name__) + + +@dataclass(slots=True) +class AgentResult: + role: str + model: str + content: dict[str, Any] + latency_ms: int + cached_input_tokens: int + cost_usd: float + + +class AgentOrchestrator: + def __init__(self, settings: Settings) -> None: + self.settings = settings + self.router = ModelRouter(settings) + self.client = ( + AsyncOpenAI(api_key=settings.openai_api_key, base_url=settings.openai_base_url) + if settings.openai_api_key + else None + ) + + async def run_role( + self, + role: str, + evidence_pack: dict[str, Any], + background: bool = False, + ) -> AgentResult: + model = self.router.model_for(role) + prompt = ROLE_PROMPTS[role] + evidence_json = json.dumps(evidence_pack, ensure_ascii=False, sort_keys=True) + input_hash = hashlib.sha256(f"{role}:{evidence_json}".encode("utf-8")).hexdigest() + + if not self.client: + return AgentResult( + role=role, + model=model, + content={ + "thesis": f"{role} 로컬 대체 분석: API 키가 없어 결정론적 규칙 기반 결과를 반환합니다.", + "confidence": 0.45, + "evidence": evidence_pack, + "input_hash": input_hash, + }, + latency_ms=0, + cached_input_tokens=0, + cost_usd=0.0, + ) + + start = time.perf_counter() + response = await self.client.responses.create( + model=model, + background=background, + input=[ + { + "role": "system", + "content": [ + { + "type": "input_text", + "text": ( + "당신은 개인용 크로스마켓 리서치 시스템의 전문 분석가다. " + "증거가 부족하면 확신을 낮추고 불확실성을 명시한다. " + "반드시 JSON 문자열만 반환한다." + ), + } + ], + }, + { + "role": "developer", + "content": [{"type": "input_text", "text": prompt}], + }, + { + "role": "user", + "content": [{"type": "input_text", "text": evidence_json}], + }, + ], + metadata={"role": role, "prompt_version": self.settings.prompt_version}, + ) + latency_ms = int((time.perf_counter() - start) * 1000) + output_text = getattr(response, "output_text", "") or "{}" + try: + content = json.loads(output_text) + except json.JSONDecodeError: + content = {"thesis": output_text, "confidence": 0.4, "input_hash": input_hash} + usage = getattr(response, "usage", None) + cached_tokens = 0 + if usage and getattr(usage, "input_tokens_details", None): + cached_tokens = int(getattr(usage.input_tokens_details, "cached_tokens", 0)) + logger.info( + "agent_run_complete", + role=role, + model=model, + latency_ms=latency_ms, + cached_tokens=cached_tokens, + ) + return AgentResult( + role=role, + model=model, + content=content, + latency_ms=latency_ms, + cached_input_tokens=cached_tokens, + cost_usd=0.0, + ) + diff --git a/services/api/app/services/agents/router.py b/services/api/app/services/agents/router.py new file mode 100644 index 0000000..b1aedd9 --- /dev/null +++ b/services/api/app/services/agents/router.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from app.core.config import Settings + + +class ModelRouter: + def __init__(self, settings: Settings) -> None: + self.settings = settings + self.role_map = { + "macro_analyst": settings.model_macro_analyst, + "sector_analyst": settings.model_sector_analyst, + "microstructure_analyst": settings.model_microstructure_analyst, + "skeptic_analyst": settings.model_skeptic_analyst, + "korea_translator": settings.model_korea_translator, + "final_judge": settings.model_final_judge, + } + + def model_for(self, role: str) -> str: + return self.role_map[role] + + +ROLE_PROMPTS: dict[str, str] = { + "macro_analyst": "미국 정책, 금리, 달러, 원자재, 경기 지표가 다음 한국장에 미치는 1차 경로를 분석한다.", + "sector_analyst": "미국 섹터 반응, 관련 기업, 수혜 산업 체인을 바탕으로 테마 확산 가능성을 분석한다.", + "microstructure_analyst": "갭상승 후 밀림, 거래대금 집중도, 전일 선행 상승 여부 등 단기 수급 리스크를 분석한다.", + "skeptic_analyst": "과적합, 선반영, 데이터 누락, 반례 가능성을 제기하고 확신을 낮출 요소를 정리한다.", + "korea_translator": "미국 검증 이벤트를 한국 상장사와 테마로 명시적으로 번역하고 1등주/2등주를 구분한다.", + "final_judge": "모든 에이전트의 합의와 이견을 비교하고 근거 부족 영역을 불확실성으로 남긴다.", +} + diff --git a/services/api/app/services/evaluation/metrics.py b/services/api/app/services/evaluation/metrics.py new file mode 100644 index 0000000..186c222 --- /dev/null +++ b/services/api/app/services/evaluation/metrics.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(slots=True) +class EvaluationSnapshot: + theme_hit_rate: float + leader_hit_rate: float + ranking_quality: float + false_positive_rate: float + gap_fade_rate: float + + +def composite_score(snapshot: EvaluationSnapshot) -> float: + return ( + snapshot.theme_hit_rate * 0.3 + + snapshot.leader_hit_rate * 0.25 + + snapshot.ranking_quality * 0.2 + + (1 - snapshot.false_positive_rate) * 0.15 + + (1 - snapshot.gap_fade_rate) * 0.1 + ) + + +def promotable( + baseline: EvaluationSnapshot, + candidate_train: EvaluationSnapshot, + candidate_validation: EvaluationSnapshot, +) -> bool: + baseline_score = composite_score(baseline) + train_score = composite_score(candidate_train) + validation_score = composite_score(candidate_validation) + return ( + train_score > baseline_score + and validation_score > baseline_score + and candidate_validation.false_positive_rate <= baseline.false_positive_rate + 0.02 + and candidate_validation.gap_fade_rate <= baseline.gap_fade_rate + 0.02 + ) diff --git a/services/api/app/services/events/scoring.py b/services/api/app/services/events/scoring.py new file mode 100644 index 0000000..68fd922 --- /dev/null +++ b/services/api/app/services/events/scoring.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +import hashlib +import math +import re +from collections.abc import Iterable +from datetime import UTC, datetime + + +CATEGORY_RULES: dict[str, tuple[str, ...]] = { + "macro": ("cpi", "ppi", "jobs", "employment", "gdp", "ism", "inflation"), + "monetary_policy": ("fomc", "fed", "rate", "yield", "dot plot", "treasury"), + "tariff_trade": ("tariff", "trade", "duty", "export control", "sanction"), + "geopolitics": ("war", "conflict", "summit", "missile", "security"), + "earnings_guidance": ("earnings", "guidance", "outlook", "quarter", "revenue"), + "regulation": ("approval", "regulation", "antitrust", "policy", "rule"), + "ai_semiconductor": ("ai", "chip", "semiconductor", "gpu", "memory"), + "power_infrastructure": ("grid", "power", "nuclear", "utility", "datacenter"), + "defense": ("defense", "aerospace", "weapon", "contract"), + "biotech": ("drug", "trial", "fda", "biotech", "pharma"), + "energy_raw_materials": ("oil", "gas", "gold", "copper", "lithium", "uranium"), + "logistics_supply_chain": ("shipping", "freight", "container", "supply chain", "logistics"), +} + + +def normalize_text(text: str) -> str: + return re.sub(r"\s+", " ", text.strip().lower()) + + +def keyword_hit_count(text: str, keywords: tuple[str, ...]) -> int: + return sum(1 for keyword in keywords if _contains_keyword(text, keyword)) + + +def classify_event(title: str, body: str) -> str: + haystack = normalize_text(f"{title} {body}") + best_category = "macro" + best_hits = 0 + priority = { + "ai_semiconductor": 3, + "power_infrastructure": 3, + "defense": 3, + "energy_raw_materials": 3, + "earnings_guidance": 1, + "macro": 1, + } + for category, keywords in CATEGORY_RULES.items(): + hits = keyword_hit_count(haystack, keywords) + if hits > best_hits: + best_category = category + best_hits = hits + continue + if hits == best_hits and hits > 0: + if priority.get(category, 2) > priority.get(best_category, 2): + best_category = category + if best_hits > 0: + return best_category + return "macro" + + +def cluster_key(category: str, title: str) -> str: + normalized = normalize_text(title) + digest = hashlib.sha256(f"{category}:{normalized}".encode("utf-8")).hexdigest() + return digest[:24] + + +def novelty_score(prior_hits: int, hours_since_last_seen: float) -> float: + freshness = min(1.0, max(0.0, hours_since_last_seen / 48.0)) + repetition_penalty = 1.0 / math.sqrt(max(prior_hits, 1)) + return round(max(0.05, min(1.0, freshness * repetition_penalty)), 4) + + +def directness_score(title: str, body: str) -> float: + haystack = normalize_text(f"{title} {body}") + explicit_terms = sum( + 1 + for term in ("guidance", "official", "announced", "approval", "export control", "rate") + if term in haystack + ) + market_terms = sum(1 for term in ("nvda", "qqq", "smh", "soxx", "yield", "dxy") if term in haystack) + return round(min(1.0, 0.2 + explicit_terms * 0.15 + market_terms * 0.1), 4) + + +def surprise_score(title: str, body: str) -> float: + haystack = normalize_text(f"{title} {body}") + surprise_terms = sum( + 1 for term in ("unexpected", "beats", "misses", "shock", "surge", "cuts", "halt") if term in haystack + ) + return round(min(1.0, 0.15 + surprise_terms * 0.14), 4) + + +def persistence_score(category: str, body: str) -> float: + baseline = { + "macro": 0.55, + "monetary_policy": 0.78, + "tariff_trade": 0.82, + "geopolitics": 0.72, + "earnings_guidance": 0.48, + "regulation": 0.68, + "ai_semiconductor": 0.74, + "power_infrastructure": 0.76, + "defense": 0.66, + "biotech": 0.44, + "energy_raw_materials": 0.63, + "logistics_supply_chain": 0.6, + } + body_bonus = 0.08 if "multi-year" in normalize_text(body) or "structural" in normalize_text(body) else 0.0 + return round(min(1.0, baseline.get(category, 0.5) + body_bonus), 4) + + +def market_confirmation_score(moves: Iterable[float]) -> float: + magnitudes = [abs(move) for move in moves] + if not magnitudes: + return 0.0 + avg_move = sum(magnitudes) / len(magnitudes) + return round(min(1.0, avg_move / 2.5), 4) + + +def dedupe_key(source_name: str, title: str, url: str, published_at: datetime) -> str: + payload = "|".join( + [ + normalize_text(source_name), + normalize_text(title), + url.strip().lower(), + published_at.astimezone(UTC).isoformat(), + ] + ) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + +def _contains_keyword(text: str, keyword: str) -> bool: + normalized_keyword = normalize_text(keyword) + pattern = rf"(? None: + self.source_name = source_name + self.url = url + self.reliability_score = reliability_score + + async def fetch(self) -> list[RawDocument]: + async with httpx.AsyncClient(timeout=20.0, headers=HTTP_HEADERS, follow_redirects=True) as client: + response = await client.get(self.url) + response.raise_for_status() + parsed = feedparser.parse(response.text) + fetched_at = datetime.now(UTC) + documents: list[RawDocument] = [] + for entry in parsed.entries: + published_raw = entry.get("published") or entry.get("updated") + published_at = ( + parsedate_to_datetime(published_raw).astimezone(UTC) + if published_raw + else fetched_at + ) + summary_html = entry.get("summary", "") + summary_text = BeautifulSoup(summary_html, "html.parser").get_text(" ", strip=True) + url = entry.get("link") or self.url + title = entry.get("title", "Untitled") + documents.append( + RawDocument( + source_name=self.source_name, + url=url, + title=title, + published_at=published_at, + fetched_at=fetched_at, + raw_content=summary_text, + parsed_content={ + "summary": summary_text, + "tags": [tag["term"] for tag in entry.get("tags", []) if "term" in tag], + }, + dedupe_key=dedupe_key(self.source_name, title, url, published_at), + reliability_score=self.reliability_score, + ) + ) + return documents + + +class TreasuryPressReleaseConnector: + def __init__(self, source_name: str, url: str, reliability_score: float) -> None: + self.source_name = source_name + self.url = url + self.reliability_score = reliability_score + + async def fetch(self) -> list[RawDocument]: + async with httpx.AsyncClient(timeout=20.0, headers=HTTP_HEADERS, follow_redirects=True) as client: + response = await client.get(self.url) + response.raise_for_status() + fetched_at = datetime.now(UTC) + soup = BeautifulSoup(response.text, "html.parser") + documents: list[RawDocument] = [] + seen_urls: set[str] = set() + + for anchor in soup.find_all("a", href=True): + href = anchor.get("href", "").strip() + if "/news/press-releases/" not in href: + continue + if href.rstrip("/").endswith("/news/press-releases"): + continue + + full_url = urljoin(self.url, href) + if full_url in seen_urls: + continue + + title = anchor.get_text(" ", strip=True) + if len(title) < 12: + continue + + parent_text = anchor.parent.get_text(" ", strip=True) if anchor.parent else title + published_at = _extract_treasury_published_at(parent_text, fetched_at) + documents.append( + RawDocument( + source_name=self.source_name, + url=full_url, + title=title, + published_at=published_at, + fetched_at=fetched_at, + raw_content=parent_text, + parsed_content={"summary": parent_text, "tags": ["treasury", "press_release"]}, + dedupe_key=_stable_document_key(self.source_name, title, full_url), + reliability_score=self.reliability_score, + ) + ) + seen_urls.add(full_url) + + return documents + + +def default_connectors() -> list[FeedConnector | TreasuryPressReleaseConnector]: + return [ + FeedConnector( + source_name="Federal Reserve Press Releases", + url="https://www.federalreserve.gov/feeds/press_all.xml", + reliability_score=0.98, + ), + TreasuryPressReleaseConnector( + source_name="U.S. Treasury Press Releases", + url="https://home.treasury.gov/news/press-releases", + reliability_score=0.95, + ), + FeedConnector( + source_name="SEC Press Releases", + url="https://www.sec.gov/news/pressreleases.rss", + reliability_score=0.94, + ), + FeedConnector( + source_name="CNBC Top News", + url="https://www.cnbc.com/id/100003114/device/rss/rss.html", + reliability_score=0.82, + ), + FeedConnector( + source_name="MarketWatch Top Stories", + url="https://feeds.marketwatch.com/marketwatch/topstories/", + reliability_score=0.8, + ), + FeedConnector( + source_name="BEA News", + url="https://www.bea.gov/rss/news.xml", + reliability_score=0.9, + ), + ] + + +def _stable_document_key(source_name: str, title: str, url: str) -> str: + payload = "|".join([normalize_text(source_name), normalize_text(title), url.strip().lower()]) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + +def _extract_treasury_published_at(text: str, fallback: datetime) -> datetime: + match = re.search(r"([A-Z][a-z]+ \d{1,2}, \d{4})", text) + if not match: + return fallback + try: + return datetime.strptime(match.group(1), "%B %d, %Y").replace(tzinfo=UTC) + except ValueError: + return fallback diff --git a/services/api/app/services/jobs/queue.py b/services/api/app/services/jobs/queue.py new file mode 100644 index 0000000..66fb794 --- /dev/null +++ b/services/api/app/services/jobs/queue.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from uuid import uuid4 + +from arq import create_pool +from arq.connections import RedisSettings + +from app.core.config import Settings + + +async def enqueue_job(settings: Settings, job_name: str, *args) -> str: + pool = await create_pool(RedisSettings.from_dsn(settings.redis_url)) + job = await pool.enqueue_job(job_name, *args) + return job.job_id if job else str(uuid4()) + diff --git a/services/api/app/services/korea/translation_engine.py b/services/api/app/services/korea/translation_engine.py new file mode 100644 index 0000000..2b19409 --- /dev/null +++ b/services/api/app/services/korea/translation_engine.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass + +from sqlalchemy import Select, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.entities import KoreanSecurity, ThemeMap + + +@dataclass(slots=True) +class RankedStock: + ticker: str + name: str + score: float + rationale: str + + +@dataclass(slots=True) +class RankedTheme: + name: str + confidence: float + theme_fit_score: float + tradability_score: float + gap_fade_risk: float + priced_in_risk: float + rationale: str + invalidation_condition: str + leaders: list[RankedStock] + second_tier: list[RankedStock] + + +class KoreaTranslationEngine: + async def translate( + self, + session: AsyncSession, + category: str, + confirmation_score: float, + persistence_score: float, + ) -> list[RankedTheme]: + result = await session.execute( + select(ThemeMap).where(ThemeMap.us_category == category).order_by(ThemeMap.mapping_weight.desc()) + ) + theme_maps = result.scalars().all() + themes: list[RankedTheme] = [] + for mapping in theme_maps: + leaders = await self._stocks_for_tags(session, mapping.beneficiary_tags, limit=2) + second_tier = await self._stocks_for_tags(session, mapping.beneficiary_tags, limit=4, offset=2) + tradability_score = min(1.0, 0.45 + len(leaders) * 0.1 + confirmation_score * 0.25) + gap_fade_risk = max(0.08, min(0.95, 0.65 - confirmation_score * 0.25)) + priced_in_risk = max(0.05, min(0.95, 0.2 + (1 - persistence_score) * 0.4)) + confidence = min(1.0, mapping.mapping_weight * 0.45 + confirmation_score * 0.35 + persistence_score * 0.2) + themes.append( + RankedTheme( + name=mapping.korea_theme, + confidence=round(confidence, 4), + theme_fit_score=round(mapping.mapping_weight, 4), + tradability_score=round(tradability_score, 4), + gap_fade_risk=round(gap_fade_risk, 4), + priced_in_risk=round(priced_in_risk, 4), + rationale=f"{category} 이벤트가 {mapping.korea_theme} 테마와 직접 연결되고 미국 시장 반응이 동반 확인되었습니다.", + invalidation_condition="관련 미국 지수 반전, 한국 동종업종 약세 확대, 시가 과열 후 거래대금 급감", + leaders=leaders, + second_tier=second_tier, + ) + ) + return themes + + async def _stocks_for_tags( + self, + session: AsyncSession, + tags: list[str], + limit: int, + offset: int = 0, + ) -> list[RankedStock]: + if not tags: + return [] + statement: Select[tuple[KoreanSecurity]] = select(KoreanSecurity).limit(200) + result = await session.execute(statement) + stocks = result.scalars().all() + ranked: list[RankedStock] = [] + for stock in stocks: + overlap = len(set(stock.theme_tags).intersection(tags)) + score = min(1.0, 0.4 + overlap * 0.2) + if overlap == 0: + continue + ranked.append( + RankedStock( + ticker=stock.ticker, + name=stock.name, + score=round(score, 4), + rationale=f"보유 태그 {', '.join(stock.theme_tags[:3])}가 이벤트 수혜 축과 겹칩니다.", + ) + ) + ranked.sort(key=lambda item: item.score, reverse=True) + return ranked[offset : offset + limit] + + +def merge_ranked_themes(themes: Iterable[RankedTheme]) -> list[RankedTheme]: + merged: dict[str, dict[str, object]] = {} + + for theme in themes: + entry = merged.setdefault( + theme.name, + { + "confidence": [], + "theme_fit_score": [], + "tradability_score": [], + "gap_fade_risk": [], + "priced_in_risk": [], + "rationales": [], + "invalidations": [], + "leaders": {}, + "second_tier": {}, + }, + ) + + entry["confidence"].append(theme.confidence) + entry["theme_fit_score"].append(theme.theme_fit_score) + entry["tradability_score"].append(theme.tradability_score) + entry["gap_fade_risk"].append(theme.gap_fade_risk) + entry["priced_in_risk"].append(theme.priced_in_risk) + + rationales: list[str] = entry["rationales"] # type: ignore[assignment] + if theme.rationale not in rationales: + rationales.append(theme.rationale) + + invalidations: list[str] = entry["invalidations"] # type: ignore[assignment] + if theme.invalidation_condition not in invalidations: + invalidations.append(theme.invalidation_condition) + + _merge_ranked_stocks(entry["leaders"], theme.leaders) + _merge_ranked_stocks(entry["second_tier"], theme.second_tier) + + results: list[RankedTheme] = [] + for name, entry in merged.items(): + leaders_map: dict[str, RankedStock] = entry["leaders"] # type: ignore[assignment] + second_tier_map: dict[str, RankedStock] = entry["second_tier"] # type: ignore[assignment] + + for ticker in leaders_map: + second_tier_map.pop(ticker, None) + + confidences: list[float] = entry["confidence"] # type: ignore[assignment] + gap_fade_values: list[float] = entry["gap_fade_risk"] # type: ignore[assignment] + confidence = min(1.0, max(confidences) + 0.03 * max(0, len(confidences) - 1)) + leaders = sorted(leaders_map.values(), key=lambda item: item.score, reverse=True)[:2] + second_tier = sorted(second_tier_map.values(), key=lambda item: item.score, reverse=True)[:4] + + results.append( + RankedTheme( + name=name, + confidence=round(confidence, 4), + theme_fit_score=round(max(entry["theme_fit_score"]), 4), # type: ignore[arg-type] + tradability_score=round(max(entry["tradability_score"]), 4), # type: ignore[arg-type] + gap_fade_risk=round(sum(gap_fade_values) / len(gap_fade_values), 4), + priced_in_risk=round(max(entry["priced_in_risk"]), 4), # type: ignore[arg-type] + rationale=" / ".join((entry["rationales"])[:2]), # type: ignore[index] + invalidation_condition=" / ".join((entry["invalidations"])[:2]), # type: ignore[index] + leaders=leaders, + second_tier=second_tier, + ) + ) + + results.sort(key=lambda item: (item.confidence, item.tradability_score, item.theme_fit_score), reverse=True) + return results + + +def _merge_ranked_stocks(target: object, stocks: Iterable[RankedStock]) -> None: + stock_map = target if isinstance(target, dict) else {} + for stock in stocks: + current = stock_map.get(stock.ticker) + if current is None or stock.score > current.score: + stock_map[stock.ticker] = stock diff --git a/services/api/app/services/market/reaction_engine.py b/services/api/app/services/market/reaction_engine.py new file mode 100644 index 0000000..0cb9bb3 --- /dev/null +++ b/services/api/app/services/market/reaction_engine.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta + +import pandas as pd +import yfinance as yf + +from app.services.events.scoring import market_confirmation_score + + +@dataclass(slots=True) +class ReactionMetric: + symbol: str + label: str + asset_class: str + move_pct: float + volume_ratio: float + window_label: str + after_hours: bool + + +SYMBOL_MAP: dict[str, list[tuple[str, str, str]]] = { + "macro": [("^GSPC", "S&P 500", "index"), ("DX-Y.NYB", "DXY", "macro_asset")], + "monetary_policy": [("^TNX", "미국 10년물", "yield"), ("TLT", "장기채 ETF", "etf")], + "ai_semiconductor": [("SOXX", "SOXX", "sector_etf"), ("NVDA", "NVIDIA", "equity")], + "power_infrastructure": [("XLU", "유틸리티 ETF", "sector_etf"), ("VST", "Vistra", "equity")], + "defense": [("ITA", "방산 ETF", "sector_etf"), ("LMT", "Lockheed Martin", "equity")], + "energy_raw_materials": [("XLE", "에너지 ETF", "sector_etf"), ("CL=F", "WTI", "macro_asset")], +} + + +class YahooMarketReactionEngine: + async def measure(self, category: str, as_of: datetime) -> list[ReactionMetric]: + symbols = SYMBOL_MAP.get(category, SYMBOL_MAP["macro"]) + results: list[ReactionMetric] = [] + for symbol, label, asset_class in symbols: + data = yf.download( + tickers=symbol, + start=(as_of - timedelta(days=5)).date().isoformat(), + end=(as_of + timedelta(days=1)).date().isoformat(), + progress=False, + auto_adjust=False, + interval="1d", + ) + if data.empty: + continue + closes = _numeric_series(data, "Close") + if closes.empty: + continue + recent_closes = closes.tail(2) + previous_close = float(recent_closes.iloc[0]) + current_close = float(recent_closes.iloc[-1]) + + volumes = _numeric_series(data, "Volume") + current_volume = float(volumes.iloc[-1]) if not volumes.empty else 0.0 + volume_avg = float(volumes.tail(5).mean()) if not volumes.empty else 1.0 + move_pct = ((current_close - previous_close) / previous_close) * 100 if previous_close else 0.0 + volume_ratio = current_volume / volume_avg if volume_avg else 1.0 + results.append( + ReactionMetric( + symbol=symbol, + label=label, + asset_class=asset_class, + move_pct=round(move_pct, 4), + volume_ratio=round(volume_ratio, 4), + window_label="1d_close_to_close", + after_hours=False, + ) + ) + return results + + async def confirmation_score(self, category: str, as_of: datetime) -> float: + metrics = await self.measure(category, as_of) + return market_confirmation_score(metric.move_pct for metric in metrics) + + +def _numeric_series(data: pd.DataFrame, field: str) -> pd.Series: + if field not in data: + return pd.Series(dtype="float64") + + column = data[field] + if isinstance(column, pd.DataFrame): + if column.empty: + return pd.Series(dtype="float64") + # yfinance often returns a single-ticker DataFrame with MultiIndex columns. + column = column.iloc[:, 0] + + numeric = pd.to_numeric(column, errors="coerce").dropna() + if isinstance(numeric, pd.Series): + return numeric + return pd.Series(numeric).dropna() + + +def summarize_reactions(metrics: list[ReactionMetric]) -> dict[str, object]: + total_move = round(sum(metric.move_pct for metric in metrics), 4) + return { + "totalMovePct": total_move, + "items": [ + { + "label": metric.label, + "movePct": metric.move_pct, + "commentary": f"{metric.window_label} 구간에서 {metric.asset_class} 반응 확인", + } + for metric in metrics + ], + } diff --git a/services/api/app/services/replay/weekly_replay.py b/services/api/app/services/replay/weekly_replay.py new file mode 100644 index 0000000..6a50ddc --- /dev/null +++ b/services/api/app/services/replay/weekly_replay.py @@ -0,0 +1,453 @@ +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass +from datetime import UTC, date, datetime, time, timedelta +from email.utils import parsedate_to_datetime +from statistics import mean +from urllib.parse import quote +from zoneinfo import ZoneInfo + +import feedparser +import httpx +import pandas as pd +import yfinance as yf +from bs4 import BeautifulSoup +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.entities import IngestedDocument, KoreanSecurity, PromptVersion, ThemeMap +from app.services.events.scoring import ( + CATEGORY_RULES, + classify_event, + cluster_key, + directness_score, + keyword_hit_count, + novelty_score, + normalize_text, + persistence_score, + surprise_score, +) +from app.services.korea.translation_engine import KoreaTranslationEngine, RankedTheme, merge_ranked_themes +from app.services.market.reaction_engine import YahooMarketReactionEngine, _numeric_series + +SEOUL = ZoneInfo("Asia/Seoul") +HTTP_HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} +GOOGLE_NEWS_QUERIES = { + "ai_semiconductor": "semiconductor OR Nvidia OR AI chip OR HBM OR memory", + "power_infrastructure": "\"power grid\" OR transformer OR utility OR datacenter power", + "defense": "defense OR aerospace OR missile OR military contract", + "energy_raw_materials": "oil OR natural gas OR uranium OR copper OR Iran energy", + "logistics_supply_chain": "shipping OR freight OR logistics OR supply chain", +} + + +@dataclass(slots=True) +class ReplayDocument: + source_name: str + url: str + title: str + published_at: datetime + raw_content: str + parsed_content: dict[str, object] + + +@dataclass(slots=True) +class ReplayCluster: + category: str + title: str + summary: str + novelty_score: float + directness_score: float + surprise_score: float + persistence_score: float + first_seen_at: datetime + last_seen_at: datetime + source_count: int + keywords: list[str] + source_name: str + url: str + + +def compute_stock_move(prev_close: float, day_open: float, day_close: float) -> dict[str, float]: + if prev_close == 0: + return {"openGapPct": 0.0, "closeReturnPct": 0.0, "intradayMovePct": 0.0} + open_gap = ((day_open - prev_close) / prev_close) * 100 + close_return = ((day_close - prev_close) / prev_close) * 100 + intraday = ((day_close - day_open) / day_open) * 100 if day_open else 0.0 + return { + "openGapPct": round(open_gap, 2), + "closeReturnPct": round(close_return, 2), + "intradayMovePct": round(intraday, 2), + } + + +def classify_theme_outcome(avg_leader_close_return_pct: float) -> str: + if avg_leader_close_return_pct >= 3.0: + return "강한 적중" + if avg_leader_close_return_pct >= 1.0: + return "양호" + if avg_leader_close_return_pct > -1.0: + return "혼조" + return "부진" + + +class WeeklyReplayService: + def __init__(self) -> None: + self.reaction_engine = YahooMarketReactionEngine() + self.translation_engine = KoreaTranslationEngine() + self._price_cache: dict[str, pd.DataFrame] = {} + + async def build(self, session: AsyncSession, days: int = 7) -> dict[str, object]: + prompt_version = await session.scalar( + select(PromptVersion.version).order_by(PromptVersion.created_at.desc()).limit(1) + ) or "baseline-v1" + security_map = await self._security_market_map(session) + trading_days = self._recent_korea_trading_days(days) + + day_items: list[dict[str, object]] = [] + for market_date in trading_days: + as_of = datetime.combine(market_date, time(8, 30), tzinfo=SEOUL).astimezone(UTC) + prediction = await self._predict_for_as_of(session, as_of) + actual_market = self._build_actual_market(market_date) + predicted_themes = self._attach_actual_results( + market_date=market_date, + themes=prediction["themes"], + security_map=security_map, + ) + day_items.append( + { + "marketDateLabel": market_date.strftime("%Y-%m-%d"), + "asOfLabel": as_of.astimezone(SEOUL).strftime("%Y-%m-%d %H:%M KST"), + "promptVersion": prompt_version, + "evidencePackHash": self._evidence_pack_hash(prediction["evidenceItems"]), + "marketContext": actual_market, + "predictedThemes": predicted_themes, + "evidenceItems": prediction["evidenceItems"], + "summary": self._day_summary(predicted_themes, actual_market), + } + ) + + return { + "windowLabel": f"최근 {len(day_items)}거래일", + "promptVersion": prompt_version, + "aggregate": self._aggregate(day_items), + "days": day_items, + } + + async def _predict_for_as_of(self, session: AsyncSession, as_of: datetime) -> dict[str, object]: + mapped_categories = ( + await session.execute(select(ThemeMap.us_category).distinct()) + ).scalars().all() + documents = await self._load_documents(session, as_of) + fallback_documents = await self._fetch_fallback_news(as_of, mapped_categories) + documents.extend(fallback_documents) + + clusters = self._build_clusters(documents, mapped_categories) + themes: list[RankedTheme] = [] + evidence_items: list[dict[str, object]] = [] + + for cluster in clusters[:12]: + confirmation_score = await self.reaction_engine.confirmation_score(cluster.category, as_of) + evidence_items.append( + { + "title": cluster.title, + "categoryLabel": cluster.category, + "summary": cluster.summary[:220], + "publishedAtLabel": cluster.last_seen_at.astimezone(SEOUL).strftime("%Y-%m-%d %H:%M KST"), + "marketConfirmationScore": round(confirmation_score, 4), + "sourceName": cluster.source_name, + "url": cluster.url, + } + ) + themes.extend( + await self.translation_engine.translate( + session, + category=cluster.category, + confirmation_score=confirmation_score, + persistence_score=cluster.persistence_score, + ) + ) + + merged_themes = merge_ranked_themes(themes)[:3] + return {"themes": merged_themes, "evidenceItems": evidence_items[:5]} + + async def _load_documents(self, session: AsyncSession, as_of: datetime) -> list[ReplayDocument]: + docs = ( + await session.execute( + select(IngestedDocument) + .where(IngestedDocument.published_at >= as_of - timedelta(days=10)) + .where(IngestedDocument.published_at <= as_of) + .order_by(IngestedDocument.published_at.desc()) + .limit(300) + ) + ).scalars().all() + return [ + ReplayDocument( + source_name=doc.source_name, + url=doc.url, + title=doc.title, + published_at=doc.published_at, + raw_content=doc.raw_content, + parsed_content=doc.parsed_content, + ) + for doc in docs + ] + + async def _fetch_fallback_news( + self, + as_of: datetime, + mapped_categories: list[str], + ) -> list[ReplayDocument]: + start_date = as_of.astimezone(SEOUL).date() - timedelta(days=2) + end_date = as_of.astimezone(SEOUL).date() + timedelta(days=1) + documents: list[ReplayDocument] = [] + seen_titles: set[str] = set() + + async with httpx.AsyncClient(timeout=20.0, headers=HTTP_HEADERS, follow_redirects=True) as client: + for category, query in GOOGLE_NEWS_QUERIES.items(): + if category not in mapped_categories: + continue + search_query = quote( + f"{query} after:{start_date.isoformat()} before:{end_date.isoformat()} site:reuters.com OR site:cnbc.com OR site:marketwatch.com" + ) + url = f"https://news.google.com/rss/search?q={search_query}&hl=en-US&gl=US&ceid=US:en" + response = await client.get(url) + response.raise_for_status() + parsed = feedparser.parse(response.text) + + for entry in parsed.entries[:15]: + published_raw = entry.get("published") or entry.get("updated") + if not published_raw: + continue + published_at = parsedate_to_datetime(published_raw).astimezone(UTC) + if not (as_of - timedelta(days=3) <= published_at <= as_of): + continue + + title = entry.get("title", "Untitled") + title_key = title.strip().lower() + if title_key in seen_titles: + continue + + summary_html = entry.get("summary", "") + summary_text = BeautifulSoup(summary_html, "html.parser").get_text(" ", strip=True) + detected_category = classify_event(title, summary_text) + if detected_category in mapped_categories: + final_category = detected_category + else: + haystack = normalize_text(f"{title} {summary_text}") + if keyword_hit_count(haystack, CATEGORY_RULES[category]) == 0: + continue + final_category = category + source = entry.get("source", {}) + source_name = source.get("title") or "Google News" + documents.append( + ReplayDocument( + source_name=source_name, + url=entry.get("link") or source.get("href") or url, + title=title, + published_at=published_at, + raw_content=summary_text, + parsed_content={"tags": [final_category]}, + ) + ) + seen_titles.add(title_key) + return documents + + def _build_clusters( + self, + documents: list[ReplayDocument], + mapped_categories: list[str], + ) -> list[ReplayCluster]: + clusters_by_key: dict[str, ReplayCluster] = {} + + for doc in documents: + category = classify_event(doc.title, doc.raw_content) + if category not in mapped_categories: + hinted_tags = doc.parsed_content.get("tags", []) + category = next((tag for tag in hinted_tags if tag in mapped_categories), category) + if category not in mapped_categories: + continue + + key = cluster_key(category, doc.title) + cluster = clusters_by_key.get(key) + if cluster is None: + clusters_by_key[key] = ReplayCluster( + category=category, + title=doc.title, + summary=doc.raw_content[:600], + novelty_score=novelty_score(1, 48.0), + directness_score=directness_score(doc.title, doc.raw_content), + surprise_score=surprise_score(doc.title, doc.raw_content), + persistence_score=persistence_score(category, doc.raw_content), + first_seen_at=doc.published_at, + last_seen_at=doc.published_at, + source_count=1, + keywords=list(doc.parsed_content.get("tags", [])), + source_name=doc.source_name, + url=doc.url, + ) + continue + + cluster.first_seen_at = min(cluster.first_seen_at, doc.published_at) + cluster.last_seen_at = max(cluster.last_seen_at, doc.published_at) + cluster.source_count += 1 + cluster.keywords = sorted(set(cluster.keywords).union(doc.parsed_content.get("tags", []))) + + return sorted( + clusters_by_key.values(), + key=lambda item: (item.last_seen_at, item.directness_score, item.source_count), + reverse=True, + ) + + async def _security_market_map(self, session: AsyncSession) -> dict[str, str]: + securities = (await session.execute(select(KoreanSecurity))).scalars().all() + return {security.ticker: security.market for security in securities} + + def _recent_korea_trading_days(self, days: int) -> list[date]: + end = date.today() + start = end - timedelta(days=30) + data = self._history_for_symbol("^KS11", start, end + timedelta(days=1)) + trading_days = list(data.index) + return trading_days[-days:] + + def _build_actual_market(self, market_date: date) -> dict[str, object]: + kospi = self._index_move("^KS11", market_date, "KOSPI") + kosdaq = self._index_move("^KQ11", market_date, "KOSDAQ") + return { + "marketDateLabel": market_date.strftime("%Y-%m-%d"), + "kospiCloseReturnPct": kospi["closeReturnPct"], + "kosdaqCloseReturnPct": kosdaq["closeReturnPct"], + "summary": f"KOSPI {kospi['closeReturnPct']:+.2f}% / KOSDAQ {kosdaq['closeReturnPct']:+.2f}%", + } + + def _attach_actual_results( + self, + market_date: date, + themes: list[RankedTheme], + security_map: dict[str, str], + ) -> list[dict[str, object]]: + theme_items: list[dict[str, object]] = [] + for theme in themes: + stock_results: list[dict[str, object]] = [] + for tier, stocks in (("leader", theme.leaders), ("second_tier", theme.second_tier)): + for stock in stocks: + market = security_map.get(stock.ticker, "KOSPI") + symbol = f"{stock.ticker}.KS" if market == "KOSPI" else f"{stock.ticker}.KQ" + move = self._stock_move(symbol, market_date) + if move is None: + continue + stock_results.append( + { + "ticker": stock.ticker, + "name": stock.name, + "tier": tier, + "openGapPct": move["openGapPct"], + "closeReturnPct": move["closeReturnPct"], + "intradayMovePct": move["intradayMovePct"], + } + ) + + leader_returns = [item["closeReturnPct"] for item in stock_results if item["tier"] == "leader"] + avg_leader_return = round(mean(leader_returns), 2) if leader_returns else 0.0 + best_stock = max(stock_results, key=lambda item: item["closeReturnPct"], default=None) + theme_items.append( + { + "name": theme.name, + "confidence": theme.confidence, + "rationale": theme.rationale, + "invalidationCondition": theme.invalidation_condition, + "leaders": [{"ticker": stock.ticker, "name": stock.name} for stock in theme.leaders], + "secondTier": [{"ticker": stock.ticker, "name": stock.name} for stock in theme.second_tier], + "actualOutcome": { + "avgLeaderCloseReturnPct": avg_leader_return, + "outcomeLabel": classify_theme_outcome(avg_leader_return), + "bestStockLabel": None if best_stock is None else f"{best_stock['ticker']} {best_stock['name']}", + "bestCloseReturnPct": None if best_stock is None else best_stock["closeReturnPct"], + "stockResults": stock_results, + }, + } + ) + return theme_items + + def _day_summary(self, predicted_themes: list[dict[str, object]], market_context: dict[str, object]) -> str: + if not predicted_themes: + return f"예측 테마가 없었습니다. 실제 장은 {market_context['summary']}로 마감했습니다." + top_theme = predicted_themes[0] + actual = top_theme["actualOutcome"] + return ( + f"{top_theme['name']} 예측의 주도주 평균 수익률은 " + f"{actual['avgLeaderCloseReturnPct']:+.2f}%였고 결과 평가는 {actual['outcomeLabel']}입니다." + ) + + def _aggregate(self, day_items: list[dict[str, object]]) -> dict[str, object]: + leader_returns: list[float] = [] + hit_days = 0 + for item in day_items: + themes = item["predictedThemes"] + if not themes: + continue + top_actual = themes[0]["actualOutcome"] + leader_returns.append(top_actual["avgLeaderCloseReturnPct"]) + if top_actual["outcomeLabel"] in {"강한 적중", "양호"}: + hit_days += 1 + avg_return = round(mean(leader_returns), 2) if leader_returns else 0.0 + return { + "daysAnalyzed": len(day_items), + "positiveHitDays": hit_days, + "avgLeaderCloseReturnPct": avg_return, + } + + def _index_move(self, symbol: str, market_date: date, label: str) -> dict[str, float | str]: + move = self._stock_move(symbol, market_date) + if move is None: + return {"label": label, "closeReturnPct": 0.0} + return {"label": label, "closeReturnPct": move["closeReturnPct"]} + + def _stock_move(self, symbol: str, market_date: date) -> dict[str, float] | None: + history = self._history_for_symbol(symbol, market_date - timedelta(days=10), market_date + timedelta(days=1)) + if history.empty or market_date not in history.index: + return None + dates = list(history.index) + current_index = dates.index(market_date) + if current_index == 0: + return None + prev_date = dates[current_index - 1] + prev_close = float(history.loc[prev_date, "Close"]) + day_open = float(history.loc[market_date, "Open"]) + day_close = float(history.loc[market_date, "Close"]) + return compute_stock_move(prev_close, day_open, day_close) + + def _history_for_symbol(self, symbol: str, start: date, end: date) -> pd.DataFrame: + cache_key = f"{symbol}:{start.isoformat()}:{end.isoformat()}" + cached = self._price_cache.get(cache_key) + if cached is not None: + return cached + + data = yf.download( + tickers=symbol, + start=start.isoformat(), + end=end.isoformat(), + progress=False, + auto_adjust=False, + interval="1d", + ) + if data.empty: + frame = pd.DataFrame(columns=["Open", "Close"]) + self._price_cache[cache_key] = frame + return frame + + frame = pd.DataFrame( + { + "Open": _numeric_series(data, "Open"), + "Close": _numeric_series(data, "Close"), + } + ).dropna() + frame.index = [pd.Timestamp(value).date() for value in frame.index] + self._price_cache[cache_key] = frame + return frame + + def _evidence_pack_hash(self, evidence_items: list[dict[str, object]]) -> str: + encoded = json.dumps(evidence_items, ensure_ascii=False, sort_keys=True).encode("utf-8") + return hashlib.sha256(encoded).hexdigest() diff --git a/services/api/app/workers/tasks.py b/services/api/app/workers/tasks.py new file mode 100644 index 0000000..81bcddb --- /dev/null +++ b/services/api/app/workers/tasks.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +import hashlib +import json +import uuid +from datetime import UTC, datetime + +from sqlalchemy import delete, select + +from app.core.config import get_settings +from app.core.logging import configure_logging, get_logger +from app.db.session import SessionLocal +from app.models.entities import ( + EventCluster, + IngestedDocument, + JobRun, + MarketReaction, + PredictionRun, + PromptVersion, + StockCandidate, + ThemePrediction, + ThemeMap, +) +from app.services.events.scoring import ( + classify_event, + cluster_key, + directness_score, + novelty_score, + persistence_score, + surprise_score, +) +from app.services.ingestion.connectors import default_connectors +from app.services.korea.translation_engine import KoreaTranslationEngine, RankedTheme, merge_ranked_themes +from app.services.market.reaction_engine import YahooMarketReactionEngine + +settings = get_settings() +configure_logging(settings.log_level) +logger = get_logger(__name__) + + +async def run_ingestion_job(ctx: dict, run_id: str) -> dict[str, int]: + await _mark_job_running(run_id) + try: + result = await _run_ingestion_core() + await _mark_job_completed(run_id, result) + logger.info("ingestion_completed", inserted=result["inserted"]) + return result + except Exception as exc: + await _mark_job_failed(run_id, str(exc)) + raise + + +async def run_analysis_job(ctx: dict, run_id: str, as_of_iso: str | None = None) -> dict[str, int]: + await _mark_job_running(run_id) + try: + result = await _run_analysis_core(as_of_iso=as_of_iso) + await _mark_job_completed(run_id, result) + logger.info( + "analysis_completed", + created_clusters=result["created_clusters"], + created_themes=result["created_themes"], + ) + return result + except Exception as exc: + await _mark_job_failed(run_id, str(exc)) + raise + + +async def run_full_pipeline_job( + ctx: dict, + run_id: str, + as_of_iso: str | None = None, +) -> dict[str, dict[str, int]]: + await _mark_job_running(run_id) + try: + ingestion_result = await _run_ingestion_core() + analysis_result = await _run_analysis_core(as_of_iso=as_of_iso) + result = { + "ingestion": ingestion_result, + "analysis": analysis_result, + } + await _mark_job_completed(run_id, result) + logger.info( + "full_pipeline_completed", + inserted=ingestion_result["inserted"], + created_clusters=analysis_result["created_clusters"], + created_themes=analysis_result["created_themes"], + ) + return result + except Exception as exc: + await _mark_job_failed(run_id, str(exc)) + raise + + +async def _run_ingestion_core() -> dict[str, int]: + connectors = default_connectors() + inserted = 0 + source_failures: list[dict[str, str]] = [] + async with SessionLocal() as session: + for connector in connectors: + try: + for document in await connector.fetch(): + existing = await session.execute( + select(IngestedDocument).where(IngestedDocument.dedupe_key == document.dedupe_key) + ) + if existing.scalar_one_or_none(): + continue + session.add( + IngestedDocument( + source_name=document.source_name, + external_id=None, + url=document.url, + title=document.title, + dedupe_key=document.dedupe_key, + fetched_at=document.fetched_at, + published_at=document.published_at, + raw_content=document.raw_content, + parsed_content=document.parsed_content, + reliability_score=document.reliability_score, + ) + ) + inserted += 1 + except Exception as exc: + logger.warning( + "connector_fetch_failed", + source_name=getattr(connector, "source_name", "unknown"), + error=str(exc), + ) + source_failures.append( + { + "source_name": getattr(connector, "source_name", "unknown"), + "error": str(exc), + } + ) + await session.commit() + result = {"inserted": inserted} + if source_failures: + result["source_failures"] = len(source_failures) + return result + + +async def _run_analysis_core(as_of_iso: str | None = None) -> dict[str, int]: + as_of = datetime.fromisoformat(as_of_iso) if as_of_iso else datetime.now(UTC) + reaction_engine = YahooMarketReactionEngine() + translation_engine = KoreaTranslationEngine() + created_clusters = 0 + + async with SessionLocal() as session: + unclustered_docs = ( + await session.execute( + select(IngestedDocument) + .where(IngestedDocument.event_cluster_id.is_(None)) + .order_by(IngestedDocument.published_at.desc()) + .limit(100) + ) + ).scalars().all() + + for doc in unclustered_docs: + category = classify_event(doc.title, doc.raw_content) + key = cluster_key(category, doc.title) + cluster = ( + await session.execute(select(EventCluster).where(EventCluster.cluster_key == key)) + ).scalar_one_or_none() + + if cluster is None: + cluster = EventCluster( + category=category, + title=doc.title, + summary=doc.raw_content[:600], + novelty_score=novelty_score(1, 48.0), + directness_score=directness_score(doc.title, doc.raw_content), + surprise_score=surprise_score(doc.title, doc.raw_content), + persistence_score=persistence_score(category, doc.raw_content), + market_confirmation_score=0.0, + market_confirmed=False, + keywords=doc.parsed_content.get("tags", []), + cluster_key=key, + first_seen_at=doc.published_at, + last_seen_at=doc.published_at, + source_count=1, + ) + session.add(cluster) + await session.flush() + created_clusters += 1 + else: + cluster.first_seen_at = min(cluster.first_seen_at, doc.published_at) + cluster.last_seen_at = max(cluster.last_seen_at, doc.published_at) + cluster.source_count += 1 + cluster.keywords = sorted(set(cluster.keywords).union(doc.parsed_content.get("tags", []))) + + doc.event_cluster_id = cluster.id + + await session.flush() + + mapped_categories = ( + await session.execute(select(ThemeMap.us_category).distinct()) + ).scalars().all() + clusters = ( + await session.execute( + select(EventCluster) + .where(EventCluster.category.in_(mapped_categories)) + .order_by(EventCluster.last_seen_at.desc()) + .limit(12) + ) + ).scalars().all() + + translated_themes: list[RankedTheme] = [] + for cluster in clusters: + reactions = await reaction_engine.measure(cluster.category, as_of) + cluster.market_confirmation_score = await reaction_engine.confirmation_score(cluster.category, as_of) + cluster.market_confirmed = cluster.market_confirmation_score >= 0.35 + + await session.execute(delete(MarketReaction).where(MarketReaction.cluster_id == cluster.id)) + for metric in reactions: + session.add( + MarketReaction( + cluster_id=cluster.id, + asset_class=metric.asset_class, + symbol=metric.symbol, + window_label=metric.window_label, + move_pct=metric.move_pct, + volume_ratio=metric.volume_ratio, + after_hours=metric.after_hours, + raw_metrics={}, + ) + ) + + translated_themes.extend( + await translation_engine.translate( + session, + category=cluster.category, + confirmation_score=cluster.market_confirmation_score, + persistence_score=cluster.persistence_score, + ) + ) + + merged_themes = merge_ranked_themes(translated_themes) + prompt_version = await session.scalar( + select(PromptVersion.version).order_by(PromptVersion.created_at.desc()).limit(1) + ) or settings.prompt_version + + prediction_run = PredictionRun( + run_type="daily_analysis", + market_date=as_of, + evidence_pack_hash=_build_evidence_pack_hash(clusters), + prompt_version=prompt_version, + final_output={}, + final_confidence=merged_themes[0].confidence if merged_themes else 0.0, + unresolved_uncertainty=[], + status="completed", + ) + session.add(prediction_run) + await session.flush() + + for theme in merged_themes: + theme_row = ThemePrediction( + prediction_run_id=prediction_run.id, + name=theme.name, + rationale=theme.rationale, + confidence=theme.confidence, + theme_fit_score=theme.theme_fit_score, + tradability_score=theme.tradability_score, + gap_fade_risk=theme.gap_fade_risk, + priced_in_risk=theme.priced_in_risk, + invalidation_condition=theme.invalidation_condition, + open_note="시가 거래대금 상위 유지 여부 확인", + fifteen_minute_note="체결 강도 유지 여부 확인", + close_note="후속 미국 이벤트 연결 여부 확인", + metadata_json={}, + ) + session.add(theme_row) + await session.flush() + + for leader in theme.leaders: + session.add( + StockCandidate( + theme_prediction_id=theme_row.id, + ticker=leader.ticker, + name=leader.name, + tier="leader", + score=leader.score, + priced_in_risk=theme.priced_in_risk, + rationale=leader.rationale, + ) + ) + + for follower in theme.second_tier: + session.add( + StockCandidate( + theme_prediction_id=theme_row.id, + ticker=follower.ticker, + name=follower.name, + tier="second_tier", + score=follower.score, + priced_in_risk=theme.priced_in_risk, + rationale=follower.rationale, + ) + ) + + prediction_run.final_output = { + "themes": [ + { + "name": theme.name, + "confidence": theme.confidence, + "leaders": [leader.ticker for leader in theme.leaders], + "second_tier": [stock.ticker for stock in theme.second_tier], + } + for theme in merged_themes + ] + } + + await session.commit() + + return {"created_clusters": created_clusters, "created_themes": len(merged_themes)} + + +def _build_evidence_pack_hash(clusters: list[EventCluster]) -> str: + payload = [ + { + "id": str(cluster.id), + "category": cluster.category, + "cluster_key": cluster.cluster_key, + "last_seen_at": cluster.last_seen_at.isoformat(), + } + for cluster in clusters + ] + encoded = json.dumps(payload, sort_keys=True).encode("utf-8") + return hashlib.sha256(encoded).hexdigest() + + +async def _mark_job_running(run_id: str) -> None: + async with SessionLocal() as session: + run = await session.get(JobRun, uuid.UUID(run_id)) + if run is None: + return + run.status = "running" + run.started_at = datetime.now(UTC) + run.error_message = None + await session.commit() + + +async def _mark_job_completed(run_id: str, result: dict) -> None: + async with SessionLocal() as session: + run = await session.get(JobRun, uuid.UUID(run_id)) + if run is None: + return + run.status = "completed" + run.finished_at = datetime.now(UTC) + run.result_json = result + run.error_message = None + await session.commit() + + +async def _mark_job_failed(run_id: str, error_message: str) -> None: + async with SessionLocal() as session: + run = await session.get(JobRun, uuid.UUID(run_id)) + if run is None: + return + run.status = "failed" + run.finished_at = datetime.now(UTC) + run.error_message = error_message + await session.commit() + + +class WorkerSettings: + functions = [run_ingestion_job, run_analysis_job, run_full_pipeline_job] diff --git a/services/api/tests/fixtures/evidence_pack.json b/services/api/tests/fixtures/evidence_pack.json new file mode 100644 index 0000000..79531fe --- /dev/null +++ b/services/api/tests/fixtures/evidence_pack.json @@ -0,0 +1,15 @@ +{ + "timestamp": "2026-04-09T21:30:00Z", + "clusters": [ + { + "category": "ai_semiconductor", + "title": "NVIDIA raises AI infrastructure commentary", + "market_confirmation_score": 0.71 + } + ], + "macro_assets": { + "dxy": 0.4, + "us10y": -0.1, + "wti": 1.2 + } +} diff --git a/services/api/tests/test_evaluation.py b/services/api/tests/test_evaluation.py new file mode 100644 index 0000000..69342a0 --- /dev/null +++ b/services/api/tests/test_evaluation.py @@ -0,0 +1,15 @@ +from app.services.evaluation.metrics import EvaluationSnapshot, promotable + + +def test_promotable_requires_validation_improvement() -> None: + baseline = EvaluationSnapshot(0.42, 0.28, 0.36, 0.25, 0.31) + candidate_train = EvaluationSnapshot(0.48, 0.33, 0.41, 0.23, 0.28) + candidate_validation = EvaluationSnapshot(0.47, 0.31, 0.4, 0.24, 0.29) + assert promotable(baseline, candidate_train, candidate_validation) is True + + +def test_promotable_blocks_false_positive_regression() -> None: + baseline = EvaluationSnapshot(0.42, 0.28, 0.36, 0.25, 0.31) + candidate_train = EvaluationSnapshot(0.5, 0.34, 0.41, 0.3, 0.28) + candidate_validation = EvaluationSnapshot(0.47, 0.31, 0.4, 0.35, 0.29) + assert promotable(baseline, candidate_train, candidate_validation) is False diff --git a/services/api/tests/test_reaction_engine.py b/services/api/tests/test_reaction_engine.py new file mode 100644 index 0000000..1703c30 --- /dev/null +++ b/services/api/tests/test_reaction_engine.py @@ -0,0 +1,49 @@ +from datetime import UTC, datetime + +import pandas as pd + +from app.services.market.reaction_engine import YahooMarketReactionEngine, _numeric_series + + +def test_numeric_series_handles_yfinance_multiindex_columns() -> None: + index = pd.date_range("2026-04-07", periods=2, freq="D") + frame = pd.DataFrame( + { + ("Close", "^GSPC"): [6616.85, 6782.81], + ("Volume", "^GSPC"): [4555680000, 5904880000], + }, + index=index, + ) + + closes = _numeric_series(frame, "Close") + volumes = _numeric_series(frame, "Volume") + + assert list(closes) == [6616.85, 6782.81] + assert list(volumes) == [4555680000, 5904880000] + + +async def _fake_download(*args, **kwargs) -> pd.DataFrame: + raise AssertionError("not used") + + +def test_measure_works_with_multiindex_download(monkeypatch) -> None: + index = pd.date_range("2026-04-07", periods=2, freq="D") + frame = pd.DataFrame( + { + ("Close", "^GSPC"): [6616.85, 6782.81], + ("Volume", "^GSPC"): [4555680000, 5904880000], + }, + index=index, + ) + + def fake_download(**kwargs) -> pd.DataFrame: + return frame + + monkeypatch.setattr("app.services.market.reaction_engine.yf.download", fake_download) + + engine = YahooMarketReactionEngine() + metrics = __import__("asyncio").run(engine.measure("macro", datetime(2026, 4, 9, tzinfo=UTC))) + + assert metrics + assert isinstance(metrics[0].move_pct, float) + assert metrics[0].symbol == "^GSPC" diff --git a/services/api/tests/test_scoring.py b/services/api/tests/test_scoring.py new file mode 100644 index 0000000..d3a6896 --- /dev/null +++ b/services/api/tests/test_scoring.py @@ -0,0 +1,37 @@ +from app.services.events.scoring import ( + classify_event, + directness_score, + market_confirmation_score, + novelty_score, + persistence_score, +) + + +def test_classify_event_ai_semiconductor() -> None: + assert classify_event("NVIDIA raises AI chip guidance", "GPU and HBM demand surged") == "ai_semiconductor" + + +def test_classify_event_does_not_match_substring_inside_other_words() -> None: + assert ( + classify_event( + "Agencies request comment to maintain capital strength", + "The proposal helps maintain resilience in the banking system", + ) + != "ai_semiconductor" + ) + + +def test_novelty_score_penalizes_repetition() -> None: + assert novelty_score(1, 48) > novelty_score(9, 4) + + +def test_directness_score_rewards_explicit_policy_terms() -> None: + assert directness_score("Fed rate decision", "Official guidance from the Fed") >= 0.5 + + +def test_persistence_score_prefers_policy_categories() -> None: + assert persistence_score("monetary_policy", "multi-year shift") > persistence_score("biotech", "single trial") + + +def test_market_confirmation_score_uses_average_magnitude() -> None: + assert market_confirmation_score([1.2, -1.0, 0.8]) > 0.35 diff --git a/services/api/tests/test_translation_dedup.py b/services/api/tests/test_translation_dedup.py new file mode 100644 index 0000000..33b3d50 --- /dev/null +++ b/services/api/tests/test_translation_dedup.py @@ -0,0 +1,48 @@ +from app.services.korea.translation_engine import RankedStock, RankedTheme, merge_ranked_themes + + +def test_merge_ranked_themes_dedupes_theme_and_stock_candidates() -> None: + merged = merge_ranked_themes( + [ + RankedTheme( + name="HBM/반도체 장비", + confidence=0.72, + theme_fit_score=0.91, + tradability_score=0.76, + gap_fade_risk=0.31, + priced_in_risk=0.24, + rationale="미국 AI 반도체 반응이 강함", + invalidation_condition="미국 반도체 지수 급락", + leaders=[ + RankedStock(ticker="000660", name="SK하이닉스", score=0.9, rationale="HBM 핵심"), + RankedStock(ticker="042700", name="한미반도체", score=0.82, rationale="패키징 수혜"), + ], + second_tier=[ + RankedStock(ticker="089890", name="코세스", score=0.7, rationale="후공정 연계"), + ], + ), + RankedTheme( + name="HBM/반도체 장비", + confidence=0.74, + theme_fit_score=0.92, + tradability_score=0.78, + gap_fade_risk=0.35, + priced_in_risk=0.28, + rationale="메모리 체인 동반 강세", + invalidation_condition="엔비디아 시간외 약세 전환", + leaders=[ + RankedStock(ticker="000660", name="SK하이닉스", score=0.93, rationale="가장 직접적"), + ], + second_tier=[ + RankedStock(ticker="042700", name="한미반도체", score=0.79, rationale="중복 후보"), + RankedStock(ticker="089890", name="코세스", score=0.72, rationale="중복 후보"), + ], + ), + ] + ) + + assert len(merged) == 1 + assert [stock.ticker for stock in merged[0].leaders] == ["000660", "042700"] + assert [stock.ticker for stock in merged[0].second_tier] == ["089890"] + assert "미국 AI 반도체 반응이 강함" in merged[0].rationale + assert "메모리 체인 동반 강세" in merged[0].rationale diff --git a/services/api/tests/test_weekly_replay.py b/services/api/tests/test_weekly_replay.py new file mode 100644 index 0000000..139b86a --- /dev/null +++ b/services/api/tests/test_weekly_replay.py @@ -0,0 +1,16 @@ +from app.services.replay.weekly_replay import classify_theme_outcome, compute_stock_move + + +def test_compute_stock_move_uses_previous_close_as_base() -> None: + move = compute_stock_move(prev_close=100.0, day_open=103.0, day_close=108.0) + + assert move["openGapPct"] == 3.0 + assert move["closeReturnPct"] == 8.0 + assert round(move["intradayMovePct"], 2) == round(((108.0 - 103.0) / 103.0) * 100, 2) + + +def test_classify_theme_outcome_has_clear_bands() -> None: + assert classify_theme_outcome(3.2) == "강한 적중" + assert classify_theme_outcome(1.4) == "양호" + assert classify_theme_outcome(0.2) == "혼조" + assert classify_theme_outcome(-1.8) == "부진"