Skip to content
12 changes: 12 additions & 0 deletions tests/trace/test_weave_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,12 @@ def models(text):
"pricing_level": "default",
"pricing_level_id": "default",
"created_by": "system",
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens_total_cost": 0.0,
"cache_creation_input_tokens_total_cost": 0.0,
"cache_read_input_token_cost": 0.0,
"cache_creation_input_token_cost": 0.0,
}
)

Expand All @@ -1716,6 +1722,12 @@ def models(text):
"pricing_level": "default",
"pricing_level_id": "default",
"created_by": "system",
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens_total_cost": 0.0,
"cache_creation_input_tokens_total_cost": 0.0,
"cache_read_input_token_cost": 0.0,
"cache_creation_input_token_cost": 0.0,
}
)

Expand Down
4 changes: 4 additions & 0 deletions tests/trace_server/costs/test_insert_costs.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def test_insert_costs_into_db(self, mock_datetime, mock_uuid4):
"USD",
cost.get("output", 0),
"USD",
cost.get("cache_read_input", 0),
cost.get("cache_creation_input", 0),
"system",
created_at,
)
Expand All @@ -142,6 +144,8 @@ def test_insert_costs_into_db(self, mock_datetime, mock_uuid4):
"prompt_token_cost_unit",
"completion_token_cost",
"completion_token_cost_unit",
"cache_read_input_token_cost",
"cache_creation_input_token_cost",
"created_by",
"created_at",
],
Expand Down
187 changes: 157 additions & 30 deletions tests/trace_server/query_builder/test_costs_query.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions tests/trace_server/test_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,12 @@ def test_opentelemetry_cost_calculation(self, client: weave_client.WeaveClient):
"pricing_level": "default",
"pricing_level_id": "default",
"created_by": "system",
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens_total_cost": 0,
"cache_creation_input_tokens_total_cost": 0,
"cache_read_input_token_cost": 0,
"cache_creation_input_token_cost": 0,
}
)

