forked from mrbrightsides/stc-bench
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparse_bench.py
More file actions
80 lines (69 loc) · 2.92 KB
/
parse_bench.py
File metadata and controls
80 lines (69 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import json
import csv
import uuid
from datetime import datetime
from pathlib import Path
import os
from make_bundle import create_bundle
INPUT_FILE = "outputs/run-2025-09-15_12-54-37.json"
RUNS_CSV = Path("outputs/bench_runs.csv")
TX_CSV = Path("outputs/bench_tx.csv")
def parse_caliper_report():
with open(INPUT_FILE, "r") as f:
data = json.load(f)
run_id = str(uuid.uuid4())
# --- bench_runs.csv ---
run_info = {
"run_id": run_id,
"timestamp": datetime.utcnow().isoformat(),
"network": data.get("network", "unknown"),
"scenario": data.get("test", {}).get("name", "default_scenario"),
"contract": data.get("contractAddress", "0x0"),
"function_name": data.get("test", {}).get("operation", "unknown"),
"concurrency": data.get("test", {}).get("workers", 1),
"tx_per_user": data.get("test", {}).get("txPerClient", 0),
"tps_avg": data.get("metrics", {}).get("tps", {}).get("average", 0),
"tps_peak": data.get("metrics", {}).get("tps", {}).get("peak", 0),
"p50_ms": data.get("metrics", {}).get("latency", {}).get("50th", 0),
"p95_ms": data.get("metrics", {}).get("latency", {}).get("95th", 0),
"success_rate": data.get("metrics", {}).get("successRate", 0),
}
RUNS_CSV.parent.mkdir(parents=True, exist_ok=True)
with open(RUNS_CSV, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=run_info.keys())
writer.writeheader()
writer.writerow(run_info)
# --- bench_tx.csv ---
tx_records = []
for tx in data.get("transactions", []):
tx_records.append({
"run_id": run_id,
"tx_hash": tx.get("hash"),
"submitted_at": tx.get("submitted"),
"mined_at": tx.get("mined"),
"latency_ms": tx.get("latency", 0),
"status": tx.get("status", "UNKNOWN"),
"gas_used": tx.get("gasUsed", 0),
"gas_price_wei": tx.get("gasPrice", 0),
"block_number": tx.get("blockNumber", 0),
"function_name": tx.get("function", "unknown")
})
if tx_records:
with open(TX_CSV, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=tx_records[0].keys())
writer.writeheader()
writer.writerows(tx_records)
else:
with open(TX_CSV, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["run_id","tx_hash","submitted_at","mined_at","latency_ms","status","gas_used","gas_price_wei","block_number","function_name"])
print(f"✅ Parsed: {RUNS_CSV} & {TX_CSV}")
def bundle_if_ready():
if RUNS_CSV.exists() and TX_CSV.exists():
bundle = create_bundle(RUNS_CSV, TX_CSV, Path("outputs"))
print("📦 Bundle created:", bundle)
else:
print("⚠️ bench_runs.csv & bench_tx.csv belum ada, jalankan parse dulu.")
if __name__ == "__main__":
parse_caliper_report()
bundle_if_ready()