Skip to content

[Bug]: Concurrency Issue in AgentBase.call() #732

@wuji1428

Description

@wuji1428

Description

The current implementation of the call method utilizes doFinally to reset the Agent's running state (running.set(false)).

In multi-threaded environments (e.g., ChatModel using subscribeOn(Schedulers.boundedElastic())), the execution of doFinally is concurrent or asynchronous relative to the delivery of signals to the downstream subscriber. When a caller uses .block() to suspend the main thread, the main thread may be awakened and initiate the next call before the background thread has finished resetting the state bit. This results in an unexpected IllegalStateException.

Reproduction

@Test
public void testConcurrency() {
    AgentBase agent = new AgentBase();
    agent.call()
        .map(msg -> {
            // Time-consuming map operation
            return msg;
        })
        .block(); 

    // Subsequent call immediately after
    agent.call().block(); // Throws unexpected IllegalStateException
}

Impact

  • Logic Anomalies: Sequential calls to the Agent may fail due to cleanup delays, reducing system reliability and making errors unpredictable.
  • Over-extended Resource Holding: Once AgentBase.call() completes its internal execution, the Agent should release its "busy" state. Downstream processing (such as .map()) belongs to the user's business logic and should not prevent the Agent from accepting new calls.

Necessity for Modification

The current implementation cannot strictly guarantee the execution order between the "completion signal" and "state reset." To ensure thread-safe lifecycle management, the state reset logic must be integrated into the synchronous lifecycle of the reactive stream.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions