Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/active-root-run-id.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@perstack/react": patch
---

Add activeRootRunId to useRun and useJobStream for immediate root run identification from startRun events
6 changes: 4 additions & 2 deletions packages/react/src/hooks/use-job-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export type JobStreamState = {
activities: ActivityOrGroup[]
streaming: StreamingState
latestActivity: ActivityOrGroup | null
/** The runId of the active root run, derived from startRun events (available immediately) */
activeRootRunId: string | null
isConnected: boolean
error: Error | null
}
Expand All @@ -21,7 +23,7 @@ export function useJobStream(options: {
}): JobStreamState {
const { jobId, connect, enabled = true } = options
const shouldConnect = Boolean(jobId && enabled)
const { activities, streaming, addEvent } = useRun()
const { activities, streaming, activeRootRunId, addEvent } = useRun()
const [isConnected, setIsConnected] = useState(false)
const [error, setError] = useState<Error | null>(null)
const addEventRef = useRef(addEvent)
Expand Down Expand Up @@ -63,5 +65,5 @@ export function useJobStream(options: {
[activities],
)

return { activities, streaming, latestActivity, isConnected, error }
return { activities, streaming, latestActivity, activeRootRunId, isConnected, error }
}
91 changes: 91 additions & 0 deletions packages/react/src/hooks/use-run.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ describe("useRun hook", () => {
expect(result.current.activities).toEqual([])
expect(result.current.streaming.runs).toEqual({})
expect(result.current.isComplete).toBe(false)
expect(result.current.activeRootRunId).toBeNull()
expect(result.current.eventCount).toBe(0)
})

Expand Down Expand Up @@ -619,6 +620,96 @@ describe("useRun hook", () => {
expect(result.current.eventCount).toBe(1)
})

it("sets activeRootRunId on root startRun event", () => {
const { result } = renderHook(() => useRun())

act(() => {
result.current.addEvent(
createBaseEvent({
type: "startRun",
runId: "root-run-1",
inputMessages: [{ type: "userMessage", contents: [{ type: "textPart", text: "Hello" }] }],
initialCheckpoint: { status: "init" },
} as Partial<RunEvent>) as RunEvent,
)
})

expect(result.current.activeRootRunId).toBe("root-run-1")
})

it("does not set activeRootRunId on delegated startRun event", () => {
const { result } = renderHook(() => useRun())

act(() => {
result.current.addEvent(
createBaseEvent({
type: "startRun",
runId: "root-run-1",
inputMessages: [{ type: "userMessage", contents: [{ type: "textPart", text: "Hello" }] }],
initialCheckpoint: { status: "init" },
} as Partial<RunEvent>) as RunEvent,
)
})

expect(result.current.activeRootRunId).toBe("root-run-1")

act(() => {
result.current.addEvent(
createBaseEvent({
type: "startRun",
runId: "child-run-2",
inputMessages: [
{ type: "userMessage", contents: [{ type: "textPart", text: "Delegated" }] },
],
initialCheckpoint: {
status: "init",
delegatedBy: {
expert: { key: "root-expert@1.0.0" },
toolCallId: "tc-1",
toolName: "delegate",
checkpointId: "cp-1",
runId: "root-run-1",
},
},
} as Partial<RunEvent>) as RunEvent,
)
})

expect(result.current.activeRootRunId).toBe("root-run-1")
})

it("updates activeRootRunId on continuation startRun event", () => {
const { result } = renderHook(() => useRun())

act(() => {
result.current.addEvent(
createBaseEvent({
type: "startRun",
runId: "run-1",
inputMessages: [{ type: "userMessage", contents: [{ type: "textPart", text: "Hello" }] }],
initialCheckpoint: { status: "init" },
} as Partial<RunEvent>) as RunEvent,
)
})

expect(result.current.activeRootRunId).toBe("run-1")

act(() => {
result.current.addEvent(
createBaseEvent({
type: "startRun",
runId: "run-2",
inputMessages: [
{ type: "userMessage", contents: [{ type: "textPart", text: "Continue" }] },
],
initialCheckpoint: { status: "completed" },
} as Partial<RunEvent>) as RunEvent,
)
})

expect(result.current.activeRootRunId).toBe("run-2")
})

it("appends historical events", () => {
const { result } = renderHook(() => useRun())

Expand Down
12 changes: 12 additions & 0 deletions packages/react/src/hooks/use-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ export type RunResult = {
streaming: StreamingState
/** Whether the run is complete */
isComplete: boolean
/** The runId of the active root run (not delegated), derived from startRun events */
activeRootRunId: string | null
/** Number of events processed */
eventCount: number
/** Add a new event to be processed */
Expand All @@ -174,6 +176,7 @@ export function useRun(): RunResult {
const [streaming, setStreaming] = useState<StreamingState>({ runs: {} })
const [eventCount, setEventCount] = useState(0)
const [isComplete, setIsComplete] = useState(false)
const [activeRootRunId, setActiveRootRunId] = useState<string | null>(null)

const stateRef = useRef<ActivityProcessState>(createInitialActivityProcessState())

Expand Down Expand Up @@ -205,6 +208,14 @@ export function useRun(): RunResult {
(rs) => rs.isComplete && !rs.delegatedBy,
)
setIsComplete(rootRunComplete)

if ("type" in event && event.type === "startRun" && "runId" in event) {
const startRunId = event.runId as string
const runState = stateRef.current.runStates.get(startRunId)
if (runState && !runState.delegatedBy) {
setActiveRootRunId(startRunId)
}
}
}, [])

const clearRunStreaming = useCallback((runId: string) => {
Expand Down Expand Up @@ -261,6 +272,7 @@ export function useRun(): RunResult {
activities,
streaming,
isComplete,
activeRootRunId,
eventCount,
addEvent,
appendHistoricalEvents,
Expand Down