diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/DashScopeChatModel.java b/agentscope-core/src/main/java/io/agentscope/core/model/DashScopeChatModel.java index 88e5be749..85828bfd1 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/DashScopeChatModel.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/DashScopeChatModel.java @@ -188,8 +188,8 @@ private Flux streamWithHttpClient( Instant start = Instant.now(); boolean useMultimodal = requiresMultiModalApi(); - // Get effective options - GenerateOptions effectiveOptions = options != null ? options : defaultOptions; + // Merge options with defaultOptions (options takes precedence) + GenerateOptions effectiveOptions = GenerateOptions.mergeOptions(options, defaultOptions); ToolChoice toolChoice = effectiveOptions.getToolChoice(); // Format messages using formatter diff --git a/agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java b/agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java index 37b0badf7..ef9fa151a 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java +++ b/agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java @@ -16,6 +16,8 @@ package io.agentscope.core.pipeline; import io.agentscope.core.agent.AgentBase; +import io.agentscope.core.agent.Event; +import io.agentscope.core.agent.StreamOptions; import io.agentscope.core.exception.CompositeAgentException; import io.agentscope.core.message.Msg; import java.util.ArrayList; @@ -227,6 +229,131 @@ public String getDescription() { return description; } + // ==================== Streaming API ==================== + + /** + * Stream execution events from all agents with default options. + * + *

Events from multiple agents are merged (concurrent mode) or concatenated + * (sequential mode) based on the pipeline configuration. + * + * @param input Input message to distribute to all agents + * @return Flux of events emitted during execution from all agents + */ + public Flux stream(Msg input) { + return stream(input, StreamOptions.defaults()); + } + + /** + * Stream execution events from all agents with specified options. + * + *

Events from multiple agents are merged (concurrent mode) or concatenated + * (sequential mode) based on the pipeline configuration. + * + * @param input Input message to distribute to all agents + * @param options Stream configuration options + * @return Flux of events emitted during execution from all agents + */ + public Flux stream(Msg input, StreamOptions options) { + return stream(input, options, null); + } + + /** + * Stream execution events from all agents with structured output support. + * + *

Events from multiple agents are merged (concurrent mode) or concatenated + * (sequential mode) based on the pipeline configuration. + * + * @param input Input message to distribute to all agents + * @param options Stream configuration options + * @param structuredOutputClass The class type for structured output (optional) + * @return Flux of events emitted during execution from all agents + */ + public Flux stream(Msg input, StreamOptions options, Class structuredOutputClass) { + if (agents.isEmpty()) { + return Flux.empty(); + } + + StreamOptions effectiveOptions = options != null ? options : StreamOptions.defaults(); + + return enableConcurrent + ? streamConcurrent(input, effectiveOptions, structuredOutputClass) + : streamSequential(input, effectiveOptions, structuredOutputClass); + } + + /** + * Stream events from all agents concurrently. + * + *

All agents execute in parallel and their events are merged into a single stream. + * Events may arrive interleaved from different agents. + * + * @param input Input message to distribute to all agents + * @param options Stream configuration options + * @param structuredOutputClass The class type for structured output (optional) + * @return Flux of merged events from all agents + */ + private Flux streamConcurrent( + Msg input, StreamOptions options, Class structuredOutputClass) { + List errors = + Collections.synchronizedList(new ArrayList<>()); + + List> agentFluxes = + agents.stream() + .map( + agent -> { + Flux flux = + structuredOutputClass != null + ? agent.stream( + input, options, structuredOutputClass) + : agent.stream(input, options); + + return flux.subscribeOn(scheduler) + .doOnError( + throwable -> + errors.add( + new CompositeAgentException + .AgentExceptionInfo( + agent.getAgentId(), + agent.getName(), + throwable))) + .onErrorResume(e -> Flux.empty()); + }) + .toList(); + + return Flux.merge(agentFluxes) + .doOnComplete( + () -> { + if (!errors.isEmpty()) { + throw new CompositeAgentException( + "Multiple agent streaming failures occurred", errors); + } + }); + } + + /** + * Stream events from all agents sequentially. + * + *

Agents execute one after another. Events from each agent are emitted + * in order before the next agent starts. + * + * @param input Input message to distribute to all agents + * @param options Stream configuration options + * @param structuredOutputClass The class type for structured output (optional) + * @return Flux of concatenated events from all agents + */ + private Flux streamSequential( + Msg input, StreamOptions options, Class structuredOutputClass) { + List> chain = new ArrayList<>(); + for (AgentBase agent : agents) { + Flux flux = + structuredOutputClass != null + ? agent.stream(input, options, structuredOutputClass) + : agent.stream(input, options); + chain.add(flux); + } + return Flux.concat(chain); + } + @Override public String toString() { return String.format( diff --git a/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java b/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java index 3d0b65b93..ed9e46475 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java +++ b/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java @@ -145,11 +145,11 @@ private Mono handlePreCall(PreCallEvent event) { if (retrievedDocs == null || retrievedDocs.isEmpty()) { return Mono.just(event); } - List enhancedMessages = new ArrayList<>(); + List enhancedMessages = new ArrayList<>(inputMessages.size() + 1); // Build enhanced messages with knowledge context Msg enhancedMessage = createEnhancedMessages(retrievedDocs); - enhancedMessages.addAll(inputMessages); enhancedMessages.add(enhancedMessage); + enhancedMessages.addAll(inputMessages); event.setInputMessages(enhancedMessages); return Mono.just(event); }) diff --git a/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java b/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java index 58ad38a9a..32d917908 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java @@ -23,6 +23,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.agentscope.core.ReActAgent; +import io.agentscope.core.agent.Event; +import io.agentscope.core.agent.EventType; +import io.agentscope.core.agent.StreamOptions; import io.agentscope.core.agent.test.MockModel; import io.agentscope.core.agent.test.TestUtils; import io.agentscope.core.exception.CompositeAgentException; @@ -32,6 +35,7 @@ import java.time.Duration; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -213,4 +217,152 @@ private ReActAgent createAgent(String name, MockModel model) { // safety .build(); } + + // ==================== Streaming Tests ==================== + + @Test + @DisplayName("Should stream events from all agents when running concurrently") + void shouldStreamEventsConcurrently() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2)); + + Msg input = TestUtils.createUserMessage("User", "stream fanout"); + List events = pipeline.stream(input).collectList().block(TIMEOUT); + + assertNotNull(events, "Streaming pipeline should produce events"); + assertFalse(events.isEmpty(), "Events should not be empty"); + + // Verify we got AGENT_RESULT events from both agents + long agentResultCount = + events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).count(); + assertEquals(2, agentResultCount, "Expected AGENT_RESULT from each agent"); + + // Verify models were called + assertEquals(1, model1.getCallCount(), "First model should be invoked once"); + assertEquals(1, model2.getCallCount(), "Second model should be invoked once"); + } + + @Test + @DisplayName("Should stream events sequentially when configured") + void shouldStreamEventsSequentially() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2), false); + + Msg input = TestUtils.createUserMessage("User", "sequential stream"); + List events = pipeline.stream(input).collectList().block(TIMEOUT); + + assertNotNull(events, "Sequential streaming should return events"); + assertFalse(pipeline.isConcurrentEnabled(), "Pipeline should be sequential"); + + // Verify we got AGENT_RESULT events + long agentResultCount = + events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).count(); + assertEquals(2, agentResultCount, "Expected AGENT_RESULT from each agent"); + + // Find agent result events and verify order + List agentResults = + events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).toList(); + + assertEquals( + "Agent1", + agentResults.get(0).getMessage().getName(), + "First agent response should lead"); + assertEquals( + "Agent2", + agentResults.get(1).getMessage().getName(), + "Second agent response should follow"); + } + + @Test + @DisplayName("Should stream with custom StreamOptions") + void shouldStreamWithCustomOptions() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2)); + + // Only stream AGENT_RESULT events + StreamOptions options = + StreamOptions.builder() + .eventTypes(EventType.AGENT_RESULT) + .incremental(true) + .build(); + + Msg input = TestUtils.createUserMessage("User", "options test"); + List events = pipeline.stream(input, options).collectList().block(TIMEOUT); + + assertNotNull(events, "Streaming with options should return events"); + + // All events should be AGENT_RESULT type + boolean allAgentResults = + events.stream().allMatch(e -> e.getType() == EventType.AGENT_RESULT); + assertTrue(allAgentResults, "All events should be AGENT_RESULT"); + } + + @Test + @DisplayName("Should return empty flux for empty pipeline") + void shouldReturnEmptyFluxForEmptyPipeline() { + FanoutPipeline pipeline = new FanoutPipeline(List.of()); + + Msg input = TestUtils.createUserMessage("User", "empty pipeline"); + List events = pipeline.stream(input).collectList().block(TIMEOUT); + + assertNotNull(events, "Should return empty list"); + assertTrue(events.isEmpty(), "Empty pipeline should produce no events"); + } + + @Test + @DisplayName("Should stream events through builder-created pipeline") + void shouldStreamEventsViaBuilder() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + ReActAgent agent3 = createAgent("Agent3", model3); + + FanoutPipeline pipeline = + FanoutPipeline.builder() + .addAgent(agent1) + .addAgents(List.of(agent2, agent3)) + .sequential() + .build(); + + Msg input = TestUtils.createUserMessage("User", "builder stream"); + List events = pipeline.stream(input).collectList().block(TIMEOUT); + + assertNotNull(events, "Builder-produced pipeline should stream events"); + + // Verify agent results in order (sequential mode) + List agentNames = + events.stream() + .filter(e -> e.getType() == EventType.AGENT_RESULT) + .map(e -> e.getMessage().getName()) + .toList(); + + assertEquals( + List.of("Agent1", "Agent2", "Agent3"), + agentNames, + "Sequential streaming should maintain insertion order"); + } + + @Test + @DisplayName("Should collect events count correctly in concurrent streaming") + void shouldCollectCorrectEventCountConcurrently() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + ReActAgent agent3 = createAgent("Agent3", model3); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2, agent3)); + + Msg input = TestUtils.createUserMessage("User", "count test"); + + AtomicInteger eventCount = new AtomicInteger(0); + pipeline.stream(input).doOnNext(event -> eventCount.incrementAndGet()).blockLast(TIMEOUT); + + assertTrue( + eventCount.get() >= 3, + "Should have at least 3 AGENT_RESULT events (one per agent)"); + } }