Milestone: v0.3.0 | Tier: Creative Bet | Effort: Medium–Large
Problem
Firewall.apply() waits for the complete driver response before transforming. For large results (database queries, LLM-generated tool outputs, file listings), this:
- Blocks until the entire response arrives — high latency for large payloads
- Wastes memory — entire response must fit in memory before any processing
- Prevents incremental delivery — caller can't start processing until everything is ready
- Can't adapt — if budget is exceeded mid-stream, there's no way to switch to more aggressive summarization for remaining chunks
Proposed Change
1. Streaming firewall method
class Firewall:
async def apply_stream(
self,
response_chunks: AsyncIterator[dict],
*,
response_mode: ResponseMode,
budget: int,
sensitivity: SensitivityTag,
allowed_fields: list[str] | None = None,
) -> AsyncIterator[dict]:
"""Stream response chunks through the firewall with progressive summarization."""
2. Progressive summarization strategy
As chunks arrive:
- Phase 1 (budget > 60%): Pass chunks through with redaction only (minimal transformation).
- Phase 2 (budget 30%–60%): Switch to table/summary mode for remaining chunks.
- Phase 3 (budget 10%–30%): Aggressive summarization — emit only key-value summaries.
- Phase 4 (budget < 10%): Emit handle references only — store full data in HandleStore for later
expand().
Budget tracking is incremental — each yielded chunk decrements the remaining budget.
3. Streaming driver protocol
Extend the Driver protocol to support streaming:
class StreamingDriver(Protocol):
async def execute_stream(
self,
operation: str,
params: dict[str, Any],
constraints: dict[str, Any],
) -> AsyncIterator[dict[str, Any]]:
"""Execute an operation and yield response chunks."""
Drivers can implement execute_stream() optionally. The kernel detects support and uses streaming when available, falling back to execute() + single-chunk wrapping.
4. Kernel integration
async def invoke_stream(
self, token: str, params: dict,
) -> AsyncIterator[Frame]:
"""Invoke a capability with streaming response."""
Each yielded Frame contains a chunk of the response. The final frame has is_final=True.
Acceptance Criteria
Affected Files
src/agent_kernel/firewall/transform.py (add apply_stream())
src/agent_kernel/drivers/base.py (add StreamingDriver protocol)
src/agent_kernel/kernel.py (add invoke_stream())
src/agent_kernel/models.py (add is_final to Frame or new streaming model)
tests/test_firewall.py (streaming tests with mock async iterators)
tests/test_kernel.py (end-to-end streaming tests)
Dependencies
Risk
Medium — streaming + summarization interaction is complex. Edge cases around chunk boundaries and partial records need careful testing.
Milestone: v0.3.0 | Tier: Creative Bet | Effort: Medium–Large
Problem
Firewall.apply()waits for the complete driver response before transforming. For large results (database queries, LLM-generated tool outputs, file listings), this:Proposed Change
1. Streaming firewall method
2. Progressive summarization strategy
As chunks arrive:
expand().Budget tracking is incremental — each yielded chunk decrements the remaining budget.
3. Streaming driver protocol
Extend the
Driverprotocol to support streaming:Drivers can implement
execute_stream()optionally. The kernel detects support and uses streaming when available, falling back toexecute()+ single-chunk wrapping.4. Kernel integration
Each yielded
Framecontains a chunk of the response. The final frame hasis_final=True.Acceptance Criteria
apply_stream()yields transformed chunks as they arrive (not buffered)StreamingDriverprotocol is optional — non-streaming drivers still workinvoke_stream()works end-to-end with a streaming driverAffected Files
src/agent_kernel/firewall/transform.py(addapply_stream())src/agent_kernel/drivers/base.py(addStreamingDriverprotocol)src/agent_kernel/kernel.py(addinvoke_stream())src/agent_kernel/models.py(addis_finalto Frame or new streaming model)tests/test_firewall.py(streaming tests with mock async iterators)tests/test_kernel.py(end-to-end streaming tests)Dependencies
Risk
Medium — streaming + summarization interaction is complex. Edge cases around chunk boundaries and partial records need careful testing.