From cf2fedae6fefdb4d7675ce67865b4ddf359f5d13 Mon Sep 17 00:00:00 2001 From: user040131 Date: Fri, 10 Apr 2026 03:00:32 +0900 Subject: [PATCH] fix: speed up weekly replay loading --- .../replay/weekly-replay-client.tsx | 21 +- .../api/app/services/replay/weekly_replay.py | 303 +++++++++++++----- services/api/tests/test_weekly_replay.py | 2 +- 3 files changed, 238 insertions(+), 88 deletions(-) diff --git a/apps/web/components/replay/weekly-replay-client.tsx b/apps/web/components/replay/weekly-replay-client.tsx index f08fd37..34cb7b6 100644 --- a/apps/web/components/replay/weekly-replay-client.tsx +++ b/apps/web/components/replay/weekly-replay-client.tsx @@ -9,6 +9,10 @@ import { formatPct, formatScore } from "@/lib/format"; import { Badge } from "@/components/shared/badge"; import { Card } from "@/components/shared/card"; +const API_BASE_URL = + process.env.NEXT_PUBLIC_API_BASE_URL ?? "http://localhost:8000/api/v1"; +const REPLAY_TIMEOUT_MS = 90000; + function outcomeVariant(label: string): "ok" | "warn" | "danger" { if (label === "강한 반응" || label === "양호") { return "ok"; @@ -30,18 +34,15 @@ export function WeeklyReplayClient() { useEffect(() => { const controller = new AbortController(); - const timeoutId = window.setTimeout(() => controller.abort(), 30000); + const timeoutId = window.setTimeout(() => controller.abort(), REPLAY_TIMEOUT_MS); async function load() { try { - const response = await fetch( - `${process.env.NEXT_PUBLIC_API_BASE_URL ?? "http://localhost:8000/api/v1"}/replay/weekly`, - { - cache: "no-store", - headers: { "Content-Type": "application/json" }, - signal: controller.signal - } - ); + const response = await fetch(`${API_BASE_URL}/replay/weekly`, { + cache: "no-store", + headers: { "Content-Type": "application/json" }, + signal: controller.signal + }); if (!response.ok) { throw new Error(`API request failed: ${response.status} ${response.statusText}`); @@ -50,7 +51,7 @@ export function WeeklyReplayClient() { const payload = await response.json(); const parsed = WeeklyReplayPayloadSchema.parse(payload); startTransition(() => { - setData(parsed as WeeklyReplayPayload); + setData(parsed); setError(null); }); } catch (caughtError) { diff --git a/services/api/app/services/replay/weekly_replay.py b/services/api/app/services/replay/weekly_replay.py index 6a50ddc..62253f6 100644 --- a/services/api/app/services/replay/weekly_replay.py +++ b/services/api/app/services/replay/weekly_replay.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import hashlib import json from dataclasses import dataclass @@ -14,7 +15,7 @@ import pandas as pd import yfinance as yf from bs4 import BeautifulSoup -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.entities import IngestedDocument, KoreanSecurity, PromptVersion, ThemeMap @@ -41,6 +42,8 @@ "energy_raw_materials": "oil OR natural gas OR uranium OR copper OR Iran energy", "logistics_supply_chain": "shipping OR freight OR logistics OR supply chain", } +CACHE_TTL = timedelta(minutes=15) +_WEEKLY_REPLAY_CACHE: dict[str, tuple[datetime, dict[str, object]]] = {} @dataclass(slots=True) @@ -89,7 +92,7 @@ def classify_theme_outcome(avg_leader_close_return_pct: float) -> str: if avg_leader_close_return_pct >= 1.0: return "양호" if avg_leader_close_return_pct > -1.0: - return "혼조" + return "보통" return "부진" @@ -98,18 +101,85 @@ def __init__(self) -> None: self.reaction_engine = YahooMarketReactionEngine() self.translation_engine = KoreaTranslationEngine() self._price_cache: dict[str, pd.DataFrame] = {} + self._price_cache_bounds: dict[str, tuple[date, date]] = {} async def build(self, session: AsyncSession, days: int = 7) -> dict[str, object]: + cache_key = await self._cache_key(session, days) + now = datetime.now(UTC) + cached = _WEEKLY_REPLAY_CACHE.get(cache_key) + if cached and now - cached[0] <= CACHE_TTL: + return cached[1] + prompt_version = await session.scalar( select(PromptVersion.version).order_by(PromptVersion.created_at.desc()).limit(1) ) or "baseline-v1" security_map = await self._security_market_map(session) trading_days = self._recent_korea_trading_days(days) + if not trading_days: + result = { + "windowLabel": "최근 0거래일", + "promptVersion": prompt_version, + "aggregate": {"daysAnalyzed": 0, "positiveHitDays": 0, "avgLeaderCloseReturnPct": 0.0}, + "days": [], + } + _WEEKLY_REPLAY_CACHE[cache_key] = (now, result) + return result - day_items: list[dict[str, object]] = [] + earliest_as_of = datetime.combine(trading_days[0], time(8, 30), tzinfo=SEOUL).astimezone(UTC) + latest_as_of = datetime.combine(trading_days[-1], time(8, 30), tzinfo=SEOUL).astimezone(UTC) + mapped_categories = ( + await session.execute(select(ThemeMap.us_category).distinct()) + ).scalars().all() + + documents = await self._load_documents_window( + session=session, + start_at=earliest_as_of - timedelta(days=10), + end_at=latest_as_of, + ) + if len(documents) < 20: + documents.extend( + await self._fetch_fallback_news_window( + start_at=earliest_as_of, + end_at=latest_as_of, + mapped_categories=mapped_categories, + ) + ) + + predicted_day_items: list[dict[str, object]] = [] for market_date in trading_days: as_of = datetime.combine(market_date, time(8, 30), tzinfo=SEOUL).astimezone(UTC) - prediction = await self._predict_for_as_of(session, as_of) + prediction = await self._predict_for_as_of( + session=session, + as_of=as_of, + mapped_categories=mapped_categories, + documents=documents, + ) + predicted_day_items.append( + { + "market_date": market_date, + "as_of": as_of, + "prediction": prediction, + } + ) + + symbols_to_prefetch = {"^KS11", "^KQ11"} + for item in predicted_day_items: + for theme in item["prediction"]["themes"]: + for stock in [*theme.leaders, *theme.second_tier]: + market = security_map.get(stock.ticker, "KOSPI") + symbols_to_prefetch.add(f"{stock.ticker}.KS" if market == "KOSPI" else f"{stock.ticker}.KQ") + + await self._prefetch_histories( + symbols=symbols_to_prefetch, + start=trading_days[0] - timedelta(days=10), + end=trading_days[-1] + timedelta(days=1), + ) + + day_items: list[dict[str, object]] = [] + for item in predicted_day_items: + market_date = item["market_date"] + as_of = item["as_of"] + prediction = item["prediction"] actual_market = self._build_actual_market(market_date) predicted_themes = self._attach_actual_results( market_date=market_date, @@ -129,22 +199,43 @@ async def build(self, session: AsyncSession, days: int = 7) -> dict[str, object] } ) - return { + result = { "windowLabel": f"최근 {len(day_items)}거래일", "promptVersion": prompt_version, "aggregate": self._aggregate(day_items), "days": day_items, } + _WEEKLY_REPLAY_CACHE[cache_key] = (now, result) + return result + + async def _prefetch_histories(self, symbols: set[str], start: date, end: date) -> None: + await asyncio.gather( + *[ + asyncio.to_thread(self._history_for_symbol, symbol, start, end) + for symbol in sorted(symbols) + ] + ) - async def _predict_for_as_of(self, session: AsyncSession, as_of: datetime) -> dict[str, object]: - mapped_categories = ( - await session.execute(select(ThemeMap.us_category).distinct()) - ).scalars().all() - documents = await self._load_documents(session, as_of) - fallback_documents = await self._fetch_fallback_news(as_of, mapped_categories) - documents.extend(fallback_documents) + async def _cache_key(self, session: AsyncSession, days: int) -> str: + latest_doc_created_at = await session.scalar(select(func.max(IngestedDocument.created_at))) + latest_prompt_created_at = await session.scalar(select(func.max(PromptVersion.created_at))) + payload = { + "days": days, + "latest_doc_created_at": latest_doc_created_at.isoformat() if latest_doc_created_at else None, + "latest_prompt_created_at": latest_prompt_created_at.isoformat() if latest_prompt_created_at else None, + } + encoded = json.dumps(payload, sort_keys=True).encode("utf-8") + return hashlib.sha256(encoded).hexdigest() - clusters = self._build_clusters(documents, mapped_categories) + async def _predict_for_as_of( + self, + session: AsyncSession, + as_of: datetime, + mapped_categories: list[str], + documents: list[ReplayDocument], + ) -> dict[str, object]: + filtered_documents = self._documents_for_as_of(documents, as_of) + clusters = self._build_clusters(filtered_documents, mapped_categories) themes: list[RankedTheme] = [] evidence_items: list[dict[str, object]] = [] @@ -173,16 +264,22 @@ async def _predict_for_as_of(self, session: AsyncSession, as_of: datetime) -> di merged_themes = merge_ranked_themes(themes)[:3] return {"themes": merged_themes, "evidenceItems": evidence_items[:5]} - async def _load_documents(self, session: AsyncSession, as_of: datetime) -> list[ReplayDocument]: + async def _load_documents_window( + self, + session: AsyncSession, + start_at: datetime, + end_at: datetime, + ) -> list[ReplayDocument]: docs = ( await session.execute( select(IngestedDocument) - .where(IngestedDocument.published_at >= as_of - timedelta(days=10)) - .where(IngestedDocument.published_at <= as_of) + .where(IngestedDocument.published_at >= start_at) + .where(IngestedDocument.published_at <= end_at) .order_by(IngestedDocument.published_at.desc()) - .limit(300) + .limit(500) ) ).scalars().all() + return [ ReplayDocument( source_name=doc.source_name, @@ -195,66 +292,113 @@ async def _load_documents(self, session: AsyncSession, as_of: datetime) -> list[ for doc in docs ] - async def _fetch_fallback_news( + async def _fetch_fallback_news_window( self, - as_of: datetime, + start_at: datetime, + end_at: datetime, mapped_categories: list[str], ) -> list[ReplayDocument]: - start_date = as_of.astimezone(SEOUL).date() - timedelta(days=2) - end_date = as_of.astimezone(SEOUL).date() + timedelta(days=1) + start_date = start_at.astimezone(SEOUL).date() - timedelta(days=2) + end_date = end_at.astimezone(SEOUL).date() + timedelta(days=1) + requests_to_run = [ + (category, query) + for category, query in GOOGLE_NEWS_QUERIES.items() + if category in mapped_categories + ] + if not requests_to_run: + return [] + + async with httpx.AsyncClient(timeout=20.0, headers=HTTP_HEADERS, follow_redirects=True) as client: + tasks = [ + self._fetch_google_news_category( + client=client, + category=category, + query=query, + start_at=start_at, + end_at=end_at, + start_date=start_date, + end_date=end_date, + mapped_categories=mapped_categories, + ) + for category, query in requests_to_run + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + documents: list[ReplayDocument] = [] seen_titles: set[str] = set() + for result in results: + if isinstance(result, Exception): + continue + for document in result: + title_key = document.title.strip().lower() + if title_key in seen_titles: + continue + seen_titles.add(title_key) + documents.append(document) - async with httpx.AsyncClient(timeout=20.0, headers=HTTP_HEADERS, follow_redirects=True) as client: - for category, query in GOOGLE_NEWS_QUERIES.items(): - if category not in mapped_categories: + return documents + + async def _fetch_google_news_category( + self, + client: httpx.AsyncClient, + category: str, + query: str, + start_at: datetime, + end_at: datetime, + start_date: date, + end_date: date, + mapped_categories: list[str], + ) -> list[ReplayDocument]: + search_query = quote( + f"{query} after:{start_date.isoformat()} before:{end_date.isoformat()} " + "site:reuters.com OR site:cnbc.com OR site:marketwatch.com" + ) + url = f"https://news.google.com/rss/search?q={search_query}&hl=en-US&gl=US&ceid=US:en" + response = await client.get(url) + response.raise_for_status() + parsed = feedparser.parse(response.text) + documents: list[ReplayDocument] = [] + + for entry in parsed.entries[:15]: + published_raw = entry.get("published") or entry.get("updated") + if not published_raw: + continue + + published_at = parsedate_to_datetime(published_raw).astimezone(UTC) + if not (start_at - timedelta(days=1) <= published_at <= end_at): + continue + + title = entry.get("title", "Untitled") + summary_html = entry.get("summary", "") + summary_text = BeautifulSoup(summary_html, "html.parser").get_text(" ", strip=True) + detected_category = classify_event(title, summary_text) + if detected_category in mapped_categories: + final_category = detected_category + else: + haystack = normalize_text(f"{title} {summary_text}") + if keyword_hit_count(haystack, CATEGORY_RULES[category]) == 0: continue - search_query = quote( - f"{query} after:{start_date.isoformat()} before:{end_date.isoformat()} site:reuters.com OR site:cnbc.com OR site:marketwatch.com" + final_category = category + + source = entry.get("source", {}) + source_name = source.get("title") or "Google News" + documents.append( + ReplayDocument( + source_name=source_name, + url=entry.get("link") or source.get("href") or url, + title=title, + published_at=published_at, + raw_content=summary_text, + parsed_content={"tags": [final_category]}, ) - url = f"https://news.google.com/rss/search?q={search_query}&hl=en-US&gl=US&ceid=US:en" - response = await client.get(url) - response.raise_for_status() - parsed = feedparser.parse(response.text) - - for entry in parsed.entries[:15]: - published_raw = entry.get("published") or entry.get("updated") - if not published_raw: - continue - published_at = parsedate_to_datetime(published_raw).astimezone(UTC) - if not (as_of - timedelta(days=3) <= published_at <= as_of): - continue - - title = entry.get("title", "Untitled") - title_key = title.strip().lower() - if title_key in seen_titles: - continue + ) - summary_html = entry.get("summary", "") - summary_text = BeautifulSoup(summary_html, "html.parser").get_text(" ", strip=True) - detected_category = classify_event(title, summary_text) - if detected_category in mapped_categories: - final_category = detected_category - else: - haystack = normalize_text(f"{title} {summary_text}") - if keyword_hit_count(haystack, CATEGORY_RULES[category]) == 0: - continue - final_category = category - source = entry.get("source", {}) - source_name = source.get("title") or "Google News" - documents.append( - ReplayDocument( - source_name=source_name, - url=entry.get("link") or source.get("href") or url, - title=title, - published_at=published_at, - raw_content=summary_text, - parsed_content={"tags": [final_category]}, - ) - ) - seen_titles.add(title_key) return documents + def _documents_for_as_of(self, documents: list[ReplayDocument], as_of: datetime) -> list[ReplayDocument]: + window_start = as_of - timedelta(days=10) + return [document for document in documents if window_start <= document.published_at <= as_of] + def _build_clusters( self, documents: list[ReplayDocument], @@ -373,11 +517,11 @@ def _attach_actual_results( def _day_summary(self, predicted_themes: list[dict[str, object]], market_context: dict[str, object]) -> str: if not predicted_themes: - return f"예측 테마가 없었습니다. 실제 장은 {market_context['summary']}로 마감했습니다." + return f"예상 테마가 없었습니다. 실제 시장은 {market_context['summary']}로 마감했습니다." top_theme = predicted_themes[0] actual = top_theme["actualOutcome"] return ( - f"{top_theme['name']} 예측의 주도주 평균 수익률은 " + f"{top_theme['name']} 예상의 주도주 평균 수익률은 " f"{actual['avgLeaderCloseReturnPct']:+.2f}%였고 결과 평가는 {actual['outcomeLabel']}입니다." ) @@ -420,22 +564,26 @@ def _stock_move(self, symbol: str, market_date: date) -> dict[str, float] | None return compute_stock_move(prev_close, day_open, day_close) def _history_for_symbol(self, symbol: str, start: date, end: date) -> pd.DataFrame: - cache_key = f"{symbol}:{start.isoformat()}:{end.isoformat()}" - cached = self._price_cache.get(cache_key) - if cached is not None: - return cached + cached = self._price_cache.get(symbol) + bounds = self._price_cache_bounds.get(symbol) + if cached is not None and bounds is not None and bounds[0] <= start and bounds[1] >= end: + return cached[(cached.index >= start) & (cached.index < end)] + fetch_start = start if bounds is None else min(start, bounds[0]) + fetch_end = end if bounds is None else max(end, bounds[1]) data = yf.download( tickers=symbol, - start=start.isoformat(), - end=end.isoformat(), + start=fetch_start.isoformat(), + end=fetch_end.isoformat(), progress=False, auto_adjust=False, interval="1d", ) + if data.empty: frame = pd.DataFrame(columns=["Open", "Close"]) - self._price_cache[cache_key] = frame + self._price_cache[symbol] = frame + self._price_cache_bounds[symbol] = (fetch_start, fetch_end) return frame frame = pd.DataFrame( @@ -445,8 +593,9 @@ def _history_for_symbol(self, symbol: str, start: date, end: date) -> pd.DataFra } ).dropna() frame.index = [pd.Timestamp(value).date() for value in frame.index] - self._price_cache[cache_key] = frame - return frame + self._price_cache[symbol] = frame + self._price_cache_bounds[symbol] = (fetch_start, fetch_end) + return frame[(frame.index >= start) & (frame.index < end)] def _evidence_pack_hash(self, evidence_items: list[dict[str, object]]) -> str: encoded = json.dumps(evidence_items, ensure_ascii=False, sort_keys=True).encode("utf-8") diff --git a/services/api/tests/test_weekly_replay.py b/services/api/tests/test_weekly_replay.py index 139b86a..dae61cd 100644 --- a/services/api/tests/test_weekly_replay.py +++ b/services/api/tests/test_weekly_replay.py @@ -12,5 +12,5 @@ def test_compute_stock_move_uses_previous_close_as_base() -> None: def test_classify_theme_outcome_has_clear_bands() -> None: assert classify_theme_outcome(3.2) == "강한 적중" assert classify_theme_outcome(1.4) == "양호" - assert classify_theme_outcome(0.2) == "혼조" + assert classify_theme_outcome(0.2) == "보통" assert classify_theme_outcome(-1.8) == "부진"