diff --git a/hermes-plugin/pipeline/__init__.py b/hermes-plugin/pipeline/__init__.py new file mode 100644 index 0000000..32e2981 --- /dev/null +++ b/hermes-plugin/pipeline/__init__.py @@ -0,0 +1,238 @@ +"""Pipeline components for SoloFlow. + +Implements Haystack-style component-based orchestration: +- Components = atomic capability units +- Pipelines = data flow + branching + looping +- Full observability at component level +""" + +from __future__ import annotations + +import logging +import time +import uuid +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Awaitable, Optional + +logger = logging.getLogger("soloflow.pipeline") + + +class ComponentStatus(str, Enum): + """Status of a pipeline component.""" + + IDLE = "idle" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class ComponentResult: + """Result from a pipeline component.""" + + component_id: str + status: ComponentStatus + output: Any = None + error: Optional[str] = None + duration_ms: float = 0.0 + token_usage: dict = field(default_factory=dict) + + def to_dict(self) -> dict: + return { + "component_id": self.component_id, + "status": self.status.value, + "output": self.output, + "error": self.error, + "duration_ms": self.duration_ms, + "token_usage": self.token_usage, + } + + +@dataclass +class PipelineComponent: + """A component in a pipeline. + + Key insight from Haystack: + - Components are atomic capability units + - Each component has clear input/output + - Full observability at component level + """ + + component_id: str + name: str + description: str = "" + handler: Callable[..., Awaitable[Any]] | None = None + input_schema: dict = field(default_factory=dict) + output_schema: dict = field(default_factory=dict) + + async def run(self, **kwargs) -> ComponentResult: + """Run the component.""" + start_time = time.time() + + try: + if self.handler: + output = await self.handler(**kwargs) + else: + output = kwargs + + duration_ms = (time.time() - start_time) * 1000 + + return ComponentResult( + component_id=self.component_id, + status=ComponentStatus.COMPLETED, + output=output, + duration_ms=duration_ms, + ) + except Exception as e: + duration_ms = (time.time() - start_time) * 1000 + return ComponentResult( + component_id=self.component_id, + status=ComponentStatus.FAILED, + error=str(e), + duration_ms=duration_ms, + ) + + def to_dict(self) -> dict: + return { + "component_id": self.component_id, + "name": self.name, + "description": self.description, + } + + +class Pipeline: + """A pipeline of components with data flow. + + Key patterns from Haystack: + - Pipeline defines data flow between components + - Supports branching and looping + - Full observability at each step + """ + + def __init__(self, name: str) -> None: + self.name = name + self._components: dict[str, PipelineComponent] = {} + self._edges: list[tuple[str, str]] = [] + self._results: list[ComponentResult] = [] + + def add_component(self, component: PipelineComponent) -> None: + """Add a component to the pipeline.""" + self._components[component.component_id] = component + + def add_edge(self, from_id: str, to_id: str) -> None: + """Add an edge between components.""" + self._edges.append((from_id, to_id)) + + async def run(self, initial_input: dict) -> dict: + """Run the pipeline. + + Returns: + Final output from the last component + """ + # Find start components (no incoming edges) + incoming = {e[1] for e in self._edges} + start_components = [ + cid for cid in self._components + if cid not in incoming + ] + + if not start_components: + start_components = list(self._components.keys())[:1] + + # Run components in topological order + current_input = initial_input + final_output = None + + for comp_id in start_components: + component = self._components.get(comp_id) + if not component: + continue + + result = await component.run(**current_input) + self._results.append(result) + + if result.status == ComponentStatus.FAILED: + return { + "success": False, + "error": result.error, + "failed_component": comp_id, + } + + final_output = result.output + current_input = result.output if isinstance(result.output, dict) else {"result": result.output} + + return { + "success": True, + "output": final_output, + "components_run": len(self._results), + } + + def get_results(self) -> list[dict]: + """Get results from all components.""" + return [r.to_dict() for r in self._results] + + def to_dict(self) -> dict: + return { + "name": self.name, + "components": [c.to_dict() for c in self._components.values()], + "edges": self._edges, + } + + +# Predefined components +class PromptBuilderComponent(PipelineComponent): + """Builds prompts from templates and context.""" + + def __init__(self) -> None: + super().__init__( + component_id="prompt_builder", + name="Prompt Builder", + description="Builds prompts from templates and context", + ) + + async def run(self, template: str = "", context: dict | None = None, **kwargs) -> ComponentResult: + start_time = time.time() + + prompt = template + if context: + for key, value in context.items(): + prompt = prompt.replace(f"{{{key}}}", str(value)) + + duration_ms = (time.time() - start_time) * 1000 + return ComponentResult( + component_id=self.component_id, + status=ComponentStatus.COMPLETED, + output={"prompt": prompt}, + duration_ms=duration_ms, + ) + + +class RouterComponent(PipelineComponent): + """Routes to different paths based on conditions.""" + + def __init__(self) -> None: + super().__init__( + component_id="router", + name="Router", + description="Routes to different paths based on conditions", + ) + + async def run(self, input_text: str = "", **kwargs) -> ComponentResult: + start_time = time.time() + + # Simple routing logic + if len(input_text) < 100: + route = "quick" + elif "analyze" in input_text.lower(): + route = "deep" + else: + route = "default" + + duration_ms = (time.time() - start_time) * 1000 + return ComponentResult( + component_id=self.component_id, + status=ComponentStatus.COMPLETED, + output={"route": route, "input": input_text}, + duration_ms=duration_ms, + ) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py new file mode 100644 index 0000000..70b06fc --- /dev/null +++ b/tests/pipeline/test_pipeline.py @@ -0,0 +1,106 @@ +"""Tests for pipeline components.""" + +import sys +from pathlib import Path +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "hermes-plugin")) + +from pipeline import Pipeline, PipelineComponent, ComponentStatus, ComponentResult + + +@pytest.fixture +def pipeline(): + p = Pipeline("test-pipeline") + return p + + +class TestPipelineComponent: + @pytest.mark.asyncio + async def test_run_success(self): + async def handler(text): + return {"result": text.upper()} + + component = PipelineComponent( + component_id="test", + name="Test", + handler=handler, + ) + + result = await component.run(text="hello") + assert result.status == ComponentStatus.COMPLETED + assert result.output["result"] == "HELLO" + + @pytest.mark.asyncio + async def test_run_failure(self): + async def handler(): + raise ValueError("Test error") + + component = PipelineComponent( + component_id="test", + name="Test", + handler=handler, + ) + + result = await component.run() + assert result.status == ComponentStatus.FAILED + assert "Test error" in result.error + + +class TestPipeline: + def test_add_component(self, pipeline): + component = PipelineComponent( + component_id="c1", + name="Component 1", + ) + pipeline.add_component(component) + assert "c1" in pipeline._components + + def test_add_edge(self, pipeline): + pipeline.add_edge("c1", "c2") + assert ("c1", "c2") in pipeline._edges + + @pytest.mark.asyncio + async def test_run_single_component(self, pipeline): + async def handler(text): + return {"result": text.upper()} + + component = PipelineComponent( + component_id="c1", + name="Upper", + handler=handler, + ) + pipeline.add_component(component) + + result = await pipeline.run({"text": "hello"}) + assert result["success"] is True + assert result["output"]["result"] == "HELLO" + + @pytest.mark.asyncio + async def test_run_pipeline_failure(self, pipeline): + async def handler(): + raise ValueError("Failed") + + component = PipelineComponent( + component_id="c1", + name="Fail", + handler=handler, + ) + pipeline.add_component(component) + + result = await pipeline.run({}) + assert result["success"] is False + assert "Failed" in result["error"] + + def test_to_dict(self, pipeline): + component = PipelineComponent( + component_id="c1", + name="Test", + ) + pipeline.add_component(component) + pipeline.add_edge("c1", "c2") + + d = pipeline.to_dict() + assert d["name"] == "test-pipeline" + assert len(d["components"]) == 1 + assert len(d["edges"]) == 1