-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprocess_event_stream.py
More file actions
63 lines (55 loc) · 2.69 KB
/
process_event_stream.py
File metadata and controls
63 lines (55 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
import sys
from collections.abc import AsyncIterable
from typing import cast
from agent_framework import AgentExecutorResponse
from agent_framework import AgentResponseUpdate
from agent_framework import Message
from agent_framework import WorkflowEvent
from agent_framework.orchestrations import AgentRequestInfoResponse
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, AgentRequestInfoResponse] | None:
"""Process events from the workflow stream to capture human feedback requests."""
requests: dict[str, AgentExecutorResponse] = {}
async for event in stream:
if event.type == "request_info" and isinstance(event.data, AgentExecutorResponse):
requests[event.request_id] = event.data
if event.type == "output":
# The output of the workflow comes from the orchestrator and it's a list of messages
print("\n" + "=" * 60)
print("DISCUSSION COMPLETE")
print("=" * 60)
print("Final discussion summary:")
# To make the type checker happy, we cast event.data to the expected type
outputs = cast(list[Message], event.data)
for msg in outputs:
speaker = msg.author_name or msg.role
print(f"[{speaker}]: {msg.text}")
responses: dict[str, AgentRequestInfoResponse] = {}
if requests:
for request_id, request in requests.items():
# # Display pre-agent context for human input
# print("\n" + "-" * 40)
# print("INPUT REQUESTED")
# print(
# f"Agent {request.executor_id} just responded with: '{request.agent_response.text}'. "
# "Please provide your feedback."
# )
# print("-" * 40)
if request.full_conversation:
print("Conversation context:")
recent = (
request.full_conversation[-2:] if len(request.full_conversation) > 2 else request.full_conversation
)
for msg in recent:
name = msg.author_name or msg.role
text = (msg.text or "")[:350]
print(f" [{name}]: {text}\n...")
print("-" * 40)
# Get human input to steer the agent
user_input = input(f"Feedback for {request.executor_id} (or 'skip' to approve): ") # noqa: ASYNC250
if user_input.lower() == "skip":
user_input = AgentRequestInfoResponse.approve()
else:
user_input = AgentRequestInfoResponse.from_strings([user_input])
responses[request_id] = user_input
return responses if responses else None