diff --git a/apps/web/app/(pages)/events/page.tsx b/apps/web/app/(pages)/events/page.tsx index 549ec40..7df48ac 100644 --- a/apps/web/app/(pages)/events/page.tsx +++ b/apps/web/app/(pages)/events/page.tsx @@ -11,8 +11,8 @@ export default async function EventsPage() {
{data.clusters.map((cluster) => ( @@ -45,43 +45,61 @@ export default async function EventsPage() {

원문 출처

-
    - {cluster.sources.map((source) => ( -
  • - - {source.name} - - 수집 {source.fetchedAtLabel} -
  • - ))} -
+ {cluster.sources.length > 0 ? ( +
    + {cluster.sources.map((source) => ( +
  • + + {source.name} + + 수집 {source.fetchedAtLabel} +
  • + ))} +
+ ) : ( +

+ 아직 연결된 원문 출처가 없습니다. +

+ )}
-

미국 반응

-
    - {cluster.reactions.map((reaction) => ( -
  • - {reaction.label}: {formatPct(reaction.movePct)} / {reaction.window} -
  • - ))} -
+

미국 시장 반응

+ {cluster.reactions.length > 0 ? ( +
    + {cluster.reactions.map((reaction) => ( +
  • + {reaction.label}: {formatPct(reaction.movePct)} / {reaction.window} +
  • + ))} +
+ ) : ( +

+ 아직 시장 반응 데이터가 기록되지 않았습니다. +

+ )}

에이전트 코멘터리

-
    - {cluster.agentNotes.map((note) => ( -
  • - {note.role} - {" · "} - {note.note} -
  • - ))} -
+ {cluster.agentNotes.length > 0 ? ( +
    + {cluster.agentNotes.map((note) => ( +
  • + {note.role} + {" : "} + {note.note} +
  • + ))} +
+ ) : ( +

+ 아직 기록된 에이전트 코멘터리가 없습니다. 최신 분석을 한 번 더 실행해 주세요. +

+ )}
diff --git a/services/api/app/repositories/dashboard.py b/services/api/app/repositories/dashboard.py index 5050044..9aa62b3 100644 --- a/services/api/app/repositories/dashboard.py +++ b/services/api/app/repositories/dashboard.py @@ -39,6 +39,24 @@ ) from app.services.replay.weekly_replay import WeeklyReplayService +ROLE_LABELS = { + "macro_analyst": "매크로 분석가", + "sector_analyst": "섹터 분석가", + "microstructure_analyst": "수급 분석가", + "skeptic_analyst": "리스크 분석가", + "korea_translator": "한국장 번역 분석가", + "final_judge": "최종 판단자", +} + +ROLE_ORDER = { + "macro_analyst": 0, + "sector_analyst": 1, + "microstructure_analyst": 2, + "skeptic_analyst": 3, + "korea_translator": 4, + "final_judge": 5, +} + def _label_datetime(value: datetime | None) -> str: if value is None: @@ -101,7 +119,7 @@ async def fetch_dashboard(session: AsyncSession) -> DashboardResponse: confidence=theme.confidence, tradabilityScore=theme.tradability_score, gapFadeRisk=theme.gap_fade_risk, - marketView="다음 영업일 시가 및 거래대금 확인 필요", + marketView="다음 영업일 시가와 거래대금 확인이 필요합니다.", ) for theme in theme_predictions ] @@ -126,7 +144,7 @@ async def fetch_dashboard(session: AsyncSession) -> DashboardResponse: { "label": item.symbol, "movePct": item.move_pct, - "commentary": f"{item.window_label} 구간에서 {item.asset_class} 반응 확인", + "commentary": f"{item.window_label} 구간에서 {item.asset_class} 반응을 확인했습니다.", } for item in reactions ], @@ -147,9 +165,9 @@ async def fetch_dashboard(session: AsyncSession) -> DashboardResponse: themeBoard=theme_board, leaderBoard=leader_board, riskFlags=[ - "시가 과열 추격 금지", - "미국 반응 미확인 이벤트는 우선순위 하향", - "정책성 이벤트는 후속 반응 동반 확인", + "시가 과열 추격은 피하고 거래대금 확인을 우선하세요.", + "미국 반응이 약한 이벤트는 우선순위를 낮게 보세요.", + "정책성 이벤트는 후속 기사와 선물 움직임을 함께 확인하세요.", ], topTheme=theme_board[0] if theme_board else None, topLeader=leader_board[0] if leader_board else None, @@ -157,10 +175,23 @@ async def fetch_dashboard(session: AsyncSession) -> DashboardResponse: async def fetch_event_explorer(session: AsyncSession) -> EventExplorerResponse: - clusters = ( - await session.execute(select(EventCluster).order_by(EventCluster.last_seen_at.desc()).limit(20)) - ).scalars().all() + latest_prediction_run_id = await _latest_prediction_run_id(session) + agent_runs = await _latest_agent_runs(session, latest_prediction_run_id) + analyzed_cluster_keys = sorted( + { + run.output_json.get("cluster_key") + for run in agent_runs + if isinstance(run.output_json, dict) and run.output_json.get("cluster_key") + } + ) + + cluster_statement = select(EventCluster).order_by(EventCluster.last_seen_at.desc()) + if analyzed_cluster_keys: + cluster_statement = cluster_statement.where(EventCluster.cluster_key.in_(analyzed_cluster_keys)) + + clusters = (await session.execute(cluster_statement.limit(20))).scalars().all() payload: list[EventClusterResponse] = [] + for cluster in clusters: sources = ( await session.execute(select(IngestedDocument).where(IngestedDocument.event_cluster_id == cluster.id)) @@ -168,9 +199,7 @@ async def fetch_event_explorer(session: AsyncSession) -> EventExplorerResponse: reactions = ( await session.execute(select(MarketReaction).where(MarketReaction.cluster_id == cluster.id)) ).scalars().all() - notes = ( - await session.execute(select(AgentRun).order_by(AgentRun.created_at.desc()).limit(4)) - ).scalars().all() + payload.append( EventClusterResponse( id=str(cluster.id), @@ -198,15 +227,10 @@ async def fetch_event_explorer(session: AsyncSession) -> EventExplorerResponse: } for reaction in reactions ], - agentNotes=[ - { - "role": note.role, - "note": note.output_json.get("thesis", "에이전트 결과 없음"), - } - for note in notes - ], + agentNotes=_agent_notes_for_cluster(agent_runs, cluster.cluster_key), ) ) + return EventExplorerResponse(clusters=payload) @@ -253,24 +277,23 @@ async def fetch_replay(session: AsyncSession, date_label: str) -> ReplayResponse prompt_version = await session.scalar( select(PromptVersion.version).order_by(PromptVersion.created_at.desc()).limit(1) ) + latest_prediction_run_id = await _latest_prediction_run_id(session) clusters = ( await session.execute(select(EventCluster).order_by(EventCluster.last_seen_at.desc()).limit(5)) ).scalars().all() - notes = ( - await session.execute(select(AgentRun).order_by(AgentRun.created_at.desc()).limit(6)) - ).scalars().all() + notes = await _latest_agent_runs(session, latest_prediction_run_id, limit=6) return ReplayResponse( asOfLabel=date_label if date_label != "latest" else _label_datetime(datetime.now(UTC)), promptVersion=prompt_version or "baseline-v1", evidencePackHash="latest-evidence-pack", - predictionSummary="미국 검증 이벤트를 기준으로 다음 한국장 테마를 재구성합니다.", - actualSummary="실제 결과는 차후 리플레이 데이터 적재 후 연결됩니다.", - outcomeSummary="현재는 초기 상태라 리플레이 기록이 제한적입니다.", + predictionSummary="미국 검증 이벤트를 기반으로 다음 한국장 테마와 주도주를 재구성했습니다.", + actualSummary="실제 결과는 같은 페이지의 리플레이/평가 데이터와 함께 비교합니다.", + outcomeSummary="불확실성은 에이전트별 주장과 확신도 차이로 확인할 수 있습니다.", evidenceItems=[{"title": cluster.title, "summary": cluster.summary[:200]} for cluster in clusters], agentDebate=[ { - "role": note.role, - "thesis": note.output_json.get("thesis", "기록 없음"), + "role": ROLE_LABELS.get(note.role, note.role), + "thesis": _extract_agent_note(note.output_json), "confidence": float(note.output_json.get("confidence", 0.4)), } for note in notes @@ -320,7 +343,7 @@ async def fetch_evaluation_summary(session: AsyncSession) -> EvaluationSummaryRe promptLeaderboard=[ { "version": "baseline-v1", - "description": "규칙 기반 번역 + 멀티 에이전트 기본 조합", + "description": "규칙 기반 번역과 멀티 에이전트 판단을 결합한 기본 설정입니다.", "score": 0.62, "themeHitRate": theme_hit_rate, "leaderHitRate": leader_hit_rate, @@ -389,19 +412,47 @@ def _summarize_job_result(run: JobRun) -> str | None: if ingestion and analysis: return ( f"문서 {ingestion.get('inserted', 0)}건 수집, " - f"클러스터 {analysis.get('created_clusters', 0)}건, " + f"클러스터 {analysis.get('created_clusters', 0)}건 " f"테마 {analysis.get('created_themes', 0)}건 생성" ) if "inserted" in run.result_json: return f"문서 {run.result_json.get('inserted', 0)}건 수집" if "created_clusters" in run.result_json: return ( - f"클러스터 {run.result_json.get('created_clusters', 0)}건, " + f"클러스터 {run.result_json.get('created_clusters', 0)}건 " f"테마 {run.result_json.get('created_themes', 0)}건 생성" ) return None +def _extract_agent_note(output_json: dict) -> str: + for key in ("thesis", "summary", "decision", "rationale"): + value = output_json.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return "의견이 아직 기록되지 않았습니다." + + +def _agent_notes_for_cluster(agent_runs: list[AgentRun], cluster_key: str) -> list[dict[str, str]]: + cluster_runs = [ + run + for run in agent_runs + if isinstance(run.output_json, dict) and run.output_json.get("cluster_key") == cluster_key + ] + cluster_runs.sort(key=lambda run: (ROLE_ORDER.get(run.role, 99), run.created_at)) + + notes_by_role: dict[str, dict[str, str]] = {} + for run in cluster_runs: + if run.role in notes_by_role: + continue + notes_by_role[run.role] = { + "role": ROLE_LABELS.get(run.role, run.role), + "note": _extract_agent_note(run.output_json), + } + + return list(notes_by_role.values()) + + async def _latest_prediction_run_id(session: AsyncSession) -> UUID | None: return await session.scalar( select(PredictionRun.id) @@ -454,3 +505,21 @@ async def _latest_stock_candidates( deduped[stock.ticker] = stock return sorted(deduped.values(), key=lambda item: (item.tier != "leader", -item.score))[:limit] + + +async def _latest_agent_runs( + session: AsyncSession, + prediction_run_id: UUID | None, + limit: int = 40, +) -> list[AgentRun]: + if prediction_run_id is None: + return [] + + return ( + await session.execute( + select(AgentRun) + .where(AgentRun.prediction_run_id == prediction_run_id) + .order_by(AgentRun.created_at.desc()) + .limit(limit) + ) + ).scalars().all() diff --git a/services/api/app/services/agents/openai_client.py b/services/api/app/services/agents/openai_client.py index aad5f1b..ef5fa59 100644 --- a/services/api/app/services/agents/openai_client.py +++ b/services/api/app/services/agents/openai_client.py @@ -51,7 +51,10 @@ async def run_role( role=role, model=model, content={ - "thesis": f"{role} 로컬 대체 분석: API 키가 없어 결정론적 규칙 기반 결과를 반환합니다.", + "thesis": ( + f"{role} 로컬 대체 분석: API 키가 없어 " + "결정론적 규칙 기반 결과를 반환합니다." + ), "confidence": 0.45, "evidence": evidence_pack, "input_hash": input_hash, @@ -72,9 +75,9 @@ async def run_role( { "type": "input_text", "text": ( - "당신은 개인용 크로스마켓 리서치 시스템의 전문 분석가다. " - "증거가 부족하면 확신을 낮추고 불확실성을 명시한다. " - "반드시 JSON 문자열만 반환한다." + "당신은 개인용 크로스마켓 리서치 시스템의 전문 분석가입니다. " + "증거가 부족하면 확신을 낮추고 불확실성을 명시하세요. " + "반드시 JSON 객체만 반환하세요." ), } ], @@ -115,4 +118,3 @@ async def run_role( cached_input_tokens=cached_tokens, cost_usd=0.0, ) - diff --git a/services/api/app/workers/tasks.py b/services/api/app/workers/tasks.py index 81bcddb..1149207 100644 --- a/services/api/app/workers/tasks.py +++ b/services/api/app/workers/tasks.py @@ -11,6 +11,7 @@ from app.core.logging import configure_logging, get_logger from app.db.session import SessionLocal from app.models.entities import ( + AgentRun, EventCluster, IngestedDocument, JobRun, @@ -21,6 +22,7 @@ ThemePrediction, ThemeMap, ) +from app.services.agents.openai_client import AgentOrchestrator, AgentResult from app.services.events.scoring import ( classify_event, cluster_key, @@ -143,6 +145,7 @@ async def _run_analysis_core(as_of_iso: str | None = None) -> dict[str, int]: as_of = datetime.fromisoformat(as_of_iso) if as_of_iso else datetime.now(UTC) reaction_engine = YahooMarketReactionEngine() translation_engine = KoreaTranslationEngine() + agent_orchestrator = AgentOrchestrator(settings) created_clusters = 0 async with SessionLocal() as session: @@ -234,6 +237,8 @@ async def _run_analysis_core(as_of_iso: str | None = None) -> dict[str, int]: ) ) + await session.flush() + merged_themes = merge_ranked_themes(translated_themes) prompt_version = await session.scalar( select(PromptVersion.version).order_by(PromptVersion.created_at.desc()).limit(1) @@ -252,6 +257,13 @@ async def _run_analysis_core(as_of_iso: str | None = None) -> dict[str, int]: session.add(prediction_run) await session.flush() + cluster_agent_results = await _run_cluster_agent_rounds( + orchestrator=agent_orchestrator, + clusters=clusters, + session=session, + prediction_run_id=prediction_run.id, + ) + for theme in merged_themes: theme_row = ThemePrediction( prediction_run_id=prediction_run.id, @@ -306,7 +318,8 @@ async def _run_analysis_core(as_of_iso: str | None = None) -> dict[str, int]: "second_tier": [stock.ticker for stock in theme.second_tier], } for theme in merged_themes - ] + ], + "agentCommentaryCount": len(cluster_agent_results), } await session.commit() @@ -328,6 +341,117 @@ def _build_evidence_pack_hash(clusters: list[EventCluster]) -> str: return hashlib.sha256(encoded).hexdigest() +async def _run_cluster_agent_rounds( + orchestrator: AgentOrchestrator, + clusters: list[EventCluster], + session, + prediction_run_id, +) -> list[AgentRun]: + saved_runs: list[AgentRun] = [] + selected_clusters = clusters + + for cluster in selected_clusters: + measured_reactions = ( + await session.execute(select(MarketReaction).where(MarketReaction.cluster_id == cluster.id)) + ).scalars().all() + evidence_pack = { + "cluster_id": str(cluster.id), + "cluster_key": cluster.cluster_key, + "title": cluster.title, + "category": cluster.category, + "summary": cluster.summary, + "scores": { + "novelty": cluster.novelty_score, + "directness": cluster.directness_score, + "surprise": cluster.surprise_score, + "persistence": cluster.persistence_score, + "market_confirmation": cluster.market_confirmation_score, + }, + "reactions": [ + { + "symbol": reaction.symbol, + "asset_class": reaction.asset_class, + "window": reaction.window_label, + "move_pct": reaction.move_pct, + } + for reaction in measured_reactions + ], + } + + role_results: list[AgentResult] = [] + for role in ( + "macro_analyst", + "sector_analyst", + "microstructure_analyst", + "skeptic_analyst", + "korea_translator", + ): + result = await orchestrator.run_role(role=role, evidence_pack=evidence_pack, background=False) + role_results.append(result) + saved_runs.append( + AgentRun( + prediction_run_id=prediction_run_id, + role=role, + model=result.model, + input_hash=result.content.get("input_hash") or _agent_input_hash(role, evidence_pack), + output_json={ + **result.content, + "cluster_id": str(cluster.id), + "cluster_key": cluster.cluster_key, + "category": cluster.category, + "title": cluster.title, + }, + cost_usd=result.cost_usd, + latency_ms=result.latency_ms, + cached_input_tokens=result.cached_input_tokens, + ) + ) + + judge_evidence = { + **evidence_pack, + "agent_views": [ + { + "role": result.role, + "thesis": result.content.get("thesis", ""), + "confidence": result.content.get("confidence", 0.4), + } + for result in role_results + ], + } + judge_result = await orchestrator.run_role( + role="final_judge", + evidence_pack=judge_evidence, + background=False, + ) + saved_runs.append( + AgentRun( + prediction_run_id=prediction_run_id, + role="final_judge", + model=judge_result.model, + input_hash=judge_result.content.get("input_hash") or _agent_input_hash("final_judge", judge_evidence), + output_json={ + **judge_result.content, + "cluster_id": str(cluster.id), + "cluster_key": cluster.cluster_key, + "category": cluster.category, + "title": cluster.title, + }, + cost_usd=judge_result.cost_usd, + latency_ms=judge_result.latency_ms, + cached_input_tokens=judge_result.cached_input_tokens, + ) + ) + + session.add_all(saved_runs) + await session.flush() + return saved_runs + + +def _agent_input_hash(role: str, evidence_pack: dict) -> str: + encoded = json.dumps({"role": role, "evidence_pack": evidence_pack}, sort_keys=True).encode("utf-8") + return hashlib.sha256(encoded).hexdigest() + + async def _mark_job_running(run_id: str) -> None: async with SessionLocal() as session: run = await session.get(JobRun, uuid.UUID(run_id)) diff --git a/services/api/tests/test_event_explorer_notes.py b/services/api/tests/test_event_explorer_notes.py new file mode 100644 index 0000000..538a92a --- /dev/null +++ b/services/api/tests/test_event_explorer_notes.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import uuid +from datetime import UTC, datetime, timedelta + +from app.models.entities import AgentRun +from app.repositories.dashboard import _agent_notes_for_cluster + + +def _agent_run(role: str, cluster_key: str, *, minutes: int, thesis: str) -> AgentRun: + return AgentRun( + prediction_run_id=uuid.uuid4(), + role=role, + model="gpt-test", + input_hash=f"{role}-{cluster_key}-{minutes}", + output_json={"cluster_key": cluster_key, "thesis": thesis}, + created_at=datetime.now(UTC) + timedelta(minutes=minutes), + ) + + +def test_agent_notes_are_scoped_and_ordered_by_cluster() -> None: + runs = [ + _agent_run("sector_analyst", "cluster-a", minutes=2, thesis="섹터 관점 코멘트"), + _agent_run("macro_analyst", "cluster-b", minutes=1, thesis="다른 클러스터 코멘트"), + _agent_run("final_judge", "cluster-a", minutes=3, thesis="최종 판단"), + _agent_run("macro_analyst", "cluster-a", minutes=1, thesis="매크로 관점 코멘트"), + ] + + notes = _agent_notes_for_cluster(runs, "cluster-a") + + assert notes == [ + {"role": "매크로 분석가", "note": "매크로 관점 코멘트"}, + {"role": "섹터 분석가", "note": "섹터 관점 코멘트"}, + {"role": "최종 판단자", "note": "최종 판단"}, + ] + + +def test_agent_notes_keep_first_note_per_role() -> None: + runs = [ + _agent_run("macro_analyst", "cluster-a", minutes=1, thesis="첫 번째 의견"), + _agent_run("macro_analyst", "cluster-a", minutes=2, thesis="두 번째 의견"), + ] + + notes = _agent_notes_for_cluster(runs, "cluster-a") + + assert notes == [{"role": "매크로 분석가", "note": "첫 번째 의견"}]