Expand Down
86 changes: 86 additions & 0 deletions tests/trace_server/test_trace_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
"output_tokens",
"requests",
"total_tokens",
"cache_read_input_tokens",
"cache_creation_input_tokens",
"prompt_tokens_total_cost",
"completion_tokens_total_cost",
"cache_read_input_tokens_total_cost",
"cache_creation_input_tokens_total_cost",
"prompt_token_cost",
"completion_token_cost",
"cache_read_input_token_cost",
"cache_creation_input_token_cost",
"prompt_token_cost_unit",
"completion_token_cost_unit",
"effective_date",
Expand Down Expand Up @@ -778,3 +784,83 @@ def test_calls_usage_handles_missing_usage(
assert usage.completion_tokens == expected_root[1]
assert usage.total_tokens == expected_root[2]
assert usage.requests == expected_root[3]


def test_aggregate_usage_with_cache_tokens_rolls_up() -> None:
"""Cache token counts and costs are extracted, rolled up, and merged correctly."""
root_id = "root"
child_id = "child"

calls = [
_make_call(
root_id,
None,
_usage_summary(
{
"claude-3.5-sonnet": {
"input_tokens": 100,
"output_tokens": 50,
"cache_read_input_tokens": 80,
"cache_creation_input_tokens": 20,
"requests": 1,
}
},
costs={
"claude-3.5-sonnet": {
"prompt_tokens_total_cost": 0.3,
"completion_tokens_total_cost": 0.15,
"cache_read_input_tokens_total_cost": 0.04,
"cache_creation_input_tokens_total_cost": 0.05,
}
},
),
),
_make_call(
child_id,
root_id,
_usage_summary(
{
"claude-3.5-sonnet": {
"input_tokens": 60,
"output_tokens": 30,
"cache_read_input_tokens": 40,
"cache_creation_input_tokens": 10,
"requests": 1,
}
},
costs={
"claude-3.5-sonnet": {
"prompt_tokens_total_cost": 0.18,
"completion_tokens_total_cost": 0.09,
"cache_read_input_tokens_total_cost": 0.02,
"cache_creation_input_tokens_total_cost": 0.0,
}
},
),
),
]

# Token counts roll up
result = usage_utils.aggregate_usage_with_descendants(calls, include_costs=True)
root_usage = result[root_id]["claude-3.5-sonnet"]
assert root_usage.prompt_tokens == 160
assert root_usage.completion_tokens == 80
assert root_usage.cache_read_input_tokens == 120
assert root_usage.cache_creation_input_tokens == 30
assert root_usage.requests == 2

# Costs roll up
assert root_usage.cache_read_input_tokens_total_cost == pytest.approx(0.06)
assert root_usage.cache_creation_input_tokens_total_cost == pytest.approx(0.05)

child_usage = result[child_id]["claude-3.5-sonnet"]
assert child_usage.cache_read_input_tokens == 40
assert child_usage.cache_creation_input_tokens == 10

# Without costs flag, cost fields are None
result_no_costs = usage_utils.aggregate_usage_with_descendants(
calls, include_costs=False
)
root_no_costs = result_no_costs[root_id]["claude-3.5-sonnet"]
assert root_no_costs.cache_read_input_tokens_total_cost is None
assert root_no_costs.cache_creation_input_tokens_total_cost is None
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def _get_usage_metric_extraction_sql(metric: str, json_col: str) -> str:
ifNull(toFloat64OrNull(JSONExtractRaw({json_col}, 'completion_tokens')), 0) +
ifNull(toFloat64OrNull(JSONExtractRaw({json_col}, 'output_tokens')), 0)
)"""
elif metric in {"cache_read_input_tokens", "cache_creation_input_tokens"}:
return f"ifNull(toFloat64OrNull(JSONExtractRaw({json_col}, '{metric}')), 0)"
else:
return f"toFloat64OrNull(JSONExtractRaw({json_col}, '{metric}'))"

Expand Down
42 changes: 38 additions & 4 deletions weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,12 +1089,24 @@ def _get_prices_for_models(

prices: dict[str, dict[str, float]] = {}
for row in result.result_rows:
llm_id, prompt_cost, completion_cost = row
(
llm_id,
prompt_cost,
completion_cost,
cache_read_cost,
cache_creation_cost,
) = row
prices[llm_id] = {
"prompt_token_cost": float(prompt_cost) if prompt_cost else 0.0,
"completion_token_cost": float(completion_cost)
if completion_cost
else 0.0,
"cache_read_input_token_cost": float(cache_read_cost)
if cache_read_cost
else 0.0,
"cache_creation_input_token_cost": float(cache_creation_cost)
if cache_creation_cost
else 0.0,
}
return prices

Expand Down Expand Up @@ -1126,22 +1138,42 @@ def _compute_costs_for_buckets(
model_prices = prices.get(model, {})
prompt_cost = model_prices.get("prompt_token_cost", 0.0)
completion_cost = model_prices.get("completion_token_cost", 0.0)
cache_read_cost = model_prices.get("cache_read_input_token_cost", 0.0)
cache_creation_cost = model_prices.get(
"cache_creation_input_token_cost", 0.0
)

input_tokens = bucket.get("sum_input_tokens", 0) or 0
output_tokens = bucket.get("sum_output_tokens", 0) or 0
cache_read_tokens = bucket.get("sum_cache_read_input_tokens", 0) or 0
cache_creation_tokens = (
bucket.get("sum_cache_creation_input_tokens", 0) or 0
)

# Subtract cache tokens from input: they are billed at cache
# rates, not the regular prompt rate.
net_input_tokens = (
input_tokens - cache_read_tokens - cache_creation_tokens
)

if "input_cost" in requested_cost_metrics:
bucket["sum_input_cost"] = input_tokens * prompt_cost
bucket["sum_input_cost"] = net_input_tokens * prompt_cost

if "output_cost" in requested_cost_metrics:
bucket["sum_output_cost"] = output_tokens * completion_cost

if "total_cost" in requested_cost_metrics:
input_cost = bucket.get("sum_input_cost", input_tokens * prompt_cost)
input_cost = bucket.get(
"sum_input_cost", net_input_tokens * prompt_cost
)
output_cost = bucket.get(
"sum_output_cost", output_tokens * completion_cost
)
bucket["sum_total_cost"] = input_cost + output_cost
cache_read_total = cache_read_tokens * cache_read_cost
cache_creation_total = cache_creation_tokens * cache_creation_cost
bucket["sum_total_cost"] = (
input_cost + output_cost + cache_read_total + cache_creation_total
)
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

def call_stats(self, req: tsi.CallStatsReq) -> tsi.CallStatsRes:
"""Return call statistics grouped by bucket with requested aggregations.
Expand Down Expand Up @@ -5742,6 +5774,8 @@ def cost_create(self, req: tsi.CostCreateReq) -> tsi.CostCreateRes:
),
"prompt_token_cost": cost.prompt_token_cost,
"completion_token_cost": cost.completion_token_cost,
"cache_read_input_token_cost": cost.cache_read_input_token_cost,
"cache_creation_input_token_cost": cost.cache_creation_input_token_cost,
"prompt_token_cost_unit": cost.prompt_token_cost_unit,
"completion_token_cost_unit": cost.completion_token_cost_unit,
}
Expand Down
8 changes: 8 additions & 0 deletions weave/trace_server/costs/insert_costs.py
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 filter_out_current_costs ignores cache cost fields, causing updated cache pricing to never be seeded

