Skip to content

think41/pipecat-adk

 
 

Repository files navigation

pipecat-adk

Build powerful voice-enabled AI agents by combining Pipecat's real-time audio pipelines with Google ADK's agent framework.

The Problem

Pipecat excels at real-time voice applications. It handles audio streaming, VAD, STT, TTS, and transport protocols beautifully. But as your application grows more complex—managing conversation history, handling interruptions correctly, persisting sessions, calling tools—things get difficult. Pipecat's context management wasn't designed for sophisticated agent workflows.

Google ADK (Agent Development Kit) excels at building agents. It provides rich concepts for sessions, state management, tool definitions, multi-agent orchestration, evaluations, and much more. But ADK wasn't designed for real-time voice—it expects request/response patterns, not streaming audio.

pipecat-adk bridges these two worlds, letting you build voice applications with Pipecat's real-time capabilities while leveraging ADK's agent framework for everything else.

Installation

pip install pipecat-adk

# Or install from source
pip install -e /path/to/pipecat-adk

Getting Started

If you have an existing Pipecat application, here's what you need to change:

Before (Standard Pipecat)

from pipecat.services.google import GoogleLLMService
from pipecat.services.google.llm import GoogleLLMContext
from pipecat.pipeline.pipeline import Pipeline

llm = GoogleLLMService(
    model="gemini-2.0-flash",
    api_key=os.getenv("GEMINI_API_KEY"),
)

context_aggregator = llm.create_context_aggregator(
    GoogleLLMContext(messages=[{"role": "system", "content": "You are helpful"}])
)

pipeline = Pipeline([
    transport.input(),
    stt_service,
    context_aggregator.user(),
    llm,
    tts_service,
    transport.output(),
    context_aggregator.assistant(),
])

After (With pipecat-adk)

from pipecat_adk import AdkLLMService, AdkInterruptionPlugin, VqlTTSMixin, SessionParams
from google.adk.agents import Agent
from google.adk.apps import App
from google.adk.sessions import DatabaseSessionService
from pipecat.services.google.tts import GoogleTTSService
from pipecat.pipeline.pipeline import Pipeline

# 1. Define your ADK agent and App
agent = Agent(
    name="helpful_assistant",  # Note: use underscores, not hyphens
    model="gemini-2.0-flash",
    instruction="You are a helpful assistant.",
)
app = App(
    name="my_app",
    root_agent=agent,
    plugins=[AdkInterruptionPlugin()],
)

# 2. Set up session management (DatabaseSessionService persists across restarts)
session_service = DatabaseSessionService(db_url="sqlite+aiosqlite:///sessions.db")
session_params = SessionParams(
    app_name=app.name,
    user_id="user_123",
    session_id="session_456",
)
existing = await session_service.get_session(**session_params.model_dump())
if not existing:
    await session_service.create_session(**session_params.model_dump())

# 3. Create the LLM service
llm = AdkLLMService(
    app=app,
    session_service=session_service,
    session_params=session_params,
)

# 4. Create context aggregators — pipeline structure stays the same
context_aggregator = llm.create_context_aggregator()

# 5. Wrap TTS with VqlTTSMixin — required for [HEARD] interruption tracking
class MyGoogleTTSService(VqlTTSMixin, GoogleTTSService):
    pass

tts = MyGoogleTTSService(voice_id=...)

pipeline = Pipeline([
    transport.input(),
    stt_service,
    context_aggregator.user(),
    llm,
    tts,
    transport.output(),
    context_aggregator.assistant(),
])

The pipeline structure stays the same—you swap the LLM service and context aggregator.

Key Challenges Solved

1. Context Management

The Problem: Pipecat manages conversation history by accumulating messages in an LLMContext. This works for simple cases, but breaks down when you need sophisticated history management, multi-turn reasoning, or agent handoffs.

Our Solution: ADK manages the conversation history. When a user speaks, VqlUserContextAggregator:

  1. Generates a turn_id (UUID) for the turn
  2. Pushes VqlContextFrame(turn_id, text) downstream to AdkLLMService
  3. AdkLLMService calls runner.run_async(new_message=content) directly — no pre-persist, no session writes from the aggregator

The user's transcription is sent directly to ADK without modification.

Tradeoff: You can't use Pipecat's context inspection tools. All history lives in ADK sessions, which you access via session_service.get_session().

2. Persistence and Replayability

The Problem: Pipecat's context is ephemeral—restart the server and you lose everything. Building features like conversation replay, analytics, or multi-device continuity requires custom persistence logic.

Our Solution: Use any ADK session service. ADK provides:

  • InMemorySessionService for development (history lost on restart)
  • DatabaseSessionService for production persistence (requires aiosqlite for SQLite or the appropriate async driver for other databases)
  • Custom implementations for your specific needs

Every event is persisted automatically. You get full conversation history across restarts, audit trails for compliance, and session handoff between agents.

Tradeoff: You need to manage session IDs and ensure they're unique per conversation. You also need to handle session cleanup and expiration.

3. Interruption Handling

