Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/dedalus_labs/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def model_dump(
if (not PYDANTIC_V1) or hasattr(model, "model_dump"):
return model.model_dump(
mode=mode,
by_alias=True,
exclude=exclude,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
Expand Down
25 changes: 2 additions & 23 deletions src/dedalus_labs/lib/runner/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .types import Message, ToolCall, JsonValue, ToolResult, PolicyInput, PolicyContext
from ..._client import Dedalus, AsyncDedalus
from ...types.shared import MCPToolResult
from ..utils._stream import accumulate_tool_call

# Type alias for mcp_servers parameter - accepts strings, server objects, or mixed lists
MCPServersInput = Union[
Expand Down Expand Up @@ -1240,29 +1241,7 @@ def _execute_tool_calls_sync(
def _accumulate_tool_calls(self, deltas, acc: list[ToolCall]) -> None:
"""Accumulate streaming tool call deltas."""
for delta in deltas:
index = getattr(delta, "index", 0)

# Ensure we have enough entries in acc
while len(acc) <= index:
acc.append(
{
"id": "",
"type": "function",
"function": {"name": "", "arguments": ""},
}
)

if hasattr(delta, "id") and delta.id:
acc[index]["id"] = delta.id
if hasattr(delta, "function"):
fn = delta.function
if hasattr(fn, "name") and fn.name:
acc[index]["function"]["name"] = fn.name
if hasattr(fn, "arguments") and fn.arguments:
acc[index]["function"]["arguments"] += fn.arguments
thought_sig = getattr(delta, "thought_signature", None)
if thought_sig:
acc[index]["thought_signature"] = thought_sig
accumulate_tool_call(acc, delta)

@staticmethod
def _mk_kwargs(mc: _ModelConfig) -> Dict[str, Any]:
Expand Down
190 changes: 123 additions & 67 deletions src/dedalus_labs/lib/utils/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,136 +7,192 @@
from __future__ import annotations

import os

from dataclasses import dataclass, field
from collections.abc import AsyncIterator, Iterator
from typing import TYPE_CHECKING
from typing import Any, Dict, List, TYPE_CHECKING


if TYPE_CHECKING:
from ...types.chat.stream_chunk import StreamChunk

__all__ = [
"StreamResult",
"accumulate_tool_call",
"stream_async",
"stream_sync",
]


async def stream_async(stream: AsyncIterator[StreamChunk] | object) -> None:
@dataclass
class StreamResult:
"""Collected data from a consumed stream.

Returned by :func:`stream_async` and :func:`stream_sync` so callers
can inspect what happened after the stream is exhausted.
"""

content: str = ""
"""Concatenated text content from all ``delta.content`` fragments."""

tool_calls: List[Dict[str, Any]] = field(default_factory=list)
"""Reassembled tool calls accumulated from streaming deltas.

Each entry has the shape ``{"id": str, "function": {"name": str, "arguments": str}}``.
"""


def accumulate_tool_call(
tool_calls: List[Dict[str, Any]],
tc_delta: object,
) -> None:
"""Accumulate a single streaming tool-call delta into *tool_calls*.

Reassembles the incremental fragments (id, function name, argument
chunks, thought_signature) that arrive across multiple SSE chunks
into complete tool-call dicts keyed by ``tc_delta.index``.

This is the canonical implementation shared by both the stream
helpers and :class:`~dedalus_labs.lib.runner.core.DedalusRunner`.
"""
idx: int = getattr(tc_delta, "index", 0)
while len(tool_calls) <= idx:
tool_calls.append({"id": "", "type": "function", "function": {"name": "", "arguments": ""}})
entry = tool_calls[idx]

tc_id = getattr(tc_delta, "id", None)
if tc_id:
entry["id"] = tc_id

fn = getattr(tc_delta, "function", None)
if fn is not None:
if getattr(fn, "name", None):
entry["function"]["name"] = fn.name
if getattr(fn, "arguments", None):
entry["function"]["arguments"] += fn.arguments

thought_sig = getattr(tc_delta, "thought_signature", None)
if thought_sig:
entry["thought_signature"] = thought_sig


def _process_chunk(
chunk: object,
result: StreamResult,
verbose: bool,
) -> None:
"""Extract content and tool-call deltas from a single StreamChunk."""
if verbose:
extra = getattr(chunk, "__pydantic_extra__", None)
if isinstance(extra, dict):
meta = extra.get("dedalus_event")
if isinstance(meta, dict):
print(f"\n[EVENT] {meta}")

choices = getattr(chunk, "choices", None)
if not choices:
return

choice = choices[0]
delta = choice.delta

# Tool-call deltas
for tc in getattr(delta, "tool_calls", None) or []:
if verbose:
fn = getattr(tc, "function", None)
print(f"\n[TOOL_CALL] name={getattr(fn, 'name', None)} id={getattr(tc, 'id', None)}")
accumulate_tool_call(result.tool_calls, tc)

# Content
if delta.content:
print(delta.content, end="", flush=True)
result.content += delta.content

# Finish reason (verbose-only)
if verbose and getattr(choice, "finish_reason", None):
print(f"\n[FINISH] reason={choice.finish_reason}")


async def stream_async(stream: AsyncIterator[StreamChunk] | object) -> StreamResult:
"""Stream text content from an async streaming response.

Prints content tokens to stdout as they arrive **and** returns a
:class:`StreamResult` with the accumulated content and tool calls.

Supports both:
- Raw StreamChunk iterator from .create(stream=True) or DedalusRunner.run(stream=True)
- ChatCompletionStreamManager from .stream() (Pydantic models with event API)

Args:
stream: An async iterator of StreamChunk or a ChatCompletionStreamManager

Example:
>>> # With .create(stream=True)
>>> stream = await client.chat.completions.create(stream=True, ...)
>>> await stream_async(stream)
Returns:
A :class:`StreamResult` containing the full content and any tool calls.

>>> # With .stream() (Pydantic models)
>>> stream = client.chat.completions.stream(response_format=Model, ...)
>>> await stream_async(stream)
Example:
>>> stream = runner.run(input="...", model="...", stream=True)
>>> result = await stream_async(stream)
>>> print(result.tool_calls)
"""
verbose = os.environ.get("DEDALUS_SDK_VERBOSE", "").lower() in ("1", "true", "yes", "on", "debug")
result = StreamResult()

# Stream manager (event API) vs raw AsyncStream: discriminate via __aenter__ without __aiter__
if hasattr(stream, "__aenter__") and not hasattr(stream, "__aiter__"):
async with stream as event_stream:
async for event in event_stream:
if event.type == "content.delta":
print(event.delta, end="", flush=True)
result.content += event.delta
elif verbose and event.type == "content.done" and hasattr(event, "parsed") and event.parsed:
print(f"\n[PARSED] {type(event.parsed).__name__}")
print() # Final newline
return
return result

# Simple StreamChunk iterator case
async for chunk in stream:
# Print server-side metadata events if present (verbose-only)
if verbose:
extra = getattr(chunk, "__pydantic_extra__", None)
if isinstance(extra, dict):
meta = extra.get("dedalus_event")
if isinstance(meta, dict):
print(f"\n[EVENT] {meta}")

if chunk.choices:
choice = chunk.choices[0]
delta = choice.delta
# Print tool-call deltas as debug (verbose-only)
if verbose and getattr(delta, "tool_calls", None):
for tc in delta.tool_calls:
fn = getattr(tc, "function", None)
name = getattr(fn, "name", None)
tcid = getattr(tc, "id", None)
print(f"\n[TOOL_CALL] name={name} id={tcid}")
# Always print content
if delta.content:
print(delta.content, end="", flush=True)
# Print finish reason (verbose-only)
if verbose and getattr(choice, "finish_reason", None):
print(f"\n[FINISH] reason={choice.finish_reason}")
_process_chunk(chunk, result, verbose)
print() # Final newline
return result


def stream_sync(stream: Iterator[StreamChunk] | object) -> None:
def stream_sync(stream: Iterator[StreamChunk] | object) -> StreamResult:
"""Stream text content from a streaming response.

Prints content tokens to stdout as they arrive **and** returns a
:class:`StreamResult` with the accumulated content and tool calls.

Supports both:
- Raw StreamChunk iterator from .create(stream=True) or DedalusRunner.run(stream=True)
- ChatCompletionStreamManager from .stream() (Pydantic models with event API)

Args:
stream: An iterator of StreamChunk or a ChatCompletionStreamManager

Example:
>>> # With .create(stream=True)
>>> stream = client.chat.completions.create(stream=True, ...)
>>> stream_sync(stream)
Returns:
A :class:`StreamResult` containing the full content and any tool calls.

>>> # With .stream() (Pydantic models)
>>> stream = client.chat.completions.stream(response_format=Model, ...)
>>> stream_sync(stream)
Example:
>>> stream = runner.run(input="...", model="...", stream=True)
>>> result = stream_sync(stream)
>>> print(result.tool_calls)
"""
verbose = os.environ.get("DEDALUS_SDK_VERBOSE", "").lower() in ("1", "true", "yes", "on", "debug")
result = StreamResult()

# Stream manager (event API) vs raw Stream: discriminate via __enter__ without __iter__
if hasattr(stream, "__enter__") and not hasattr(stream, "__iter__"):
with stream as event_stream:
for event in event_stream:
if event.type == "content.delta":
print(event.delta, end="", flush=True)
result.content += event.delta
elif verbose and event.type == "content.done" and hasattr(event, "parsed") and event.parsed:
print(f"\n[PARSED] {type(event.parsed).__name__}")
print() # Final newline
return
return result

# Simple StreamChunk iterator case
for chunk in stream:
# Print server-side metadata events if present (verbose-only)
if verbose:
extra = getattr(chunk, "__pydantic_extra__", None)
if isinstance(extra, dict):
meta = extra.get("dedalus_event")
if isinstance(meta, dict):
print(f"\n[EVENT] {meta}")

if chunk.choices:
choice = chunk.choices[0]
delta = choice.delta
# Print tool-call deltas as debug (verbose-only)
if verbose and getattr(delta, "tool_calls", None):
for tc in delta.tool_calls:
fn = getattr(tc, "function", None)
name = getattr(fn, "name", None)
tcid = getattr(tc, "id", None)
print(f"\n[TOOL_CALL] name={name} id={tcid}")
# Always print content
if delta.content:
print(delta.content, end="", flush=True)
if verbose and getattr(choice, "finish_reason", None):
print(f"\n[FINISH] reason={choice.finish_reason}")
_process_chunk(chunk, result, verbose)
print() # Final newline
return result
4 changes: 2 additions & 2 deletions src/dedalus_labs/utils/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@

"""Stream utilities for printing model responses in real-time."""

from ..lib.utils._stream import stream_async, stream_sync
from ..lib.utils._stream import StreamResult, stream_async, stream_sync

__all__ = ["stream_async", "stream_sync"]
__all__ = ["StreamResult", "stream_async", "stream_sync"]
Loading
Loading