-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
357 lines (289 loc) · 13.2 KB
/
main.py
File metadata and controls
357 lines (289 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import requests
from datetime import datetime, timedelta, timezone
import time
import os
import sys
from collections import defaultdict
from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from dotenv import load_dotenv
# ────────────────────────────────────────────────
# CONFIGURATION ───────────────────────────────────
# ────────────────────────────────────────────────
load_dotenv() # optional — only if you still want .env support
# These can be set via .env or via API
TEMPO_BASE = os.getenv("TEMPO_BASE") or "http://localhost:3200"
# Will be updated via API or .env
GITHUB_TOKEN: str | None = os.getenv("GITHUB_TOKEN")
GITHUB_REPO: str | None = os.getenv("GITHUB_REPO") # e.g. "username/repo"
GITHUB_LABELS = ["performance", "latency", "errors", "tempo"]
TRACEQL_QUERY_SLOW = '{ duration > 500ms }'
TRACEQL_QUERY_ERRORS = '{ status = error }'
LIMIT = 100
MIN_TRACES_TO_CREATE_ISSUE = 1
MAX_DURATION_HIGHLIGHT_MS = 2500
# ────────────────────────────────────────────────
# Data Models ─────────────────────────────────────
# ────────────────────────────────────────────────
class GitHubConfig(BaseModel):
token: str
repo: str
class TracesReportResponse(BaseModel):
slow_traces: list[dict]
error_traces: list[dict]
slow_count: int
error_count: int
generated_at: str
# ────────────────────────────────────────────────
# Core Functions ──────────────────────────────────
# ────────────────────────────────────────────────
def fetch_traces(query: str, sort_by_duration: bool = True) -> list:
if not TEMPO_BASE:
raise ValueError("TEMPO_BASE is not configured")
now = datetime.now(timezone.utc)
start = now - timedelta(hours=72)
params = {
"start": int(start.timestamp()),
"end": int(now.timestamp()),
"limit": LIMIT,
"q": query,
}
try:
resp = requests.get(f"{TEMPO_BASE}/api/search", params=params, timeout=40)
resp.raise_for_status()
traces = resp.json().get("traces", [])
if sort_by_duration:
traces.sort(key=lambda t: t.get("durationMs", 0), reverse=True)
else:
traces.sort(key=lambda t: int(t.get("startTimeUnixNano", 0)), reverse=True)
return traces
except Exception as e:
print(f"Tempo search failed ({query}): {e}", file=sys.stderr)
return []
def deduplicate_by_root(traces: list) -> list:
groups = defaultdict(list)
for trace in traces:
root = trace.get("rootTraceName", "—unknown—")
groups[root].append(trace)
# Keep the "best" one per root (first after sorting)
selected = [group[0] for group in groups.values() if group]
return selected
def get_grafana_link(trace_id: str) -> str:
return (
f"https://grafana.rocketgraph.app/explore?"
f"orgId=1&left=%7B%22datasource%22:%22tempo%22,%22queries%22:%5B%7B"
f"%22refId%22:%22A%22,%22queryType%22:%22traceId%22,%22query%22:%22{trace_id}%22"
f"%7D%5D%7D"
)
def generate_slow_prompt(unique_slow: list) -> str:
if not unique_slow:
return "No slow root endpoints (>500ms) found in the last 72 hours."
lines = [
"You are debugging performance in a Python/FastAPI codebase.",
"Below are the slowest unique root endpoints (by rootTraceName) from the last 72 hours.",
"Each has duration > 500 ms. Focus on these high-impact endpoints.",
"Goal: suggest concrete places in code to investigate and optimize (DB queries, external calls, loops, blocking code, etc.).",
"Be specific about likely files/functions based on endpoint names.",
"Prioritize: biggest potential wins first.",
"",
"Endpoints to investigate (sorted slowest → fastest):",
""
]
for i, t in enumerate(unique_slow, 1):
dur_ms = t.get("durationMs", 0)
dur = f"{dur_ms} ms" + (" **VERY SLOW**" if dur_ms >= MAX_DURATION_HIGHLIGHT_MS else "")
root = t.get("rootTraceName", "—")
trace_id = t.get("traceID", "—")
service = t.get("rootServiceName", "—")
link = get_grafana_link(trace_id)
lines.append(f"{i}. **{root}** ({dur})")
lines.append(f" - TraceID: {trace_id}")
lines.append(f" - Service: {service}")
lines.append(f" - Grafana: {link}")
lines.append("")
lines.extend([
"",
"Open the Grafana links → look at span waterfall, attributes, DB calls, HTTP calls.",
"Suggest 3–5 most promising optimizations per endpoint.",
"Start now."
])
return "\n".join(lines)
def generate_errors_prompt(unique_errors: list) -> str:
if not unique_errors:
return "No errored root endpoints found in the last 72 hours."
lines = [
"You are debugging errors in a Python/FastAPI codebase using distributed traces.",
"Below are unique root endpoints (by rootTraceName) that had at least one error span in the last 72 hours.",
"Goal: help find where and why these operations are failing → propose fixes.",
"Look especially for exception messages, status codes, failed external calls, DB errors, etc. in Grafana.",
"Be specific about likely code locations based on endpoint names.",
"",
"Errored root endpoints (most recent first):",
""
]
for i, t in enumerate(unique_errors, 1):
root = t.get("rootTraceName", "—")
trace_id = t.get("traceID", "—")
service = t.get("rootServiceName", "—")
start_unix = int(t.get("startTimeUnixNano", 0)) / 1_000_000_000
start_str = datetime.fromtimestamp(start_unix, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
link = get_grafana_link(trace_id)
lines.append(f"{i}. **{root}**")
lines.append(f" - TraceID: {trace_id}")
lines.append(f" - Service: {service}")
lines.append(f" - Started: {start_str}")
lines.append(f" - Grafana: {link}")
lines.append("")
lines.extend([
"",
"Open each Grafana link → find the red error spans, read exception messages / status.",
"Suggest fixes: better error handling, input validation, retries, dependency fixes, etc.",
"Group similar errors if patterns emerge.",
"Start now."
])
return "\n".join(lines)
def build_issue_body(slow: list, errors: list) -> str:
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
lines = [
f"**Daily Performance & Errors Report**",
f"Generated: {now_str} UTC",
f"Slow roots (>500ms): {len(slow)} | Errored roots: {len(errors)}",
f"Window: last 72 hours",
"---",
"",
"## Slow Endpoints",
]
for i, t in enumerate(slow, 1):
dur = t.get("durationMs", 0)
dur_str = f"{dur} ms" + (" **CRITICAL**" if dur >= MAX_DURATION_HIGHLIGHT_MS else "")
lines.append(f"{i}. **{t.get('rootTraceName')}** – {dur_str}")
lines.append(f" TraceID: `{t.get('traceID')}` | [Grafana]({get_grafana_link(t['traceID'])})")
lines.extend(["", "## Errored Endpoints"])
for i, t in enumerate(errors, 1):
start_unix = int(t.get("startTimeUnixNano", 0)) / 1_000_000_000
start_str = datetime.fromtimestamp(start_unix, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
lines.append(f"{i}. **{t.get('rootTraceName')}**")
lines.append(f" TraceID: `{t.get('traceID')}` | Started: {start_str} | [Grafana]({get_grafana_link(t['traceID'])})")
lines.extend([
"",
"---",
"## Cursor Prompts (copy-paste into Cursor)",
"",
"### 1. Debug Slow Endpoints",
"```text",
generate_slow_prompt(slow),
"```",
"",
"### 2. Debug Errored Endpoints",
"```text",
generate_errors_prompt(errors),
"```",
"",
"*Generated automatically*"
])
return "\n".join(lines)
def create_github_issue(title: str, body: str):
if not GITHUB_TOKEN or not GITHUB_REPO:
print("GitHub config missing → skipping issue creation")
return None
url = f"https://api.github.com/repos/{GITHUB_REPO}/issues"
headers = {
"Authorization": f"Bearer {GITHUB_TOKEN}",
"Accept": "application/vnd.github+json"
}
payload = {"title": title, "body": body, "labels": GITHUB_LABELS}
try:
r = requests.post(url, json=payload, headers=headers, timeout=15)
r.raise_for_status()
issue = r.json()
print(f"Issue created: {issue['html_url']}")
return issue
except Exception as e:
print(f"GitHub issue creation failed: {e}", file=sys.stderr)
return None
def run_report(create_issue: bool = True) -> dict:
print("Starting report generation...")
slow_traces = fetch_traces(TRACEQL_QUERY_SLOW, sort_by_duration=True)
unique_slow = deduplicate_by_root(slow_traces)
error_traces = fetch_traces(TRACEQL_QUERY_ERRORS, sort_by_duration=False)
unique_errors = deduplicate_by_root(error_traces)
print(f"Slow: {len(unique_slow)} unique roots | Errors: {len(unique_errors)} unique roots")
report = {
"slow_traces": unique_slow,
"error_traces": unique_errors,
"slow_count": len(unique_slow),
"error_count": len(unique_errors),
"generated_at": datetime.now(timezone.utc).isoformat(),
}
if create_issue:
total = len(unique_slow) + len(unique_errors)
if total >= MIN_TRACES_TO_CREATE_ISSUE:
if not GITHUB_TOKEN or not GITHUB_REPO:
print("GitHub config missing → skipping issue creation")
else:
title = f"Perf & Errors – {len(unique_slow)} slow + {len(unique_errors)} errored roots"
body = build_issue_body(unique_slow, unique_errors)
issue = create_github_issue(title, body)
if issue:
report["github_issue_url"] = issue["html_url"]
return report
# ────────────────────────────────────────────────
# FastAPI ─────────────────────────────────────────
# ────────────────────────────────────────────────
app = FastAPI(title="Daily Perf & Errors Reporter")
scheduler = BackgroundScheduler()
@app.on_event("startup")
def startup():
global GITHUB_TOKEN, GITHUB_REPO
print(f"Startup config → TEMPO_BASE={TEMPO_BASE}")
print(f"Startup config → GitHub repo={GITHUB_REPO}, token={'set' if GITHUB_TOKEN else 'not set'}")
scheduler.add_job(lambda: run_report(create_issue=True), CronTrigger(hour=0, minute=0)) # midnight UTC
scheduler.start()
print("Daily report scheduled for 00:00 UTC")
@app.on_event("shutdown")
def shutdown():
scheduler.shutdown()
@app.post("/configure-github", response_model=dict)
def configure_github(config: GitHubConfig):
global GITHUB_TOKEN, GITHUB_REPO
GITHUB_TOKEN = config.token.strip()
GITHUB_REPO = config.repo.strip()
print(f"GitHub config updated via API → repo={GITHUB_REPO}, token length={len(GITHUB_TOKEN) if GITHUB_TOKEN else 0}")
return {
"status": "github_config_updated",
"repo": GITHUB_REPO,
"token_set": bool(GITHUB_TOKEN)
}
@app.post("/trigger")
def trigger(create_issue: bool = True):
report = run_report(create_issue=create_issue)
return {
"status": "report_triggered",
"create_issue_attempted": create_issue,
**report
}
@app.get("/traces-report", response_model=TracesReportResponse)
def get_traces_report():
report = run_report(create_issue=False)
return TracesReportResponse(
slow_traces=report["slow_traces"],
error_traces=report["error_traces"],
slow_count=report["slow_count"],
error_count=report["error_count"],
generated_at=report["generated_at"]
)
@app.get("/health")
def health():
return {
"status": "ok",
"tempo_base": TEMPO_BASE,
"github": {
"configured": bool(GITHUB_TOKEN and GITHUB_REPO),
"repo": GITHUB_REPO,
"token_set": bool(GITHUB_TOKEN)
}
}
# To run: uvicorn this_file_name:app --reload
# (replace this_file_name with the actual filename, e.g. reporter.py)