Sync agent trajectories from Arize Phoenix to Kaizen and automatically generate tips/guidelines.
The Phoenix sync module:
- Fetches agent trajectories from Phoenix's REST API
- Deduplicates already-processed trajectories (by
span_id) - Converts messages to OpenAI format
- Generates tips/guidelines using LLM
- Stores both trajectories and tips in Kaizen
No additional dependencies required - uses only stdlib for Phoenix API calls.
| Variable | Default | Description |
|---|---|---|
PHOENIX_URL |
http://localhost:6006 |
Phoenix server URL |
PHOENIX_PROJECT |
default |
Phoenix project name |
KAIZEN_NAMESPACE_ID |
kaizen |
Target namespace for stored entities |
KAIZEN_BACKEND |
milvus |
Backend provider (milvus or filesystem) |
# Basic sync with defaults
uv run python -m kaizen.cli.cli sync phoenix
# Custom Phoenix URL and namespace
uv run python -m kaizen.cli.cli sync phoenix \
--url http://phoenix.example.com:6006 \
--namespace my_namespace
# Fetch more spans and include errors
uv run python -m kaizen.cli.cli sync phoenix \
--limit 500 \
--include-errors
# Full options
uv run python -m kaizen.cli.cli sync phoenix \
--url http://localhost:6006 \
--namespace production \
--project my_project \
--limit 200 \
--include-errors| Option | Short | Description |
|---|---|---|
--url |
-u |
Phoenix server URL |
--namespace |
-n |
Target Kaizen namespace |
--project |
-p |
Phoenix project name |
--limit |
Max spans to fetch (default: 100) | |
--include-errors |
Include failed/error spans |
from kaizen.sync.phoenix_sync import PhoenixSync
# Initialize syncer
syncer = PhoenixSync(
phoenix_url="http://localhost:6006",
namespace_id="my_namespace",
project="default"
)
# Run sync
result = syncer.sync(limit=100, include_errors=False)
print(f"Processed: {result.processed}")
print(f"Skipped: {result.skipped}")
print(f"Tips generated: {result.tips_generated}")
print(f"Errors: {result.errors}")The syncer calls Phoenix's REST API:
GET /v1/projects/{project}/spans?limit=N&cursor=CURSOR
Only litellm_request spans with prompt messages are processed.
Each processed trajectory stores span_id in its metadata. On subsequent syncs, already-processed span IDs are skipped.
Anthropic/Claude message format is converted to OpenAI format:
- Tool use blocks →
tool_callsarray - Tool results → separate
toolrole messages - Thinking blocks → preserved in
thinkingfield
The generate_tips() function analyzes the trajectory and produces actionable guidelines using an LLM.
Two entity types are stored:
trajectory- Individual messages with metadata (trace_id, span_id, model, role)guideline- Generated tips with conflict resolution enabled
┌─────────────┐ fetch ┌───────────────────┐
│ Phoenix │ ───────────────→│ PhoenixSync │
│ (spans) │ │ │
└─────────────┘ └─────────┬─────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Deduplicate │ │ Convert to │ │ Generate │
│ (by span_id) │ │ OpenAI format │ │ Tips │
└───────────────┘ └───────────────┘ └───────┬───────┘
│
▼
┌───────────────┐
│ Kaizen │
│ Backend │
└───────────────┘
# Sync every hour
0 * * * * cd /path/to/kaizen && uv run python -m kaizen.cli.cli sync phoenix --limit 100# /etc/systemd/system/kaizen-sync.service
[Unit]
Description=Kaizen Phoenix Sync
[Service]
Type=oneshot
WorkingDirectory=/path/to/kaizen
ExecStart=/path/to/uv run python -m kaizen.cli.cli sync phoenix
Environment=PHOENIX_URL=http://localhost:6006
Environment=KAIZEN_NAMESPACE_ID=production# /etc/systemd/system/kaizen-sync.timer
[Unit]
Description=Run Kaizen Phoenix Sync hourly
[Timer]
OnCalendar=hourly
Persistent=true
[Install]
WantedBy=timers.targetEnsure Phoenix is running and accessible at the configured URL:
curl http://localhost:6006/v1/projects/default/spans?limit=1- Check that spans have
name="litellm_request" - Verify spans contain
gen_ai.prompt.*attributes - Use
--include-errorsto include failed spans
Ensure LLM API key is configured:
export OPENAI_API_KEY=sk-...
# or for other providers
export ANTHROPIC_API_KEY=sk-...