Skip to content

[Arrow Flight RPC] Producer-side back-pressure under slow consumers#21899

Open
rishabhmaurya wants to merge 1 commit into
opensearch-project:mainfrom
rishabhmaurya:flight-rpc-slow-consumer-it
Open

[Arrow Flight RPC] Producer-side back-pressure under slow consumers#21899
rishabhmaurya wants to merge 1 commit into
opensearch-project:mainfrom
rishabhmaurya:flight-rpc-slow-consumer-it

Conversation

@rishabhmaurya
Copy link
Copy Markdown
Contributor

@rishabhmaurya rishabhmaurya commented May 30, 2026

Summary

FlightServerChannel.sendResponseBatch now honours gRPC's isReady() contract: the producer thread parks on BackpressureStrategy.waitForListener before each batch is queued, and resumes only after gRPC reports the per-stream outbound buffer has drained below setOnReadyThreshold. Slow consumers throttle producer wall-clock instead of OOMing the flight pool.

Context

3.1 used BaseFlightProducer with a synchronous BackpressureStrategy.waitForListener gate before every putNext. When we moved to the async/queue model (flight-eventloop-N + FlightOutboundHandler.BatchTask), the goal was to match the existing OpenSearch transport's fire-and-forget shape — producer enqueues quickly and returns, ordering is enforced by a single-threaded eventloop. That solved the concurrent-segment-search contention problem (multiple producer threads writing to one channel without serialising on a per-channel mutex), but in doing so we dropped the isReady() gate. Without it, the unguarded path ignored gRPC's flow-control signal and let buffers accumulate until the flight pool allocator threw OutOfMemoryException.

This PR restores the gate without giving up the async fire-and-forget shape. awaitReadyOrThrow runs on the producer thread before the BatchTask is submitted to the eventloop. The eventloop and its single-threaded ordering guarantees are unchanged.

What changed

  • FlightServerChannel registers a CompositeBackpressureStrategy and exposes awaitReadyOrThrow(). The strategy's cancel callback runs the channel's existing onChannelCancelled cleanup before notifying parked threads, so a thread waking from waitForListener always observes the cancelled state.
  • FlightOutboundHandler.sendResponseBatch calls awaitReadyOrThrow() before getExecutor().execute(...).
  • New settings arrow.flight.channel.outbound_buffer_threshold (default 64 MiB) and arrow.flight.channel.ready_timeout (default 60s, 100ms minimum). The threshold is wired through to OSFlightServer.Builder.backpressureThreshold.

Settings

Setting Default Behaviour
arrow.flight.channel.ready_timeout 60s Maximum time the producer parks before failing the batch with StreamErrorCode.TIMED_OUT.
arrow.flight.channel.outbound_buffer_threshold 64mb Per-stream gRPC watermark. Must be strictly smaller than native.allocator.pool.flight.max.

Tuning and operational concerns

Sizing the flight pool

outbound_buffer_threshold is per stream; native.allocator.pool.flight.max is per node, shared across all streams.

flight pool max  >=  N concurrent streams × (threshold + ~16 MiB headroom)

The threshold is a watermark, not a max-message-size — a single batch larger than the threshold ships in one shot, with the producer parking on the next sendResponseBatch. See docs/backpressure.md for full sizing guidance.

Thread-pool exhaustion under slow consumers

The producer thread parks under a slow consumer (up to ready_timeout). The action handler runs on whichever thread pool it was registered against; N concurrent slow streams hold N threads parked simultaneously, which can starve the pool.

Mitigations:

  • Size ready_timeout to fail unresponsive streams within an acceptable window.
  • Register stream actions on a pool sized for streaming workloads (not on shared CPU-bound pools): a pool of K threads both bounds concurrent streams to K and isolates the parking from unrelated traffic. A dedicated admission-control layer is a more flexible alternative, out of scope here.

⚠️ Virtual threads for park-bound producers

For workloads where the producer is mostly bottlenecked on awaitReadyOrThrow (lots of parking, little compute per batch), the code that registers the stream action can dispatch its handler on a virtual-thread executor it owns. Park time then doesn't consume a platform thread. CPU-bound work (e.g. building each batch) should still run on a sized platform-thread pool to avoid pinning the carrier — typically by submitting that work to a separate executor and awaiting the result from the virtual thread. The framework itself does not assume virtual threads; this is a decision for the action's registrant.

Tests

  • Unit tests cover CompositeBackpressureStrategy (100% coverage), FlightServerChannel.awaitReadyOrThrow (fast path, parks-until-ready, timeout, cancel-during-wait, already-cancelled, handler registration), FlightOutboundHandler gating path, and ServerConfig settings parsing / bounds.
  • BackpressureProducerIT exercises a slow consumer; asserts the stream completes cleanly, all batches arrive, and producer wall-clock reflects consumer pacing (proves the producer was actually throttled).
  • Concurrency-sensitive unit tests use assertBusy on Thread.State.TIMED_WAITING rather than fixed sleeps to avoid flakiness. Stable across multiple reruns.