The filter_out_current_costs function at weave/trace_server/costs/insert_costs.py:116-150 determines whether a cost entry already exists in the DB by comparing only prompt_token_cost (mapped from cost["input"]), completion_token_cost (mapped from cost["output"]), and effective_date. It does not compare the new cache_read_input or cache_creation_input fields. Similarly, get_current_costs at weave/trace_server/costs/insert_costs.py:22-39 only queries llm_id, prompt_token_cost, completion_token_cost, effective_date — it doesn't fetch cache cost columns at all.

This means if cost_checkpoint.json is updated to add cache pricing for a model that already has matching prompt/completion costs and effective_date in the database, the entry will be incorrectly filtered out as a duplicate, and the new cache costs will never be inserted.

(Refers to lines 130-143)

Prompt for agents
In weave/trace_server/costs/insert_costs.py, update get_current_costs (lines 22-39) to also SELECT cache_read_input_token_cost and cache_creation_input_token_cost from llm_token_prices. Then update filter_out_current_costs (lines 116-150) to unpack those additional columns from the current_costs tuples and include them in the comparison at lines 132-135. Add two additional math.isclose checks: one comparing cache_read_input_token_cost with cost.get('cache_read_input', 0) and another comparing cache_creation_input_token_cost with cost.get('cache_creation_input', 0).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def get_current_costs(
class CostDetails(TypedDict):
input: float
output: float
cache_read_input: float
cache_creation_input: float
provider: str
created_at: str

Expand Down Expand Up @@ -69,6 +71,8 @@ def insert_costs_into_db(client: Client, data: dict[str, list[CostDetails]]) ->
provider_id = cost.get("provider", "default")
input_token_cost = cost.get("input", 0)
output_token_cost = cost.get("output", 0)
cache_read_input_token_cost = cost.get("cache_read_input", 0)
cache_creation_input_token_cost = cost.get("cache_creation_input", 0)
date_str = cost.get(
"created_at", datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
Expand All @@ -87,6 +91,8 @@ def insert_costs_into_db(client: Client, data: dict[str, list[CostDetails]]) ->
"USD",
output_token_cost,
"USD",
cache_read_input_token_cost,
cache_creation_input_token_cost,
"system",
created_at,
),
Expand All @@ -106,6 +112,8 @@ def insert_costs_into_db(client: Client, data: dict[str, list[CostDetails]]) ->
"prompt_token_cost_unit",
"completion_token_cost",
"completion_token_cost_unit",
"cache_read_input_token_cost",
"cache_creation_input_token_cost",
"created_by",
"created_at",
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE llm_token_prices DROP COLUMN IF EXISTS cache_read_input_token_cost;
ALTER TABLE llm_token_prices DROP COLUMN IF EXISTS cache_creation_input_token_cost;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE llm_token_prices ADD COLUMN IF NOT EXISTS cache_read_input_token_cost Float DEFAULT 0;
ALTER TABLE llm_token_prices ADD COLUMN IF NOT EXISTS cache_creation_input_token_cost Float DEFAULT 0;
45 changes: 43 additions & 2 deletions weave/trace_server/sqlite_trace_server.py
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ def _cost_usage_from_summary(
"requests": _safe_int_for_costs(usage.get("requests")),
# Match ClickHouse: keep total_tokens as-reported rather than deriving it.
"total_tokens": _safe_int_for_costs(usage.get("total_tokens")),
"cache_read_input_tokens": _safe_int_for_costs(
usage.get("cache_read_input_tokens")
),
"cache_creation_input_tokens": _safe_int_for_costs(
usage.get("cache_creation_input_tokens")
),
}
return normalized_usage

Expand Down Expand Up @@ -437,6 +443,8 @@ def setup_tables(self) -> None:
effective_date TEXT NOT NULL,
prompt_token_cost REAL NOT NULL,
completion_token_cost REAL NOT NULL,
cache_read_input_token_cost REAL NOT NULL DEFAULT 0,
cache_creation_input_token_cost REAL NOT NULL DEFAULT 0,
prompt_token_cost_unit TEXT NOT NULL,
completion_token_cost_unit TEXT NOT NULL,
created_by TEXT NOT NULL,
Expand Down Expand Up @@ -671,6 +679,8 @@ def _ensure_default_costs(self, cursor: sqlite3.Cursor) -> bool:
row["effective_date"],
row["prompt_token_cost"],
row["completion_token_cost"],
row.get("cache_read_input_token_cost", 0),
row.get("cache_creation_input_token_cost", 0),
row["prompt_token_cost_unit"],
row["completion_token_cost_unit"],
row["created_by"],
Expand All @@ -689,11 +699,13 @@ def _ensure_default_costs(self, cursor: sqlite3.Cursor) -> bool:
effective_date,
prompt_token_cost,
completion_token_cost,
cache_read_input_token_cost,
cache_creation_input_token_cost,
prompt_token_cost_unit,
completion_token_cost_unit,
created_by,
created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
default_rows,
)
Expand Down Expand Up @@ -767,6 +779,8 @@ def _apply_costs_to_calls(
effective_date,
prompt_token_cost,
completion_token_cost,
cache_read_input_token_cost,
cache_creation_input_token_cost,
prompt_token_cost_unit,
completion_token_cost_unit,
created_by,
Expand Down Expand Up @@ -800,6 +814,8 @@ def _apply_costs_to_calls(
"effective_date",
"prompt_token_cost",
"completion_token_cost",
"cache_read_input_token_cost",
"cache_creation_input_token_cost",
"prompt_token_cost_unit",
"completion_token_cost_unit",
"created_by",
Expand Down Expand Up @@ -836,18 +852,43 @@ def _apply_costs_to_calls(

prompt_cost = float(best_row["prompt_token_cost"] or 0.0)
completion_cost = float(best_row["completion_token_cost"] or 0.0)
cache_read_cost = float(
best_row.get("cache_read_input_token_cost") or 0.0
)
cache_creation_cost = float(
best_row.get("cache_creation_input_token_cost") or 0.0
)
prompt_tokens = usage["prompt_tokens"]
completion_tokens = usage["completion_tokens"]
cache_read_input_tokens = usage.get("cache_read_input_tokens", 0)
cache_creation_input_tokens = usage.get(
"cache_creation_input_tokens", 0
)

call_costs[llm_id] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"cache_read_input_tokens": cache_read_input_tokens,
"cache_creation_input_tokens": cache_creation_input_tokens,
"requests": usage["requests"],
"total_tokens": usage["total_tokens"],
"prompt_tokens_total_cost": prompt_tokens * prompt_cost,
# Subtract cached tokens: they are billed at the cache
# rate, not the regular input rate.
"prompt_tokens_total_cost": (
prompt_tokens
- cache_read_input_tokens
- cache_creation_input_tokens
)
* prompt_cost,
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
"completion_tokens_total_cost": completion_tokens * completion_cost,
"cache_read_input_tokens_total_cost": cache_read_input_tokens
* cache_read_cost,
"cache_creation_input_tokens_total_cost": cache_creation_input_tokens
* cache_creation_cost,
"prompt_token_cost": prompt_cost,
"completion_token_cost": completion_cost,
"cache_read_input_token_cost": cache_read_cost,
"cache_creation_input_token_cost": cache_creation_cost,
"prompt_token_cost_unit": best_row["prompt_token_cost_unit"],
"completion_token_cost_unit": best_row[
"completion_token_cost_unit"
Expand Down
Loading
Loading