-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
123 lines (99 loc) · 3.96 KB
/
Copy pathexample.py
File metadata and controls
123 lines (99 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
"""Example demonstrating agentape record/replay functionality."""
import os
from dotenv import load_dotenv
from openai import OpenAI
import agentape
# Load environment variables from .env file
load_dotenv()
# Note: This example requires an OPENROUTER_API_KEY in your .env file
def demo_basic_usage():
"""Demonstrate basic record and replay."""
print("=" * 60)
print("AgentTape Example: Basic Record/Replay")
print("=" * 60)
# Step 1: Wrap the OpenAI client configured for OpenRouter
client = agentape.wrap(
OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.environ.get("OPENROUTER_API_KEY"),
)
)
print("\n✓ Wrapped OpenAI client with OpenRouter")
# Step 2: Record an interaction
print("\n[RECORD MODE] Making API call and recording to tape...")
with agentape.record("tapes/example.yaml"):
response = client.chat.completions.create(
model="openai/gpt-oss-120b:free",
messages=[{"role": "user", "content": "Say 'Hello from agentape!'"}],
)
recorded_content = response.choices[0].message.content
print(f"Response: {recorded_content}")
print("✓ Saved to tapes/example.yaml")
# Step 3: Replay the interaction (no API call made!)
print("\n[REPLAY MODE] Loading cached response from tape...")
with agentape.replay("tapes/example.yaml"):
response = client.chat.completions.create(
model="openai/gpt-oss-120b:free",
messages=[{"role": "user", "content": "Say 'Hello from agentape!'"}],
)
replayed_content = response.choices[0].message.content
print(f"Response: {replayed_content}")
print("✓ No API call made - response came from tape!")
# Verify they match
assert recorded_content == replayed_content, (
"Recorded and replayed responses should match"
)
print("\n✓ Recorded and replayed responses match perfectly\n")
def demo_streaming():
"""Demonstrate streaming support."""
print("=" * 60)
print("AgentTape Example: Streaming Support")
print("=" * 60)
client = agentape.wrap(
OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.environ.get("OPENROUTER_API_KEY"),
)
)
# Record streaming
print("\n[RECORD MODE] Streaming API call...")
with agentape.record("tapes/streaming_example.yaml"):
stream = client.chat.completions.create(
model="openai/gpt-oss-120b:free",
messages=[{"role": "user", "content": "Count to 5"}],
stream=True,
)
print("Response: ", end="", flush=True)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
print("✓ Streaming response saved to tape")
# Replay streaming
print("\n[REPLAY MODE] Replaying streamed response...")
with agentape.replay("tapes/streaming_example.yaml"):
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Count to 5"}],
stream=True,
)
print("Response: ", end="", flush=True)
for chunk in stream:
if chunk.get("choices", [{}])[0].get("delta", {}).get("content"):
print(chunk["choices"][0]["delta"]["content"], end="", flush=True)
print()
print("✓ Streamed response replayed from tape\n")
if __name__ == "__main__":
try:
demo_basic_usage()
print("\n" + "=" * 60 + "\n")
demo_streaming()
print("\nAll examples completed successfully!")
print("\nCheck the 'tapes/' directory to see the recorded YAML files.")
except Exception as e:
print(f"\n❌ Error: {e}")
print(
"\nMake sure OPENROUTER_API_KEY is set in your environment for record mode."
)
print("Or use replay mode with existing tapes.")