The Problem: When a user interrupts the AI mid-sentence, the LLM on the next turn sees the full planned response as if the user heard everything—leading to confusing conversations.

Our Solution: A deterministic, two-part mechanism:

  1. When interrupted, VqlAssistantContextAggregator knows exactly what TTS text was spoken (accumulated via TTSTextFrame). It pushes VqlTurnCompletedFrame(interrupted=True, text=...) upstream to AdkLLMService, which writes the [HEARD] event to ADK's session.
  2. Before the next LLM call, AdkInterruptionPlugin finds the [HEARD] marker in the request, locates the preceding model event, and replaces its full text with the heard portion. The marker is then removed from the request.

The LLM sees only what the user actually heard. The full response remains in ADK session history for auditing.

Tradeoff: The session history contains [HEARD] marker events. If you analyze raw session data you'll need to filter these.

4. State Management

The Problem: ADK tools and events often produce state changes that clients need to know about. Coordinating this state between the AI and your application is tedious.

Our Solution: Override _on_state_delta() in a subclass of AdkBasedLLMService. The bridge calls this for every ADK event that carries a state_delta, before any text frames from the same event—so the client receives state before the bot starts speaking.

class MyLLMService(AdkLLMService):
    async def _on_state_delta(self, state_delta: dict) -> None:
        # Forward state to your client via RTVI, WebSocket, etc.
        await self.push_frame(RTVIServerMessageFrame(
            data={"type": "state-sync", "state_delta": state_delta}
        ))

To inject events programmatically (e.g., a timeout or a form submission), override process_frame and call _persist_and_run:

class MyLLMService(AdkLLMService):
    async def process_frame(self, frame, direction):
        if isinstance(frame, UserIdleFrame):
            await self._persist_and_run(
                content=Content(role="user", parts=[
                    Part(text="<system>User has been idle for 30s.</system>")
                ])
            )
        else:
            await super().process_frame(frame, direction)

Tradeoff: State integration requires subclassing AdkLLMService. This keeps the bridge lean but means simple state forwarding can't be done with configuration alone.

5. Function Call Lifecycle

The Problem: When an AI calls a tool, you often want to mute the microphone or show a loading indicator. Standard integrations don't always emit the right frames at the right time.

Our Solution: When ADK executes a function call, the bridge pushes frames both upstream and downstream:

  1. FunctionCallsStartedFrame → enables STTMuteFilter to mute the mic
  2. FunctionCallInProgressFrame → lets your UI show "thinking..."
  3. ADK executes the function
  4. FunctionCallResultFrame → lets your UI show results, unmutes mic

Tradeoff: Function calls are managed entirely by ADK. You define tools using ADK's FunctionTool or as plain Python functions, not Pipecat's function calling mechanism. The frames inform Pipecat of the lifecycle but don't let you intercept or modify the calls.

6. Custom Context Injection

The Problem: You need to inject dynamic context into conversations—current time, user preferences, system state, etc.

Our Solution: Override _build_user_event() in a subclass of AdkLLMService:

from pipecat_adk import AdkLLMService
from google.genai.types import Content, Part
from datetime import datetime

class MyLLMService(AdkLLMService):
    async def _build_user_event(self, text: str) -> Content:
        return Content(role="user", parts=[
            Part(text=f"<system>Current time: {datetime.now()}</system>"),
            Part(text=text),
        ])

Pass your custom service class instead of AdkLLMService when constructing the pipeline.

Tradeoff: This runs on every user message. Keep it lightweight—avoid slow database queries or API calls here, or at least await them with care.

Pipecat Ecosystem Limitations

pipecat-adk takes a fundamentally different approach to context management: ADK owns the conversation history, not Pipecat. This architectural decision enables ADK's powerful session management, but means some Pipecat ecosystem components won't work as expected.

Why This Matters

Standard Pipecat components expect to read/write messages via OpenAILLMContext. Since ADK manages conversation history in its own session store, our context frames only carry a turn_id reference—not the actual messages.

Incompatible Components

Component What It Does Why It's Incompatible
Mem0 Memory Service Enhances context with retrieved memories Expects to add messages to context before LLM
LangChain Framework Routes to LangChain agents Alternative agent framework—use ADK or LangChain, not both
Strands Framework Routes to Strands agents Alternative agent framework—use ADK or Strands, not both
IVR Navigator Stores messages for IVR mode switching Expects to read/store messages from context
LLM Log Observer Logs conversation messages Will show empty context (use ADK session inspection instead)

Compatible Components

These Pipecat components work normally with pipecat-adk:

  • STT services (Google, Deepgram, etc.)
  • TTS services (Google, ElevenLabs, Cartesia, etc.)
  • VAD analyzers (Silero, WebRTC)
  • Transports (WebRTC, WebSocket)
  • STTMuteFilter (receives function call lifecycle frames)
  • UserIdleProcessor (receives lifecycle frames)

Complete Example

See examples/assistant/ for a complete working application:

  • agent.py: Defines the ADK Agent and includes AdkInterruptionPlugin
  • bot.py: Sets up the Pipecat pipeline with AdkBasedLLMService
  • run.py: FastAPI server for WebRTC signaling

