diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java new file mode 100644 index 000000000..2f837b478 --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java @@ -0,0 +1,87 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2; + +import java.io.InterruptedIOException; + +import org.apache.hc.core5.util.Timeout; + +/** + * {@link java.net.SocketTimeoutException} raised by the HTTP/2 stream + * multiplexer when a per-stream timeout elapses. + *

+ * This exception is used for timeouts that are scoped to a single HTTP/2 + * stream rather than the underlying TCP connection, for example: + *

+ * + *

+ * The {@link #isIdleTimeout()} flag can be used to distinguish whether + * the timeout was triggered by idleness or by the overall stream lifetime. + * The affected stream id and the timeout value are exposed via + * {@link #getStreamId()} and {@link #getTimeout()} respectively. + *

+ * + * @since 5.4 + */ +public class H2StreamTimeoutException extends InterruptedIOException { + + private static final long serialVersionUID = 1L; + + private final int streamId; + private final Timeout timeout; + private final boolean idleTimeout; + + public H2StreamTimeoutException(final String message, final int streamId, final Timeout timeout, final boolean idleTimeout) { + super(message); + this.streamId = streamId; + this.timeout = timeout; + this.idleTimeout = idleTimeout; + } + + public int getStreamId() { + return streamId; + } + + public Timeout getTimeout() { + return timeout; + } + + /** + * Indicates whether this timeout was triggered by idle time (no activity) + * rather than by stream lifetime. + * + * @return {@code true} if this is an idle timeout, {@code false} if it is a lifetime timeout. + */ + public boolean isIdleTimeout() { + return idleTimeout; + } + +} diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 1eb939c6f..af1261ef1 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -74,6 +74,7 @@ import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; import org.apache.hc.core5.http2.config.H2Setting; @@ -454,6 +455,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio break; } } + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } } } @@ -531,6 +535,11 @@ public final void onOutput() throws HttpException, IOException { } } } + + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } + if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) { int liveStreams = 0; for (final Iterator it = streams.iterator(); it.hasNext(); ) { @@ -1359,8 +1368,9 @@ H2StreamChannel createChannel(final int streamId) { return new H2StreamChannelImpl(streamId, initInputWinSize, initOutputWinSize); } - H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) throws H2ConnectionException { - return streams.createActive(channel, streamHandler); + H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) { + final H2Stream stream = streams.createActive(channel, streamHandler); + return stream; } private void recordPriorityFromHeaders(final int streamId, final List headers) { @@ -1579,4 +1589,29 @@ public String toString() { } + private void checkStreamTimeouts(final long nowNanos) throws IOException { + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + if (!stream.isActive()) { + continue; + } + + final Timeout idleTimeout = stream.getIdleTimeout(); + if (idleTimeout == null || !idleTimeout.isEnabled()) { + continue; + } + + final long last = stream.getLastActivityNanos(); + final long idleNanos = idleTimeout.toNanoseconds(); + if (idleNanos > 0 && nowNanos - last > idleNanos) { + final int streamId = stream.getId(); + final H2StreamTimeoutException ex = new H2StreamTimeoutException( + "HTTP/2 stream idle timeout (" + idleTimeout + ")", + streamId, + idleTimeout, + true); + stream.localReset(ex, H2Error.CANCEL); + } + } + } } \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index 8d90bb66c..7f3dadbb4 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -61,6 +61,10 @@ class H2Stream implements StreamControl { private volatile boolean reserved; private volatile boolean remoteClosed; + private volatile long lastActivityNanos; + + private volatile Timeout idleTimeout; + H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer stateChangeCallback) { this.channel = channel; this.handler = handler; @@ -69,6 +73,7 @@ class H2Stream implements StreamControl { this.transitionRef = new AtomicReference<>(State.RESERVED); this.released = new AtomicBoolean(); this.cancelled = new AtomicBoolean(); + this.lastActivityNanos = 0L; } @Override @@ -83,7 +88,7 @@ public State getState() { @Override public void setTimeout(final Timeout timeout) { - // not supported + this.idleTimeout = timeout; } boolean isReserved() { @@ -104,6 +109,7 @@ private void triggerClosed() { void activate() { reserved = false; + markCreatedAndActive(); triggerOpen(); } @@ -146,6 +152,8 @@ boolean isLocalClosed() { void consumePromise(final List
headers) throws HttpException, IOException { try { + touch(); + if (channel.isLocalReset()) { return; } @@ -165,6 +173,8 @@ void consumeHeader(final List
headers, final boolean endOfStream) throws if (endOfStream) { remoteClosed = true; } + touch(); + if (channel.isLocalReset()) { return; } @@ -183,6 +193,8 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc if (endOfStream) { remoteClosed = true; } + touch(); + if (channel.isLocalReset()) { return; } @@ -308,4 +320,20 @@ public String toString() { return buf.toString(); } + private void markCreatedAndActive() { + this.lastActivityNanos = System.nanoTime(); + } + + private void touch() { + this.lastActivityNanos = System.nanoTime(); + } + + long getLastActivityNanos() { + return lastActivityNanos; + } + + Timeout getIdleTimeout() { + return idleTimeout; + } + } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 0089a12e5..addc239e0 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.stream.IntStream; @@ -52,6 +53,7 @@ import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.WritableByteChannelMock; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; @@ -65,6 +67,7 @@ import org.apache.hc.core5.http2.hpack.HPackEncoder; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.util.ByteArrayBuffer; +import org.apache.hc.core5.util.Timeout; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -1034,5 +1037,47 @@ void testPriorityUpdateContinuesAfterSettingsWithNoH2Equals1() throws Exception Assertions.assertTrue(idxPriUpd >= 0, "PRIORITY_UPDATE should be emitted when NO_RFC7540=1"); } + @Test + void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final H2StreamChannel channel = streamMultiplexer.createChannel(1); + final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); + + stream.setTimeout(Timeout.of(1, TimeUnit.NANOSECONDS)); + stream.activate(); + + streamMultiplexer.onOutput(); + + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + final Exception cause = exceptionCaptor.getValue(); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); + + final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; + Assertions.assertTrue(timeoutEx.isIdleTimeout()); + Assertions.assertEquals(1, timeoutEx.getStreamId()); + + Assertions.assertTrue(stream.isLocalClosed()); + Assertions.assertTrue(stream.isClosed()); + } } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java new file mode 100644 index 000000000..6fd506c75 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java @@ -0,0 +1,198 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.testing.async; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStreamResetException; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResponseChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.testing.SSLTestContexts; +import org.apache.hc.core5.testing.extension.nio.H2AsyncRequesterResource; +import org.apache.hc.core5.testing.extension.nio.H2AsyncServerResource; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class AsyncH2SocketTimeoutCoreTest { + + private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(1); + private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(30); + + @RegisterExtension + private final H2AsyncServerResource serverResource = new H2AsyncServerResource(); + + @RegisterExtension + private final H2AsyncRequesterResource clientResource = new H2AsyncRequesterResource(); + + public AsyncH2SocketTimeoutCoreTest() { + serverResource.configure(bootstrap -> bootstrap + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(30)) + .build()) + .setRequestRouter( + RequestRouter.>builder() + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "*", + SimpleDelayingHandler::new) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build() + ) + ); + + clientResource.configure(bootstrap -> bootstrap + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(SOCKET_TIMEOUT) + .build()) + ); + } + + @Test + void testHttp2RequestTimeoutYieldsStreamReset() throws Exception { + final InetSocketAddress address = startServer(); + final HttpAsyncRequester requester = clientResource.start(); + + final URI requestUri = new URI("http://localhost:" + address.getPort() + "/timeout"); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri).build(); + final BasicResponseConsumer responseConsumer = + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + + final Future> future = + requester.execute(requestProducer, responseConsumer, SOCKET_TIMEOUT, null); + + final ExecutionException ex = Assertions.assertThrows(ExecutionException.class, () -> future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit())); + + final Throwable cause = ex.getCause(); + Assertions.assertInstanceOf(HttpStreamResetException.class, cause, "Expected HttpStreamResetException, but got: " + cause); + } + + private InetSocketAddress startServer() throws Exception { + final HttpAsyncServer server = serverResource.start(); + final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get(); + return (InetSocketAddress) listener.getAddress(); + } + + static final class SimpleDelayingHandler implements AsyncServerExchangeHandler { + + private final AtomicBoolean completed = new AtomicBoolean(false); + + @Override + public void handleRequest( + final HttpRequest request, + final EntityDetails entityDetails, + final ResponseChannel responseChannel, + final HttpContext context) throws HttpException, IOException { + // Intentionally do nothing: no response is sent back. + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + // Accept any amount of request data. + capacityChannel.update(Integer.MAX_VALUE); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + // Discard request body if present. + if (src != null) { + src.position(src.limit()); + } + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + // Nothing special to do on stream end for this test. + } + + @Override + public int available() { + // No response body to produce. + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + // In this test we never send a response; just ensure the stream is closed + // if produce gets called. + if (completed.compareAndSet(false, true)) { + channel.endStream(); + } + } + + @Override + public void failed(final Exception cause) { + // No-op for this simple test handler. + } + + @Override + public void releaseResources() { + // No resources to release. + } + + } + +}