근거 이벤트
diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts
index dbd80f9..92ce251 100644
--- a/packages/contracts/src/index.ts
+++ b/packages/contracts/src/index.ts
@@ -153,6 +153,29 @@ export const WeeklyReplayPayloadSchema = z.object({
positiveHitDays: z.number(),
avgLeaderCloseReturnPct: z.number()
}),
+ calibrationSummary: z.object({
+ analysisMode: z.string(),
+ role: z.string(),
+ model: z.string(),
+ summary: z.string(),
+ daysWithMeaningfulMismatch: z.number(),
+ averageDeviationScore: z.number(),
+ topDriverLabels: z.array(z.string()),
+ suggestedAdjustments: z.array(
+ z.object({
+ target: z.string(),
+ adjustment: z.string(),
+ rationale: z.string()
+ })
+ ),
+ watchouts: z.array(z.string()),
+ confidence: z.number()
+ }),
+ tradeTimingSummary: z.object({
+ summary: z.string(),
+ averageExtraReturnPct: z.number(),
+ topSignalLabels: z.array(z.string())
+ }),
days: z.array(
z.object({
marketDateLabel: z.string(),
@@ -212,7 +235,42 @@ export const WeeklyReplayPayloadSchema = z.object({
url: z.string()
})
),
- summary: z.string()
+ summary: z.string(),
+ calibrationReview: z.object({
+ mismatchLabel: z.string(),
+ deviationScore: z.number(),
+ calibrationConfidence: z.number(),
+ summary: z.string(),
+ drivers: z.array(
+ z.object({
+ code: z.string(),
+ label: z.string(),
+ impactLabel: z.string(),
+ reason: z.string(),
+ evidence: z.string()
+ })
+ ),
+ suggestedActions: z.array(
+ z.object({
+ target: z.string(),
+ adjustment: z.string(),
+ rationale: z.string()
+ })
+ )
+ }),
+ tradeOptimization: z.object({
+ availabilityLabel: z.string(),
+ ticker: z.string(),
+ name: z.string(),
+ barInterval: z.string(),
+ entryAssumption: z.string(),
+ bestExitTimeLabel: z.string().nullable(),
+ bestExitReturnPct: z.number().nullable(),
+ closeReturnPct: z.number().nullable(),
+ extraReturnPct: z.number().nullable(),
+ chartSignal: z.string(),
+ executionNote: z.string()
+ })
})
)
});
diff --git a/scripts/seed_reference_data.py b/scripts/seed_reference_data.py
index 63aad44..7758196 100644
--- a/scripts/seed_reference_data.py
+++ b/scripts/seed_reference_data.py
@@ -74,6 +74,7 @@
("skeptic_analyst", "gpt-5.4-mini"),
("korea_translator", "gpt-5.4"),
("final_judge", "gpt-5.4"),
+ ("replay_calibrator", "gpt-5.4-mini"),
]
THEME_MAPS = [
diff --git a/services/api/app/core/config.py b/services/api/app/core/config.py
index 88f2701..e2a0168 100644
--- a/services/api/app/core/config.py
+++ b/services/api/app/core/config.py
@@ -36,6 +36,7 @@ class Settings(BaseSettings):
model_skeptic_analyst: str = "gpt-5.4-mini"
model_korea_translator: str = "gpt-5.4"
model_final_judge: str = "gpt-5.4"
+ model_replay_calibrator: str = "gpt-5.4-mini"
alpha_vantage_api_key: str | None = None
polygon_api_key: str | None = None
@@ -44,4 +45,3 @@ class Settings(BaseSettings):
@lru_cache
def get_settings() -> Settings:
return Settings()
-
diff --git a/services/api/app/repositories/dashboard.py b/services/api/app/repositories/dashboard.py
index 9aa62b3..7c3ad57 100644
--- a/services/api/app/repositories/dashboard.py
+++ b/services/api/app/repositories/dashboard.py
@@ -46,6 +46,7 @@
"skeptic_analyst": "리스크 분석가",
"korea_translator": "한국장 번역 분석가",
"final_judge": "최종 판단자",
+ "replay_calibrator": "리플레이 보정 분석가",
}
ROLE_ORDER = {
@@ -55,6 +56,7 @@
"skeptic_analyst": 3,
"korea_translator": 4,
"final_judge": 5,
+ "replay_calibrator": 6,
}
diff --git a/services/api/app/schemas/api.py b/services/api/app/schemas/api.py
index 24672a1..42b8d7a 100644
--- a/services/api/app/schemas/api.py
+++ b/services/api/app/schemas/api.py
@@ -172,6 +172,29 @@ class WeeklyReplayThemeOutcome(BaseModel):
stockResults: list[WeeklyReplayStockResult]
+class WeeklyReplayCalibrationDriver(BaseModel):
+ code: str
+ label: str
+ impactLabel: str
+ reason: str
+ evidence: str
+
+
+class WeeklyReplayCalibrationAction(BaseModel):
+ target: str
+ adjustment: str
+ rationale: str
+
+
+class WeeklyReplayCalibrationReview(BaseModel):
+ mismatchLabel: str
+ deviationScore: float
+ calibrationConfidence: float
+ summary: str
+ drivers: list[WeeklyReplayCalibrationDriver]
+ suggestedActions: list[WeeklyReplayCalibrationAction]
+
+
class WeeklyReplayThemeItem(BaseModel):
name: str
confidence: float
@@ -208,6 +231,8 @@ class WeeklyReplayDay(BaseModel):
predictedThemes: list[WeeklyReplayThemeItem]
evidenceItems: list[WeeklyReplayEvidenceItem]
summary: str
+ calibrationReview: WeeklyReplayCalibrationReview
+ tradeOptimization: WeeklyReplayTradeOptimization
class WeeklyReplayAggregate(BaseModel):
@@ -216,10 +241,45 @@ class WeeklyReplayAggregate(BaseModel):
avgLeaderCloseReturnPct: float
+class WeeklyReplayCalibrationSummary(BaseModel):
+ analysisMode: str
+ role: str
+ model: str
+ summary: str
+ daysWithMeaningfulMismatch: int
+ averageDeviationScore: float
+ topDriverLabels: list[str]
+ suggestedAdjustments: list[WeeklyReplayCalibrationAction]
+ watchouts: list[str]
+ confidence: float
+
+
+class WeeklyReplayTradeOptimization(BaseModel):
+ availabilityLabel: str
+ ticker: str
+ name: str
+ barInterval: str
+ entryAssumption: str
+ bestExitTimeLabel: str | None
+ bestExitReturnPct: float | None
+ closeReturnPct: float | None
+ extraReturnPct: float | None
+ chartSignal: str
+ executionNote: str
+
+
+class WeeklyReplayTradeTimingSummary(BaseModel):
+ summary: str
+ averageExtraReturnPct: float
+ topSignalLabels: list[str]
+
+
class WeeklyReplayResponse(BaseModel):
windowLabel: str
promptVersion: str
aggregate: WeeklyReplayAggregate
+ calibrationSummary: WeeklyReplayCalibrationSummary
+ tradeTimingSummary: WeeklyReplayTradeTimingSummary
days: list[WeeklyReplayDay]
diff --git a/services/api/app/services/agents/router.py b/services/api/app/services/agents/router.py
index b1aedd9..64aefc9 100644
--- a/services/api/app/services/agents/router.py
+++ b/services/api/app/services/agents/router.py
@@ -13,6 +13,7 @@ def __init__(self, settings: Settings) -> None:
"skeptic_analyst": settings.model_skeptic_analyst,
"korea_translator": settings.model_korea_translator,
"final_judge": settings.model_final_judge,
+ "replay_calibrator": settings.model_replay_calibrator,
}
def model_for(self, role: str) -> str:
@@ -26,5 +27,9 @@ def model_for(self, role: str) -> str:
"skeptic_analyst": "과적합, 선반영, 데이터 누락, 반례 가능성을 제기하고 확신을 낮출 요소를 정리한다.",
"korea_translator": "미국 검증 이벤트를 한국 상장사와 테마로 명시적으로 번역하고 1등주/2등주를 구분한다.",
"final_judge": "모든 에이전트의 합의와 이견을 비교하고 근거 부족 영역을 불확실성으로 남긴다.",
+ "replay_calibrator": (
+ "리플레이 결과에서 예측과 실제 한국장 반응이 왜 달라졌는지 분석한다. "
+ "반드시 JSON 객체만 반환하고 summary, keyDrivers, recommendedAdjustments, watchouts, confidence를 포함한다. "
+ "recommendedAdjustments는 target, adjustment, rationale 필드를 가진 객체 배열이어야 한다."
+ ),
}
-
diff --git a/services/api/app/services/replay/calibration.py b/services/api/app/services/replay/calibration.py
new file mode 100644
index 0000000..f6444cc
--- /dev/null
+++ b/services/api/app/services/replay/calibration.py
@@ -0,0 +1,338 @@
+from __future__ import annotations
+
+from collections import Counter
+from statistics import mean
+from typing import Any
+
+from pydantic import BaseModel, Field, ValidationError
+
+
+class CalibrationActionPayload(BaseModel):
+ target: str
+ adjustment: str
+ rationale: str
+
+
+class CalibrationAgentPayload(BaseModel):
+ summary: str
+ keyDrivers: list[str] = Field(default_factory=list)
+ recommendedAdjustments: list[CalibrationActionPayload] = Field(default_factory=list)
+ watchouts: list[str] = Field(default_factory=list)
+ confidence: float = 0.55
+
+
+def build_day_calibration_review(day_item: dict[str, Any]) -> dict[str, Any]:
+ predicted_themes = day_item.get("predictedThemes", [])
+ market_context = day_item.get("marketContext", {})
+ evidence_items = day_item.get("evidenceItems", [])
+
+ if not predicted_themes:
+ return {
+ "mismatchLabel": "신호 부족",
+ "deviationScore": 0.18,
+ "calibrationConfidence": 0.52,
+ "summary": "당일에는 유의미한 예측 테마가 없어 보정에 활용할 만한 신호가 제한적이었습니다.",
+ "drivers": [
+ {
+ "code": "no_theme_generated",
+ "label": "테마 신호 부족",
+ "impactLabel": "중간",
+ "reason": "장전 시점 근거 패킷에서 한국장으로 번역할 만한 강한 테마가 충분히 형성되지 않았습니다.",
+ "evidence": market_context.get("summary", "시장 요약 없음"),
+ }
+ ],
+ "suggestedActions": [
+ {
+ "target": "테마 생성 임계치",
+ "adjustment": "모니터링 유지",
+ "rationale": "신호가 적은 날에는 억지 예측보다 공백을 허용하는 쪽이 안전합니다.",
+ }
+ ],
+ }
+
+ top_theme = predicted_themes[0]
+ actual_outcome = top_theme.get("actualOutcome", {})
+ leader_results = [item for item in actual_outcome.get("stockResults", []) if item.get("tier") == "leader"]
+ second_tier_results = [
+ item for item in actual_outcome.get("stockResults", []) if item.get("tier") == "second_tier"
+ ]
+ avg_leader_return = float(actual_outcome.get("avgLeaderCloseReturnPct", 0.0))
+ confidence = float(top_theme.get("confidence", 0.0))
+ avg_confirmation = (
+ mean(float(item.get("marketConfirmationScore", 0.0)) for item in evidence_items)
+ if evidence_items
+ else 0.0
+ )
+ kospi_return = float(market_context.get("kospiCloseReturnPct", 0.0))
+ kosdaq_return = float(market_context.get("kosdaqCloseReturnPct", 0.0))
+ avg_open_gap = mean(float(item.get("openGapPct", 0.0)) for item in leader_results) if leader_results else 0.0
+ avg_intraday = mean(float(item.get("intradayMovePct", 0.0)) for item in leader_results) if leader_results else 0.0
+ avg_second_tier_return = (
+ mean(float(item.get("closeReturnPct", 0.0)) for item in second_tier_results)
+ if second_tier_results
+ else None
+ )
+
+ drivers: list[dict[str, str]] = []
+ actions: list[dict[str, str]] = []
+ severity = 0.22
+
+ def add_driver(
+ code: str,
+ label: str,
+ impact_label: str,
+ reason: str,
+ evidence: str,
+ action_target: str,
+ action_adjustment: str,
+ action_rationale: str,
+ floor: float,
+ ) -> None:
+ nonlocal severity
+ if any(existing["code"] == code for existing in drivers):
+ return
+ drivers.append(
+ {
+ "code": code,
+ "label": label,
+ "impactLabel": impact_label,
+ "reason": reason,
+ "evidence": evidence,
+ }
+ )
+ actions.append(
+ {
+ "target": action_target,
+ "adjustment": action_adjustment,
+ "rationale": action_rationale,
+ }
+ )
+ severity = max(severity, floor)
+
+ if min(kospi_return, kosdaq_return) <= -2.0 and avg_leader_return <= 0.5:
+ add_driver(
+ code="broad_market_headwind",
+ label="시장 전반 역풍",
+ impact_label="높음",
+ reason="테마 자체보다 한국장 전체 리스크 오프 흐름이 더 강하게 작용했습니다.",
+ evidence=f"KOSPI {kospi_return:+.2f}% / KOSDAQ {kosdaq_return:+.2f}%",
+ action_target="시장 테이프 필터",
+ action_adjustment="하향 보정 강화",
+ action_rationale="장전 예측 전에 KOSPI·KOSDAQ 선행 리스크 오프 신호가 강하면 테마 점수를 감산해야 합니다.",
+ floor=0.82,
+ )
+
+ if confidence >= 0.72 and avg_leader_return < 1.0:
+ add_driver(
+ code="confidence_overshoot",
+ label="확신 과대",
+ impact_label="높음",
+ reason="예측 확신에 비해 실제 주도주 성과가 충분히 따라오지 못했습니다.",
+ evidence=f"예측 확신 {confidence:.2f} / 실제 평균 수익률 {avg_leader_return:+.2f}%",
+ action_target="최종 확신 점수",
+ action_adjustment="상단 캡 축소",
+ action_rationale="확인 점수가 약한 날에는 높은 확신을 제한해야 합니다.",
+ floor=0.76,
+ )
+
+ if avg_confirmation < 0.35 and confidence >= 0.65:
+ add_driver(
+ code="weak_confirmation",
+ label="미국 반응 확인 부족",
+ impact_label="중간",
+ reason="뉴스는 있었지만 미국 시장 반응이 충분히 동조하지 않았습니다.",
+ evidence=f"평균 시장 확인 점수 {avg_confirmation:.2f}",
+ action_target="시장 확인 점수",
+ action_adjustment="하한 상향",
+ action_rationale="교차 자산 확인이 약하면 한국 테마 번역 점수를 낮춰야 합니다.",
+ floor=0.68,
+ )
+
+ if avg_open_gap >= 2.5 and avg_intraday <= -1.0:
+ add_driver(
+ code="gap_fade",
+ label="갭상승 후 이탈",
+ impact_label="높음",
+ reason="시가에서 이미 선반영된 뒤 장중에 수급이 빠지며 수익이 축소됐습니다.",
+ evidence=f"평균 시가 갭 {avg_open_gap:+.2f}% / 장중 변동 {avg_intraday:+.2f}%",
+ action_target="갭 페이드 리스크",
+ action_adjustment="상향",
+ action_rationale="장전 강세 예측이라도 시가 과열 패턴은 별도 경고로 분리해야 합니다.",
+ floor=0.8,
+ )
+
+ if avg_second_tier_return is not None and avg_second_tier_return - avg_leader_return >= 2.0:
+ add_driver(
+ code="leader_selection_miss",
+ label="주도주 선별 미스",
+ impact_label="중간",
+ reason="같은 테마 안에서도 2등주가 주도주보다 더 강했습니다.",
+ evidence=f"주도주 {avg_leader_return:+.2f}% / 2등주 {avg_second_tier_return:+.2f}%",
+ action_target="리더 랭킹 가중치",
+ action_adjustment="재조정",
+ action_rationale="한국 수급 민감도와 단기 탄력 점수를 더 크게 반영할 필요가 있습니다.",
+ floor=0.64,
+ )
+
+ if confidence < 0.58 and avg_leader_return >= 3.0:
+ add_driver(
+ code="underweighted_signal",
+ label="신호 과소평가",
+ impact_label="중간",
+ reason="낮게 본 신호가 실제로는 강한 테마 수익으로 이어졌습니다.",
+ evidence=f"예측 확신 {confidence:.2f} / 실제 평균 수익률 {avg_leader_return:+.2f}%",
+ action_target="테마 번역 점수",
+ action_adjustment="상향 검토",
+ action_rationale="같은 유형의 사건에서 한국 민감도가 더 높았는지 재평가해야 합니다.",
+ floor=0.61,
+ )
+
+ if not drivers:
+ drivers.append(
+ {
+ "code": "aligned_result",
+ "label": "예측과 결과 정합",
+ "impactLabel": "낮음",
+ "reason": "주요 예측과 실제 테마 성과가 대체로 같은 방향으로 움직였습니다.",
+ "evidence": f"주도주 평균 수익률 {avg_leader_return:+.2f}%",
+ }
+ )
+ actions.append(
+ {
+ "target": "현재 스코어링",
+ "adjustment": "유지",
+ "rationale": "현 규칙을 유지하되 같은 패턴이 반복되는지 추가 관찰하면 됩니다.",
+ }
+ )
+
+ mismatch_label = _mismatch_label(drivers[0]["code"], avg_leader_return)
+ calibration_confidence = min(
+ 0.92,
+ round(0.52 + min(len(evidence_items), 4) * 0.05 + (0.08 if leader_results else 0.0), 2),
+ )
+ summary = (
+ f"{top_theme.get('name', '상위 테마')} 예측은 {mismatch_label}으로 평가됩니다. "
+ f"핵심 원인은 {drivers[0]['label']}이며, {drivers[0]['evidence']}가 가장 큰 차이 신호였습니다."
+ )
+
+ return {
+ "mismatchLabel": mismatch_label,
+ "deviationScore": round(severity, 2),
+ "calibrationConfidence": calibration_confidence,
+ "summary": summary,
+ "drivers": drivers[:3],
+ "suggestedActions": _dedupe_actions(actions)[:3],
+ }
+
+
+def build_weekly_calibration_summary(
+ day_items: list[dict[str, Any]],
+ role: str,
+ model: str,
+ model_payload: dict[str, Any] | None = None,
+) -> dict[str, Any]:
+ reviews = [item["calibrationReview"] for item in day_items if item.get("calibrationReview")]
+ deviation_scores = [float(review["deviationScore"]) for review in reviews]
+ meaningful_days = [review for review in reviews if float(review["deviationScore"]) >= 0.55]
+ driver_counts = Counter(
+ driver["label"]
+ for review in reviews
+ for driver in review.get("drivers", [])[:1]
+ )
+ top_driver_labels = [label for label, _count in driver_counts.most_common(3)]
+ action_pool = _dedupe_actions(
+ [
+ action
+ for review in meaningful_days
+ for action in review.get("suggestedActions", [])
+ ]
+ )[:4]
+
+ base_summary = (
+ f"최근 {len(reviews)}거래일 중 {len(meaningful_days)}일에서 의미 있는 오차가 확인됐습니다. "
+ f"반복 원인은 {', '.join(top_driver_labels) if top_driver_labels else '뚜렷한 반복 없음'}입니다."
+ )
+ watchouts = [
+ "장전 강세 신호라도 KOSDAQ 급락 구간에서는 감산 필터를 우선 적용하세요.",
+ "시가 갭이 과하면 추격보다 15분 후 체결 강도 확인이 필요합니다.",
+ ]
+
+ analysis_mode = "deterministic"
+ summary = base_summary
+ confidence = 0.58
+
+ if model_payload:
+ analysis_mode = "model_augmented"
+ summary = model_payload.get("summary") or base_summary
+ confidence = float(model_payload.get("confidence", 0.58))
+ if not top_driver_labels:
+ top_driver_labels = model_payload.get("keyDrivers", [])[:3]
+ if model_payload.get("recommendedAdjustments"):
+ action_pool = _dedupe_actions(
+ [*action_pool, *model_payload["recommendedAdjustments"]]
+ )[:4]
+ if model_payload.get("watchouts"):
+ watchouts = model_payload["watchouts"][:4]
+
+ return {
+ "analysisMode": analysis_mode,
+ "role": role,
+ "model": model,
+ "summary": summary,
+ "daysWithMeaningfulMismatch": len(meaningful_days),
+ "averageDeviationScore": round(mean(deviation_scores), 2) if deviation_scores else 0.0,
+ "topDriverLabels": top_driver_labels,
+ "suggestedAdjustments": action_pool,
+ "watchouts": watchouts,
+ "confidence": round(confidence, 2),
+ }
+
+
+def normalize_calibration_agent_payload(raw: dict[str, Any]) -> dict[str, Any] | None:
+ try:
+ payload = CalibrationAgentPayload.model_validate(raw)
+ except ValidationError:
+ return None
+
+ return {
+ "summary": payload.summary,
+ "keyDrivers": payload.keyDrivers[:4],
+ "recommendedAdjustments": [
+ {
+ "target": action.target,
+ "adjustment": action.adjustment,
+ "rationale": action.rationale,
+ }
+ for action in payload.recommendedAdjustments[:4]
+ ],
+ "watchouts": payload.watchouts[:4],
+ "confidence": round(float(payload.confidence), 2),
+ }
+
+
+def _dedupe_actions(actions: list[dict[str, str]]) -> list[dict[str, str]]:
+ deduped: list[dict[str, str]] = []
+ seen: set[tuple[str, str]] = set()
+ for action in actions:
+ key = (action["target"], action["adjustment"])
+ if key in seen:
+ continue
+ seen.add(key)
+ deduped.append(action)
+ return deduped
+
+
+def _mismatch_label(primary_code: str, avg_leader_return: float) -> str:
+ if primary_code == "broad_market_headwind":
+ return "시장 역풍"
+ if primary_code == "gap_fade":
+ return "갭상승 후 이탈"
+ if primary_code == "leader_selection_miss":
+ return "주도주 선별 미스"
+ if primary_code == "underweighted_signal":
+ return "신호 과소평가"
+ if primary_code in {"confidence_overshoot", "weak_confirmation"}:
+ return "과신 실패"
+ if avg_leader_return >= 1.0:
+ return "정합"
+ return "보통 오차"
diff --git a/services/api/app/services/replay/trade_timing.py b/services/api/app/services/replay/trade_timing.py
new file mode 100644
index 0000000..32d0813
--- /dev/null
+++ b/services/api/app/services/replay/trade_timing.py
@@ -0,0 +1,162 @@
+from __future__ import annotations
+
+from statistics import mean
+from typing import Any
+
+import pandas as pd
+
+
+def analyze_trade_optimization(
+ market_date_label: str,
+ ticker: str,
+ name: str,
+ intraday: pd.DataFrame,
+) -> dict[str, Any]:
+ if intraday.empty:
+ return _unavailable_trade_optimization(ticker=ticker, name=name)
+
+ bars = intraday.sort_index().copy()
+ if "Open" not in bars or "High" not in bars or "Close" not in bars:
+ return _unavailable_trade_optimization(ticker=ticker, name=name)
+
+ entry_price = float(bars.iloc[0]["Open"])
+ if entry_price <= 0:
+ return _unavailable_trade_optimization(ticker=ticker, name=name)
+
+ high_at = bars["High"].astype(float).idxmax()
+ high_row = bars.loc[high_at]
+ best_exit_price = float(high_row["High"])
+ close_price = float(bars.iloc[-1]["Close"])
+ best_exit_return = round(((best_exit_price - entry_price) / entry_price) * 100, 2)
+ close_return = round(((close_price - entry_price) / entry_price) * 100, 2)
+ extra_return = round(best_exit_return - close_return, 2)
+
+ vwap = _vwap_series(bars)
+ post_peak = bars.loc[high_at:]
+ vwap_break = None
+ if not post_peak.empty:
+ for timestamp, row in post_peak.iloc[1:].iterrows():
+ if float(row["Close"]) < float(vwap.loc[timestamp]):
+ vwap_break = timestamp
+ break
+
+ signal_label = _signal_label(high_at, bars.index[-1], extra_return, vwap_break is not None)
+ execution_note = _execution_note(
+ market_date_label=market_date_label,
+ high_at=high_at,
+ best_exit_return=best_exit_return,
+ close_return=close_return,
+ extra_return=extra_return,
+ vwap_break=vwap_break,
+ )
+
+ return {
+ "availabilityLabel": "available",
+ "ticker": ticker,
+ "name": name,
+ "barInterval": "5m",
+ "entryAssumption": "시가 진입 가정",
+ "bestExitTimeLabel": high_at.tz_convert("Asia/Seoul").strftime("%H:%M KST")
+ if getattr(high_at, "tzinfo", None)
+ else high_at.strftime("%H:%M KST"),
+ "bestExitReturnPct": best_exit_return,
+ "closeReturnPct": close_return,
+ "extraReturnPct": extra_return,
+ "chartSignal": signal_label,
+ "executionNote": execution_note,
+ }
+
+
+def summarize_trade_optimizations(day_items: list[dict[str, Any]]) -> dict[str, Any]:
+ available = [
+ item["tradeOptimization"]
+ for item in day_items
+ if item.get("tradeOptimization", {}).get("availabilityLabel") == "available"
+ ]
+ if not available:
+ return {
+ "summary": "장중 5분봉 데이터가 부족해 최적 매도 타이밍을 요약하지 못했습니다.",
+ "averageExtraReturnPct": 0.0,
+ "topSignalLabels": [],
+ }
+
+ signal_counts: dict[str, int] = {}
+ for item in available:
+ signal_counts[item["chartSignal"]] = signal_counts.get(item["chartSignal"], 0) + 1
+ ordered_signals = [label for label, _count in sorted(signal_counts.items(), key=lambda pair: (-pair[1], pair[0]))]
+ avg_extra_return = round(mean(float(item["extraReturnPct"]) for item in available), 2)
+
+ return {
+ "summary": (
+ f"시가 진입 가정 기준으로 종가까지 보유하는 것보다 평균 {avg_extra_return:+.2f}%p 더 유리했던 "
+ f"매도 힌트는 {', '.join(ordered_signals[:2]) if ordered_signals else '없음'}이었습니다."
+ ),
+ "averageExtraReturnPct": avg_extra_return,
+ "topSignalLabels": ordered_signals[:3],
+ }
+
+
+def _signal_label(high_at, last_at, extra_return: float, has_vwap_break: bool) -> str:
+ if extra_return <= 0.3:
+ return "종가 보유 우위"
+ hour = int(high_at.strftime("%H"))
+ minute = int(high_at.strftime("%M"))
+ total_minutes = hour * 60 + minute
+
+ if total_minutes <= 10 * 60:
+ return "시초 급등 고점"
+ if has_vwap_break:
+ return "고점 후 VWAP 이탈"
+ if total_minutes <= 11 * 60 + 30:
+ return "오전 추세 둔화"
+ if high_at == last_at:
+ return "종가 직전 재가속"
+ return "오후 재돌파"
+
+
+def _execution_note(
+ market_date_label: str,
+ high_at,
+ best_exit_return: float,
+ close_return: float,
+ extra_return: float,
+ vwap_break,
+) -> str:
+ base = (
+ f"{market_date_label}에는 5분봉 기준 {high_at.strftime('%H:%M')} 부근이 최대 수익 구간이었고 "
+ f"시가 대비 최고 {best_exit_return:+.2f}%까지 도달했습니다."
+ )
+ if extra_return <= 0.3:
+ return f"{base} 종가 수익률도 {close_return:+.2f}%로 비슷해 종가 보유가 크게 불리하지 않았습니다."
+ if vwap_break is not None:
+ return (
+ f"{base} 이후 {vwap_break.strftime('%H:%M')} 무렵 VWAP 하향 이탈이 나타나 "
+ f"종가 보유 대비 {extra_return:+.2f}%p 더 유리했습니다."
+ )
+ return f"{base} 종가까지 보유했을 때보다 {extra_return:+.2f}%p 더 높은 수익을 확보할 수 있었습니다."
+
+
+def _unavailable_trade_optimization(ticker: str, name: str) -> dict[str, Any]:
+ return {
+ "availabilityLabel": "unavailable",
+ "ticker": ticker,
+ "name": name,
+ "barInterval": "5m",
+ "entryAssumption": "시가 진입 가정",
+ "bestExitTimeLabel": None,
+ "bestExitReturnPct": None,
+ "closeReturnPct": None,
+ "extraReturnPct": None,
+ "chartSignal": "데이터 부족",
+ "executionNote": "해당 일자의 장중 데이터를 확보하지 못해 최적 매도 시점을 계산하지 못했습니다.",
+ }
+
+
+def _vwap_series(bars: pd.DataFrame) -> pd.Series:
+ volume = bars["Volume"].fillna(0).astype(float)
+ typical_price = (
+ bars["High"].astype(float) + bars["Low"].astype(float) + bars["Close"].astype(float)
+ ) / 3
+ cumulative_volume = volume.cumsum().replace(0, pd.NA).ffill()
+ vwap = (typical_price * volume).cumsum() / cumulative_volume
+ return vwap.ffill().fillna(bars["Close"].astype(float))
diff --git a/services/api/app/services/replay/weekly_replay.py b/services/api/app/services/replay/weekly_replay.py
index 2e3fe3d..06d2bd9 100644
--- a/services/api/app/services/replay/weekly_replay.py
+++ b/services/api/app/services/replay/weekly_replay.py
@@ -32,6 +32,14 @@
)
from app.services.korea.translation_engine import KoreaTranslationEngine, RankedTheme, merge_ranked_themes
from app.services.market.reaction_engine import YahooMarketReactionEngine, _numeric_series
+from app.services.replay.calibration import (
+ build_day_calibration_review,
+ build_weekly_calibration_summary,
+ normalize_calibration_agent_payload,
+)
+from app.services.replay.trade_timing import analyze_trade_optimization, summarize_trade_optimizations
+from app.services.agents.openai_client import AgentOrchestrator
+from app.core.config import get_settings
SEOUL = ZoneInfo("Asia/Seoul")
HTTP_HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
@@ -106,10 +114,13 @@ def sort_day_items_latest_first(day_items: list[dict[str, object]]) -> list[dict
class WeeklyReplayService:
def __init__(self) -> None:
+ self.settings = get_settings()
self.reaction_engine = YahooMarketReactionEngine()
self.translation_engine = KoreaTranslationEngine()
+ self.agent_orchestrator = AgentOrchestrator(self.settings)
self._price_cache: dict[str, pd.DataFrame] = {}
self._price_cache_bounds: dict[str, tuple[date, date]] = {}
+ self._intraday_cache: dict[tuple[str, date], pd.DataFrame] = {}
async def build(self, session: AsyncSession, days: int = 7) -> dict[str, object]:
cache_key = await self._cache_key(session, days)
@@ -128,6 +139,23 @@ async def build(self, session: AsyncSession, days: int = 7) -> dict[str, object]
"windowLabel": "최근 0거래일",
"promptVersion": prompt_version,
"aggregate": {"daysAnalyzed": 0, "positiveHitDays": 0, "avgLeaderCloseReturnPct": 0.0},
+ "calibrationSummary": {
+ "analysisMode": "deterministic",
+ "role": "replay_calibrator",
+ "model": self.agent_orchestrator.router.model_for("replay_calibrator"),
+ "summary": "분석할 거래일이 없어 보정 요약을 만들지 않았습니다.",
+ "daysWithMeaningfulMismatch": 0,
+ "averageDeviationScore": 0.0,
+ "topDriverLabels": [],
+ "suggestedAdjustments": [],
+ "watchouts": [],
+ "confidence": 0.0,
+ },
+ "tradeTimingSummary": {
+ "summary": "분석할 거래일이 없어 매도 타이밍 요약을 만들지 않았습니다.",
+ "averageExtraReturnPct": 0.0,
+ "topSignalLabels": [],
+ },
"days": [],
}
_WEEKLY_REPLAY_CACHE[cache_key] = (now, result)
@@ -208,15 +236,133 @@ async def build(self, session: AsyncSession, days: int = 7) -> dict[str, object]
)
ordered_day_items = sort_day_items_latest_first(day_items)
+ for day_item in ordered_day_items:
+ day_item["calibrationReview"] = build_day_calibration_review(day_item)
+ await self._attach_trade_optimizations(ordered_day_items, security_map)
+
+ calibration_summary = await self._build_calibration_summary(
+ prompt_version=prompt_version,
+ day_items=ordered_day_items,
+ )
+ trade_timing_summary = summarize_trade_optimizations(ordered_day_items)
result = {
"windowLabel": f"최근 {len(day_items)}거래일",
"promptVersion": prompt_version,
"aggregate": self._aggregate(ordered_day_items),
+ "calibrationSummary": calibration_summary,
+ "tradeTimingSummary": trade_timing_summary,
"days": ordered_day_items,
}
_WEEKLY_REPLAY_CACHE[cache_key] = (now, result)
return result
+ async def _build_calibration_summary(
+ self,
+ prompt_version: str,
+ day_items: list[dict[str, object]],
+ ) -> dict[str, object]:
+ role = "replay_calibrator"
+ model = self.agent_orchestrator.router.model_for(role)
+ evidence_pack = {
+ "prompt_version": prompt_version,
+ "days": [
+ {
+ "market_date": item["marketDateLabel"],
+ "summary": item["summary"],
+ "market_context": item["marketContext"],
+ "top_theme": item["predictedThemes"][0]["name"] if item["predictedThemes"] else None,
+ "calibration_review": item["calibrationReview"],
+ }
+ for item in day_items
+ ],
+ "aggregate": self._aggregate(day_items),
+ }
+
+ model_payload: dict[str, object] | None = None
+ if self.settings.openai_api_key:
+ result = await self.agent_orchestrator.run_role(
+ role=role,
+ evidence_pack=evidence_pack,
+ background=False,
+ )
+ model_payload = normalize_calibration_agent_payload(result.content)
+
+ return build_weekly_calibration_summary(
+ day_items=day_items,
+ role=role,
+ model=model,
+ model_payload=model_payload,
+ )
+
+ async def _attach_trade_optimizations(
+ self,
+ day_items: list[dict[str, object]],
+ security_map: dict[str, str],
+ ) -> None:
+ targets: list[tuple[dict[str, object], str, str, str, date]] = []
+ for day_item in day_items:
+ market_date = datetime.strptime(str(day_item["marketDateLabel"]), "%Y-%m-%d").date()
+ top_theme = day_item["predictedThemes"][0] if day_item["predictedThemes"] else None
+ if not top_theme:
+ day_item["tradeOptimization"] = {
+ "availabilityLabel": "unavailable",
+ "ticker": "-",
+ "name": "데이터 없음",
+ "barInterval": "5m",
+ "entryAssumption": "시가 진입 가정",
+ "bestExitTimeLabel": None,
+ "bestExitReturnPct": None,
+ "closeReturnPct": None,
+ "extraReturnPct": None,
+ "chartSignal": "테마 부재",
+ "executionNote": "상위 예측 테마가 없어 매도 최적화 분석을 생략했습니다.",
+ }
+ continue
+
+ leader_results = [
+ stock for stock in top_theme["actualOutcome"]["stockResults"] if stock["tier"] == "leader"
+ ]
+ selected = max(
+ leader_results or top_theme["actualOutcome"]["stockResults"],
+ key=lambda stock: stock["closeReturnPct"],
+ default=None,
+ )
+ if selected is None:
+ day_item["tradeOptimization"] = {
+ "availabilityLabel": "unavailable",
+ "ticker": "-",
+ "name": "데이터 없음",
+ "barInterval": "5m",
+ "entryAssumption": "시가 진입 가정",
+ "bestExitTimeLabel": None,
+ "bestExitReturnPct": None,
+ "closeReturnPct": None,
+ "extraReturnPct": None,
+ "chartSignal": "데이터 부족",
+ "executionNote": "장중 가격 데이터가 없어 최적 매도 시점을 계산하지 못했습니다.",
+ }
+ continue
+
+ market = security_map.get(selected["ticker"], "KOSPI")
+ symbol = f"{selected['ticker']}.KS" if market == "KOSPI" else f"{selected['ticker']}.KQ"
+ targets.append((day_item, symbol, selected["ticker"], selected["name"], market_date))
+
+ await asyncio.gather(
+ *[
+ asyncio.to_thread(self._intraday_for_symbol_day, symbol, market_date)
+ for _day_item, symbol, _ticker, _name, market_date in targets
+ ]
+ )
+
+ for day_item, symbol, ticker, name, market_date in targets:
+ intraday = self._intraday_for_symbol_day(symbol, market_date)
+ day_item["tradeOptimization"] = analyze_trade_optimization(
+ market_date_label=str(day_item["marketDateLabel"]),
+ ticker=ticker,
+ name=name,
+ intraday=intraday,
+ )
+
async def _prefetch_histories(self, symbols: set[str], start: date, end: date) -> None:
await asyncio.gather(
*[
@@ -606,6 +752,35 @@ def _history_for_symbol(self, symbol: str, start: date, end: date) -> pd.DataFra
self._price_cache_bounds[symbol] = (fetch_start, fetch_end)
return frame[(frame.index >= start) & (frame.index < end)]
+ def _intraday_for_symbol_day(self, symbol: str, market_date: date) -> pd.DataFrame:
+ cache_key = (symbol, market_date)
+ cached = self._intraday_cache.get(cache_key)
+ if cached is not None:
+ return cached
+
+ ticker = yf.Ticker(symbol)
+ frame = ticker.history(
+ interval="5m",
+ start=market_date.isoformat(),
+ end=(market_date + timedelta(days=1)).isoformat(),
+ auto_adjust=False,
+ actions=False,
+ prepost=False,
+ )
+ if frame.empty:
+ self._intraday_cache[cache_key] = frame
+ return frame
+
+ frame = frame.copy()
+ frame.columns = [str(column) for column in frame.columns]
+ if getattr(frame.index, "tz", None) is None:
+ frame.index = pd.to_datetime(frame.index).tz_localize("Asia/Seoul")
+ else:
+ frame.index = pd.to_datetime(frame.index).tz_convert("Asia/Seoul")
+ frame = frame[frame.index.date == market_date]
+ self._intraday_cache[cache_key] = frame
+ return frame
+
def _evidence_pack_hash(self, evidence_items: list[dict[str, object]]) -> str:
encoded = json.dumps(evidence_items, ensure_ascii=False, sort_keys=True).encode("utf-8")
return hashlib.sha256(encoded).hexdigest()
diff --git a/services/api/tests/test_replay_analysis_helpers.py b/services/api/tests/test_replay_analysis_helpers.py
new file mode 100644
index 0000000..393af2f
--- /dev/null
+++ b/services/api/tests/test_replay_analysis_helpers.py
@@ -0,0 +1,98 @@
+from __future__ import annotations
+
+import pandas as pd
+
+from app.services.replay.calibration import build_day_calibration_review
+from app.services.replay.trade_timing import analyze_trade_optimization, summarize_trade_optimizations
+
+
+def test_build_day_calibration_review_flags_market_headwind_and_gap_fade() -> None:
+ review = build_day_calibration_review(
+ {
+ "marketContext": {
+ "kospiCloseReturnPct": -2.8,
+ "kosdaqCloseReturnPct": -3.6,
+ "summary": "KOSPI -2.80% / KOSDAQ -3.60%",
+ },
+ "evidenceItems": [{"marketConfirmationScore": 0.21}],
+ "predictedThemes": [
+ {
+ "name": "HBM/반도체 장비",
+ "confidence": 0.82,
+ "actualOutcome": {
+ "avgLeaderCloseReturnPct": -2.4,
+ "stockResults": [
+ {
+ "ticker": "042700",
+ "name": "한미반도체",
+ "tier": "leader",
+ "openGapPct": 3.4,
+ "closeReturnPct": -2.4,
+ "intradayMovePct": -4.6,
+ }
+ ],
+ },
+ }
+ ],
+ }
+ )
+
+ assert review["mismatchLabel"] == "시장 역풍"
+ assert any(driver["code"] == "broad_market_headwind" for driver in review["drivers"])
+ assert any(action["target"] == "시장 테이프 필터" for action in review["suggestedActions"])
+
+
+def test_analyze_trade_optimization_finds_best_exit_before_close() -> None:
+ bars = pd.DataFrame(
+ {
+ "Open": [100.0, 104.0, 105.0, 103.0],
+ "High": [104.0, 108.0, 106.0, 103.0],
+ "Low": [99.5, 103.5, 102.0, 101.0],
+ "Close": [104.0, 105.0, 103.0, 102.0],
+ "Volume": [1000, 1400, 1300, 1200],
+ },
+ index=pd.date_range("2026-04-08 09:05", periods=4, freq="5min", tz="Asia/Seoul"),
+ )
+
+ optimization = analyze_trade_optimization(
+ market_date_label="2026-04-08",
+ ticker="042700",
+ name="한미반도체",
+ intraday=bars,
+ )
+
+ assert optimization["availabilityLabel"] == "available"
+ assert optimization["bestExitTimeLabel"] == "09:10 KST"
+ assert optimization["bestExitReturnPct"] > optimization["closeReturnPct"]
+ assert optimization["extraReturnPct"] > 0
+
+
+def test_summarize_trade_optimizations_aggregates_signal_labels() -> None:
+ summary = summarize_trade_optimizations(
+ [
+ {
+ "tradeOptimization": {
+ "availabilityLabel": "available",
+ "chartSignal": "고점 후 VWAP 이탈",
+ "extraReturnPct": 2.4,
+ }
+ },
+ {
+ "tradeOptimization": {
+ "availabilityLabel": "available",
+ "chartSignal": "고점 후 VWAP 이탈",
+ "extraReturnPct": 1.1,
+ }
+ },
+ {
+ "tradeOptimization": {
+ "availabilityLabel": "available",
+ "chartSignal": "시초 급등 고점",
+ "extraReturnPct": 0.7,
+ }
+ },
+ ]
+ )
+
+ assert summary["topSignalLabels"][0] == "고점 후 VWAP 이탈"
+ assert summary["averageExtraReturnPct"] == 1.4