Coverage on touched code: CompositeBackpressureStrategy 100%, FlightServerChannel 82%, ArrowFlightProducer 77%, FlightOutboundHandler 70%. Uncovered lines are pre-existing defensive paths.

Test plan

  • ./gradlew :plugins:arrow-flight-rpc:test
  • ./gradlew :plugins:arrow-flight-rpc:check -x internalClusterTest
  • ./gradlew :plugins:arrow-flight-rpc:internalClusterTest --tests "*BackpressureProducerIT" (stable across reruns)
  • ./gradlew :plugins:arrow-flight-rpc:jacocoTestReport

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 30, 2026

PR Reviewer Guide 🔍

(Review updated until commit cadf774)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Thread-pool exhaustion risk

The producer thread parks in awaitReadyOrThrow() for up to readyTimeoutMillis (default 60s) under a slow consumer. If the action handler is registered on a bounded thread pool (e.g., SEARCH), N concurrent slow streams will hold N threads parked simultaneously and can starve the pool. The javadoc warns about this, but the default 60s timeout is long enough to cause operational impact before the stream fails. Operators must either size the action's thread pool to accommodate concurrent parked threads or implement admission control, which may not be obvious from the setting alone.

/**
 * Parks the calling thread until gRPC signals it can accept another batch. Called
 * from the producer thread before the batch is submitted to the channel's executor.
 *
 * <p><b>Warning:</b> the calling thread may park for up to {@code readyTimeoutMillis}
 * under a slow consumer. If the action handler is registered on a bounded thread
 * pool (e.g. {@code SEARCH}), N concurrent slow streams will hold N threads parked
 * simultaneously and can starve the pool. Operators should size the action's thread
 * pool — or limit concurrent streams via admission control — accordingly.
 *
 * @throws StreamException with {@link StreamErrorCode#TIMED_OUT} if the consumer
 *         remains not-ready longer than {@code readyTimeoutMillis}, or
 *         {@link StreamErrorCode#CANCELLED} if the client cancelled.
 */
Flaky timing assertion

