diff --git a/src/server/src/fbi.gleam b/src/server/src/fbi.gleam index e4ff9f4..c2ecf23 100644 --- a/src/server/src/fbi.gleam +++ b/src/server/src/fbi.gleam @@ -55,9 +55,10 @@ pub fn main() { let assert Ok(registry) = run_registry.start() let assert Ok(pubsub_subject) = pubsub.start() let assert Ok(history_lock) = history_mutex.start() - reattach.run_all(db, cfg, registry) + reattach.run_all(db, cfg, registry, pubsub_subject) let assert Ok(_gc_scheduler) = gc_scheduler.start(db, cfg) - let assert Ok(_resume_scheduler) = resume_scheduler.start(db, cfg, registry) + let assert Ok(_resume_scheduler) = + resume_scheduler.start(db, cfg, registry, pubsub_subject) let ctx = Context( db: db, diff --git a/src/server/src/fbi/db/projects.gleam b/src/server/src/fbi/db/projects.gleam index eef252c..e1538eb 100644 --- a/src/server/src/fbi/db/projects.gleam +++ b/src/server/src/fbi/db/projects.gleam @@ -122,10 +122,7 @@ pub fn get(db: sqlight.Connection, id: Int) -> Result(Project, DbError) { ) } -pub fn insert( - db: sqlight.Connection, - p: NewProject, -) -> Result(Project, DbError) { +pub fn insert(db: sqlight.Connection, p: NewProject) -> Result(Project, DbError) { let sql = "INSERT INTO projects (name, repo_url, default_branch, devcontainer_override_json, instructions, diff --git a/src/server/src/fbi/db/usage.gleam b/src/server/src/fbi/db/usage.gleam new file mode 100644 index 0000000..d3541ed --- /dev/null +++ b/src/server/src/fbi/db/usage.gleam @@ -0,0 +1,400 @@ +import fbi/db/connection.{type DbError, SqlightError} +import fbi/usage_parser.{type RateLimitFields, type UsageEvent} +import gleam/dynamic/decode +import gleam/float +import gleam/int +import gleam/json +import gleam/list +import gleam/option.{type Option, None, Some} +import gleam/result +import sqlight + +// ── Insert a usage event and atomically update run token totals ─────────────── + +pub fn insert_event( + db: sqlight.Connection, + run_id: Int, + ts: Int, + event: UsageEvent, + rl: Option(RateLimitFields), +) -> Result(Nil, DbError) { + let rl_req_rem = option.then(rl, fn(r) { r.requests_remaining }) + let rl_req_lim = option.then(rl, fn(r) { r.requests_limit }) + let rl_tok_rem = option.then(rl, fn(r) { r.tokens_remaining }) + let rl_tok_lim = option.then(rl, fn(r) { r.tokens_limit }) + let rl_reset = option.then(rl, fn(r) { r.reset_at }) + + // Insert the raw event row + use _ <- result.try( + sqlight.query( + "INSERT INTO run_usage_events + (run_id, ts, model, + input_tokens, output_tokens, cache_read_tokens, cache_create_tokens, + rl_requests_remaining, rl_requests_limit, + rl_tokens_remaining, rl_tokens_limit, rl_reset_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + on: db, + with: [ + sqlight.int(run_id), + sqlight.int(ts), + sqlight.text(event.model), + sqlight.int(event.input_tokens), + sqlight.int(event.output_tokens), + sqlight.int(event.cache_read_tokens), + sqlight.int(event.cache_create_tokens), + nullable_int(rl_req_rem), + nullable_int(rl_req_lim), + nullable_int(rl_tok_rem), + nullable_int(rl_tok_lim), + nullable_int(rl_reset), + ], + expecting: decode.dynamic, + ) + |> result.map_error(SqlightError) + |> result.map(fn(_) { Nil }), + ) + + // Atomically accumulate on the run row + let total = + event.input_tokens + + event.output_tokens + + event.cache_read_tokens + + event.cache_create_tokens + sqlight.query( + "UPDATE runs SET + tokens_input = tokens_input + ?, + tokens_output = tokens_output + ?, + tokens_cache_read = tokens_cache_read + ?, + tokens_cache_create = tokens_cache_create + ?, + tokens_total = tokens_total + ? + WHERE id = ?", + on: db, + with: [ + sqlight.int(event.input_tokens), + sqlight.int(event.output_tokens), + sqlight.int(event.cache_read_tokens), + sqlight.int(event.cache_create_tokens), + sqlight.int(total), + sqlight.int(run_id), + ], + expecting: decode.dynamic, + ) + |> result.map_error(SqlightError) + |> result.map(fn(_) { Nil }) +} + +pub fn bump_parse_errors( + db: sqlight.Connection, + run_id: Int, +) -> Result(Nil, DbError) { + sqlight.query( + "UPDATE runs SET usage_parse_errors = usage_parse_errors + 1 WHERE id = ?", + on: db, + with: [sqlight.int(run_id)], + expecting: decode.dynamic, + ) + |> result.map_error(SqlightError) + |> result.map(fn(_) { Nil }) +} + +// ── Rate-limit state ────────────────────────────────────────────────────────── + +pub type UpsertBucketResult { + BucketUpdated + ThresholdCrossed(bucket_id: String, threshold: Int, reset_at: Option(Int)) +} + +pub fn upsert_rate_limit_state( + db: sqlight.Connection, + plan: Option(String), + observed_at: Int, +) -> Result(Nil, DbError) { + sqlight.query( + "INSERT INTO rate_limit_state (id, plan, observed_at) + VALUES (1, ?, ?) + ON CONFLICT(id) DO UPDATE SET + plan = COALESCE(excluded.plan, rate_limit_state.plan), + observed_at = excluded.observed_at", + on: db, + with: [nullable_str(plan), sqlight.int(observed_at)], + expecting: decode.dynamic, + ) + |> result.map_error(SqlightError) + |> result.map(fn(_) { Nil }) +} + +pub fn upsert_bucket( + db: sqlight.Connection, + bucket_id: String, + utilization: Float, + reset_at: Option(Int), + observed_at: Int, +) -> Result(UpsertBucketResult, DbError) { + // Read current threshold marker before upsert + let old_threshold_result = + sqlight.query( + "SELECT last_notified_threshold FROM rate_limit_buckets WHERE bucket_id = ?", + on: db, + with: [sqlight.text(bucket_id)], + expecting: decode.at([0], decode.optional(decode.int)), + ) + |> result.map_error(SqlightError) + + use _ <- result.try( + sqlight.query( + "INSERT INTO rate_limit_buckets + (bucket_id, utilization, reset_at, observed_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(bucket_id) DO UPDATE SET + utilization = excluded.utilization, + reset_at = excluded.reset_at, + observed_at = excluded.observed_at", + on: db, + with: [ + sqlight.text(bucket_id), + sqlight.float(utilization), + nullable_int(reset_at), + sqlight.int(observed_at), + ], + expecting: decode.dynamic, + ) + |> result.map_error(SqlightError) + |> result.map(fn(_) { Nil }), + ) + + let pct = float_to_pct(utilization) + let old_threshold = case old_threshold_result { + Ok([Some(t), ..]) -> t + Ok([None, ..]) -> 0 + _ -> 0 + } + + // Check if we just crossed a new threshold (75 or 90) + let new_threshold = case pct >= 90, pct >= 75 { + True, _ -> 90 + False, True -> 75 + False, False -> 0 + } + + case new_threshold > old_threshold && new_threshold > 0 { + False -> Ok(BucketUpdated) + True -> { + // Persist the notification marker + let _ = + sqlight.query( + "UPDATE rate_limit_buckets SET last_notified_threshold = ? WHERE bucket_id = ?", + on: db, + with: [sqlight.int(new_threshold), sqlight.text(bucket_id)], + expecting: decode.dynamic, + ) + Ok(ThresholdCrossed( + bucket_id: bucket_id, + threshold: new_threshold, + reset_at: reset_at, + )) + } + } +} + +// ── Full usage state (for GET /api/usage and WS snapshots) ─────────────────── + +pub type UsageStateRow { + UsageStateRow( + plan: Option(String), + observed_at: Option(Int), + last_error: Option(String), + last_error_at: Option(Int), + ) +} + +pub type BucketRow { + BucketRow( + id: String, + utilization: Float, + reset_at: Option(Int), + window_started_at: Option(Int), + ) +} + +/// Returns the full UsageState as a json.Json value (not yet serialised). +/// Callers can embed it in a larger envelope or call json.to_string(). +pub fn get_usage_state_value(db: sqlight.Connection, now_ms: Int) -> json.Json { + let state = load_state_row(db) + let buckets = load_bucket_rows(db) + encode_usage_state(state, buckets, now_ms) +} + +/// Convenience: returns the state as a serialised JSON string. +pub fn get_usage_state_json(db: sqlight.Connection, now_ms: Int) -> String { + get_usage_state_value(db, now_ms) |> json.to_string() +} + +fn load_state_row(db: sqlight.Connection) -> UsageStateRow { + let row_decoder = { + use plan <- decode.field(0, decode.optional(decode.string)) + use observed_at <- decode.field(1, decode.optional(decode.int)) + use last_error <- decode.field(2, decode.optional(decode.string)) + use last_error_at <- decode.field(3, decode.optional(decode.int)) + decode.success(UsageStateRow( + plan: plan, + observed_at: observed_at, + last_error: last_error, + last_error_at: last_error_at, + )) + } + case + sqlight.query( + "SELECT plan, observed_at, last_error, last_error_at FROM rate_limit_state WHERE id = 1", + on: db, + with: [], + expecting: row_decoder, + ) + { + Ok([row, ..]) -> row + _ -> + UsageStateRow( + plan: None, + observed_at: None, + last_error: Some("missing_credentials"), + last_error_at: None, + ) + } +} + +fn load_bucket_rows(db: sqlight.Connection) -> List(BucketRow) { + let row_decoder = { + use id <- decode.field(0, decode.string) + use util <- decode.field(1, decode.float) + use reset_at <- decode.field(2, decode.optional(decode.int)) + use window_started_at <- decode.field(3, decode.optional(decode.int)) + decode.success(BucketRow( + id: id, + utilization: util, + reset_at: reset_at, + window_started_at: window_started_at, + )) + } + case + sqlight.query( + "SELECT bucket_id, utilization, reset_at, window_started_at FROM rate_limit_buckets ORDER BY bucket_id", + on: db, + with: [], + expecting: row_decoder, + ) + { + Ok(rows) -> rows + Error(_) -> [] + } +} + +fn encode_usage_state( + state: UsageStateRow, + buckets: List(BucketRow), + now_ms: Int, +) -> json.Json { + let pacing_entries = + list.map(buckets, fn(b) { #(b.id, pacing_for_bucket(b, now_ms)) }) + + json.object([ + #("plan", json.nullable(state.plan, json.string)), + #("observed_at", json.nullable(state.observed_at, json.int)), + #("last_error", json.nullable(state.last_error, json.string)), + #("last_error_at", json.nullable(state.last_error_at, json.int)), + #( + "buckets", + json.array(buckets, fn(b) { + json.object([ + #("id", json.string(b.id)), + #("utilization", json.float(b.utilization)), + #("reset_at", json.nullable(b.reset_at, json.int)), + #("window_started_at", json.nullable(b.window_started_at, json.int)), + ]) + }), + ), + #( + "pacing", + json.object( + list.map(pacing_entries, fn(entry) { + let #(id, verdict) = entry + #(id, encode_pacing(verdict)) + }), + ), + ), + ]) +} + +// ── Pacing computation ──────────────────────────────────────────────────────── + +type PacingZone { + Chill + OnTrack + Hot + NoPacing +} + +type PacingVerdict { + PacingVerdict(delta: Float, zone: PacingZone) +} + +fn pacing_for_bucket(b: BucketRow, now_ms: Int) -> PacingVerdict { + case b.window_started_at, b.reset_at { + Some(started), Some(reset) -> { + let window_duration = reset - started + case window_duration > 0 { + False -> PacingVerdict(delta: 0.0, zone: NoPacing) + True -> { + let elapsed = now_ms - started + let elapsed_frac = + int.to_float(elapsed) /. int.to_float(window_duration) + let elapsed_frac = float.max(0.0, float.min(1.0, elapsed_frac)) + let delta = b.utilization -. elapsed_frac + let zone = case delta >=. 0.25 { + True -> Hot + False -> + case delta <=. -0.2 { + True -> Chill + False -> OnTrack + } + } + PacingVerdict(delta: delta, zone: zone) + } + } + } + _, _ -> PacingVerdict(delta: 0.0, zone: NoPacing) + } +} + +fn encode_pacing(v: PacingVerdict) -> json.Json { + json.object([ + #("delta", json.float(v.delta)), + #( + "zone", + json.string(case v.zone { + Chill -> "chill" + OnTrack -> "on_track" + Hot -> "hot" + NoPacing -> "none" + }), + ), + ]) +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn nullable_int(opt: Option(Int)) -> sqlight.Value { + case opt { + None -> sqlight.null() + Some(n) -> sqlight.int(n) + } +} + +fn nullable_str(opt: Option(String)) -> sqlight.Value { + case opt { + None -> sqlight.null() + Some(s) -> sqlight.text(s) + } +} + +fn float_to_pct(f: Float) -> Int { + float.round(f *. 100.0) +} diff --git a/src/server/src/fbi/git/history_ops.gleam b/src/server/src/fbi/git/history_ops.gleam index 7a904c8..863034c 100644 --- a/src/server/src/fbi/git/history_ops.gleam +++ b/src/server/src/fbi/git/history_ops.gleam @@ -47,11 +47,7 @@ pub fn squash_local( |> result.map(string.trim), ) use _ <- result_try( - git.run(repo_path, [ - "update-ref", - "refs/heads/" <> branch, - new_commit, - ]), + git.run(repo_path, ["update-ref", "refs/heads/" <> branch, new_commit]), ) Ok(Complete(sha: new_commit)) } @@ -83,10 +79,7 @@ pub fn mirror_rebase( } } -pub fn sync_in_container( - config: Config, - cid: String, -) -> Result(Outcome, String) { +pub fn sync_in_container(config: Config, cid: String) -> Result(Outcome, String) { exec_in_container(config, cid, "cd /workspace && git pull --no-rebase 2>&1") } diff --git a/src/server/src/fbi/git/repo.gleam b/src/server/src/fbi/git/repo.gleam index a42b739..ecc47ac 100644 --- a/src/server/src/fbi/git/repo.gleam +++ b/src/server/src/fbi/git/repo.gleam @@ -51,13 +51,7 @@ pub fn commit_files( ]), ) use num_output <- result_try( - git.run(repo_path, [ - "show", - "--no-renames", - "--pretty=", - "--numstat", - sha, - ]), + git.run(repo_path, ["show", "--no-renames", "--pretty=", "--numstat", sha]), ) let names = parse.parse_name_status(ns_output) let nums = parse.parse_numstat(num_output) @@ -74,14 +68,7 @@ pub fn file_diff( False -> empty_tree_sha } use output <- result_try( - git.run(repo_path, [ - "diff", - "--no-color", - parent_arg, - sha, - "--", - path, - ]), + git.run(repo_path, ["diff", "--no-color", parent_arg, sha, "--", path]), ) let truncated = string.byte_size(output) > diff_byte_cap let body = case truncated { @@ -97,18 +84,10 @@ pub fn branch_base_ahead_behind( default: String, ) -> Result(BranchBase, GitError) { use ahead_str <- result_try( - git.run(repo_path, [ - "rev-list", - "--count", - default <> ".." <> branch, - ]), + git.run(repo_path, ["rev-list", "--count", default <> ".." <> branch]), ) use behind_str <- result_try( - git.run(repo_path, [ - "rev-list", - "--count", - branch <> ".." <> default, - ]), + git.run(repo_path, ["rev-list", "--count", branch <> ".." <> default]), ) Ok(BranchBase( base: default, @@ -123,10 +102,7 @@ pub fn wip_files(repo_path: String) -> Result(Option(WipSnapshot), GitError) { Ok(snapshot_str) -> { let snapshot_sha = string.trim(snapshot_str) use parent_str <- result_try( - git.run(repo_path, [ - "rev-parse", - snapshot_sha <> "^", - ]), + git.run(repo_path, ["rev-parse", snapshot_sha <> "^"]), ) let parent_sha = string.trim(parent_str) use ns_output <- result_try( diff --git a/src/server/src/fbi/handlers/changes.gleam b/src/server/src/fbi/handlers/changes.gleam index bf7f078..df28d29 100644 --- a/src/server/src/fbi/handlers/changes.gleam +++ b/src/server/src/fbi/handlers/changes.gleam @@ -116,11 +116,7 @@ pub fn handle_submodule_commit_files( } } -pub fn handle_file_diff( - req: Request, - ctx: Context, - id_str: String, -) -> Response { +pub fn handle_file_diff(req: Request, ctx: Context, id_str: String) -> Response { case req.method { http.Get -> case int.parse(id_str) { diff --git a/src/server/src/fbi/handlers/mcp_servers.gleam b/src/server/src/fbi/handlers/mcp_servers.gleam index 4c0c621..7e4969a 100644 --- a/src/server/src/fbi/handlers/mcp_servers.gleam +++ b/src/server/src/fbi/handlers/mcp_servers.gleam @@ -18,11 +18,7 @@ pub fn handle_global(req: Request, ctx: Context) -> Response { } } -pub fn handle_global_one( - req: Request, - ctx: Context, - id_str: String, -) -> Response { +pub fn handle_global_one(req: Request, ctx: Context, id_str: String) -> Response { case int.parse(id_str) { Error(_) -> wisp.bad_request("Invalid MCP server ID") Ok(id) -> diff --git a/src/server/src/fbi/handlers/prompts.gleam b/src/server/src/fbi/handlers/prompts.gleam index 3c121ff..5c71db2 100644 --- a/src/server/src/fbi/handlers/prompts.gleam +++ b/src/server/src/fbi/handlers/prompts.gleam @@ -19,11 +19,7 @@ pub fn handle_recent( } } -fn serve_recent( - req: Request, - ctx: Context, - project_id_str: String, -) -> Response { +fn serve_recent(req: Request, ctx: Context, project_id_str: String) -> Response { case int.parse(project_id_str) { Error(_) -> wisp.bad_request("Invalid project ID") Ok(pid) -> { diff --git a/src/server/src/fbi/handlers/runs.gleam b/src/server/src/fbi/handlers/runs.gleam index 0d92e6c..f97667a 100644 --- a/src/server/src/fbi/handlers/runs.gleam +++ b/src/server/src/fbi/handlers/runs.gleam @@ -170,6 +170,7 @@ fn do_continue(req: Request, ctx: Context, run_id: Int) -> Response { ctx.db, ctx.config, new_run.id, + ctx.pubsub, ) { Error(reason) -> { @@ -211,11 +212,7 @@ fn do_continue(req: Request, ctx: Context, run_id: Int) -> Response { } } -pub fn handle_resume_now( - req: Request, - ctx: Context, - id_str: String, -) -> Response { +pub fn handle_resume_now(req: Request, ctx: Context, id_str: String) -> Response { case req.method { http.Post -> case int.parse(id_str) { @@ -241,7 +238,13 @@ fn do_resume_now(ctx: Context, id: Int) -> Response { Ok(run) -> case run.state { "awaiting_resume" -> { - run_reattach.resurrect(run, ctx.db, ctx.config, ctx.run_registry) + run_reattach.resurrect( + run, + ctx.db, + ctx.config, + ctx.run_registry, + ctx.pubsub, + ) wisp.response(202) } _ -> wisp.bad_request("Run is not awaiting_resume") @@ -395,7 +398,13 @@ fn do_create( } Ok(run) -> case - run_supervisor.start_run(ctx.run_registry, ctx.db, ctx.config, run.id) + run_supervisor.start_run( + ctx.run_registry, + ctx.db, + ctx.config, + run.id, + ctx.pubsub, + ) { Error(reason) -> { wisp.log_error( @@ -469,10 +478,7 @@ fn index_paged(ctx: Context, filter: runs.ListFilter) -> Response { } } -fn get_param( - qs: List(#(String, String)), - key: String, -) -> option.Option(String) { +fn get_param(qs: List(#(String, String)), key: String) -> option.Option(String) { case list.key_find(qs, key) { Ok(v) if v != "" -> Some(v) _ -> None diff --git a/src/server/src/fbi/handlers/shell_ws.gleam b/src/server/src/fbi/handlers/shell_ws.gleam index e5f9c03..c3ed059 100644 --- a/src/server/src/fbi/handlers/shell_ws.gleam +++ b/src/server/src/fbi/handlers/shell_ws.gleam @@ -222,6 +222,25 @@ fn handle_live_frame( let _ = mist.send_text_frame(conn, body) mist.continue(state) } + mist.Custom(types.UsageSnap(model, input, output, cache_read, cache_create)) -> { + let body = + json.object([ + #("type", json.string("usage")), + #( + "snapshot", + json.object([ + #("model", json.string(model)), + #("input_tokens", json.int(input)), + #("output_tokens", json.int(output)), + #("cache_read_tokens", json.int(cache_read)), + #("cache_create_tokens", json.int(cache_create)), + ]), + ), + ]) + |> json.to_string() + let _ = mist.send_text_frame(conn, body) + mist.continue(state) + } mist.Closed | mist.Shutdown -> { unsubscribe(state) mist.stop() @@ -250,10 +269,7 @@ fn unsubscribe(state: ConnState) -> Nil { fn send_state(conn: mist.WebsocketConnection, s: String) -> Nil { let body = - json.object([ - #("type", json.string("state")), - #("state", json.string(s)), - ]) + json.object([#("type", json.string("state")), #("state", json.string(s))]) |> json.to_string() let _ = mist.send_text_frame(conn, body) Nil diff --git a/src/server/src/fbi/handlers/uploads.gleam b/src/server/src/fbi/handlers/uploads.gleam index 98b8b58..1ebba8a 100644 --- a/src/server/src/fbi/handlers/uploads.gleam +++ b/src/server/src/fbi/handlers/uploads.gleam @@ -27,11 +27,7 @@ pub fn handle_draft_file( } } -pub fn handle_run_uploads( - req: Request, - _ctx: Context, - _id: String, -) -> Response { +pub fn handle_run_uploads(req: Request, _ctx: Context, _id: String) -> Response { case req.method { http.Get -> json.object([#("files", json.array([], json.string))]) diff --git a/src/server/src/fbi/handlers/usage.gleam b/src/server/src/fbi/handlers/usage.gleam index 0af23b7..469f5c8 100644 --- a/src/server/src/fbi/handlers/usage.gleam +++ b/src/server/src/fbi/handlers/usage.gleam @@ -1,5 +1,6 @@ import fbi/context.{type Context} import fbi/db/connection +import fbi/db/usage as usage_db import gleam/dynamic/decode import gleam/http import gleam/int @@ -9,20 +10,11 @@ import gleam/result import sqlight import wisp.{type Request, type Response} -/// Returns a minimal UsageState. Plan, buckets, and pacing aren't tracked -/// yet — the frontend treats this as "unavailable" and shows the badge. -pub fn handle_state(req: Request, _ctx: Context) -> Response { +/// Returns the current UsageState from the DB. +pub fn handle_state(req: Request, ctx: Context) -> Response { case req.method { http.Get -> - json.object([ - #("plan", json.null()), - #("observed_at", json.null()), - #("last_error", json.string("missing_credentials")), - #("last_error_at", json.null()), - #("buckets", json.array([], json.string)), - #("pacing", json.object([])), - ]) - |> json.to_string() + usage_db.get_usage_state_json(ctx.db, now_ms()) |> wisp.json_response(200) _ -> wisp.method_not_allowed([http.Get]) } diff --git a/src/server/src/fbi/handlers/wip.gleam b/src/server/src/fbi/handlers/wip.gleam index 3fa062e..de189e9 100644 --- a/src/server/src/fbi/handlers/wip.gleam +++ b/src/server/src/fbi/handlers/wip.gleam @@ -120,8 +120,7 @@ fn exec_discard(ctx: Context, cid: String) -> Response { sock, cid, [ - "sh", - "-c", + "sh", "-c", "cd /workspace && git restore --staged --worktree . && git clean -fd", ], "agent", diff --git a/src/server/src/fbi/run/actor.gleam b/src/server/src/fbi/run/actor.gleam index c0b18d5..6af7d59 100644 --- a/src/server/src/fbi/run/actor.gleam +++ b/src/server/src/fbi/run/actor.gleam @@ -1,6 +1,7 @@ import fbi/config.{type Config} import fbi/db/runs as runs_db import fbi/docker +import fbi/pubsub import fbi/run/container_monitor import fbi/run/registry.{type RegistryMsg, Unregister} import fbi/run/types.{ @@ -11,6 +12,7 @@ import fbi/run/types.{ Starting, StateChanged, Subscribe, Unsubscribe, Waiting, WaitingTimeout, WorkerFailed, WorkerReady, WriteStdin, } +import fbi/run/usage_tailer.{type TailerMsg} import gleam/bit_array import gleam/erlang/process.{type Subject} import gleam/int @@ -32,6 +34,8 @@ pub type State { actor_subject: Subject(RunMsg), stdin_sock: Option(docker.Socket), registry: Subject(RegistryMsg), + pubsub: Subject(pubsub.PubsubMsg), + tailer: Option(Subject(TailerMsg)), ) } @@ -41,6 +45,7 @@ pub fn start( config: Config, bc: Subject(BroadcastMsg), registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Result(Subject(RunMsg), actor.StartError) { actor.new_with_initialiser(500, fn(subject) { State( @@ -52,6 +57,8 @@ pub fn start( actor_subject: subject, stdin_sock: None, registry: registry, + pubsub: pubsub_subject, + tailer: None, ) |> actor.initialised |> actor.returning(subject) @@ -74,6 +81,7 @@ pub fn start_attached( config: Config, bc: Subject(BroadcastMsg), registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Result(Subject(RunMsg), actor.StartError) { actor.new_with_initialiser(500, fn(subject) { let stdin_sock = case @@ -92,6 +100,8 @@ pub fn start_attached( None } } + // Start the tailer immediately for reattached runs + let tailer = start_tailer(run_id, config, db, pubsub_subject, bc) State( run_id: run_id, db: db, @@ -101,6 +111,8 @@ pub fn start_attached( actor_subject: subject, stdin_sock: stdin_sock, registry: registry, + pubsub: pubsub_subject, + tailer: tailer, ) |> actor.initialised |> actor.returning(subject) @@ -112,6 +124,38 @@ pub fn start_attached( |> result.map(fn(started) { started.data }) } +fn start_tailer( + run_id: Int, + config: Config, + db: sqlight.Connection, + pubsub_subject: Subject(pubsub.PubsubMsg), + bc: Subject(BroadcastMsg), +) -> Option(Subject(TailerMsg)) { + case usage_tailer.start(run_id, config, db, pubsub_subject, bc) { + Ok(subject) -> Some(subject) + Error(e) -> { + wisp.log_warning( + "run " + <> int.to_string(run_id) + <> " usage tailer failed to start: " + <> actor_start_error_to_string(e), + ) + None + } + } +} + +@external(erlang, "erlang", "term_to_binary") +fn actor_start_error_to_binary(e: actor.StartError) -> BitArray + +fn actor_start_error_to_string(e: actor.StartError) -> String { + let bits = actor_start_error_to_binary(e) + case bit_array.to_string(bits) { + Ok(s) -> s + Error(_) -> "unknown error" + } +} + fn handle(state: State, msg: RunMsg) -> actor.Next(State, RunMsg) { case state.phase, msg { // ── Starting ───────────────────────────────────────────────────────────── @@ -276,11 +320,14 @@ fn transition_to_running( None } } + let tailer = + start_tailer(state.run_id, state.config, state.db, state.pubsub, bc) actor.continue( State( ..state, phase: Running(cid, branch, bc, cols, rows), stdin_sock: stdin_sock, + tailer: tailer, ), ) } @@ -328,6 +375,11 @@ fn transition_to_finishing( cid: String, ) -> actor.Next(State, RunMsg) { wisp.log_debug("run " <> int.to_string(state.run_id) <> " cleaning up") + // Stop the tailer so it does a final sweep before the run dir is cleaned + case state.tailer { + None -> Nil + Some(t) -> process.send(t, usage_tailer.Stop) + } process.send(bc, BroadcastShutdown) process.send(state.registry, Unregister(state.run_id)) case state.stdin_sock { diff --git a/src/server/src/fbi/run/devcontainer_fetcher.gleam b/src/server/src/fbi/run/devcontainer_fetcher.gleam index e9a1b36..7d4b414 100644 --- a/src/server/src/fbi/run/devcontainer_fetcher.gleam +++ b/src/server/src/fbi/run/devcontainer_fetcher.gleam @@ -32,10 +32,7 @@ fn do_fetch( <> int.to_string(now_ms()) <> "-" <> int.to_string(unique_int()) - let env = [ - #("SSH_AUTH_SOCK", ssh_auth_sock), - #("GIT_TERMINAL_PROMPT", "0"), - ] + let env = [#("SSH_AUTH_SOCK", ssh_auth_sock), #("GIT_TERMINAL_PROMPT", "0")] let result = try_fetch(repo_url, tmp_dir, env, on_log) let _ = simplifile.delete(tmp_dir) result @@ -52,8 +49,13 @@ fn try_fetch( run_cmd( git, [ - "clone", "--depth=1", "--filter=blob:none", "--sparse", "--no-tags", - repo_url, tmp_dir, + "clone", + "--depth=1", + "--filter=blob:none", + "--sparse", + "--no-tags", + repo_url, + tmp_dir, ], env, ) diff --git a/src/server/src/fbi/run/image_builder.gleam b/src/server/src/fbi/run/image_builder.gleam index 6a2cfb5..683bb58 100644 --- a/src/server/src/fbi/run/image_builder.gleam +++ b/src/server/src/fbi/run/image_builder.gleam @@ -278,8 +278,13 @@ fn build_devcontainer( fbi_cmd_run_streaming( npx, [ - "-y", "@devcontainers/cli@0.67.0", "build", "--workspace-folder", - tmp_dir, "--image-name", tag, + "-y", + "@devcontainers/cli@0.67.0", + "build", + "--workspace-folder", + tmp_dir, + "--image-name", + tag, ], [], on_chunk, @@ -414,10 +419,7 @@ fn build_post_layer( on_log("[fbi] applying post-build layer → " <> final_tag <> "\n") buildx_build_with_files( final_tag, - dict.from_list([ - #("Dockerfile", dockerfile), - #("postbuild.sh", postbuild), - ]), + dict.from_list([#("Dockerfile", dockerfile), #("postbuild.sh", postbuild)]), on_log, ) |> result.map_error(fn(e) { "build_post_layer: " <> e }) @@ -446,7 +448,13 @@ fn buildx_build_with_files( use _ <- result.try(write_build_context(files, context_dir)) let docker = find_executable("docker") let args = [ - "buildx", "build", "--load", "--progress=plain", "-t", tag, context_dir, + "buildx", + "build", + "--load", + "--progress=plain", + "-t", + tag, + context_dir, ] let on_chunk = fn(bin: BitArray) -> Nil { case bit_array.to_string(bin) { diff --git a/src/server/src/fbi/run/reattach.gleam b/src/server/src/fbi/run/reattach.gleam index 790b2a3..53e4a6b 100644 --- a/src/server/src/fbi/run/reattach.gleam +++ b/src/server/src/fbi/run/reattach.gleam @@ -4,6 +4,7 @@ import fbi/db/projects import fbi/db/runs.{type Run, type RunOutcome, RunOutcome} import fbi/db/settings import fbi/docker +import fbi/pubsub import fbi/run/actor as run_actor import fbi/run/broadcaster import fbi/run/registry.{type RegistryMsg, Register} @@ -26,6 +27,7 @@ pub fn run_all( db: sqlight.Connection, config: Config, registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Nil { case runs.list_non_terminal(db) { Error(e) -> @@ -36,7 +38,9 @@ pub fn run_all( wisp.log_info( "reattach: " <> int.to_string(list.length(rs)) <> " non-terminal run(s)", ) - list.each(rs, fn(run) { reattach_one(run, db, config, registry) }) + list.each(rs, fn(run) { + reattach_one(run, db, config, registry, pubsub_subject) + }) } } } @@ -46,6 +50,7 @@ fn reattach_one( db: sqlight.Connection, config: Config, registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Nil { case run.state { "queued" -> { @@ -55,8 +60,10 @@ fn reattach_one( let _ = runs.delete(db, run.id) Nil } - "running" | "waiting" -> reattach_active(run, db, config, registry) - "awaiting_resume" -> reattach_awaiting(run, db, config, registry) + "running" | "waiting" -> + reattach_active(run, db, config, registry, pubsub_subject) + "awaiting_resume" -> + reattach_awaiting(run, db, config, registry, pubsub_subject) _ -> Nil } } @@ -66,6 +73,7 @@ fn reattach_active( db: sqlight.Connection, config: Config, registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Nil { case run.container_id { None -> { @@ -109,7 +117,7 @@ fn reattach_active( } Ok(docker.ContainerExited(code)) -> finish_exited(run, db, config, code) Ok(docker.ContainerRunning) -> - attach_live(run, cid, db, config, registry) + attach_live(run, cid, db, config, registry, pubsub_subject) } } } @@ -121,6 +129,7 @@ fn attach_live( db: sqlight.Connection, config: Config, registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Nil { wisp.log_info( "reattach: run " @@ -147,6 +156,7 @@ fn attach_live( config, bc, registry, + pubsub_subject, ) { Error(_) -> { @@ -157,8 +167,6 @@ fn attach_live( } Ok(actor_subject) -> { process.send(registry, Register(run.id, actor_subject)) - // Make sure DB state is "running" with the right cid (idempotent if - // it was already running; corrects a stale "waiting" transition). let _ = runs.mark_running(db, run.id, cid, now_ms()) Nil } @@ -190,10 +198,11 @@ fn reattach_awaiting( db: sqlight.Connection, config: Config, registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Nil { let now = now_ms() case run.next_resume_at { - Some(t) if t <= now -> resurrect(run, db, config, registry) + Some(t) if t <= now -> resurrect(run, db, config, registry, pubsub_subject) _ -> { wisp.log_debug( "reattach: run " @@ -213,6 +222,7 @@ pub fn resurrect( db: sqlight.Connection, config: Config, registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Nil { case run.claude_session_id { None -> { @@ -281,8 +291,9 @@ pub fn resurrect( ), now, ) - // Start a supervisor + worker for the new child. - case supervisor_start(db, config, registry, child.id) { + case + supervisor_start(db, config, registry, pubsub_subject, child.id) + { Error(reason) -> wisp.log_warning( "reattach: supervisor_start for child " @@ -321,6 +332,7 @@ fn supervisor_start( db: sqlight.Connection, config: Config, registry: Subject(RegistryMsg), + pubsub_subject: Subject(pubsub.PubsubMsg), run_id: Int, ) -> Result(#(Subject(RunMsg), Subject(BroadcastMsg)), String) { use bc <- result.try( @@ -328,7 +340,7 @@ fn supervisor_start( |> result.map_error(fn(_) { "failed to start broadcaster" }), ) use actor_subject <- result.try( - run_actor.start(run_id, db, config, bc, registry) + run_actor.start(run_id, db, config, bc, registry, pubsub_subject) |> result.map_error(fn(_) { "failed to start run actor" }), ) process.send(registry, Register(run_id, actor_subject)) diff --git a/src/server/src/fbi/run/resume_scheduler.gleam b/src/server/src/fbi/run/resume_scheduler.gleam index 5123002..5254f53 100644 --- a/src/server/src/fbi/run/resume_scheduler.gleam +++ b/src/server/src/fbi/run/resume_scheduler.gleam @@ -1,6 +1,7 @@ import fbi/config.{type Config} import fbi/db/connection import fbi/db/runs +import fbi/pubsub import fbi/run/reattach import fbi/run/registry.{type RegistryMsg} import gleam/erlang/process @@ -24,6 +25,7 @@ type State { db: sqlight.Connection, config: Config, registry: process.Subject(RegistryMsg), + pubsub: process.Subject(pubsub.PubsubMsg), self: process.Subject(ResumeMsg), ) } @@ -32,10 +34,17 @@ pub fn start( db: sqlight.Connection, config: Config, registry: process.Subject(RegistryMsg), + pubsub_subject: process.Subject(pubsub.PubsubMsg), ) -> Result(process.Subject(ResumeMsg), actor.StartError) { actor.new_with_initialiser(500, fn(subject) { process.send_after(subject, interval_ms, Tick) - State(db: db, config: config, registry: registry, self: subject) + State( + db: db, + config: config, + registry: registry, + pubsub: pubsub_subject, + self: subject, + ) |> actor.initialised |> actor.returning(subject) |> actor.selecting(process.new_selector() |> process.select(subject)) @@ -71,7 +80,13 @@ fn tick(state: State) -> Nil { <> " run(s)", ) list.each(due, fn(run) { - reattach.resurrect(run, state.db, state.config, state.registry) + reattach.resurrect( + run, + state.db, + state.config, + state.registry, + state.pubsub, + ) }) } } diff --git a/src/server/src/fbi/run/supervisor.gleam b/src/server/src/fbi/run/supervisor.gleam index 698946e..92019cc 100644 --- a/src/server/src/fbi/run/supervisor.gleam +++ b/src/server/src/fbi/run/supervisor.gleam @@ -1,4 +1,5 @@ import fbi/config.{type Config} +import fbi/pubsub import fbi/run/actor as run_actor import fbi/run/broadcaster import fbi/run/registry.{type RegistryMsg, Register} @@ -12,13 +13,14 @@ pub fn start_run( db: sqlight.Connection, config: Config, run_id: Int, + pubsub_subject: Subject(pubsub.PubsubMsg), ) -> Result(#(Subject(RunMsg), Subject(BroadcastMsg)), String) { use bc <- result.try( broadcaster.start() |> result.map_error(fn(_) { "failed to start broadcaster" }), ) use actor_subject <- result.try( - run_actor.start(run_id, db, config, bc, registry) + run_actor.start(run_id, db, config, bc, registry, pubsub_subject) |> result.map_error(fn(_) { "failed to start run actor" }), ) process.send(registry, Register(run_id, actor_subject)) diff --git a/src/server/src/fbi/run/types.gleam b/src/server/src/fbi/run/types.gleam index e6fd7dc..96f511e 100644 --- a/src/server/src/fbi/run/types.gleam +++ b/src/server/src/fbi/run/types.gleam @@ -61,4 +61,11 @@ pub type TerminalEvent { StateChanged(state: String) TitleChanged(title: String) Snapshot(ansi: String, cols: Int, rows: Int, byte_offset: Int) + UsageSnap( + model: String, + input_tokens: Int, + output_tokens: Int, + cache_read_tokens: Int, + cache_create_tokens: Int, + ) } diff --git a/src/server/src/fbi/run/usage_tailer.gleam b/src/server/src/fbi/run/usage_tailer.gleam new file mode 100644 index 0000000..1030e9b --- /dev/null +++ b/src/server/src/fbi/run/usage_tailer.gleam @@ -0,0 +1,296 @@ +import fbi/config.{type Config} +import fbi/db/connection +import fbi/db/usage as usage_db +import fbi/pubsub +import fbi/run/types.{type BroadcastMsg, BroadcastEvent, UsageSnap} +import fbi/usage_parser +import gleam/bit_array +import gleam/dict.{type Dict} +import gleam/dynamic.{type Dynamic} +import gleam/dynamic/decode +import gleam/erlang/process.{type Subject} +import gleam/int +import gleam/json +import gleam/list +import gleam/option.{type Option, None, Some} +import gleam/otp/actor +import gleam/result +import gleam/string +import simplifile +import sqlight +import wisp + +const poll_interval_ms = 2000 + +pub type TailerMsg { + Tick + Stop +} + +type State { + State( + run_id: Int, + mount_dir: String, + db: sqlight.Connection, + pubsub: Subject(pubsub.PubsubMsg), + broadcaster: Subject(BroadcastMsg), + offsets: Dict(String, Int), + self: Subject(TailerMsg), + ) +} + +pub fn start( + run_id: Int, + config: Config, + db: sqlight.Connection, + pubsub_subject: Subject(pubsub.PubsubMsg), + broadcaster: Subject(BroadcastMsg), +) -> Result(Subject(TailerMsg), actor.StartError) { + let mount_dir = config.runs_dir <> "/" <> int.to_string(run_id) <> "/mount" + actor.new_with_initialiser(500, fn(subject) { + process.send_after(subject, poll_interval_ms, Tick) + State( + run_id: run_id, + mount_dir: mount_dir, + db: db, + pubsub: pubsub_subject, + broadcaster: broadcaster, + offsets: dict.new(), + self: subject, + ) + |> actor.initialised + |> actor.returning(subject) + |> actor.selecting(process.new_selector() |> process.select(subject)) + |> Ok + }) + |> actor.on_message(fn(state: State, msg: TailerMsg) { handle(state, msg) }) + |> actor.start + |> result.map(fn(started) { started.data }) +} + +fn handle(state: State, msg: TailerMsg) -> actor.Next(State, TailerMsg) { + case msg { + Tick -> { + let new_offsets = sweep(state) + process.send_after(state.self, poll_interval_ms, Tick) + actor.continue(State(..state, offsets: new_offsets)) + } + Stop -> { + let _ = sweep(state) + actor.stop() + } + } +} + +// ── Filesystem sweep ───────────────────────────────────────────────────────── + +fn sweep(state: State) -> Dict(String, Int) { + let paths = collect_jsonl_files(state.mount_dir) + list.fold(paths, state.offsets, fn(offsets, path) { + let current_offset = dict.get(offsets, path) |> result.unwrap(0) + let new_offset = process_file(state, path, current_offset) + dict.insert(offsets, path, new_offset) + }) +} + +fn collect_jsonl_files(dir: String) -> List(String) { + case simplifile.read_directory(dir) { + Error(_) -> [] + Ok(entries) -> + list.flat_map(entries, fn(entry) { + let path = dir <> "/" <> entry + case simplifile.is_directory(path) { + Ok(True) -> collect_jsonl_files(path) + _ -> + case string.ends_with(entry, ".jsonl") { + True -> [path] + False -> [] + } + } + }) + } +} + +fn process_file(state: State, path: String, offset: Int) -> Int { + case simplifile.read_bits(path) { + Error(_) -> offset + Ok(bits) -> { + let total_bytes = bit_array.byte_size(bits) + case total_bytes <= offset { + True -> offset + False -> { + let new_bytes_count = total_bytes - offset + case bit_array.slice(bits, offset, new_bytes_count) { + Error(_) -> offset + Ok(new_bits) -> + case bit_array.to_string(new_bits) { + Error(_) -> offset + Ok(chunk) -> { + let all_parts = string.split(chunk, "\n") + // Consume only complete lines (everything before the last element, + // which may be an unterminated partial line). + let #(complete, _partial) = split_last(all_parts) + let consumed = + list.fold(complete, 0, fn(acc, l) { + // +1 for the \n that was stripped by string.split + acc + string.byte_size(l) + 1 + }) + list.each(complete, fn(line) { process_line(state, line) }) + offset + consumed + } + } + } + } + } + } + } +} + +fn split_last(parts: List(String)) -> #(List(String), String) { + case list.reverse(parts) { + [] -> #([], "") + [last, ..rest] -> #(list.reverse(rest), last) + } +} + +// ── Line processing ─────────────────────────────────────────────────────────── + +fn process_line(state: State, line: String) -> Nil { + let now = now_ms() + case usage_parser.parse_line(line) { + usage_parser.SkipLine -> Nil + usage_parser.ErrorLine(reason) -> { + wisp.log_debug( + "run " + <> int.to_string(state.run_id) + <> " usage parse error: " + <> reason, + ) + let _ = usage_db.bump_parse_errors(state.db, state.run_id) + Nil + } + usage_parser.UsageLine(usage: event, rate_limits: rl) -> { + case usage_db.insert_event(state.db, state.run_id, now, event, rl) { + Error(e) -> + wisp.log_warning( + "run " + <> int.to_string(state.run_id) + <> " usage db error: " + <> connection.describe_error(e), + ) + Ok(_) -> Nil + } + + // Live broadcast to this run's WebSocket subscribers + process.send( + state.broadcaster, + BroadcastEvent(UsageSnap( + model: event.model, + input_tokens: event.input_tokens, + output_tokens: event.output_tokens, + cache_read_tokens: event.cache_read_tokens, + cache_create_tokens: event.cache_create_tokens, + )), + ) + + // Rate-limit bucket update + case rl { + None -> Nil + Some(fields) -> process_rate_limits(state, fields, now) + } + } + } +} + +fn process_rate_limits( + state: State, + rl: usage_parser.RateLimitFields, + now: Int, +) -> Nil { + let _ = usage_db.upsert_rate_limit_state(state.db, None, now) + + let utilization = + derive_utilization(rl.tokens_remaining, rl.tokens_limit) + |> option.or(derive_utilization(rl.requests_remaining, rl.requests_limit)) + + case utilization { + None -> Nil + Some(util) -> { + // Derive window_started_at from reset_at - 5h + let window_started = case rl.reset_at { + None -> None + Some(reset) -> Some(reset - five_hour_window_ms) + } + + let bucket_result = + usage_db.upsert_bucket(state.db, "five_hour", util, rl.reset_at, now) + + // Persist window_started_at on first observation + case window_started { + None -> Nil + Some(ws) -> { + let _ = + sqlight.query( + "UPDATE rate_limit_buckets SET window_started_at = ? WHERE bucket_id = 'five_hour' AND window_started_at IS NULL", + on: state.db, + with: [sqlight.int(ws)], + expecting: decode.dynamic, + ) + Nil + } + } + + let state_value = usage_db.get_usage_state_value(state.db, now_ms()) + let snapshot_msg = + json.object([ + #("type", json.string("snapshot")), + #("state", state_value), + ]) + |> json.to_string() + + publish_usage(state.pubsub, snapshot_msg) + + case bucket_result { + Ok(usage_db.ThresholdCrossed(bid, thresh, reset)) -> { + let threshold_msg = + json.object([ + #("type", json.string("threshold_crossed")), + #("bucket_id", json.string(bid)), + #("threshold", json.int(thresh)), + #("reset_at", json.nullable(reset, json.int)), + ]) + |> json.to_string() + publish_usage(state.pubsub, threshold_msg) + } + _ -> Nil + } + } + } +} + +fn derive_utilization( + remaining: Option(Int), + limit: Option(Int), +) -> Option(Float) { + case remaining, limit { + Some(rem), Some(lim) if lim > 0 -> + Some(1.0 -. int.to_float(rem) /. int.to_float(lim)) + _, _ -> None + } +} + +fn publish_usage(ps: Subject(pubsub.PubsubMsg), msg: String) -> Nil { + process.send(ps, pubsub.Publish(topic: "usage", message: to_dynamic(msg))) +} + +// ── FFI ─────────────────────────────────────────────────────────────────────── + +const five_hour_window_ms = 18_000_000 + +@external(erlang, "fbi_time", "now_ms") +fn now_ms() -> Int + +/// Cast a String to Dynamic — safe because Gleam strings are Erlang binaries, +/// which decode correctly with decode.string on the subscriber side. +@external(erlang, "fbi_time", "to_dynamic") +fn to_dynamic(s: String) -> Dynamic diff --git a/src/server/src/fbi/usage_parser.gleam b/src/server/src/fbi/usage_parser.gleam new file mode 100644 index 0000000..c2834a2 --- /dev/null +++ b/src/server/src/fbi/usage_parser.gleam @@ -0,0 +1,254 @@ +import gleam/dynamic/decode +import gleam/int +import gleam/json +import gleam/option.{type Option, None, Some} + +pub type UsageEvent { + UsageEvent( + model: String, + input_tokens: Int, + output_tokens: Int, + cache_read_tokens: Int, + cache_create_tokens: Int, + ) +} + +pub type RateLimitFields { + RateLimitFields( + requests_remaining: Option(Int), + requests_limit: Option(Int), + tokens_remaining: Option(Int), + tokens_limit: Option(Int), + reset_at: Option(Int), + ) +} + +pub type ParseResult { + UsageLine(usage: UsageEvent, rate_limits: Option(RateLimitFields)) + SkipLine + ErrorLine(reason: String) +} + +// ── Intermediate types ──────────────────────────────────────────────────────── + +type MessageData { + MessageData( + role: Option(String), + model: Option(String), + input_tokens: Option(Int), + output_tokens: Option(Int), + cache_read: Option(Int), + cache_create: Option(Int), + ) +} + +type RawLine { + RawLine( + line_type: String, + message: Option(MessageData), + rate_limits: Option(RateLimitFields), + ) +} + +// ── Decoder helpers ─────────────────────────────────────────────────────────── + +fn int_or_string_decoder() -> decode.Decoder(Option(Int)) { + decode.one_of(decode.map(decode.int, Some), [ + decode.then(decode.string, fn(s) { + case int.parse(s) { + Ok(n) -> decode.success(Some(n)) + Error(_) -> decode.success(None) + } + }), + ]) +} + +// Decoder for the rateLimits object (applied to the value at key "rateLimits"). +fn rl_inner_decoder() -> decode.Decoder(RateLimitFields) { + use req_rem <- decode.optional_field( + "anthropic-ratelimit-unified-5h-requests-remaining", + None, + int_or_string_decoder(), + ) + use req_lim <- decode.optional_field( + "anthropic-ratelimit-unified-5h-requests-limit", + None, + int_or_string_decoder(), + ) + use tok_rem <- decode.optional_field( + "anthropic-ratelimit-unified-5h-tokens-remaining", + None, + int_or_string_decoder(), + ) + use tok_lim <- decode.optional_field( + "anthropic-ratelimit-unified-5h-tokens-limit", + None, + int_or_string_decoder(), + ) + use reset_str <- decode.optional_field( + "anthropic-ratelimit-unified-5h-reset", + None, + decode.optional(decode.string), + ) + decode.success(RateLimitFields( + requests_remaining: req_rem, + requests_limit: req_lim, + tokens_remaining: tok_rem, + tokens_limit: tok_lim, + reset_at: option.then(reset_str, parse_iso8601_ms), + )) +} + +// Decoder for the "message" sub-object. +fn message_decoder() -> decode.Decoder(MessageData) { + use role <- decode.optional_field( + "role", + None, + decode.optional(decode.string), + ) + use model <- decode.optional_field( + "model", + None, + decode.optional(decode.string), + ) + use input_tokens <- decode.optional_field( + "usage", + None, + decode.optional(decode.field("input_tokens", decode.int, decode.success)), + ) + use output_tokens <- decode.optional_field( + "usage", + None, + decode.optional(decode.field("output_tokens", decode.int, decode.success)), + ) + use cache_read <- decode.optional_field( + "usage", + None, + decode.optional(decode.field( + "cache_read_input_tokens", + decode.int, + decode.success, + )), + ) + use cache_create <- decode.optional_field( + "usage", + None, + decode.optional(decode.field( + "cache_creation_input_tokens", + decode.int, + decode.success, + )), + ) + decode.success(MessageData( + role: role, + model: model, + input_tokens: input_tokens, + output_tokens: output_tokens, + cache_read: cache_read, + cache_create: cache_create, + )) +} + +// Top-level line decoder. +fn line_decoder() -> decode.Decoder(RawLine) { + use line_type <- decode.field("type", decode.string) + use message <- decode.optional_field( + "message", + None, + decode.optional(message_decoder()), + ) + use rate_limits <- decode.optional_field( + "rateLimits", + None, + decode.optional(rl_inner_decoder()), + ) + decode.success(RawLine( + line_type: line_type, + message: message, + rate_limits: rate_limits, + )) +} + +// ── Public API ──────────────────────────────────────────────────────────────── + +pub fn parse_line(raw: String) -> ParseResult { + let trimmed = string_trim(raw) + case trimmed { + "" -> SkipLine + _ -> { + case json.parse(trimmed, line_decoder()) { + Error(_) -> ErrorLine("not valid JSON or unexpected shape") + Ok(line) -> interpret(line) + } + } + } +} + +fn interpret(line: RawLine) -> ParseResult { + case line.line_type { + "assistant" -> interpret_assistant(line) + _ -> SkipLine + } +} + +fn interpret_assistant(line: RawLine) -> ParseResult { + case line.message { + None -> ErrorLine("assistant turn missing message") + Some(msg) -> interpret_message(msg, line.rate_limits) + } +} + +fn interpret_message( + msg: MessageData, + rl: Option(RateLimitFields), +) -> ParseResult { + case msg.role { + Some(r) if r != "assistant" -> SkipLine + _ -> + case msg.model { + None -> ErrorLine("assistant turn missing model") + Some(model) -> build_usage_line(model, msg, rl) + } + } +} + +fn build_usage_line( + model: String, + msg: MessageData, + rl: Option(RateLimitFields), +) -> ParseResult { + let has_usage = + option.is_some(msg.input_tokens) + || option.is_some(msg.output_tokens) + || option.is_some(msg.cache_read) + || option.is_some(msg.cache_create) + case has_usage { + False -> ErrorLine("assistant turn missing usage") + True -> + UsageLine( + usage: UsageEvent( + model: model, + input_tokens: option.unwrap(msg.input_tokens, 0), + output_tokens: option.unwrap(msg.output_tokens, 0), + cache_read_tokens: option.unwrap(msg.cache_read, 0), + cache_create_tokens: option.unwrap(msg.cache_create, 0), + ), + rate_limits: rl, + ) + } +} + +// ── ISO 8601 → Unix ms ──────────────────────────────────────────────────────── + +fn parse_iso8601_ms(iso: String) -> Option(Int) { + case erlang_parse_iso8601(iso) { + Ok(ms) -> Some(ms) + Error(_) -> None + } +} + +@external(erlang, "fbi_time", "parse_iso8601_ms") +fn erlang_parse_iso8601(iso: String) -> Result(Int, String) + +@external(erlang, "string", "trim") +fn string_trim(s: String) -> String diff --git a/src/server/src/fbi_time.erl b/src/server/src/fbi_time.erl index a594c1a..d7c08da 100644 --- a/src/server/src/fbi_time.erl +++ b/src/server/src/fbi_time.erl @@ -1,4 +1,38 @@ -module(fbi_time). --export([now_ms/0]). +-export([now_ms/0, parse_iso8601_ms/1, to_dynamic/1]). + +%% Coerce any Gleam value to gleam/dynamic.Dynamic (identity cast). +to_dynamic(X) -> X. now_ms() -> erlang:system_time(millisecond). + +% Parse an ISO 8601 UTC string like "2026-04-22T18:00:00Z" into Unix milliseconds. +% Returns {ok, Ms} | {error, Reason}. +parse_iso8601_ms(Iso) -> + Str = binary_to_list(Iso), + case catch parse_iso_str(Str) of + {'EXIT', _} -> {error, <<"parse error">>}; + {ok, Ms} -> {ok, Ms}; + {error, R} -> {error, R} + end. + +parse_iso_str(Str) -> + % "2026-04-22T18:00:00Z" or "2026-04-22T18:00:00+00:00" + case string:tokens(Str, "T") of + [Date, TimeZ] -> + [Y, Mo, D] = [list_to_integer(X) || X <- string:tokens(Date, "-")], + % Strip timezone suffix + Time = string:trim(TimeZ, trailing, "Z+0123456789:-"), + TimeParts = string:tokens(Time, ":"), + [H, Mi, S] = case TimeParts of + [HH, MM, SS] -> [list_to_integer(HH), list_to_integer(MM), trunc(float(list_to_integer(SS)))]; + [HH, MM] -> [list_to_integer(HH), list_to_integer(MM), 0]; + [HH] -> [list_to_integer(HH), 0, 0]; + _ -> [0, 0, 0] + end, + Seconds = calendar:datetime_to_gregorian_seconds({{Y, Mo, D}, {H, Mi, S}}) + - calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}), + {ok, Seconds * 1000}; + _ -> + {error, <<"bad format">>} + end. diff --git a/src/server/test/fbi/git/repo_test.gleam b/src/server/test/fbi/git/repo_test.gleam index c723962..5493d80 100644 --- a/src/server/test/fbi/git/repo_test.gleam +++ b/src/server/test/fbi/git/repo_test.gleam @@ -106,11 +106,7 @@ pub fn wip_files_with_snapshot_returns_diff_test() { git.run(work, ["commit-tree", tree, "-p", head_sha, "-m", "wip snapshot"]) let snapshot_sha = string.trim(commit_str) let assert Ok(_) = - git.run(work, [ - "push", - "origin", - snapshot_sha <> ":refs/fbi/wip-snapshot", - ]) + git.run(work, ["push", "origin", snapshot_sha <> ":refs/fbi/wip-snapshot"]) let assert Ok(option.Some(snapshot)) = repo.wip_files(bare) snapshot.parent_sha |> should.equal(head_sha) snapshot.snapshot_sha |> should.equal(snapshot_sha) @@ -132,11 +128,7 @@ pub fn wip_files_no_diff_returns_none_test() { git.run(work, ["commit-tree", tree, "-p", head_sha, "-m", "wip snapshot"]) let snapshot_sha = string.trim(commit_str) let assert Ok(_) = - git.run(work, [ - "push", - "origin", - snapshot_sha <> ":refs/fbi/wip-snapshot", - ]) + git.run(work, ["push", "origin", snapshot_sha <> ":refs/fbi/wip-snapshot"]) repo.wip_files(bare) |> should.equal(Ok(option.None)) } diff --git a/src/server/test/fbi/run/actor_test.gleam b/src/server/test/fbi/run/actor_test.gleam index 37f5191..1010623 100644 --- a/src/server/test/fbi/run/actor_test.gleam +++ b/src/server/test/fbi/run/actor_test.gleam @@ -2,6 +2,7 @@ import fbi/config import fbi/db/migrations import fbi/db/projects import fbi/db/runs +import fbi/pubsub import fbi/run/actor as run_actor import fbi/run/broadcaster import fbi/run/registry @@ -74,9 +75,10 @@ pub fn agent_status_changed_in_running_updates_db_and_broadcasts_test() { let #(db, cfg) = test_setup() let assert Ok(bc) = broadcaster.start() let assert Ok(reg) = registry.start() + let assert Ok(ps) = pubsub.start() let event_sub = process.new_subject() process.send(bc, BroadcastSubscribe(event_sub)) - let assert Ok(actor_subject) = run_actor.start(1, db, cfg, bc, reg) + let assert Ok(actor_subject) = run_actor.start(1, db, cfg, bc, reg, ps) // Transition to Running process.send(actor_subject, WorkerReady("test-cid", "main", 80, 24)) process.sleep(50) @@ -97,7 +99,8 @@ pub fn agent_status_changed_ignored_when_not_running_test() { let #(db, cfg) = test_setup() let assert Ok(bc) = broadcaster.start() let assert Ok(reg) = registry.start() - let assert Ok(actor_subject) = run_actor.start(1, db, cfg, bc, reg) + let assert Ok(ps) = pubsub.start() + let assert Ok(actor_subject) = run_actor.start(1, db, cfg, bc, reg, ps) // Send in Starting phase — must not crash or change state process.send(actor_subject, AgentStatusChanged("waiting")) process.sleep(50) @@ -109,7 +112,8 @@ pub fn worker_failed_transitions_to_failed_test() { let #(db, cfg) = test_setup() let assert Ok(bc) = broadcaster.start() let assert Ok(reg) = registry.start() - let assert Ok(actor_subject) = run_actor.start(1, db, cfg, bc, reg) + let assert Ok(ps) = pubsub.start() + let assert Ok(actor_subject) = run_actor.start(1, db, cfg, bc, reg, ps) process.send(actor_subject, WorkerFailed("simulated failure")) // Give the actor time to process the message process.sleep(50) @@ -124,7 +128,8 @@ pub fn start_and_shutdown_test() { let #(db, cfg) = test_setup() let assert Ok(bc) = broadcaster.start() let assert Ok(reg) = registry.start() - let assert Ok(actor_subject) = run_actor.start(1, db, cfg, bc, reg) + let assert Ok(ps) = pubsub.start() + let assert Ok(actor_subject) = run_actor.start(1, db, cfg, bc, reg, ps) // Verify the actor starts correctly in Starting phase // by sending Shutdown which should stop the actor cleanly process.send(actor_subject, types.Shutdown)