Build powerful voice-enabled AI agents by combining Pipecat's real-time audio pipelines with Google ADK's agent framework.
Pipecat excels at real-time voice applications. It handles audio streaming, VAD, STT, TTS, and transport protocols beautifully. But as your application grows more complex—managing conversation history, handling interruptions correctly, persisting sessions, calling tools—things get difficult. Pipecat's context management wasn't designed for sophisticated agent workflows.
Google ADK (Agent Development Kit) excels at building agents. It provides rich concepts for sessions, state management, tool definitions, multi-agent orchestration, evaluations, and much more. But ADK wasn't designed for real-time voice—it expects request/response patterns, not streaming audio.
pipecat-adk bridges these two worlds, letting you build voice applications with Pipecat's real-time capabilities while leveraging ADK's agent framework for everything else.
pip install pipecat-adk
# Or install from source
pip install -e /path/to/pipecat-adkIf you have an existing Pipecat application, here's what you need to change:
from pipecat.services.google import GoogleLLMService
from pipecat.services.google.llm import GoogleLLMContext
from pipecat.pipeline.pipeline import Pipeline
llm = GoogleLLMService(
model="gemini-2.0-flash",
api_key=os.getenv("GEMINI_API_KEY"),
)
context_aggregator = llm.create_context_aggregator(
GoogleLLMContext(messages=[{"role": "system", "content": "You are helpful"}])
)
pipeline = Pipeline([
transport.input(),
stt_service,
context_aggregator.user(),
llm,
tts_service,
transport.output(),
context_aggregator.assistant(),
])from pipecat_adk import AdkLLMService, AdkInterruptionPlugin, VqlTTSMixin, SessionParams
from google.adk.agents import Agent
from google.adk.apps import App
from google.adk.sessions import DatabaseSessionService
from pipecat.services.google.tts import GoogleTTSService
from pipecat.pipeline.pipeline import Pipeline
# 1. Define your ADK agent and App
agent = Agent(
name="helpful_assistant", # Note: use underscores, not hyphens
model="gemini-2.0-flash",
instruction="You are a helpful assistant.",
)
app = App(
name="my_app",
root_agent=agent,
plugins=[AdkInterruptionPlugin()],
)
# 2. Set up session management (DatabaseSessionService persists across restarts)
session_service = DatabaseSessionService(db_url="sqlite+aiosqlite:///sessions.db")
session_params = SessionParams(
app_name=app.name,
user_id="user_123",
session_id="session_456",
)
existing = await session_service.get_session(**session_params.model_dump())
if not existing:
await session_service.create_session(**session_params.model_dump())
# 3. Create the LLM service
llm = AdkLLMService(
app=app,
session_service=session_service,
session_params=session_params,
)
# 4. Create context aggregators — pipeline structure stays the same
context_aggregator = llm.create_context_aggregator()
# 5. Wrap TTS with VqlTTSMixin — required for [HEARD] interruption tracking
class MyGoogleTTSService(VqlTTSMixin, GoogleTTSService):
pass
tts = MyGoogleTTSService(voice_id=...)
pipeline = Pipeline([
transport.input(),
stt_service,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])The pipeline structure stays the same—you swap the LLM service and context aggregator.
The Problem: Pipecat manages conversation history by accumulating messages in an LLMContext. This works for simple cases, but breaks down when you need sophisticated history management, multi-turn reasoning, or agent handoffs.
Our Solution: ADK manages the conversation history. When a user speaks, VqlUserContextAggregator:
- Generates a
turn_id(UUID) for the turn - Pushes
VqlContextFrame(turn_id, text)downstream toAdkLLMService AdkLLMServicecallsrunner.run_async(new_message=content)directly — no pre-persist, no session writes from the aggregator
The user's transcription is sent directly to ADK without modification.
Tradeoff: You can't use Pipecat's context inspection tools. All history lives in ADK sessions, which you access via session_service.get_session().
The Problem: Pipecat's context is ephemeral—restart the server and you lose everything. Building features like conversation replay, analytics, or multi-device continuity requires custom persistence logic.
Our Solution: Use any ADK session service. ADK provides:
InMemorySessionServicefor development (history lost on restart)DatabaseSessionServicefor production persistence (requiresaiosqlitefor SQLite or the appropriate async driver for other databases)- Custom implementations for your specific needs
Every event is persisted automatically. You get full conversation history across restarts, audit trails for compliance, and session handoff between agents.
Tradeoff: You need to manage session IDs and ensure they're unique per conversation. You also need to handle session cleanup and expiration.
The Problem: When a user interrupts the AI mid-sentence, the LLM on the next turn sees the full planned response as if the user heard everything—leading to confusing conversations.
Our Solution: A deterministic, two-part mechanism:
- When interrupted,
VqlAssistantContextAggregatorknows exactly what TTS text was spoken (accumulated viaTTSTextFrame). It pushesVqlTurnCompletedFrame(interrupted=True, text=...)upstream toAdkLLMService, which writes the[HEARD]event to ADK's session. - Before the next LLM call,
AdkInterruptionPluginfinds the[HEARD]marker in the request, locates the preceding model event, and replaces its full text with the heard portion. The marker is then removed from the request.
The LLM sees only what the user actually heard. The full response remains in ADK session history for auditing.
Tradeoff: The session history contains [HEARD] marker events. If you analyze raw session data you'll need to filter these.
The Problem: ADK tools and events often produce state changes that clients need to know about. Coordinating this state between the AI and your application is tedious.
Our Solution: Override _on_state_delta() in a subclass of AdkBasedLLMService. The bridge calls this for every ADK event that carries a state_delta, before any text frames from the same event—so the client receives state before the bot starts speaking.
class MyLLMService(AdkLLMService):
async def _on_state_delta(self, state_delta: dict) -> None:
# Forward state to your client via RTVI, WebSocket, etc.
await self.push_frame(RTVIServerMessageFrame(
data={"type": "state-sync", "state_delta": state_delta}
))To inject events programmatically (e.g., a timeout or a form submission), override process_frame and call _persist_and_run:
class MyLLMService(AdkLLMService):
async def process_frame(self, frame, direction):
if isinstance(frame, UserIdleFrame):
await self._persist_and_run(
content=Content(role="user", parts=[
Part(text="<system>User has been idle for 30s.</system>")
])
)
else:
await super().process_frame(frame, direction)Tradeoff: State integration requires subclassing AdkLLMService. This keeps the bridge lean but means simple state forwarding can't be done with configuration alone.
The Problem: When an AI calls a tool, you often want to mute the microphone or show a loading indicator. Standard integrations don't always emit the right frames at the right time.
Our Solution: When ADK executes a function call, the bridge pushes frames both upstream and downstream:
FunctionCallsStartedFrame→ enablesSTTMuteFilterto mute the micFunctionCallInProgressFrame→ lets your UI show "thinking..."- ADK executes the function
FunctionCallResultFrame→ lets your UI show results, unmutes mic
Tradeoff: Function calls are managed entirely by ADK. You define tools using ADK's FunctionTool or as plain Python functions, not Pipecat's function calling mechanism. The frames inform Pipecat of the lifecycle but don't let you intercept or modify the calls.
The Problem: You need to inject dynamic context into conversations—current time, user preferences, system state, etc.
Our Solution: Override _build_user_event() in a subclass of AdkLLMService:
from pipecat_adk import AdkLLMService
from google.genai.types import Content, Part
from datetime import datetime
class MyLLMService(AdkLLMService):
async def _build_user_event(self, text: str) -> Content:
return Content(role="user", parts=[
Part(text=f"<system>Current time: {datetime.now()}</system>"),
Part(text=text),
])Pass your custom service class instead of AdkLLMService when constructing the pipeline.
Tradeoff: This runs on every user message. Keep it lightweight—avoid slow database queries or API calls here, or at least await them with care.
pipecat-adk takes a fundamentally different approach to context management: ADK owns the conversation history, not Pipecat. This architectural decision enables ADK's powerful session management, but means some Pipecat ecosystem components won't work as expected.
Standard Pipecat components expect to read/write messages via OpenAILLMContext. Since ADK manages conversation history in its own session store, our context frames only carry a turn_id reference—not the actual messages.
| Component | What It Does | Why It's Incompatible |
|---|---|---|
| Mem0 Memory Service | Enhances context with retrieved memories | Expects to add messages to context before LLM |
| LangChain Framework | Routes to LangChain agents | Alternative agent framework—use ADK or LangChain, not both |
| Strands Framework | Routes to Strands agents | Alternative agent framework—use ADK or Strands, not both |
| IVR Navigator | Stores messages for IVR mode switching | Expects to read/store messages from context |
| LLM Log Observer | Logs conversation messages | Will show empty context (use ADK session inspection instead) |
These Pipecat components work normally with pipecat-adk:
- STT services (Google, Deepgram, etc.)
- TTS services (Google, ElevenLabs, Cartesia, etc.)
- VAD analyzers (Silero, WebRTC)
- Transports (WebRTC, WebSocket)
- STTMuteFilter (receives function call lifecycle frames)
- UserIdleProcessor (receives lifecycle frames)
See examples/assistant/ for a complete working application:
agent.py: Defines the ADK Agent and includesAdkInterruptionPluginbot.py: Sets up the Pipecat pipeline withAdkBasedLLMServicerun.py: FastAPI server for WebRTC signaling
To run:
cd examples/assistant
pip install -r requirements.txt
pip install -e ../.. # Install pipecat-adk in development mode
export GEMINI_API_KEY=your_key
python run.pyOpen http://localhost:7860 to interact with the voice assistant.
pipecat-adk ships with a complete mock testing infrastructure so you can test your agents without real API calls. The test utilities live in tests/mocks.py and tests/test_utils.py—copy them to your project or add tests/ to your path.
import unittest
from google.adk.agents import Agent
from google.adk.apps import App
from pipecat_adk import AdkInterruptionPlugin
from mocks import MockLLM, TestRunner, Turn
class TestMyAgent(unittest.IsolatedAsyncioTestCase):
async def test_greeting(self):
mock_llm = MockLLM.single("Hello! How can I help?")
agent = Agent(name="test_agent", model=mock_llm,
instruction="You are helpful.")
app = App(name="agents", root_agent=agent,
plugins=[AdkInterruptionPlugin()])
async with TestRunner(app=app) as runner:
await runner.join()
await runner.speak_and_wait_for_response("Hi")
assert runner.last_bot_message == "Hello! How can I help?"The app.name must be "agents" — TestRunner uses a hardcoded session scoped to that name.
# Single response
mock_llm = MockLLM.single("Hello!")
# Multi-turn — one response per user message
mock_llm = MockLLM.conversation(["Hello!", "Sure, I can help.", "You're welcome."])
# Function calls — list of Parts per turn, or a string for text-only turns
from google.genai.types import Part
mock_llm = MockLLM.from_parts([
[Part.from_function_call(name="get_weather", args={"city": "NYC"})],
"The weather in NYC is sunny.",
])The number of responses must match the number of conversation turns, or later turns will time out.
async with TestRunner(app=app) as runner:
await runner.join()
# Speak and wait for bot reply
await runner.speak_and_wait_for_response("Hello")
assert runner.last_bot_message == "Hello! How can I help you today?"
# Multi-turn with full transcript assertion
await runner.speak_and_wait_for_response("I need help")
assert runner.transcript == [
Turn("user", "Hello"),
Turn("bot", "Hello! How can I help you today?"),
Turn("user", "I need help"),
Turn("bot", "Sure, I can help with that."),
]async with TestRunner(app=app, tts_delay=0.05) as runner:
await runner.join()
await runner.speak("Tell me a long story")
await runner.interrupt_bot("Wait, stop")
# Bot receives the interruption; verify session state as neededUse tts_delay to slow TTS output so the bot is still speaking when interrupt_bot fires.
async with TestRunner(app=app) as runner:
await runner.join()
await runner.speak_and_wait_for_response("Set my theme to dark")
# Inspect ADK session state (what tools wrote)
state = await runner.session_state()
assert state.get("theme") == "dark"
# Inspect raw ADK events
events = await runner.events()from test_utils import simplify_events
events = await runner.events()
simplified = simplify_events(events)
# Returns: [("user", "Hello"), ("agent", "Hi there!"), ...]AdkLLMService(app, session_service, session_params): Main LLM service. Pass a pre-built ADKApp, or passagent+pluginsand the service builds theAppinternally. Override_build_user_event(text)to inject custom context,_on_state_delta(state_delta)to forward state to clients, andprocess_frame+_persist_and_run(content, state_delta)to inject system events.SessionParams(app_name, user_id, session_id): Dataclass for session identification.app_namemust match theApp.name.AdkInterruptionPlugin: ADK plugin for deterministic interruption handling. Include inApp(plugins=[AdkInterruptionPlugin()]).VqlTTSMixin: Mixin that must be applied to every TTS service. Overrides TTS context tracking to useturn_idinstead of a random UUID, enabling[HEARD]tracking. Without this, interruptions are silently ignored. Usage:class MyTTS(VqlTTSMixin, GoogleTTSService): pass
Created via llm.create_context_aggregator():
- User aggregator (
VqlUserContextAggregator): Generates aturn_idper user turn and pushesVqlContextFrame(turn_id, text)toAdkLLMService. Override_build_user_eventonAdkLLMServiceto inject custom context. - Assistant aggregator (
VqlAssistantContextAggregator): Accumulates spoken TTS text per turn. PushesVqlTurnCompletedFrameupstream toAdkLLMService, which writes[HEARD]events on interruption.
- Python >= 3.12
- pipecat-ai >= 1.1.0, < 2.0.0
- google-adk >= 1.18.0
- google-genai >= 1.51.0
MIT License
Contributions are welcome! Please open an issue or submit a pull request.