To run:

cd examples/assistant
pip install -r requirements.txt
pip install -e ../..  # Install pipecat-adk in development mode
export GEMINI_API_KEY=your_key
python run.py

Open http://localhost:7860 to interact with the voice assistant.

Testing Your Application

pipecat-adk ships with a complete mock testing infrastructure so you can test your agents without real API calls. The test utilities live in tests/mocks.py and tests/test_utils.py—copy them to your project or add tests/ to your path.

Basic Test Structure

import unittest
from google.adk.agents import Agent
from google.adk.apps import App
from pipecat_adk import AdkInterruptionPlugin
from mocks import MockLLM, TestRunner, Turn

class TestMyAgent(unittest.IsolatedAsyncioTestCase):
    async def test_greeting(self):
        mock_llm = MockLLM.single("Hello! How can I help?")

        agent = Agent(name="test_agent", model=mock_llm,
                      instruction="You are helpful.")
        app = App(name="agents", root_agent=agent,
                  plugins=[AdkInterruptionPlugin()])

        async with TestRunner(app=app) as runner:
            await runner.join()
            await runner.speak_and_wait_for_response("Hi")

            assert runner.last_bot_message == "Hello! How can I help?"

The app.name must be "agents"TestRunner uses a hardcoded session scoped to that name.

MockLLM

# Single response
mock_llm = MockLLM.single("Hello!")

# Multi-turn — one response per user message
mock_llm = MockLLM.conversation(["Hello!", "Sure, I can help.", "You're welcome."])

# Function calls — list of Parts per turn, or a string for text-only turns
from google.genai.types import Part
mock_llm = MockLLM.from_parts([
    [Part.from_function_call(name="get_weather", args={"city": "NYC"})],
    "The weather in NYC is sunny.",
])

The number of responses must match the number of conversation turns, or later turns will time out.

Conversational API

async with TestRunner(app=app) as runner:
    await runner.join()

    # Speak and wait for bot reply
    await runner.speak_and_wait_for_response("Hello")
    assert runner.last_bot_message == "Hello! How can I help you today?"

    # Multi-turn with full transcript assertion
    await runner.speak_and_wait_for_response("I need help")
    assert runner.transcript == [
        Turn("user", "Hello"),
        Turn("bot", "Hello! How can I help you today?"),
        Turn("user", "I need help"),
        Turn("bot", "Sure, I can help with that."),
    ]

Testing Interruptions

async with TestRunner(app=app, tts_delay=0.05) as runner:
    await runner.join()
    await runner.speak("Tell me a long story")
    await runner.interrupt_bot("Wait, stop")
    # Bot receives the interruption; verify session state as needed

Use tts_delay to slow TTS output so the bot is still speaking when interrupt_bot fires.

Gray-Box Inspection

async with TestRunner(app=app) as runner:
    await runner.join()
    await runner.speak_and_wait_for_response("Set my theme to dark")

    # Inspect ADK session state (what tools wrote)
    state = await runner.session_state()
    assert state.get("theme") == "dark"

    # Inspect raw ADK events
    events = await runner.events()

Test Utilities

from test_utils import simplify_events

events = await runner.events()
simplified = simplify_events(events)
# Returns: [("user", "Hello"), ("agent", "Hi there!"), ...]

API Reference

Core Classes

  • AdkLLMService(app, session_service, session_params): Main LLM service. Pass a pre-built ADK App, or pass agent + plugins and the service builds the App internally. Override _build_user_event(text) to inject custom context, _on_state_delta(state_delta) to forward state to clients, and process_frame + _persist_and_run(content, state_delta) to inject system events.
  • SessionParams(app_name, user_id, session_id): Dataclass for session identification. app_name must match the App.name.
  • AdkInterruptionPlugin: ADK plugin for deterministic interruption handling. Include in App(plugins=[AdkInterruptionPlugin()]).
  • VqlTTSMixin: Mixin that must be applied to every TTS service. Overrides TTS context tracking to use turn_id instead of a random UUID, enabling [HEARD] tracking. Without this, interruptions are silently ignored. Usage: class MyTTS(VqlTTSMixin, GoogleTTSService): pass

Context Aggregators

Created via llm.create_context_aggregator():

  • User aggregator (VqlUserContextAggregator): Generates a turn_id per user turn and pushes VqlContextFrame(turn_id, text) to AdkLLMService. Override _build_user_event on AdkLLMService to inject custom context.
  • Assistant aggregator (VqlAssistantContextAggregator): Accumulates spoken TTS text per turn. Pushes VqlTurnCompletedFrame upstream to AdkLLMService, which writes [HEARD] events on interruption.

Requirements

  • Python >= 3.12
  • pipecat-ai >= 1.1.0, < 2.0.0
  • google-adk >= 1.18.0
  • google-genai >= 1.51.0

License

MIT License

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

About

Integrate Google ADK agents with Pipecat pipelines for real-time voice AI applications.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

  • Python 90.7%
  • TypeScript 9.3%