Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions components/backend/types/agui.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@ package types

import "time"

// Timestamp format constants for AG-UI events and metadata.
// These ensure consistent timestamp formatting across the codebase.
const (
// AGUITimestampFormat is used for event timestamps that require nanosecond precision.
// This preserves event ordering when multiple events occur in rapid succession.
// Format: "2006-01-02T15:04:05.999999999Z07:00" (RFC3339 with nanoseconds)
// Used in: BaseEvent.Timestamp, streamed events
AGUITimestampFormat = time.RFC3339Nano

// AGUIMetadataTimestampFormat is used for run/session metadata timestamps.
// This is sufficient for human-readable timestamps where nanosecond precision isn't needed.
// Format: "2006-01-02T15:04:05Z07:00" (RFC3339)
// Used in: AGUIRunMetadata.StartedAt, AGUIRunMetadata.FinishedAt
AGUIMetadataTimestampFormat = time.RFC3339
)

// AG-UI Event Types as defined in the protocol specification
// See: https://docs.ag-ui.com/concepts/events
const (
Expand Down Expand Up @@ -62,7 +78,7 @@ type BaseEvent struct {
Type string `json:"type"`
ThreadID string `json:"threadId"`
RunID string `json:"runId"`
Timestamp string `json:"timestamp"`
Timestamp string `json:"timestamp"` // Format: AGUITimestampFormat (RFC3339Nano)
// Optional fields
MessageID string `json:"messageId,omitempty"`
ParentRunID string `json:"parentRunId,omitempty"`
Expand Down Expand Up @@ -293,7 +309,7 @@ func NewBaseEvent(eventType, threadID, runID string) BaseEvent {
Type: eventType,
ThreadID: threadID,
RunID: runID,
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
Timestamp: time.Now().UTC().Format(AGUITimestampFormat),
}
}

Expand Down
11 changes: 10 additions & 1 deletion components/backend/websocket/agui.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func RouteAGUIEvent(sessionID string, event map[string]interface{}) {

// If no active run found, check if event has a runId we should create
if activeRunState == nil {
// Ensure timestamp is set before any early returns
if event["timestamp"] == nil || event["timestamp"] == "" {
event["timestamp"] = time.Now().UTC().Format(types.AGUITimestampFormat)
}

// Don't create lazy runs for terminal events - they should only apply to existing runs
if isTerminalEventType(eventType) {
go persistAGUIEventMap(sessionID, "", event)
Expand Down Expand Up @@ -163,13 +168,17 @@ func RouteAGUIEvent(sessionID string, event map[string]interface{}) {
threadID = eventThreadID
}

// Fill in missing IDs only if not present
// Fill in missing IDs and timestamp
if event["threadId"] == nil || event["threadId"] == "" {
event["threadId"] = threadID
}
if event["runId"] == nil || event["runId"] == "" {
event["runId"] = runID
}
// Add timestamp if not present - critical for message timestamp tracking
if event["timestamp"] == nil || event["timestamp"] == "" {
event["timestamp"] = time.Now().UTC().Format(types.AGUITimestampFormat)
}

// Broadcast to run-specific SSE subscribers
activeRunState.BroadcastFull(event)
Expand Down
6 changes: 5 additions & 1 deletion components/backend/websocket/agui_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,17 @@ func handleStreamedEvent(sessionID, runID, threadID, jsonData string, runState *

eventType, _ := event["type"].(string)

// Ensure threadId and runId are set
// Ensure threadId, runId, and timestamp are set
if _, ok := event["threadId"]; !ok {
event["threadId"] = threadID
}
if _, ok := event["runId"]; !ok {
event["runId"] = runID
}
// Add timestamp if not present - critical for message timestamp tracking
if _, ok := event["timestamp"]; !ok {
event["timestamp"] = time.Now().UTC().Format(types.AGUITimestampFormat)
}

// Check for terminal events
switch eventType {
Expand Down
17 changes: 14 additions & 3 deletions components/backend/websocket/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,14 @@ func (c *MessageCompactor) handleTextMessageStart(event map[string]interface{})
if role == "" {
role = types.RoleAssistant
}
// Preserve timestamp from the event
timestamp, _ := event["timestamp"].(string)

c.currentMessage = &types.Message{
ID: messageID,
Role: role,
Content: "",
ID: messageID,
Role: role,
Content: "",
Timestamp: timestamp,
}
}

Expand Down Expand Up @@ -228,17 +231,25 @@ func (c *MessageCompactor) handleToolCallEnd(event map[string]interface{}) {
tc.Status = "error"
}

// Preserve timestamp from the event
timestamp, _ := event["timestamp"].(string)

// Add to message
// Check if we need to create a new message or add to current
if c.currentMessage != nil && c.currentMessage.Role == types.RoleAssistant {
// Add to current message
c.currentMessage.ToolCalls = append(c.currentMessage.ToolCalls, tc)
// Update timestamp if not already set
if c.currentMessage.Timestamp == "" && timestamp != "" {
c.currentMessage.Timestamp = timestamp
}
} else {
// Create new message for this tool call
c.messages = append(c.messages, types.Message{
ID: uuid.New().String(),
Role: types.RoleAssistant,
ToolCalls: []types.ToolCall{tc},
Timestamp: timestamp,
})
}

Expand Down
2 changes: 1 addition & 1 deletion components/backend/websocket/legacy_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func MigrateLegacySessionToAGUI(sessionID string) error {
"type": types.EventTypeMessagesSnapshot,
"threadId": sessionID,
"runId": "legacy-migration",
"timestamp": time.Now().UTC().Format(time.RFC3339Nano),
"timestamp": time.Now().UTC().Format(types.AGUITimestampFormat),
"messages": messages,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,6 @@ export function WelcomeExperience({

{/* Message Content */}
<div className="flex-1 min-w-0">
{/* Timestamp */}
<div className="text-[10px] text-muted-foreground/60 mb-1">just now</div>
<div className="rounded-lg bg-card">
{/* Content */}
<p className="text-sm text-muted-foreground leading-relaxed whitespace-pre-wrap mb-[0.2rem]">
Expand Down Expand Up @@ -400,13 +398,11 @@ export function WelcomeExperience({
</div>
</div>

{/* Message Content */}
<div className="flex-1 min-w-0">
{/* Timestamp */}
<div className="text-[10px] text-muted-foreground/60 mb-1">just now</div>
<div className="rounded-lg bg-card">
{/* Content */}
<p className="text-sm text-muted-foreground leading-relaxed mb-[0.2rem]">
{/* Message Content */}
<div className="flex-1 min-w-0">
<div className="rounded-lg bg-card">
{/* Content */}
<p className="text-sm text-muted-foreground leading-relaxed mb-[0.2rem]">
{!isSetupTypingComplete ? (
<>
{setupDisplayedText.slice(0, -3)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,8 @@ export default function ProjectSessionDetailPage({
const allToolCalls = new Map<string, { tc: AGUIToolCall; timestamp: string }>();

for (const msg of aguiState.messages) {
// Use msg.timestamp from backend, fallback to current time for legacy messages
// Note: After backend fix, new messages will have proper timestamps
const timestamp = msg.timestamp || new Date().toISOString();

if (msg.toolCalls && Array.isArray(msg.toolCalls)) {
Expand Down Expand Up @@ -791,7 +793,8 @@ export default function ProjectSessionDetailPage({
if (!allToolCalls.has(streamingToolId)) {
allToolCalls.set(streamingToolId, {
tc: streamingTC,
timestamp: new Date().toISOString()
// Use timestamp from currentToolCall if available, fallback to current time for legacy
timestamp: aguiState.pendingToolCalls?.get(streamingToolId)?.timestamp || new Date().toISOString()
});
}
}
Expand All @@ -807,7 +810,8 @@ export default function ProjectSessionDetailPage({
if (!allToolCalls.has(tc.id)) {
allToolCalls.set(tc.id, {
tc: tc,
timestamp: new Date().toISOString(),
// Use timestamp from child message if available, fallback to current time
timestamp: childMsg.timestamp || new Date().toISOString(),
});
}
}
Expand Down Expand Up @@ -852,6 +856,7 @@ export default function ProjectSessionDetailPage({

// Phase C: Process messages and build hierarchical structure
for (const msg of aguiState.messages) {
// Use msg.timestamp from backend, fallback to current time for legacy messages
const timestamp = msg.timestamp || new Date().toISOString();

// Handle text content by role
Expand Down Expand Up @@ -974,7 +979,8 @@ export default function ProjectSessionDetailPage({
type: "agent_message",
content: { type: "text_block", text: aguiState.currentMessage.content },
model: "claude",
timestamp: new Date().toISOString(),
// Use timestamp from currentMessage (captured from TEXT_MESSAGE_START), fallback to current time
timestamp: aguiState.currentMessage.timestamp || new Date().toISOString(),
streaming: true,
} as MessageObject & { streaming?: boolean });
}
Expand Down Expand Up @@ -1004,7 +1010,8 @@ export default function ProjectSessionDetailPage({
.map(childMsg => {
const childTC = childMsg.toolCalls?.[0];
if (!childTC) return null;
return createToolMessage(childTC, new Date().toISOString());
// Use timestamp from child message if available
return createToolMessage(childTC, childMsg.timestamp || new Date().toISOString());
})
.filter((c): c is ToolUseMessages => c !== null);

Expand All @@ -1014,7 +1021,8 @@ export default function ProjectSessionDetailPage({
const childInput = parseToolArgs(childTool.args || "");
children.push({
type: "tool_use_messages",
timestamp: new Date().toISOString(),
// Use timestamp from pending tool call (captured from TOOL_CALL_START)
timestamp: childTool.timestamp || new Date().toISOString(),
toolUseBlock: {
type: "tool_use_block",
id: childId,
Expand Down Expand Up @@ -1045,7 +1053,8 @@ export default function ProjectSessionDetailPage({

const streamingToolMessage: HierarchicalToolMessage = {
type: "tool_use_messages",
timestamp: new Date().toISOString(),
// Use timestamp from pending tool call (captured from TOOL_CALL_START)
timestamp: pendingTool.timestamp || new Date().toISOString(),
toolUseBlock: {
type: "tool_use_block",
id: toolId,
Expand Down
8 changes: 8 additions & 0 deletions components/frontend/src/hooks/use-agui-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
id: newState.currentMessage.id || crypto.randomUUID(),
role: newState.currentMessage.role || AGUIRole.ASSISTANT,
content: newState.currentMessage.content,
timestamp: event.timestamp,
}
newState.messages = [...newState.messages, msg]
onMessage?.(msg)
Expand Down Expand Up @@ -169,6 +170,7 @@
id: event.messageId || null,
role: event.role,
content: '',
timestamp: event.timestamp, // Capture timestamp from event
}
return newState
}
Expand Down Expand Up @@ -214,6 +216,7 @@
id: messageId,
role: newState.currentMessage.role || AGUIRole.ASSISTANT,
content: newState.currentMessage.content,
timestamp: event.timestamp,
}
newState.messages = [...newState.messages, msg]
onMessage?.(msg)
Expand All @@ -235,6 +238,7 @@
name: event.toolCallName || 'unknown_tool',
args: '',
parentToolUseId: parentToolId,
timestamp: event.timestamp, // Capture timestamp from event
});
newState.pendingToolCalls = updatedPending;

Expand Down Expand Up @@ -415,6 +419,7 @@
toolCallId: toolCallId,
name: toolCallName,
toolCalls: [completedToolCall],
timestamp: event.timestamp,
}
messages.push(toolMessage)
}
Expand Down Expand Up @@ -546,6 +551,7 @@
thinking: actualRawData.thinking as string,
signature: actualRawData.signature as string,
},
timestamp: event.timestamp,
}
newState.messages = [...newState.messages, msg]
onMessage?.(msg)
Expand All @@ -562,6 +568,7 @@
id: messageId,
role: AGUIRole.USER,
content: actualRawData.content as string,
timestamp: event.timestamp,
}
newState.messages = [...newState.messages, msg]
onMessage?.(msg)
Expand All @@ -575,6 +582,7 @@
id: (actualRawData.id as string) || crypto.randomUUID(),
role: actualRawData.role as AGUIMessage['role'],
content: actualRawData.content as string,
timestamp: event.timestamp,
}
newState.messages = [...newState.messages, msg]
onMessage?.(msg)
Expand All @@ -598,7 +606,7 @@
return newState
})
},
[onEvent, onMessage, onError],

Check warning on line 609 in components/frontend/src/hooks/use-agui-stream.ts

View workflow job for this annotation

GitHub Actions / lint-frontend

React Hook useCallback has a missing dependency: 'onTraceId'. Either include it or remove the dependency array
)

// Connect to the AG-UI event stream
Expand Down
2 changes: 2 additions & 0 deletions components/frontend/src/types/agui.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ export type PendingToolCall = {
name: string
args: string
parentToolUseId?: string
timestamp?: string // Timestamp from TOOL_CALL_START event
}

// Feedback type for messages
Expand All @@ -325,6 +326,7 @@ export type AGUIClientState = {
id: string | null
role: AGUIRoleValue | null
content: string
timestamp?: string // Timestamp from TEXT_MESSAGE_START event
} | null
// DEPRECATED: Use pendingToolCalls instead for parallel tool call support
currentToolCall: {
Expand Down
Loading