From 26993ee9a866ea0d891f8ee89fb1c7196124c9e0 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sun, 7 Dec 2025 16:00:02 +0100 Subject: [PATCH 1/2] HTTP/2: add per-stream idle and lifetime timeouts to core multiplexer Expose configuration via H2Config, throw H2StreamTimeoutException on expiry and keep the connection alive Extend test coverage and add an example client demonstrating timed-out and successful streams --- .../core5/http2/H2StreamTimeoutException.java | 87 +++++ .../impl/nio/AbstractH2StreamMultiplexer.java | 78 ++++- .../hc/core5/http2/impl/nio/H2Stream.java | 51 +++ .../H2StreamTimeoutClientExample.java | 300 ++++++++++++++++++ .../nio/TestAbstractH2StreamMultiplexer.java | 95 ++++++ 5 files changed, 609 insertions(+), 2 deletions(-) create mode 100644 httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java create mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java new file mode 100644 index 000000000..2f74ceb45 --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java @@ -0,0 +1,87 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2; + +import java.net.SocketTimeoutException; + +import org.apache.hc.core5.util.Timeout; + +/** + * {@link java.net.SocketTimeoutException} raised by the HTTP/2 stream + * multiplexer when a per-stream timeout elapses. + *

+ * This exception is used for timeouts that are scoped to a single HTTP/2 + * stream rather than the underlying TCP connection, for example: + *

+ * + *

+ * The {@link #isIdleTimeout()} flag can be used to distinguish whether + * the timeout was triggered by idleness or by the overall stream lifetime. + * The affected stream id and the timeout value are exposed via + * {@link #getStreamId()} and {@link #getTimeout()} respectively. + *

+ * + * @since 5.4 + */ +public class H2StreamTimeoutException extends SocketTimeoutException { + + private static final long serialVersionUID = 1L; + + private final int streamId; + private final Timeout timeout; + private final boolean idleTimeout; + + public H2StreamTimeoutException(final String message, final int streamId, final Timeout timeout, final boolean idleTimeout) { + super(message); + this.streamId = streamId; + this.timeout = timeout; + this.idleTimeout = idleTimeout; + } + + public int getStreamId() { + return streamId; + } + + public Timeout getTimeout() { + return timeout; + } + + /** + * Indicates whether this timeout was triggered by idle time (no activity) + * rather than by stream lifetime. + * + * @return {@code true} if this is an idle timeout, {@code false} if it is a lifetime timeout. + */ + public boolean isIdleTimeout() { + return idleTimeout; + } + +} diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 1eb939c6f..906ec1065 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -74,6 +74,7 @@ import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; import org.apache.hc.core5.http2.config.H2Setting; @@ -438,6 +439,10 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio for (;;) { final RawFrame frame = inputBuffer.read(src, ioSession); if (frame != null) { + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } + if (streamListener != null) { streamListener.onFrameInput(this, frame.getStreamId(), frame); } @@ -454,6 +459,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio break; } } + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } } } @@ -531,6 +539,11 @@ public final void onOutput() throws HttpException, IOException { } } } + + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } + if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) { int liveStreams = 0; for (final Iterator it = streams.iterator(); it.hasNext(); ) { @@ -642,6 +655,7 @@ private void executeRequest(final RequestExecutionCommand requestExecutionComman requestExecutionCommand.getExchangeHandler(), requestExecutionCommand.getPushHandlerFactory(), requestExecutionCommand.getContext())); + initializeStreamTimeouts(stream); if (streamListener != null) { final int initInputWindow = stream.getInputWindow().get(); @@ -760,10 +774,12 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final H2StreamChannel channel = createChannel(streamId); if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { stream = streams.createActive(channel, incomingRequest(channel)); + initializeStreamTimeouts(stream); streams.resetIfExceedsMaxConcurrentLimit(stream, localConfig.getMaxConcurrentStreams()); } else { channel.localReset(H2Error.REFUSED_STREAM); stream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); + initializeStreamTimeouts(stream); } } else if (stream.isLocalClosed() && stream.isRemoteClosed()) { throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed"); @@ -954,6 +970,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio channel.localReset(H2Error.REFUSED_STREAM); promisedStream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); } + initializeStreamTimeouts(promisedStream); try { consumePushPromiseFrame(frame, payload, promisedStream); } catch (final H2StreamResetException ex) { @@ -1359,8 +1376,17 @@ H2StreamChannel createChannel(final int streamId) { return new H2StreamChannelImpl(streamId, initInputWinSize, initOutputWinSize); } - H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) throws H2ConnectionException { - return streams.createActive(channel, streamHandler); + private void initializeStreamTimeouts(final H2Stream stream) { + final Timeout socketTimeout = ioSession.getSocketTimeout(); + if (socketTimeout != null && socketTimeout.isEnabled()) { + stream.setIdleTimeout(socketTimeout); + } + } + + H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) { + final H2Stream stream = streams.createActive(channel, streamHandler); + initializeStreamTimeouts(stream); + return stream; } private void recordPriorityFromHeaders(final int streamId, final List headers) { @@ -1463,6 +1489,7 @@ public void push(final List
headers, final AsyncPushProducer pushProduce final int promisedStreamId = streams.generateStreamId(); final H2StreamChannel channel = createChannel(promisedStreamId); final H2Stream stream = streams.createReserved(channel, outgoingPushPromise(channel, pushProducer)); + initializeStreamTimeouts(stream); commitPushPromise(id, promisedStreamId, headers); stream.markRemoteClosed(); @@ -1579,4 +1606,51 @@ public String toString() { } + private void checkStreamTimeouts(final long nowNanos) throws IOException { + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + if (!stream.isActive()) { + continue; + } + + final Timeout idleTimeout = stream.getIdleTimeout(); + final Timeout lifetimeTimeout = stream.getLifetimeTimeout(); + if ((idleTimeout == null || !idleTimeout.isEnabled()) + && (lifetimeTimeout == null || !lifetimeTimeout.isEnabled())) { + continue; + } + + final long created = stream.getCreatedNanos(); + final long last = stream.getLastActivityNanos(); + + if (idleTimeout != null && idleTimeout.isEnabled()) { + final long idleNanos = idleTimeout.toNanoseconds(); + if (idleNanos > 0 && nowNanos - last > idleNanos) { + final int streamId = stream.getId(); + final H2StreamTimeoutException ex = new H2StreamTimeoutException( + "HTTP/2 stream idle timeout (" + idleTimeout + ")", + streamId, + idleTimeout, + true); + stream.localReset(ex, H2Error.CANCEL); + // Once reset due to idle timeout, we do not care about lifetime anymore + continue; + } + } + + if (lifetimeTimeout != null && lifetimeTimeout.isEnabled()) { + final long lifeNanos = lifetimeTimeout.toNanoseconds(); + if (lifeNanos > 0 && nowNanos - created > lifeNanos) { + final int streamId = stream.getId(); + final H2StreamTimeoutException ex = new H2StreamTimeoutException( + "HTTP/2 stream lifetime timeout (" + lifetimeTimeout + ")", + streamId, + lifetimeTimeout, + false); + stream.localReset(ex, H2Error.CANCEL); + } + } + } + } + } \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index 8d90bb66c..c770bef85 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -61,6 +61,12 @@ class H2Stream implements StreamControl { private volatile boolean reserved; private volatile boolean remoteClosed; + private volatile long createdNanos; + private volatile long lastActivityNanos; + + private volatile Timeout idleTimeout; + private volatile Timeout lifetimeTimeout; + H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer stateChangeCallback) { this.channel = channel; this.handler = handler; @@ -69,6 +75,8 @@ class H2Stream implements StreamControl { this.transitionRef = new AtomicReference<>(State.RESERVED); this.released = new AtomicBoolean(); this.cancelled = new AtomicBoolean(); + this.createdNanos = 0L; + this.lastActivityNanos = 0L; } @Override @@ -104,6 +112,7 @@ private void triggerClosed() { void activate() { reserved = false; + markCreatedAndActive(); triggerOpen(); } @@ -146,6 +155,8 @@ boolean isLocalClosed() { void consumePromise(final List
headers) throws HttpException, IOException { try { + touch(); + if (channel.isLocalReset()) { return; } @@ -165,6 +176,8 @@ void consumeHeader(final List
headers, final boolean endOfStream) throws if (endOfStream) { remoteClosed = true; } + touch(); + if (channel.isLocalReset()) { return; } @@ -183,6 +196,8 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc if (endOfStream) { remoteClosed = true; } + touch(); + if (channel.isLocalReset()) { return; } @@ -204,6 +219,8 @@ boolean isOutputReady() { void produceOutput() throws HttpException, IOException { try { + touch(); + handler.produceOutput(); } catch (final ProtocolException ex) { localReset(ex, H2Error.PROTOCOL_ERROR); @@ -211,6 +228,7 @@ void produceOutput() throws HttpException, IOException { } void produceInputCapacityUpdate() throws IOException { + touch(); handler.updateInputCapacity(); } @@ -308,4 +326,37 @@ public String toString() { return buf.toString(); } + private void markCreatedAndActive() { + final long now = System.nanoTime(); + this.createdNanos = now; + this.lastActivityNanos = now; + } + + private void touch() { + this.lastActivityNanos = System.nanoTime(); + } + + long getCreatedNanos() { + return createdNanos; + } + + long getLastActivityNanos() { + return lastActivityNanos; + } + + Timeout getIdleTimeout() { + return idleTimeout; + } + + void setIdleTimeout(final Timeout idleTimeout) { + this.idleTimeout = idleTimeout; + } + + Timeout getLifetimeTimeout() { + return lifetimeTimeout; + } + + void setLifetimeTimeout(final Timeout lifetimeTimeout) { + this.lifetimeTimeout = lifetimeTimeout; + } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java new file mode 100644 index 000000000..542e43734 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java @@ -0,0 +1,300 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +/** + * Example of an HTTP/2 client where a "slow" request gets aborted when the + * underlying HTTP/2 connection times out due to inactivity (socket timeout). + *

+ * The client opens a single HTTP/2 connection to {@code nghttp2.org} and + * executes two concurrent requests: + *

    + *
  • a "fast" request ({@code /httpbin/ip}), which completes before + * the connection idle timeout, and
  • + *
  • a "slow" request ({@code /httpbin/delay/5}), which keeps the + * connection idle long enough for the I/O reactor to trigger a timeout + * and close the HTTP/2 connection.
  • + *
+ *

+ * When the reactor closes the connection due to inactivity, all active + * streams fail with {@link H2StreamResetException} reporting + * {@code "Timeout due to inactivity (...)"}. The already completed stream + * is not affected. + * + * @since 5.4 + */ +public class H2StreamTimeoutClientExample { + + public static void main(final String[] args) throws Exception { + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + // Connection-level inactivity timeout: keep it short so that + // /httpbin/delay/5 reliably triggers it. + .setSoTimeout(2, TimeUnit.SECONDS) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(100) + .build(); + + final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(h2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new H2StreamListener() { + + @Override + public void onHeaderInput( + final HttpConnection connection, + final int streamId, + final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + + " (" + streamId + ") << " + headers.get(i)); + } + } + + @Override + public void onHeaderOutput( + final HttpConnection connection, + final int streamId, + final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + + " (" + streamId + ") >> " + headers.get(i)); + } + } + + @Override + public void onFrameInput( + final HttpConnection connection, + final int streamId, + final RawFrame frame) { + // No-op in this example. + } + + @Override + public void onFrameOutput( + final HttpConnection connection, + final int streamId, + final RawFrame frame) { + // No-op in this example. + } + + @Override + public void onInputFlowControl( + final HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // No-op in this example. + } + + @Override + public void onOutputFlowControl( + final HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // No-op in this example. + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final URI fastUri = new URI("https://nghttp2.org/httpbin/ip"); + final URI slowUri = new URI("https://nghttp2.org/httpbin/delay/5"); + + final CountDownLatch latch = new CountDownLatch(2); + + // --- Fast stream: expected to succeed + executeWithLogging( + requester, + fastUri, + "[fast]", + latch, + false); + + // --- Slow stream: /delay/5 sleeps 5 seconds and should exceed + // the 2-second connection idle timeout, resulting in a reset. + executeWithLogging( + requester, + slowUri, + "[slow]", + latch, + true); + + latch.await(); + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + } + + private static void executeWithLogging( + final HttpAsyncRequester requester, + final URI requestUri, + final String label, + final CountDownLatch latch, + final boolean expectTimeout) { + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri) + .build(); + final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>( + new StringAsyncEntityConsumer()); + + requester.execute(new AsyncClientExchangeHandler() { + + @Override + public void releaseResources() { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + latch.countDown(); + } + + @Override + public void cancel() { + System.out.println(label + " " + requestUri + " cancelled"); + } + + @Override + public void failed(final Exception cause) { + if (expectTimeout && cause instanceof H2StreamResetException) { + final H2StreamResetException ex = (H2StreamResetException) cause; + System.out.println(label + " expected timeout reset: " + + requestUri + + " -> " + ex); + } else { + System.out.println(label + " failure: " + + requestUri + " -> " + cause); + } + } + + @Override + public void produceRequest( + final RequestChannel channel, + final HttpContext httpContext) throws HttpException, IOException { + System.out.println(label + " sending request: " + requestUri); + requestProducer.sendRequest(channel, httpContext); + } + + @Override + public int available() { + return requestProducer.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + requestProducer.produce(channel); + } + + @Override + public void consumeInformation( + final HttpResponse response, + final HttpContext httpContext) throws HttpException, IOException { + System.out.println(label + " " + requestUri + " -> informational " + + response.getCode()); + } + + @Override + public void consumeResponse( + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext httpContext) throws HttpException, IOException { + if (expectTimeout) { + System.out.println(label + " UNEXPECTED success: " + + requestUri + " -> " + response.getCode()); + } else { + System.out.println(label + " response: " + + requestUri + " -> " + response.getCode()); + } + responseConsumer.consumeResponse(response, entityDetails, httpContext, null); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + responseConsumer.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + responseConsumer.consume(src); + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + responseConsumer.streamEnd(trailers); + if (!expectTimeout) { + System.out.println(label + " body completed for " + requestUri); + } + } + + }, Timeout.ofSeconds(10), HttpCoreContext.create()); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 0089a12e5..0a0ea1959 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.stream.IntStream; @@ -52,6 +53,7 @@ import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.WritableByteChannelMock; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; @@ -65,6 +67,7 @@ import org.apache.hc.core5.http2.hpack.HPackEncoder; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.util.ByteArrayBuffer; +import org.apache.hc.core5.util.Timeout; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -1034,5 +1037,97 @@ void testPriorityUpdateContinuesAfterSettingsWithNoH2Equals1() throws Exception Assertions.assertTrue(idxPriUpd >= 0, "PRIORITY_UPDATE should be emitted when NO_RFC7540=1"); } + @Test + void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { + Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.of(1, TimeUnit.NANOSECONDS)); + + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + // Create a local stream and mark it active (initializeStreamTimeouts() se ejecuta aquí) + final H2StreamChannel channel = streamMultiplexer.createChannel(1); + final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); + stream.activate(); + + streamMultiplexer.onOutput(); + + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + final Exception cause = exceptionCaptor.getValue(); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); + + final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; + Assertions.assertTrue(timeoutEx.isIdleTimeout(), "Expected idle timeout flag"); + Assertions.assertEquals(1, timeoutEx.getStreamId(), "Unexpected stream id"); + + Assertions.assertTrue(stream.isLocalClosed()); + Assertions.assertTrue(stream.isClosed()); + } + + @Test + void testStreamLifetimeTimeoutTriggersH2StreamTimeoutException() throws Exception { + Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.DISABLED); + + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final H2StreamChannel channel = streamMultiplexer.createChannel(3); + final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); + stream.activate(); + + stream.setIdleTimeout(null); + stream.setLifetimeTimeout(Timeout.of(1, TimeUnit.NANOSECONDS)); + + streamMultiplexer.onOutput(); + + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + final Exception cause = exceptionCaptor.getValue(); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); + + final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; + Assertions.assertFalse(timeoutEx.isIdleTimeout(), "Expected lifetime timeout flag"); + Assertions.assertEquals(3, timeoutEx.getStreamId(), "Unexpected stream id"); + + Assertions.assertTrue(stream.isLocalClosed()); + Assertions.assertTrue(stream.isClosed()); + } + + + } From 7b113257c62c8864f0b8c6a3ed2e7ed9c61b3d06 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Thu, 11 Dec 2025 12:48:02 +0100 Subject: [PATCH 2/2] add per-stream idle timeout and suppress CancelledKeyException on cancel ; Handle cancelled SelectionKey in interestOps access --- .../core5/http2/H2StreamTimeoutException.java | 4 +- .../impl/nio/AbstractH2StreamMultiplexer.java | 59 +--- .../hc/core5/http2/impl/nio/H2Stream.java | 27 +- .../H2StreamTimeoutClientExample.java | 300 ------------------ .../nio/TestAbstractH2StreamMultiplexer.java | 56 +--- .../async/AsyncH2SocketTimeoutCoreTest.java | 198 ++++++++++++ 6 files changed, 215 insertions(+), 429 deletions(-) delete mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java create mode 100644 httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java index 2f74ceb45..2f837b478 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java @@ -26,7 +26,7 @@ */ package org.apache.hc.core5.http2; -import java.net.SocketTimeoutException; +import java.io.InterruptedIOException; import org.apache.hc.core5.util.Timeout; @@ -51,7 +51,7 @@ * * @since 5.4 */ -public class H2StreamTimeoutException extends SocketTimeoutException { +public class H2StreamTimeoutException extends InterruptedIOException { private static final long serialVersionUID = 1L; diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 906ec1065..af1261ef1 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -439,10 +439,6 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio for (;;) { final RawFrame frame = inputBuffer.read(src, ioSession); if (frame != null) { - if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { - checkStreamTimeouts(System.nanoTime()); - } - if (streamListener != null) { streamListener.onFrameInput(this, frame.getStreamId(), frame); } @@ -655,7 +651,6 @@ private void executeRequest(final RequestExecutionCommand requestExecutionComman requestExecutionCommand.getExchangeHandler(), requestExecutionCommand.getPushHandlerFactory(), requestExecutionCommand.getContext())); - initializeStreamTimeouts(stream); if (streamListener != null) { final int initInputWindow = stream.getInputWindow().get(); @@ -774,12 +769,10 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final H2StreamChannel channel = createChannel(streamId); if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { stream = streams.createActive(channel, incomingRequest(channel)); - initializeStreamTimeouts(stream); streams.resetIfExceedsMaxConcurrentLimit(stream, localConfig.getMaxConcurrentStreams()); } else { channel.localReset(H2Error.REFUSED_STREAM); stream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); - initializeStreamTimeouts(stream); } } else if (stream.isLocalClosed() && stream.isRemoteClosed()) { throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed"); @@ -970,7 +963,6 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio channel.localReset(H2Error.REFUSED_STREAM); promisedStream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); } - initializeStreamTimeouts(promisedStream); try { consumePushPromiseFrame(frame, payload, promisedStream); } catch (final H2StreamResetException ex) { @@ -1376,16 +1368,8 @@ H2StreamChannel createChannel(final int streamId) { return new H2StreamChannelImpl(streamId, initInputWinSize, initOutputWinSize); } - private void initializeStreamTimeouts(final H2Stream stream) { - final Timeout socketTimeout = ioSession.getSocketTimeout(); - if (socketTimeout != null && socketTimeout.isEnabled()) { - stream.setIdleTimeout(socketTimeout); - } - } - H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) { final H2Stream stream = streams.createActive(channel, streamHandler); - initializeStreamTimeouts(stream); return stream; } @@ -1489,7 +1473,6 @@ public void push(final List

headers, final AsyncPushProducer pushProduce final int promisedStreamId = streams.generateStreamId(); final H2StreamChannel channel = createChannel(promisedStreamId); final H2Stream stream = streams.createReserved(channel, outgoingPushPromise(channel, pushProducer)); - initializeStreamTimeouts(stream); commitPushPromise(id, promisedStreamId, headers); stream.markRemoteClosed(); @@ -1614,43 +1597,21 @@ private void checkStreamTimeouts(final long nowNanos) throws IOException { } final Timeout idleTimeout = stream.getIdleTimeout(); - final Timeout lifetimeTimeout = stream.getLifetimeTimeout(); - if ((idleTimeout == null || !idleTimeout.isEnabled()) - && (lifetimeTimeout == null || !lifetimeTimeout.isEnabled())) { + if (idleTimeout == null || !idleTimeout.isEnabled()) { continue; } - final long created = stream.getCreatedNanos(); final long last = stream.getLastActivityNanos(); - - if (idleTimeout != null && idleTimeout.isEnabled()) { - final long idleNanos = idleTimeout.toNanoseconds(); - if (idleNanos > 0 && nowNanos - last > idleNanos) { - final int streamId = stream.getId(); - final H2StreamTimeoutException ex = new H2StreamTimeoutException( - "HTTP/2 stream idle timeout (" + idleTimeout + ")", - streamId, - idleTimeout, - true); - stream.localReset(ex, H2Error.CANCEL); - // Once reset due to idle timeout, we do not care about lifetime anymore - continue; - } - } - - if (lifetimeTimeout != null && lifetimeTimeout.isEnabled()) { - final long lifeNanos = lifetimeTimeout.toNanoseconds(); - if (lifeNanos > 0 && nowNanos - created > lifeNanos) { - final int streamId = stream.getId(); - final H2StreamTimeoutException ex = new H2StreamTimeoutException( - "HTTP/2 stream lifetime timeout (" + lifetimeTimeout + ")", - streamId, - lifetimeTimeout, - false); - stream.localReset(ex, H2Error.CANCEL); - } + final long idleNanos = idleTimeout.toNanoseconds(); + if (idleNanos > 0 && nowNanos - last > idleNanos) { + final int streamId = stream.getId(); + final H2StreamTimeoutException ex = new H2StreamTimeoutException( + "HTTP/2 stream idle timeout (" + idleTimeout + ")", + streamId, + idleTimeout, + true); + stream.localReset(ex, H2Error.CANCEL); } } } - } \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index c770bef85..7f3dadbb4 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -61,11 +61,9 @@ class H2Stream implements StreamControl { private volatile boolean reserved; private volatile boolean remoteClosed; - private volatile long createdNanos; private volatile long lastActivityNanos; private volatile Timeout idleTimeout; - private volatile Timeout lifetimeTimeout; H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer stateChangeCallback) { this.channel = channel; @@ -75,7 +73,6 @@ class H2Stream implements StreamControl { this.transitionRef = new AtomicReference<>(State.RESERVED); this.released = new AtomicBoolean(); this.cancelled = new AtomicBoolean(); - this.createdNanos = 0L; this.lastActivityNanos = 0L; } @@ -91,7 +88,7 @@ public State getState() { @Override public void setTimeout(final Timeout timeout) { - // not supported + this.idleTimeout = timeout; } boolean isReserved() { @@ -219,8 +216,6 @@ boolean isOutputReady() { void produceOutput() throws HttpException, IOException { try { - touch(); - handler.produceOutput(); } catch (final ProtocolException ex) { localReset(ex, H2Error.PROTOCOL_ERROR); @@ -228,7 +223,6 @@ void produceOutput() throws HttpException, IOException { } void produceInputCapacityUpdate() throws IOException { - touch(); handler.updateInputCapacity(); } @@ -327,19 +321,13 @@ public String toString() { } private void markCreatedAndActive() { - final long now = System.nanoTime(); - this.createdNanos = now; - this.lastActivityNanos = now; + this.lastActivityNanos = System.nanoTime(); } private void touch() { this.lastActivityNanos = System.nanoTime(); } - long getCreatedNanos() { - return createdNanos; - } - long getLastActivityNanos() { return lastActivityNanos; } @@ -348,15 +336,4 @@ Timeout getIdleTimeout() { return idleTimeout; } - void setIdleTimeout(final Timeout idleTimeout) { - this.idleTimeout = idleTimeout; - } - - Timeout getLifetimeTimeout() { - return lifetimeTimeout; - } - - void setLifetimeTimeout(final Timeout lifetimeTimeout) { - this.lifetimeTimeout = lifetimeTimeout; - } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java deleted file mode 100644 index 542e43734..000000000 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * ==================================================================== - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * ==================================================================== - * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * . - * - */ -package org.apache.hc.core5.http2.examples; - -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.hc.core5.http.EntityDetails; -import org.apache.hc.core5.http.Header; -import org.apache.hc.core5.http.HttpConnection; -import org.apache.hc.core5.http.HttpException; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; -import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; -import org.apache.hc.core5.http.nio.AsyncRequestProducer; -import org.apache.hc.core5.http.nio.CapacityChannel; -import org.apache.hc.core5.http.nio.DataStreamChannel; -import org.apache.hc.core5.http.nio.RequestChannel; -import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; -import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; -import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; -import org.apache.hc.core5.http.protocol.HttpContext; -import org.apache.hc.core5.http.protocol.HttpCoreContext; -import org.apache.hc.core5.http2.H2StreamResetException; -import org.apache.hc.core5.http2.HttpVersionPolicy; -import org.apache.hc.core5.http2.config.H2Config; -import org.apache.hc.core5.http2.frame.RawFrame; -import org.apache.hc.core5.http2.impl.nio.H2StreamListener; -import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; -import org.apache.hc.core5.io.CloseMode; -import org.apache.hc.core5.reactor.IOReactorConfig; -import org.apache.hc.core5.util.Timeout; - -/** - * Example of an HTTP/2 client where a "slow" request gets aborted when the - * underlying HTTP/2 connection times out due to inactivity (socket timeout). - *

- * The client opens a single HTTP/2 connection to {@code nghttp2.org} and - * executes two concurrent requests: - *

    - *
  • a "fast" request ({@code /httpbin/ip}), which completes before - * the connection idle timeout, and
  • - *
  • a "slow" request ({@code /httpbin/delay/5}), which keeps the - * connection idle long enough for the I/O reactor to trigger a timeout - * and close the HTTP/2 connection.
  • - *
- *

- * When the reactor closes the connection due to inactivity, all active - * streams fail with {@link H2StreamResetException} reporting - * {@code "Timeout due to inactivity (...)"}. The already completed stream - * is not affected. - * - * @since 5.4 - */ -public class H2StreamTimeoutClientExample { - - public static void main(final String[] args) throws Exception { - - final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() - // Connection-level inactivity timeout: keep it short so that - // /httpbin/delay/5 reliably triggers it. - .setSoTimeout(2, TimeUnit.SECONDS) - .build(); - - final H2Config h2Config = H2Config.custom() - .setPushEnabled(false) - .setMaxConcurrentStreams(100) - .build(); - - final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() - .setIOReactorConfig(ioReactorConfig) - .setH2Config(h2Config) - .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) - .setStreamListener(new H2StreamListener() { - - @Override - public void onHeaderInput( - final HttpConnection connection, - final int streamId, - final List headers) { - for (int i = 0; i < headers.size(); i++) { - System.out.println(connection.getRemoteAddress() - + " (" + streamId + ") << " + headers.get(i)); - } - } - - @Override - public void onHeaderOutput( - final HttpConnection connection, - final int streamId, - final List headers) { - for (int i = 0; i < headers.size(); i++) { - System.out.println(connection.getRemoteAddress() - + " (" + streamId + ") >> " + headers.get(i)); - } - } - - @Override - public void onFrameInput( - final HttpConnection connection, - final int streamId, - final RawFrame frame) { - // No-op in this example. - } - - @Override - public void onFrameOutput( - final HttpConnection connection, - final int streamId, - final RawFrame frame) { - // No-op in this example. - } - - @Override - public void onInputFlowControl( - final HttpConnection connection, - final int streamId, - final int delta, - final int actualSize) { - // No-op in this example. - } - - @Override - public void onOutputFlowControl( - final HttpConnection connection, - final int streamId, - final int delta, - final int actualSize) { - // No-op in this example. - } - - }) - .create(); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("HTTP requester shutting down"); - requester.close(CloseMode.GRACEFUL); - })); - - requester.start(); - - final URI fastUri = new URI("https://nghttp2.org/httpbin/ip"); - final URI slowUri = new URI("https://nghttp2.org/httpbin/delay/5"); - - final CountDownLatch latch = new CountDownLatch(2); - - // --- Fast stream: expected to succeed - executeWithLogging( - requester, - fastUri, - "[fast]", - latch, - false); - - // --- Slow stream: /delay/5 sleeps 5 seconds and should exceed - // the 2-second connection idle timeout, resulting in a reset. - executeWithLogging( - requester, - slowUri, - "[slow]", - latch, - true); - - latch.await(); - - System.out.println("Shutting down I/O reactor"); - requester.initiateShutdown(); - } - - private static void executeWithLogging( - final HttpAsyncRequester requester, - final URI requestUri, - final String label, - final CountDownLatch latch, - final boolean expectTimeout) { - - final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri) - .build(); - final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>( - new StringAsyncEntityConsumer()); - - requester.execute(new AsyncClientExchangeHandler() { - - @Override - public void releaseResources() { - requestProducer.releaseResources(); - responseConsumer.releaseResources(); - latch.countDown(); - } - - @Override - public void cancel() { - System.out.println(label + " " + requestUri + " cancelled"); - } - - @Override - public void failed(final Exception cause) { - if (expectTimeout && cause instanceof H2StreamResetException) { - final H2StreamResetException ex = (H2StreamResetException) cause; - System.out.println(label + " expected timeout reset: " - + requestUri - + " -> " + ex); - } else { - System.out.println(label + " failure: " - + requestUri + " -> " + cause); - } - } - - @Override - public void produceRequest( - final RequestChannel channel, - final HttpContext httpContext) throws HttpException, IOException { - System.out.println(label + " sending request: " + requestUri); - requestProducer.sendRequest(channel, httpContext); - } - - @Override - public int available() { - return requestProducer.available(); - } - - @Override - public void produce(final DataStreamChannel channel) throws IOException { - requestProducer.produce(channel); - } - - @Override - public void consumeInformation( - final HttpResponse response, - final HttpContext httpContext) throws HttpException, IOException { - System.out.println(label + " " + requestUri + " -> informational " - + response.getCode()); - } - - @Override - public void consumeResponse( - final HttpResponse response, - final EntityDetails entityDetails, - final HttpContext httpContext) throws HttpException, IOException { - if (expectTimeout) { - System.out.println(label + " UNEXPECTED success: " - + requestUri + " -> " + response.getCode()); - } else { - System.out.println(label + " response: " - + requestUri + " -> " + response.getCode()); - } - responseConsumer.consumeResponse(response, entityDetails, httpContext, null); - } - - @Override - public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { - responseConsumer.updateCapacity(capacityChannel); - } - - @Override - public void consume(final ByteBuffer src) throws IOException { - responseConsumer.consume(src); - } - - @Override - public void streamEnd(final List trailers) - throws HttpException, IOException { - responseConsumer.streamEnd(trailers); - if (!expectTimeout) { - System.out.println(label + " body completed for " + requestUri); - } - } - - }, Timeout.ofSeconds(10), HttpCoreContext.create()); - } - -} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 0a0ea1959..addc239e0 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -1039,8 +1039,6 @@ void testPriorityUpdateContinuesAfterSettingsWithNoH2Equals1() throws Exception @Test void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { - Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.of(1, TimeUnit.NANOSECONDS)); - Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) .thenAnswer(invocation -> { final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); @@ -1062,57 +1060,12 @@ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { h2StreamListener, () -> streamHandler); - // Create a local stream and mark it active (initializeStreamTimeouts() se ejecuta aquí) final H2StreamChannel channel = streamMultiplexer.createChannel(1); final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); - stream.activate(); - - streamMultiplexer.onOutput(); - - Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); - final Exception cause = exceptionCaptor.getValue(); - Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); - - final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; - Assertions.assertTrue(timeoutEx.isIdleTimeout(), "Expected idle timeout flag"); - Assertions.assertEquals(1, timeoutEx.getStreamId(), "Unexpected stream id"); - - Assertions.assertTrue(stream.isLocalClosed()); - Assertions.assertTrue(stream.isClosed()); - } - - @Test - void testStreamLifetimeTimeoutTriggersH2StreamTimeoutException() throws Exception { - Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.DISABLED); - - Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) - .thenAnswer(invocation -> { - final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); - final int remaining = buffer.remaining(); - buffer.position(buffer.limit()); - return remaining; - }); - Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); - Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); - final H2Config h2Config = H2Config.custom().build(); - final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( - protocolIOSession, - FRAME_FACTORY, - StreamIdGenerator.ODD, - httpProcessor, - CharCodingConfig.DEFAULT, - h2Config, - h2StreamListener, - () -> streamHandler); - - final H2StreamChannel channel = streamMultiplexer.createChannel(3); - final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); + stream.setTimeout(Timeout.of(1, TimeUnit.NANOSECONDS)); stream.activate(); - stream.setIdleTimeout(null); - stream.setLifetimeTimeout(Timeout.of(1, TimeUnit.NANOSECONDS)); - streamMultiplexer.onOutput(); Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); @@ -1120,14 +1073,11 @@ void testStreamLifetimeTimeoutTriggersH2StreamTimeoutException() throws Exceptio Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; - Assertions.assertFalse(timeoutEx.isIdleTimeout(), "Expected lifetime timeout flag"); - Assertions.assertEquals(3, timeoutEx.getStreamId(), "Unexpected stream id"); + Assertions.assertTrue(timeoutEx.isIdleTimeout()); + Assertions.assertEquals(1, timeoutEx.getStreamId()); Assertions.assertTrue(stream.isLocalClosed()); Assertions.assertTrue(stream.isClosed()); } - - - } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java new file mode 100644 index 000000000..6fd506c75 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java @@ -0,0 +1,198 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.testing.async; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStreamResetException; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResponseChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.testing.SSLTestContexts; +import org.apache.hc.core5.testing.extension.nio.H2AsyncRequesterResource; +import org.apache.hc.core5.testing.extension.nio.H2AsyncServerResource; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class AsyncH2SocketTimeoutCoreTest { + + private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(1); + private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(30); + + @RegisterExtension + private final H2AsyncServerResource serverResource = new H2AsyncServerResource(); + + @RegisterExtension + private final H2AsyncRequesterResource clientResource = new H2AsyncRequesterResource(); + + public AsyncH2SocketTimeoutCoreTest() { + serverResource.configure(bootstrap -> bootstrap + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(30)) + .build()) + .setRequestRouter( + RequestRouter.>builder() + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "*", + SimpleDelayingHandler::new) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build() + ) + ); + + clientResource.configure(bootstrap -> bootstrap + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(SOCKET_TIMEOUT) + .build()) + ); + } + + @Test + void testHttp2RequestTimeoutYieldsStreamReset() throws Exception { + final InetSocketAddress address = startServer(); + final HttpAsyncRequester requester = clientResource.start(); + + final URI requestUri = new URI("http://localhost:" + address.getPort() + "/timeout"); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri).build(); + final BasicResponseConsumer responseConsumer = + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + + final Future> future = + requester.execute(requestProducer, responseConsumer, SOCKET_TIMEOUT, null); + + final ExecutionException ex = Assertions.assertThrows(ExecutionException.class, () -> future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit())); + + final Throwable cause = ex.getCause(); + Assertions.assertInstanceOf(HttpStreamResetException.class, cause, "Expected HttpStreamResetException, but got: " + cause); + } + + private InetSocketAddress startServer() throws Exception { + final HttpAsyncServer server = serverResource.start(); + final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get(); + return (InetSocketAddress) listener.getAddress(); + } + + static final class SimpleDelayingHandler implements AsyncServerExchangeHandler { + + private final AtomicBoolean completed = new AtomicBoolean(false); + + @Override + public void handleRequest( + final HttpRequest request, + final EntityDetails entityDetails, + final ResponseChannel responseChannel, + final HttpContext context) throws HttpException, IOException { + // Intentionally do nothing: no response is sent back. + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + // Accept any amount of request data. + capacityChannel.update(Integer.MAX_VALUE); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + // Discard request body if present. + if (src != null) { + src.position(src.limit()); + } + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + // Nothing special to do on stream end for this test. + } + + @Override + public int available() { + // No response body to produce. + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + // In this test we never send a response; just ensure the stream is closed + // if produce gets called. + if (completed.compareAndSet(false, true)) { + channel.endStream(); + } + } + + @Override + public void failed(final Exception cause) { + // No-op for this simple test handler. + } + + @Override + public void releaseResources() { + // No resources to release. + } + + } + +}