The test asserts wall-clock must be >= 0.4 × (BATCH_COUNT × CONSUMER_SLEEP_MS) to prove back-pressure throttled the producer. The 0.4 multiplier is intended to absorb CI variance, but under extreme load or scheduler delays the producer may still complete faster than expected (e.g., if the consumer thread is starved and doesn't actually sleep for the full duration, or if batches are processed in bursts). A false negative here would mask a regression where back-pressure is not working. Consider adding a lower bound on the number of times awaitReadyOrThrow actually parked, or verifying that the flight pool allocation stayed below a threshold, rather than relying solely on wall-clock.

long elapsedMillis = (System.nanoTime() - startNanos) / 1_000_000;
assertEquals("All batches must arrive successfully under back-pressure", BATCH_COUNT, batchesReceived.get());

// Wall-clock must reflect consumer pacing — without back-pressure the producer
// would race ahead and finish near-instantly relative to the consumer's sleep
// budget. 0.4x absorbs startup jitter and CI variance without false negatives.
long minExpectedMillis = (long) ((BATCH_COUNT * CONSUMER_SLEEP_MS) * 0.4);
assertTrue(
    "Wall-clock " + elapsedMillis + "ms must reflect consumer pacing (>=" + minExpectedMillis + "ms)",
    elapsedMillis >= minExpectedMillis
);

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 30, 2026

PR Code Suggestions ✨

Latest suggestions up to cadf774

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent resource leak on gate failure

If awaitReadyOrThrow throws a StreamException, the BatchTask is never closed because
the executor never runs. This leaks the task's resources. Wrap the gate call in a
try-catch that closes the task on failure before re-throwing.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [134-142]

 // Block the producer thread before queuing the batch so a slow consumer
 // throttles allocation rather than letting the eventloop's queue grow.
-flightChannel.awaitReadyOrThrow();
+try {
+    flightChannel.awaitReadyOrThrow();
+} catch (Exception e) {
+    task.close();
+    throw e;
+}
 
 flightChannel.getExecutor().execute(threadPool.getThreadContext().preserveContext(() -> {
     try (BatchTask ignored = task) {
         processBatchTask(task);
     } catch (Exception e) {
Suggestion importance[1-10]: 9

__

Why: This identifies a critical resource leak: if awaitReadyOrThrow throws, the BatchTask is never closed, leaking the batch's resources. The suggested fix properly closes the task before re-throwing, preventing memory leaks under back-pressure failures.

High
General
Guard assertions after latch timeout

If the latch times out, failure.get() and batchesReceived.get() are checked anyway,
potentially masking the root cause. Guard subsequent assertions so they only run
when the latch succeeded, making timeout failures immediately obvious.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/BackpressureProducerIT.java [188-192]

-assertTrue("Stream should finish within 90s", latch.await(90, TimeUnit.SECONDS));
+boolean completed = latch.await(90, TimeUnit.SECONDS);
+assertTrue("Stream should finish within 90s", completed);
+if (!completed) {
+    return; // Skip further assertions if the stream timed out
+}
 assertNull("Producer must not surface any failure under back-pressure: " + failure.get(), failure.get());
 
 long elapsedMillis = (System.nanoTime() - startNanos) / 1_000_000;
 assertEquals("All batches must arrive successfully under back-pressure", BATCH_COUNT, batchesReceived.get());
Suggestion importance[1-10]: 5

__

Why: The suggestion improves test clarity by preventing subsequent assertions from running after a timeout, making timeout failures more obvious. This is a reasonable test hygiene improvement but doesn't fix a functional bug.

Low
Improve unexpected result handling clarity

The default case logs a warning but then throws an exception. If the unexpected
result is non-fatal or transient, consider whether the stream should fail
immediately. Alternatively, if this truly represents an internal inconsistency,
ensure the exception message is actionable for operators.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java [109-132]

 public void awaitReadyOrThrow() {
     if (cancelled || serverStreamListener.isCancelled()) {
         throw StreamException.cancelled("stream cancelled before back-pressure wait");
     }
     BackpressureStrategy.WaitResult result = bp.waitForListener(readyTimeoutMillis);
     switch (result) {
         case READY:
-            // Re-check cancelled state: Arrow's CallbackBackpressureStrategy returns
-            // READY when both isReady() and isCancelled() are true at wakeup. Consult
-            // the listener directly (the channel's own cancelled flag is set by the
-            // cancel callback, which races with the wakeup itself).
             if (cancelled || serverStreamListener.isCancelled()) {
                 throw StreamException.cancelled("stream cancelled concurrently with ready");
             }
             return;
         case CANCELLED:
             throw StreamException.cancelled("stream cancelled while waiting for consumer");
         case TIMEOUT:
             throw new StreamException(StreamErrorCode.TIMED_OUT, "consumer not ready after " + readyTimeoutMillis + "ms");
         default:
-            logger.warn("unexpected back-pressure wait result: {}", result);
+            logger.error("Unexpected back-pressure wait result: {}. This indicates a bug in BackpressureStrategy.", result);
             throw new StreamException(StreamErrorCode.INTERNAL, "unexpected back-pressure wait result: " + result);
     }
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion to change logger.warn to logger.error is a minor improvement in log severity for an internal inconsistency. However, the default case already handles the situation appropriately by throwing an exception, so the impact is minimal.

Low

Previous suggestions

Suggestions up to commit 3757039
CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure idempotent cancellation cleanup

The onChannelCancelled method checks cancelled without synchronization, creating a
race condition where multiple threads could observe cancelled == false
simultaneously and execute the cleanup block multiple times. This can lead to
duplicate recordCallEnd calls or double-close. Make this method synchronized or use
AtomicBoolean.compareAndSet to ensure idempotent cleanup.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java [135-141]

-private void onChannelCancelled() {
+private synchronized void onChannelCancelled() {
     if (!cancelled) {
         cancelled = true;
         callTracker.recordCallEnd(StreamErrorCode.CANCELLED.name());
         close();
     }
 }
Suggestion importance[1-10]: 6

__

Why: Valid concern about the check-then-act race condition in onChannelCancelled. While the cancelled flag is volatile, the compound operation (check + set + cleanup) is not atomic. Using synchronized or AtomicBoolean.compareAndSet would ensure exactly-once cleanup semantics, preventing potential duplicate recordCallEnd calls.

Low
Prevent double-free on batch transfer failure

The transferred flag pattern is fragile because sendResponseBatch may throw before
ownership transfer completes. If an exception occurs during sendResponseBatch, the
VectorSchemaRoot will be closed in the finally block, but the channel may have
already taken partial ownership of the buffers, leading to potential double-free or
use-after-free. Ensure the ownership transfer is atomic or use try-with-resources on
a wrapper that tracks transfer state.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/BackpressureProducerIT.java [294-326]

 private void handleStreamRequest(BackpressureTestRequest request, TransportChannel channel, Task task) throws IOException {
     try {
         for (int b = 0; b < request.batchCount; b++) {
             if (request.perBatchSleepMillis > 0) {
                 try {
                     Thread.sleep(request.perBatchSleepMillis);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     throw new IOException("interrupted", e);
                 }
             }
-            VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator);
-            boolean transferred = false;
-            try {
+            try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) {
                 IntVector v = (IntVector) root.getVector("value");
                 v.allocateNew(request.rowsPerBatch);
                 for (int i = 0; i < request.rowsPerBatch; i++) {
                     v.setSafe(i, i);
                 }
                 root.setRowCount(request.rowsPerBatch);
                 channel.sendResponseBatch(new BackpressureTestResponse(root));
-                transferred = true;
-            } finally {
-                if (!transferred) {
-                    root.close();
-                }
+                // Ownership transferred; suppress close
+                root.clear();
             }
         }
         channel.completeStream();
     } catch (Exception e) {
         channel.sendResponse(e);
     }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion raises a valid concern about ownership transfer semantics, but the proposed solution using root.clear() is incorrect and would break the transfer. The existing transferred flag pattern is a common idiom for ownership transfer. A better approach would require understanding the exact ownership contract of sendResponseBatch, which isn't clear from this PR alone.

Low
Synchronize cancellation checks with wait operation

The method performs multiple cancellation checks that race with concurrent state
changes. Between the initial check and the waitForListener call, or between
waitForListener returning and the READY case re-check, the stream could be
cancelled. Consider wrapping the entire method body in a synchronized block or using
a lock to ensure atomicity of the cancelled state observation and the wait
operation.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java [109-132]

 public void awaitReadyOrThrow() {
-    if (cancelled || serverStreamListener.isCancelled()) {
-        throw StreamException.cancelled("stream cancelled before back-pressure wait");
-    }
-    BackpressureStrategy.WaitResult result = bp.waitForListener(readyTimeoutMillis);
-    switch (result) {
-        case READY:
-            // Re-check cancelled state: Arrow's CallbackBackpressureStrategy returns
-            // READY when both isReady() and isCancelled() are true at wakeup. Consult
-            // the listener directly (the channel's own cancelled flag is set by the
-            // cancel callback, which races with the wakeup itself).
-            if (cancelled || serverStreamListener.isCancelled()) {
-                throw StreamException.cancelled("stream cancelled concurrently with ready");
-            }
-            return;
-        case CANCELLED:
-            throw StreamException.cancelled("stream cancelled while waiting for consumer");
-        case TIMEOUT:
-            throw new StreamException(StreamErrorCode.TIMED_OUT, "consumer not ready after " + readyTimeoutMillis + "ms");
-        default:
-            logger.warn("unexpected back-pressure wait result: {}", result);
-            throw new StreamException(StreamErrorCode.INTERNAL, "unexpected back-pressure wait result: " + result);
+    synchronized (this) {
+        if (cancelled || serverStreamListener.isCancelled()) {
+            throw StreamException.cancelled("stream cancelled before back-pressure wait");
+        }
+        BackpressureStrategy.WaitResult result = bp.waitForListener(readyTimeoutMillis);
+        switch (result) {
+            case READY:
+                if (cancelled || serverStreamListener.isCancelled()) {
+                    throw StreamException.cancelled("stream cancelled concurrently with ready");
+                }
+                return;
+            case CANCELLED:
+                throw StreamException.cancelled("stream cancelled while waiting for consumer");
+            case TIMEOUT:
+                throw new StreamException(StreamErrorCode.TIMED_OUT, "consumer not ready after " + readyTimeoutMillis + "ms");
+            default:
+                logger.warn("unexpected back-pressure wait result: {}", result);
+                throw new StreamException(StreamErrorCode.INTERNAL, "unexpected back-pressure wait result: " + result);
+        }
     }
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion identifies a theoretical race condition but misunderstands the design. The BackpressureStrategy.waitForListener is thread-safe and handles concurrent state changes internally. The double-check pattern after READY is intentional to handle Arrow's behavior. Adding synchronization would block concurrent producers unnecessarily and contradict the eventloop design.

Low
Suggestions up to commit 9834a0f
CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure idempotent cancel cleanup

The onChannelCancelled method is invoked from gRPC's OnCancelHandler callback, which
can fire concurrently from multiple threads if the strategy's cancel logic races
with other cleanup paths. The check-then-act pattern (if (!cancelled) { cancelled =
true; ... }) is not atomic and can result in multiple threads observing
cancelled==false simultaneously, leading to duplicate recordCallEnd calls or
double-close. Use compareAndSet to ensure idempotent cleanup runs exactly once.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java [135-141]

+private final AtomicBoolean cancelledOnce = new AtomicBoolean(false);
+
 private void onChannelCancelled() {
-    if (!cancelled) {
+    if (cancelledOnce.compareAndSet(false, true)) {
         cancelled = true;
         callTracker.recordCallEnd(StreamErrorCode.CANCELLED.name());
         close();
     }
 }
Suggestion importance[1-10]: 7

__

Why: The check-then-act pattern in onChannelCancelled is not thread-safe and could lead to duplicate cleanup calls under concurrent cancellation. Using AtomicBoolean.compareAndSet ensures idempotent execution, preventing potential double-close or duplicate metric recording issues.

Medium
General
Fail fast on timeout

The test verifies back-pressure by checking wall-clock elapsed time against a
minimum threshold (0.4x of expected consumer sleep budget). However, if the latch
times out after 90 seconds, failure.get() is checked but batchesReceived.get() and
elapsedMillis assertions still execute. Add an early return or guard after the
timeout assertion to prevent misleading assertion failures when the stream doesn't
complete.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/BackpressureProducerIT.java [188-202]

-assertTrue("Stream should finish within 90s", latch.await(90, TimeUnit.SECONDS));
+if (!latch.await(90, TimeUnit.SECONDS)) {
+    fail("Stream should finish within 90s");
+}
 assertNull("Producer must not surface any failure under back-pressure: " + failure.get(), failure.get());
 
 long elapsedMillis = (System.nanoTime() - startNanos) / 1_000_000;
 assertEquals("All batches must arrive successfully under back-pressure", BATCH_COUNT, batchesReceived.get());
 
-// Wall-clock must reflect consumer pacing — without back-pressure the producer
-// would race ahead and finish near-instantly relative to the consumer's sleep
-// budget. 0.4x absorbs startup jitter and CI variance without false negatives.
 long minExpectedMillis = (long) ((BATCH_COUNT * CONSUMER_SLEEP_MS) * 0.4);
 assertTrue(
     "Wall-clock " + elapsedMillis + "ms must reflect consumer pacing (>=" + minExpectedMillis + "ms)",
     elapsedMillis >= minExpectedMillis
 );
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that subsequent assertions execute even after a timeout, which could produce misleading failure messages. Using fail() instead of assertTrue() with early return improves test clarity and debugging.

Low
Remove unreachable default case

The default case in the switch statement is unreachable because WaitResult is an
enum with only three values (READY, CANCELLED, TIMEOUT). The warning log and
exception throw will never execute. Remove the default case to eliminate dead code
and clarify that all enum values are explicitly handled.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java [109-132]

 public void awaitReadyOrThrow() {
     if (cancelled || serverStreamListener.isCancelled()) {
         throw StreamException.cancelled("stream cancelled before back-pressure wait");
     }
     BackpressureStrategy.WaitResult result = bp.waitForListener(readyTimeoutMillis);
     switch (result) {
         case READY:
-            // Re-check cancelled state: Arrow's CallbackBackpressureStrategy returns
-            // READY when both isReady() and isCancelled() are true at wakeup. Consult
-            // the listener directly (the channel's own cancelled flag is set by the
-            // cancel callback, which races with the wakeup itself).
             if (cancelled || serverStreamListener.isCancelled()) {
                 throw StreamException.cancelled("stream cancelled concurrently with ready");
             }
             return;
         case CANCELLED:
             throw StreamException.cancelled("stream cancelled while waiting for consumer");
         case TIMEOUT:
             throw new StreamException(StreamErrorCode.TIMED_OUT, "consumer not ready after " + readyTimeoutMillis + "ms");
-        default:
-            logger.warn("unexpected back-pressure wait result: {}", result);
-            throw new StreamException(StreamErrorCode.INTERNAL, "unexpected back-pressure wait result: " + result);
     }
 }
Suggestion importance[1-10]: 4

__

Why: The default case is technically unreachable since WaitResult is an enum with only three values. However, removing it reduces defensive programming and may make the code less resilient to future enum changes. The impact is minor.

Low
Suggestions up to commit 10b8291
CategorySuggestion                                                                                                                                    Impact
Possible issue
Close BatchTask on gate failure

If awaitReadyOrThrow() throws a StreamException, the BatchTask is never closed,
causing a resource leak. The batch holds Arrow buffers that must be released. Wrap
the gate call in a try-catch that closes the task on failure before re-throwing.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [134-136]

 // Block the producer thread before queuing the batch so a slow consumer
 // throttles allocation rather than letting the eventloop's queue grow.
-flightChannel.awaitReadyOrThrow();
+try {
+    flightChannel.awaitReadyOrThrow();
+} catch (Exception e) {
+    task.close();
+    throw e;
+}
 
 flightChannel.getExecutor().execute(threadPool.getThreadContext().preserveContext(() -> {
     try (BatchTask ignored = task) {
         processBatchTask(task);
     } catch (Exception e) {
Suggestion importance[1-10]: 8

__

Why: Valid resource leak concern. If awaitReadyOrThrow() throws, the BatchTask (which holds Arrow buffers) is never closed, causing a memory leak. The suggestion correctly wraps the gate call in try-catch to ensure cleanup.

Medium
Use AtomicBoolean for thread-safe cancellation

The check-then-act pattern if (!cancelled) { cancelled = true; ... } is not atomic
even with volatile. Multiple threads invoking onChannelCancelled concurrently can
both pass the check before either sets the flag, causing recordCallEnd and close to
run multiple times. Use AtomicBoolean.compareAndSet to ensure idempotency.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java [135-141]

+private final AtomicBoolean cancelled = new AtomicBoolean(false);
+
 private void onChannelCancelled() {
-    if (!cancelled) {
-        cancelled = true;
+    if (cancelled.compareAndSet(false, true)) {
         callTracker.recordCallEnd(StreamErrorCode.CANCELLED.name());
         close();
     }
 }
Suggestion importance[1-10]: 7

__

Why: The check-then-act pattern with volatile is not atomic. While onChannelCancelled is invoked from gRPC's callback (likely single-threaded per stream), using AtomicBoolean.compareAndSet provides stronger guarantees and makes the idempotency explicit, preventing potential double-execution of cleanup logic.

Medium
General
Fail fast on latch timeout

If the latch times out, failure.get() and batchesReceived.get() are checked
regardless, potentially masking the root cause. Add an early return or fail-fast
after the latch timeout to surface the timeout as the primary failure rather than
downstream assertion noise.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/BackpressureProducerIT.java [188-192]

-assertTrue("Stream should finish within 90s", latch.await(90, TimeUnit.SECONDS));
+if (!latch.await(90, TimeUnit.SECONDS)) {
+    fail("Stream did not finish within 90s. Batches received: " + batchesReceived.get() + ", failure: " + failure.get());
+}
 assertNull("Producer must not surface any failure under back-pressure: " + failure.get(), failure.get());
 
 long elapsedMillis = (System.nanoTime() - startNanos) / 1_000_000;
 assertEquals("All batches must arrive successfully under back-pressure", BATCH_COUNT, batchesReceived.get());
Suggestion importance[1-10]: 6

__

Why: Improves test diagnostics by failing immediately on timeout rather than proceeding to assertions that may produce misleading error messages. The current assertTrue already fails on timeout, but the improved version provides clearer context.

Low
Suggestions up to commit dc7d386
CategorySuggestion                                                                                                                                    Impact
Possible issue
Add volatile to cancelled flag

The cancelled flag is accessed without synchronization from multiple threads
(producer thread in awaitReadyOrThrow and gRPC callback thread in
onChannelCancelled). This creates a race condition where the producer thread may
observe a stale value. Mark cancelled as volatile to ensure visibility across
threads.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java [62]

-public void awaitReadyOrThrow() {
-    if (cancelled) {
-        throw StreamException.cancelled("stream cancelled before back-pressure wait");
-    }
-    BackpressureStrategy.WaitResult result = bp.waitForListener(readyTimeoutMillis);
-    switch (result) {
-        case READY:
-            // Re-check cancelled: if onReady and onCancel fired concurrently and the
-            // onReady notify won the wakeup, the strategy returns READY even though
-            // the channel is in cancelled state. Treat that as cancelled to avoid
-            // shipping a batch on a stream that is tearing down.
-            if (cancelled) {
-                throw StreamException.cancelled("stream cancelled concurrently with ready");
-            }
-            return;
-        case CANCELLED:
-            throw StreamException.cancelled("stream cancelled while waiting for consumer");
-        case TIMEOUT:
-            throw new StreamException(StreamErrorCode.TIMED_OUT, "consumer not ready after " + readyTimeoutMillis + "ms");
-        default:
-            logger.warn("unexpected back-pressure wait result: {}", result);
-            throw new StreamException(StreamErrorCode.INTERNAL, "unexpected back-pressure wait result: " + result);
-    }
-}
+private volatile boolean cancelled = false;
Suggestion importance[1-10]: 10

__

Why: The cancelled flag is accessed from multiple threads without synchronization, creating a critical race condition. Making it volatile ensures visibility across threads and prevents the producer from observing stale values.

High
Prevent buffer leak on gate failure

If awaitReadyOrThrow throws a StreamException, the BatchTask is never closed,
leaking the Arrow buffers it holds. Wrap the gate call in a try-catch that closes
the task on failure before re-throwing, or use try-with-resources to ensure cleanup.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [134-141]

-// Block the producer thread before queuing the batch so a slow consumer
-// throttles allocation rather than letting the eventloop's queue grow.
-flightChannel.awaitReadyOrThrow();
+try (BatchTask ignored = task) {
+    flightChannel.awaitReadyOrThrow();
+    flightChannel.getExecutor().execute(threadPool.getThreadContext().preserveContext(() -> {
+        try {
+            processBatchTask(task);
+        } catch (Exception e) {
+            // existing error handling
+        }
+    }));
+} catch (StreamException e) {
+    messageListener.onResponseSent(requestId, action, e);
+    throw e;
+}
 
-flightChannel.getExecutor().execute(threadPool.getThreadContext().preserveContext(() -> {
-    try (BatchTask ignored = task) {
-        processBatchTask(task);
-    } catch (Exception e) {
-
Suggestion importance[1-10]: 10

__

Why: If awaitReadyOrThrow throws, the BatchTask is never closed, leaking Arrow buffers. This is a critical resource leak that must be fixed by ensuring the task is closed on all paths.

High
General
Stop batch loop after send failure

If channel.sendResponseBatch throws (e.g. back-pressure timeout), the
VectorSchemaRoot is closed in the finally block but the loop continues allocating
new batches. The stream is already failed; subsequent allocations waste resources
and may trigger spurious errors. Break the loop immediately after catching the
exception.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/BackpressureProducerIT.java [294-326]

-private void handleStreamRequest(BackpressureTestRequest request, TransportChannel channel, Task task) throws IOException {
-    try {
-        for (int b = 0; b < request.batchCount; b++) {
-            if (request.perBatchSleepMillis > 0) {
-                try {
-                    Thread.sleep(request.perBatchSleepMillis);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new IOException("interrupted", e);
-                }
-            }
-            VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator);
-            boolean transferred = false;
-            try {
-                IntVector v = (IntVector) root.getVector("value");
-                v.allocateNew(request.rowsPerBatch);
-                for (int i = 0; i < request.rowsPerBatch; i++) {
-                    v.setSafe(i, i);
-                }
-                root.setRowCount(request.rowsPerBatch);
-                channel.sendResponseBatch(new BackpressureTestResponse(root));
-                transferred = true;
-            } finally {
-                if (!transferred) {
-                    root.close();
-                }
+try {
+    for (int b = 0; b < request.batchCount; b++) {
+        ...
+        try {
+            ...
+            channel.sendResponseBatch(new BackpressureTestResponse(root));
+            transferred = true;
+        } finally {
+            if (!transferred) {
+                root.close();
             }
         }
-        channel.completeStream();
-    } catch (Exception e) {
-        channel.sendResponse(e);
     }
+    channel.completeStream();
+} catch (Exception e) {
+    channel.sendResponse(e);
+    return; // stop processing after failure
 }
Suggestion importance[1-10]: 7

__

Why: After sendResponseBatch throws, the loop continues allocating batches unnecessarily. Adding an early return after the catch block prevents wasted allocations and potential spurious errors, though the test would still function without it.

Medium
Suggestions up to commit 911cd6c
CategorySuggestion                                                                                                                                    Impact
General
Handle sendResponse failure after batch error

If sendResponseBatch throws (e.g., back-pressure timeout), the VectorSchemaRoot is
closed in the finally block but the exception is caught and sent via
channel.sendResponse(e). This may attempt to send an error on an already-failed
stream. Consider rethrowing or checking channel state before sending the error
response.

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/BackpressureProducerIT.java [294-326]

 private void handleStreamRequest(BackpressureTestRequest request, TransportChannel channel, Task task) throws IOException {
     try {
         for (int b = 0; b < request.batchCount; b++) {
-            if (request.perBatchSleepMillis > 0) {
-                try {
-                    Thread.sleep(request.perBatchSleepMillis);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new IOException("interrupted", e);
-                }
-            }
+            ...
             VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator);
             boolean transferred = false;
             try {
                 IntVector v = (IntVector) root.getVector("value");
                 v.allocateNew(request.rowsPerBatch);
                 for (int i = 0; i < request.rowsPerBatch; i++) {
                     v.setSafe(i, i);
                 }
                 root.setRowCount(request.rowsPerBatch);
                 channel.sendResponseBatch(new BackpressureTestResponse(root));
                 transferred = true;
             } finally {
                 if (!transferred) {
                     root.close();
                 }
             }
         }
         channel.completeStream();
     } catch (Exception e) {
-        channel.sendResponse(e);
+        try {
+            channel.sendResponse(e);
+        } catch (Exception sendError) {
+            // Log but don't mask original exception
+        }
+        throw new IOException("stream request failed", e);
     }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion identifies a potential issue where sendResponse(e) might fail on an already-failed stream. However, this is test code (IT test) and the improved code doesn't match the existing code structure (missing the sleep logic). The concern is valid but the implementation needs refinement.

Low

@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch from a89c363 to d00fae0 Compare May 30, 2026 00:47
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d00fae0

@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch from d00fae0 to d904a1c Compare May 30, 2026 00:52
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d904a1c

@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch from d904a1c to 0429790 Compare May 30, 2026 00:54
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0429790

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit db6dee1

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for db6dee1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e07aa8d

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3582674

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit c907d8a.

PathLineSeverityDescription
plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/BackpressureProducerIT.java272lowTest allocator is created with Long.MAX_VALUE as the max allocation size. While bounded in practice by the parent pool cap (FLIGHT_POOL_CAP_BYTES = 64MB), using Long.MAX_VALUE as an explicit bound in test infrastructure is an unusual pattern that could mask misconfiguration. Plausible and common in Arrow child allocators, but worth noting.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c907d8a

@rishabhmaurya rishabhmaurya marked this pull request as ready for review May 31, 2026 04:15
@rishabhmaurya rishabhmaurya requested a review from a team as a code owner May 31, 2026 04:15
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 667c653

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 911cd6c

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit dc7d386

@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch from dc7d386 to 10b8291 Compare May 31, 2026 04:45
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 10b8291

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 10b8291: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch from 10b8291 to 9834a0f Compare May 31, 2026 05:21
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 9834a0f

@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch 3 times, most recently from 7f07f0b to 9d66030 Compare May 31, 2026 05:26
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7f07f0b

@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch from 9d66030 to 57cc7f1 Compare May 31, 2026 05:27
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 9d66030

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 57cc7f1

@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch from 57cc7f1 to 3757039 Compare May 31, 2026 05:29
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3757039

`FlightServerChannel.sendResponseBatch` now honours gRPC's `isReady()`
contract: the producer thread parks on `BackpressureStrategy.waitForListener`
before each batch is queued and resumes only after gRPC reports the
per-stream outbound buffer has drained below `setOnReadyThreshold`. Slow
consumers throttle producer wall-clock instead of OOMing the flight pool.

Context: 3.1 used `BaseFlightProducer` with a synchronous waitForListener
gate before every putNext. The move to the async/queue model
(flight-eventloop-N + BatchTask) matched the existing transport's
fire-and-forget shape and resolved concurrent-segment-search contention,
but in doing so dropped the isReady() gate. Without it the producer
ignored gRPC's flow-control signal and let buffers accumulate until the
allocator threw OutOfMemoryException. This change restores the gate
without giving up the async shape — awaitReadyOrThrow runs on the
producer thread before the BatchTask is submitted to the eventloop.

Changes:
- FlightServerChannel registers a CompositeBackpressureStrategy and
  exposes awaitReadyOrThrow(). The strategy's cancel callback runs the
  channel's onChannelCancelled cleanup before notifying parked threads.
- FlightOutboundHandler.sendResponseBatch calls awaitReadyOrThrow()
  before getExecutor().execute(...).
- New settings arrow.flight.channel.outbound_buffer_threshold (64 MiB
  default) wired to OSFlightServer.Builder.backpressureThreshold, and
  arrow.flight.channel.ready_timeout (60s default).
- awaitReadyOrThrow re-checks cancelled after waitForListener returns
  READY to close the cancel/ready notify race.

Operational caveat (documented in javadoc and docs/backpressure.md): the
producer thread parks under slow consumers, so N concurrent slow streams
hold N action-pool threads simultaneously and can starve a bounded pool.

Tests:
- Unit tests on CompositeBackpressureStrategy (100% coverage),
  FlightServerChannel.awaitReadyOrThrow (fast path, parks-until-ready,
  timeout, cancel-while-waiting, cancel-wakes-all-parked-producers,
  cancel/ready race, already-cancelled, post-cancel guards),
  FlightOutboundHandler gating path, ServerConfig setting bounds.
- BackpressureProducerIT exercises a slow consumer end-to-end; asserts
  stream completes cleanly and producer wall-clock reflects consumer
  pacing. Stable across multiple reruns.

Coverage on touched code: CompositeBackpressureStrategy 100%,
FlightServerChannel 82%, ArrowFlightProducer 77%, FlightOutboundHandler
70% (uncovered lines are pre-existing defensive paths).

Known limitation: the per-channel eventloop's queue is unbounded. A
producer that allocates batches significantly faster than gRPC drains
can pile up retained batches before isReady() flips false. A byte-aware
bounded queue would tighten the bound; out of scope here.

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
@rishabhmaurya rishabhmaurya force-pushed the flight-rpc-slow-consumer-it branch from 3757039 to cadf774 Compare May 31, 2026 05:34
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit cadf774

mch2 added a commit to mch2/OpenSearch that referenced this pull request May 31, 2026
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for cadf774: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 31, 2026

Codecov Report

❌ Patch coverage is 79.31034% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.51%. Comparing base (dad63c0) to head (cadf774).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
...ch/arrow/flight/transport/FlightServerChannel.java 68.42% 2 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##               main   #21899   +/-   ##
=========================================
  Coverage     73.51%   73.51%           
+ Complexity    75582    75580    -2     
=========================================
  Files          6034     6034           
  Lines        342661   342683   +22     
  Branches      49294    49296    +2     
=========================================
+ Hits         251918   251935   +17     
- Misses        70712    70716    +4     
- Partials      20031    20032    +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

if (cancelled || serverStreamListener.isCancelled()) {
throw StreamException.cancelled("stream cancelled before back-pressure wait");
}
BackpressureStrategy.WaitResult result = bp.waitForListener(readyTimeoutMillis);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in testing this diff, i'm hitting cause=java.lang.RuntimeException: Invalid state when waiting for listener. that causes one of the producers to fail immediately, and the failure isn't getting pushed to the consumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants