[Arrow Flight RPC] Producer-side back-pressure under slow consumers#21899
[Arrow Flight RPC] Producer-side back-pressure under slow consumers#21899rishabhmaurya wants to merge 1 commit into
Conversation
PR Reviewer Guide 🔍(Review updated until commit cadf774)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to cadf774 Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit 3757039
Suggestions up to commit 9834a0f
Suggestions up to commit 10b8291
Suggestions up to commit dc7d386
Suggestions up to commit 911cd6c
|
a89c363 to
d00fae0
Compare
|
Persistent review updated to latest commit d00fae0 |
d00fae0 to
d904a1c
Compare
|
Persistent review updated to latest commit d904a1c |
d904a1c to
0429790
Compare
|
Persistent review updated to latest commit 0429790 |
|
Persistent review updated to latest commit db6dee1 |
|
❌ Gradle check result for db6dee1: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Persistent review updated to latest commit e07aa8d |
|
Persistent review updated to latest commit 3582674 |
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit c907d8a.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
|
Persistent review updated to latest commit c907d8a |
|
Persistent review updated to latest commit 667c653 |
|
Persistent review updated to latest commit 911cd6c |
|
Persistent review updated to latest commit dc7d386 |
dc7d386 to
10b8291
Compare
|
Persistent review updated to latest commit 10b8291 |
|
❌ Gradle check result for 10b8291: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
10b8291 to
9834a0f
Compare
|
Persistent review updated to latest commit 9834a0f |
7f07f0b to
9d66030
Compare
|
Persistent review updated to latest commit 7f07f0b |
9d66030 to
57cc7f1
Compare
|
Persistent review updated to latest commit 9d66030 |
|
Persistent review updated to latest commit 57cc7f1 |
57cc7f1 to
3757039
Compare
|
Persistent review updated to latest commit 3757039 |
`FlightServerChannel.sendResponseBatch` now honours gRPC's `isReady()` contract: the producer thread parks on `BackpressureStrategy.waitForListener` before each batch is queued and resumes only after gRPC reports the per-stream outbound buffer has drained below `setOnReadyThreshold`. Slow consumers throttle producer wall-clock instead of OOMing the flight pool. Context: 3.1 used `BaseFlightProducer` with a synchronous waitForListener gate before every putNext. The move to the async/queue model (flight-eventloop-N + BatchTask) matched the existing transport's fire-and-forget shape and resolved concurrent-segment-search contention, but in doing so dropped the isReady() gate. Without it the producer ignored gRPC's flow-control signal and let buffers accumulate until the allocator threw OutOfMemoryException. This change restores the gate without giving up the async shape — awaitReadyOrThrow runs on the producer thread before the BatchTask is submitted to the eventloop. Changes: - FlightServerChannel registers a CompositeBackpressureStrategy and exposes awaitReadyOrThrow(). The strategy's cancel callback runs the channel's onChannelCancelled cleanup before notifying parked threads. - FlightOutboundHandler.sendResponseBatch calls awaitReadyOrThrow() before getExecutor().execute(...). - New settings arrow.flight.channel.outbound_buffer_threshold (64 MiB default) wired to OSFlightServer.Builder.backpressureThreshold, and arrow.flight.channel.ready_timeout (60s default). - awaitReadyOrThrow re-checks cancelled after waitForListener returns READY to close the cancel/ready notify race. Operational caveat (documented in javadoc and docs/backpressure.md): the producer thread parks under slow consumers, so N concurrent slow streams hold N action-pool threads simultaneously and can starve a bounded pool. Tests: - Unit tests on CompositeBackpressureStrategy (100% coverage), FlightServerChannel.awaitReadyOrThrow (fast path, parks-until-ready, timeout, cancel-while-waiting, cancel-wakes-all-parked-producers, cancel/ready race, already-cancelled, post-cancel guards), FlightOutboundHandler gating path, ServerConfig setting bounds. - BackpressureProducerIT exercises a slow consumer end-to-end; asserts stream completes cleanly and producer wall-clock reflects consumer pacing. Stable across multiple reruns. Coverage on touched code: CompositeBackpressureStrategy 100%, FlightServerChannel 82%, ArrowFlightProducer 77%, FlightOutboundHandler 70% (uncovered lines are pre-existing defensive paths). Known limitation: the per-channel eventloop's queue is unbounded. A producer that allocates batches significantly faster than gRPC drains can pile up retained batches before isReady() flips false. A byte-aware bounded queue would tighten the bound; out of scope here. Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
3757039 to
cadf774
Compare
|
Persistent review updated to latest commit cadf774 |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #21899 +/- ##
=========================================
Coverage 73.51% 73.51%
+ Complexity 75582 75580 -2
=========================================
Files 6034 6034
Lines 342661 342683 +22
Branches 49294 49296 +2
=========================================
+ Hits 251918 251935 +17
- Misses 70712 70716 +4
- Partials 20031 20032 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| if (cancelled || serverStreamListener.isCancelled()) { | ||
| throw StreamException.cancelled("stream cancelled before back-pressure wait"); | ||
| } | ||
| BackpressureStrategy.WaitResult result = bp.waitForListener(readyTimeoutMillis); |
There was a problem hiding this comment.
in testing this diff, i'm hitting cause=java.lang.RuntimeException: Invalid state when waiting for listener. that causes one of the producers to fail immediately, and the failure isn't getting pushed to the consumer.
Summary
FlightServerChannel.sendResponseBatchnow honours gRPC'sisReady()contract: the producer thread parks onBackpressureStrategy.waitForListenerbefore each batch is queued, and resumes only after gRPC reports the per-stream outbound buffer has drained belowsetOnReadyThreshold. Slow consumers throttle producer wall-clock instead of OOMing the flight pool.Context
3.1 used
BaseFlightProducerwith a synchronousBackpressureStrategy.waitForListenergate before everyputNext. When we moved to the async/queue model (flight-eventloop-N+FlightOutboundHandler.BatchTask), the goal was to match the existing OpenSearch transport's fire-and-forget shape — producer enqueues quickly and returns, ordering is enforced by a single-threaded eventloop. That solved the concurrent-segment-search contention problem (multiple producer threads writing to one channel without serialising on a per-channel mutex), but in doing so we dropped theisReady()gate. Without it, the unguarded path ignored gRPC's flow-control signal and let buffers accumulate until the flight pool allocator threwOutOfMemoryException.This PR restores the gate without giving up the async fire-and-forget shape.
awaitReadyOrThrowruns on the producer thread before theBatchTaskis submitted to the eventloop. The eventloop and its single-threaded ordering guarantees are unchanged.What changed
FlightServerChannelregisters aCompositeBackpressureStrategyand exposesawaitReadyOrThrow(). The strategy's cancel callback runs the channel's existingonChannelCancelledcleanup before notifying parked threads, so a thread waking fromwaitForListeneralways observes the cancelled state.FlightOutboundHandler.sendResponseBatchcallsawaitReadyOrThrow()beforegetExecutor().execute(...).arrow.flight.channel.outbound_buffer_threshold(default 64 MiB) andarrow.flight.channel.ready_timeout(default 60s, 100ms minimum). The threshold is wired through toOSFlightServer.Builder.backpressureThreshold.Settings
arrow.flight.channel.ready_timeout60sStreamErrorCode.TIMED_OUT.arrow.flight.channel.outbound_buffer_threshold64mbnative.allocator.pool.flight.max.Tuning and operational concerns
Sizing the flight pool
outbound_buffer_thresholdis per stream;native.allocator.pool.flight.maxis per node, shared across all streams.The threshold is a watermark, not a max-message-size — a single batch larger than the threshold ships in one shot, with the producer parking on the next
sendResponseBatch. See docs/backpressure.md for full sizing guidance.Thread-pool exhaustion under slow consumers
The producer thread parks under a slow consumer (up to
ready_timeout). The action handler runs on whichever thread pool it was registered against; N concurrent slow streams hold N threads parked simultaneously, which can starve the pool.Mitigations:
ready_timeoutto fail unresponsive streams within an acceptable window.Tests
CompositeBackpressureStrategy(100% coverage),FlightServerChannel.awaitReadyOrThrow(fast path, parks-until-ready, timeout, cancel-during-wait, already-cancelled, handler registration),FlightOutboundHandlergating path, andServerConfigsettings parsing / bounds.BackpressureProducerITexercises a slow consumer; asserts the stream completes cleanly, all batches arrive, and producer wall-clock reflects consumer pacing (proves the producer was actually throttled).assertBusyonThread.State.TIMED_WAITINGrather than fixed sleeps to avoid flakiness. Stable across multiple reruns.Coverage on touched code:
CompositeBackpressureStrategy100%,FlightServerChannel82%,ArrowFlightProducer77%,FlightOutboundHandler70%. Uncovered lines are pre-existing defensive paths.Test plan
./gradlew :plugins:arrow-flight-rpc:test./gradlew :plugins:arrow-flight-rpc:check -x internalClusterTest./gradlew :plugins:arrow-flight-rpc:internalClusterTest --tests "*BackpressureProducerIT"(stable across reruns)./gradlew :plugins:arrow-flight-rpc:jacocoTestReport