feat: DB context prefetching to eliminate planner discovery steps#416
Open
yassinejebbouri wants to merge 10 commits into
Open
feat: DB context prefetching to eliminate planner discovery steps#416yassinejebbouri wants to merge 10 commits into
yassinejebbouri wants to merge 10 commits into
Conversation
The previous implementation retried failures with a simple loop inside _call_relevancy. Moved retry/backoff logic to LiteLLM Router (exponential backoff, circuit breaker). Per-request timeout was also missing: the Router constructor timeout is not forwarded automatically to individual .completion() calls, so hung WatsonX requests could block indefinitely. Now passed explicitly on every call.
Two new parallel execution strategies for the N×M FM↔sensor mapping: - _mapping_adaptive_ceiling: fires all pairs concurrently from t=0 and halves the semaphore limit immediately on any 500 error. Avoids the AIMD ramp-up penalty for small N where additive increase finishes only after all work is already done. - _mapping_hedged: ceiling-start combined with speculative duplicate requests. If any call stalls past FMSR_HEDGE_AFTER_S (default 8s), a rescue copy is fired on a background thread. Whichever copy responds first wins, capping tail latency at ~2×hedge_after_s instead of the full 90s Router timeout. Also removed unused intermediate implementations (_mapping_batched_parallel, _mapping_async, _call_relevancy_async) that were never wired into the benchmark, and cleaned up asyncio import left behind after their removal.
psutil: hardware sampling (CPU%, memory RSS, thread count) during runs. filelock: safe append-only writes to the shared JSONL results file when multiple processes run the benchmark concurrently or resume after crash. matplotlib: benchmark visualization plots (wall time, speedup, per-call latency distribution, hardware utilization).
…nv var The benchmark controls which parallelization path runs by passing FMSR_STRATEGY in the subprocess environment when spawning the server. The tool interface (inputs, outputs) is unchanged — only the internal N×M dispatch is selected at startup from the env var: sequential — one LLM call at a time (baseline) parallel — fixed thread pool (FMSR_PARALLEL_WORKERS workers) adaptive_ceiling — ceiling-start semaphore, halves on 500 error hedged — ceiling-start + speculative duplicate on stall Default remains parallel with 2 workers, matching the original behaviour when FMSR_STRATEGY is not set.
…ntation
bench_hardware.py — samples CPU%, memory RSS, and thread count via psutil
at a configurable interval during each run
bench_stats.py — t-distribution confidence intervals, per-call stat
aggregation, and the build_summary() roll-up used by
the main benchmark and plot generator
bench_instrumentation.py — timing hooks that patch fmsr._call_relevancy to
capture per-call latency and phase boundaries; used by
the in-process debug runner (test_scenario.py), not by
the MCP benchmark where the server runs out-of-process
Covers wall time grouped bars, speedup line charts, per-call latency box plots, hardware utilization, phase breakdowns, and scenario scaling. Strategies and colors are configurable; defaults match the 4-strategy set.
Calls get_failure_mode_sensor_mapping through the FMSR MCP server (stdio subprocess) for each of 4 strategies across 15 scenarios × 3 runs. This matches the real agent execution path exactly — same subprocess spawn, same stdio protocol, same tool interface as workflow/executor.py. Key design points: - Sensors fetched live from iot-mcp-server (CouchDB), not hardcoded - Failure modes fetched live from fmsr-mcp-server (YAML), not hardcoded - Per-scenario sensor/FM slices derived from the real fetched lists using keyword filtering that mirrors what the agent query implies - Strategy selected by passing FMSR_STRATEGY to the server subprocess env - Resume support: skips (run, scenario, strategy) triples already in JSONL - Results written to results_mcp/ to preserve existing results/
test_scenario.py — debug tool that imports the FMSR server in-process
and patches _call_relevancy to print every (sensor, FM)
pair live as it executes. Sensors and failure modes are
fetched from the live MCP servers at startup. Useful for
tracing individual LLM calls without running the full
benchmark.
Usage: uv run python -m src.benchmarking.test_scenario
--scenario 109 --strategy hedged
eval_fmsr.py — original sequential-vs-parallel evaluation script that
predates the multi-run benchmark; kept as a reference
baseline and for quick one-off comparisons.
Planner (planner.py): - Add optional context_block injection into the plan prompt - When real asset/sensor/failure-mode data is available up-front, the LLM receives it as a concrete context block and is instructed to skip discovery steps and write direct tool arguments instead Runner (runner.py): - Add _prefetch_context() which calls IoTAgent.assets, IoTAgent.sensors, and FMSRAgent.get_failure_modes via live MCP servers before planning - Returns (context_str, call_timings) with per-call wall times for assets, sensors, and failure modes separately - run() accepts prefetch=True flag; sub-call timings are recorded as named phases (prefetch_assets, prefetch_sensors, prefetch_failure_modes) for fine-grained breakdown in downstream analysis Benchmark (bench_opt0.py): - N_RUNS configurable runs per scenario (default 3, override via env) - Records wall time, plan steps, failed steps, and full phase breakdown including per-prefetch-call timings per individual run - Aggregates mean +/- std / min / max across runs per scenario x condition - Computes net time saved = (baseline_execute - prefetch_execute) - prefetch_overhead - Prints side-by-side comparison table with speedup and net-saved columns - Generates 7 plots: wall_time, plan_steps, speedup, phase_breakdown, prefetch_overhead_breakdown, net_time_saved, failed_steps - Baseline strategy: sequential (matching the proposal 70-call serial baseline)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Before planning begins, prefetch assets, sensors, and failure modes from the
MCP servers and inject the results into the planner prompt. This allows the
planner to skip discovery steps entirely and go directly to the answer.
Results
Files changed
src/workflow/runner.py—_prefetch_context()method andprefetch=Trueflag onrun()Test plan
runner.run(question, prefetch=True)and confirm discovery steps are absent from the plansrc/benchmarking/bench_opt0.pyto reproduce speedup results