diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java
new file mode 100644
index 000000000..2f837b478
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java
@@ -0,0 +1,87 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.http2;
+
+import java.io.InterruptedIOException;
+
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * {@link java.net.SocketTimeoutException} raised by the HTTP/2 stream
+ * multiplexer when a per-stream timeout elapses.
+ *
+ * This exception is used for timeouts that are scoped to a single HTTP/2
+ * stream rather than the underlying TCP connection, for example:
+ *
+ *
+ * - an idle timeout where no activity has been observed on the stream, or
+ * - a lifetime timeout where the total age of the stream exceeds
+ * the configured limit.
+ *
+ *
+ * The {@link #isIdleTimeout()} flag can be used to distinguish whether
+ * the timeout was triggered by idleness or by the overall stream lifetime.
+ * The affected stream id and the timeout value are exposed via
+ * {@link #getStreamId()} and {@link #getTimeout()} respectively.
+ *
+ *
+ * @since 5.4
+ */
+public class H2StreamTimeoutException extends InterruptedIOException {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int streamId;
+ private final Timeout timeout;
+ private final boolean idleTimeout;
+
+ public H2StreamTimeoutException(final String message, final int streamId, final Timeout timeout, final boolean idleTimeout) {
+ super(message);
+ this.streamId = streamId;
+ this.timeout = timeout;
+ this.idleTimeout = idleTimeout;
+ }
+
+ public int getStreamId() {
+ return streamId;
+ }
+
+ public Timeout getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Indicates whether this timeout was triggered by idle time (no activity)
+ * rather than by stream lifetime.
+ *
+ * @return {@code true} if this is an idle timeout, {@code false} if it is a lifetime timeout.
+ */
+ public boolean isIdleTimeout() {
+ return idleTimeout;
+ }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
index 1eb939c6f..af1261ef1 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
@@ -74,6 +74,7 @@
import org.apache.hc.core5.http2.H2ConnectionException;
import org.apache.hc.core5.http2.H2Error;
import org.apache.hc.core5.http2.H2StreamResetException;
+import org.apache.hc.core5.http2.H2StreamTimeoutException;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.config.H2Param;
import org.apache.hc.core5.http2.config.H2Setting;
@@ -454,6 +455,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
break;
}
}
+ if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
+ checkStreamTimeouts(System.nanoTime());
+ }
}
}
@@ -531,6 +535,11 @@ public final void onOutput() throws HttpException, IOException {
}
}
}
+
+ if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
+ checkStreamTimeouts(System.nanoTime());
+ }
+
if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
int liveStreams = 0;
for (final Iterator it = streams.iterator(); it.hasNext(); ) {
@@ -1359,8 +1368,9 @@ H2StreamChannel createChannel(final int streamId) {
return new H2StreamChannelImpl(streamId, initInputWinSize, initOutputWinSize);
}
- H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) throws H2ConnectionException {
- return streams.createActive(channel, streamHandler);
+ H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) {
+ final H2Stream stream = streams.createActive(channel, streamHandler);
+ return stream;
}
private void recordPriorityFromHeaders(final int streamId, final List extends Header> headers) {
@@ -1579,4 +1589,29 @@ public String toString() {
}
+ private void checkStreamTimeouts(final long nowNanos) throws IOException {
+ for (final Iterator it = streams.iterator(); it.hasNext(); ) {
+ final H2Stream stream = it.next();
+ if (!stream.isActive()) {
+ continue;
+ }
+
+ final Timeout idleTimeout = stream.getIdleTimeout();
+ if (idleTimeout == null || !idleTimeout.isEnabled()) {
+ continue;
+ }
+
+ final long last = stream.getLastActivityNanos();
+ final long idleNanos = idleTimeout.toNanoseconds();
+ if (idleNanos > 0 && nowNanos - last > idleNanos) {
+ final int streamId = stream.getId();
+ final H2StreamTimeoutException ex = new H2StreamTimeoutException(
+ "HTTP/2 stream idle timeout (" + idleTimeout + ")",
+ streamId,
+ idleTimeout,
+ true);
+ stream.localReset(ex, H2Error.CANCEL);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java
index 8d90bb66c..7f3dadbb4 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java
@@ -61,6 +61,10 @@ class H2Stream implements StreamControl {
private volatile boolean reserved;
private volatile boolean remoteClosed;
+ private volatile long lastActivityNanos;
+
+ private volatile Timeout idleTimeout;
+
H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer stateChangeCallback) {
this.channel = channel;
this.handler = handler;
@@ -69,6 +73,7 @@ class H2Stream implements StreamControl {
this.transitionRef = new AtomicReference<>(State.RESERVED);
this.released = new AtomicBoolean();
this.cancelled = new AtomicBoolean();
+ this.lastActivityNanos = 0L;
}
@Override
@@ -83,7 +88,7 @@ public State getState() {
@Override
public void setTimeout(final Timeout timeout) {
- // not supported
+ this.idleTimeout = timeout;
}
boolean isReserved() {
@@ -104,6 +109,7 @@ private void triggerClosed() {
void activate() {
reserved = false;
+ markCreatedAndActive();
triggerOpen();
}
@@ -146,6 +152,8 @@ boolean isLocalClosed() {
void consumePromise(final List headers) throws HttpException, IOException {
try {
+ touch();
+
if (channel.isLocalReset()) {
return;
}
@@ -165,6 +173,8 @@ void consumeHeader(final List headers, final boolean endOfStream) throws
if (endOfStream) {
remoteClosed = true;
}
+ touch();
+
if (channel.isLocalReset()) {
return;
}
@@ -183,6 +193,8 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc
if (endOfStream) {
remoteClosed = true;
}
+ touch();
+
if (channel.isLocalReset()) {
return;
}
@@ -308,4 +320,20 @@ public String toString() {
return buf.toString();
}
+ private void markCreatedAndActive() {
+ this.lastActivityNanos = System.nanoTime();
+ }
+
+ private void touch() {
+ this.lastActivityNanos = System.nanoTime();
+ }
+
+ long getLastActivityNanos() {
+ return lastActivityNanos;
+ }
+
+ Timeout getIdleTimeout() {
+ return idleTimeout;
+ }
+
}
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java
index 0089a12e5..addc239e0 100644
--- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java
@@ -33,6 +33,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.stream.IntStream;
@@ -52,6 +53,7 @@
import org.apache.hc.core5.http2.H2ConnectionException;
import org.apache.hc.core5.http2.H2Error;
import org.apache.hc.core5.http2.H2StreamResetException;
+import org.apache.hc.core5.http2.H2StreamTimeoutException;
import org.apache.hc.core5.http2.WritableByteChannelMock;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.config.H2Param;
@@ -65,6 +67,7 @@
import org.apache.hc.core5.http2.hpack.HPackEncoder;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.util.ByteArrayBuffer;
+import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -1034,5 +1037,47 @@ void testPriorityUpdateContinuesAfterSettingsWithNoH2Equals1() throws Exception
Assertions.assertTrue(idxPriUpd >= 0, "PRIORITY_UPDATE should be emitted when NO_RFC7540=1");
}
+ @Test
+ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception {
+ Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
+ .thenAnswer(invocation -> {
+ final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class);
+ final int remaining = buffer.remaining();
+ buffer.position(buffer.limit());
+ return remaining;
+ });
+ Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
+ Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
+
+ final H2Config h2Config = H2Config.custom().build();
+ final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl(
+ protocolIOSession,
+ FRAME_FACTORY,
+ StreamIdGenerator.ODD,
+ httpProcessor,
+ CharCodingConfig.DEFAULT,
+ h2Config,
+ h2StreamListener,
+ () -> streamHandler);
+
+ final H2StreamChannel channel = streamMultiplexer.createChannel(1);
+ final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler);
+
+ stream.setTimeout(Timeout.of(1, TimeUnit.NANOSECONDS));
+ stream.activate();
+
+ streamMultiplexer.onOutput();
+
+ Mockito.verify(streamHandler).failed(exceptionCaptor.capture());
+ final Exception cause = exceptionCaptor.getValue();
+ Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause);
+
+ final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause;
+ Assertions.assertTrue(timeoutEx.isIdleTimeout());
+ Assertions.assertEquals(1, timeoutEx.getStreamId());
+
+ Assertions.assertTrue(stream.isLocalClosed());
+ Assertions.assertTrue(stream.isClosed());
+ }
}
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java
new file mode 100644
index 000000000..6fd506c75
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java
@@ -0,0 +1,198 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.testing.async;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStreamResetException;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.impl.routing.RequestRouter;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
+import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.testing.SSLTestContexts;
+import org.apache.hc.core5.testing.extension.nio.H2AsyncRequesterResource;
+import org.apache.hc.core5.testing.extension.nio.H2AsyncServerResource;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class AsyncH2SocketTimeoutCoreTest {
+
+ private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(1);
+ private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(30);
+
+ @RegisterExtension
+ private final H2AsyncServerResource serverResource = new H2AsyncServerResource();
+
+ @RegisterExtension
+ private final H2AsyncRequesterResource clientResource = new H2AsyncRequesterResource();
+
+ public AsyncH2SocketTimeoutCoreTest() {
+ serverResource.configure(bootstrap -> bootstrap
+ .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
+ .setTlsStrategy(new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext()))
+ .setIOReactorConfig(
+ IOReactorConfig.custom()
+ .setSoTimeout(Timeout.ofSeconds(30))
+ .build())
+ .setRequestRouter(
+ RequestRouter.>builder()
+ .addRoute(
+ RequestRouter.LOCAL_AUTHORITY,
+ "*",
+ SimpleDelayingHandler::new)
+ .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER)
+ .build()
+ )
+ );
+
+ clientResource.configure(bootstrap -> bootstrap
+ .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
+ .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
+ .setIOReactorConfig(
+ IOReactorConfig.custom()
+ .setSoTimeout(SOCKET_TIMEOUT)
+ .build())
+ );
+ }
+
+ @Test
+ void testHttp2RequestTimeoutYieldsStreamReset() throws Exception {
+ final InetSocketAddress address = startServer();
+ final HttpAsyncRequester requester = clientResource.start();
+
+ final URI requestUri = new URI("http://localhost:" + address.getPort() + "/timeout");
+
+ final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri).build();
+ final BasicResponseConsumer responseConsumer =
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer());
+
+ final Future> future =
+ requester.execute(requestProducer, responseConsumer, SOCKET_TIMEOUT, null);
+
+ final ExecutionException ex = Assertions.assertThrows(ExecutionException.class, () -> future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
+
+ final Throwable cause = ex.getCause();
+ Assertions.assertInstanceOf(HttpStreamResetException.class, cause, "Expected HttpStreamResetException, but got: " + cause);
+ }
+
+ private InetSocketAddress startServer() throws Exception {
+ final HttpAsyncServer server = serverResource.start();
+ final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get();
+ return (InetSocketAddress) listener.getAddress();
+ }
+
+ static final class SimpleDelayingHandler implements AsyncServerExchangeHandler {
+
+ private final AtomicBoolean completed = new AtomicBoolean(false);
+
+ @Override
+ public void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context) throws HttpException, IOException {
+ // Intentionally do nothing: no response is sent back.
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ // Accept any amount of request data.
+ capacityChannel.update(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) throws IOException {
+ // Discard request body if present.
+ if (src != null) {
+ src.position(src.limit());
+ }
+ }
+
+ @Override
+ public void streamEnd(final List extends Header> trailers)
+ throws HttpException, IOException {
+ // Nothing special to do on stream end for this test.
+ }
+
+ @Override
+ public int available() {
+ // No response body to produce.
+ return 0;
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ // In this test we never send a response; just ensure the stream is closed
+ // if produce gets called.
+ if (completed.compareAndSet(false, true)) {
+ channel.endStream();
+ }
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ // No-op for this simple test handler.
+ }
+
+ @Override
+ public void releaseResources() {
+ // No resources to release.
+ }
+
+ }
+
+}