Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,20 @@ extension NIOHTTPServer {
metadata: ["error": "\(error)"]
)
}

// The `multiplexer.inbound` iteration exits when our task is cancelled, or when the HTTP/2 stream
// multiplexer finishes or throws. In any case, we are done with this connection here, so tear it down.
do {
try await connectionChannel.close()
} catch ChannelError.alreadyClosed {
// We swallow the error here because the connection channel may already have closed at this point, e.g.
// if the client sent a TCP FIN or a TLS CLOSE_NOTIFY that the event loop processed before we got here.
} catch {
self.logger.error(
"Error thrown while closing the HTTP/2 connection channel",
metadata: ["error": "\(error)"]
)
}
}
}

Expand Down
86 changes: 86 additions & 0 deletions Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import NIOConcurrencyHelpers
import NIOCore
import NIOHTTPTypes
import NIOPosix
import NIOSSL
import ServiceLifecycle
import ServiceLifecycleTestKit
import Testing
Expand Down Expand Up @@ -119,6 +120,91 @@ struct NIOHTTPServiceLifecycleTests {
}
}

@Test(
"Server closes active connection upon forceful shutdown",
arguments: [HTTPVersion.http1_1, HTTPVersion.http2]
)
@available(anyAppleOS 26.0, *)
func testServerClosesActiveConnectionOnForcefulShutdown(httpVersion: HTTPVersion) async throws {
let (server, serverChain) = try NIOHTTPServerTests.makeSecureUpgradeServer(logger: self.serverLogger)

// This promise will be fulfilled when the server receives the first part of the request body. Once this
// happens, we cancel the server task and test whether the client's socket channel has closed.
let elg = MultiThreadedEventLoopGroup.singletonMultiThreadedEventLoopGroup
let firstChunkReadPromise = elg.any().makePromise(of: Void.self)

let serverService = ClosureService {
await #expect(throws: CancellationError.self) {
try await server.serve { request, requestContext, requestReader, responseSender in
var requestReader = requestReader
// Read the first chunk, signal `firstChunkReadPromise`, then try to read the second chunk.
let error = try await #require(throws: EitherError<Error, Never>.self) {
try await requestReader.read { _, _ in }

firstChunkReadPromise.succeed()

// The following call will block: the client will never send a request end part. This is
// intentional because we want to keep the connection alive.
try await requestReader.read { _, _ in }
}
#expect(throws: CancellationError.self) { try error.unwrap() }
}
}
}

try await withThrowingTaskGroup { group in
let serviceGroup = ServiceGroup(services: [serverService], logger: self.serviceGroupLogger)
group.addTask { try await serviceGroup.run() }

let serverAddress = try await server.listeningAddresses.first!

let tlsConfig = try TLSConfiguration.makeTestClientConfiguration(
testTrustRoots: serverChain.chain,
applicationProtocol: httpVersion.alpnIdentifier
)

let (clientConnectionChannel, alpnResultFuture) =
try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup).connect(
to: try .init(ipAddress: serverAddress.host, port: serverAddress.port)
) { socketChannel in
socketChannel.configureTestClientSSLPipeline(tlsConfig: tlsConfig).flatMap {
socketChannel.configureTestSecureUpgradeClientPipeline().map { connectionChannel in
(socketChannel, connectionChannel)
}
}
}

let alpnResult = try await alpnResultFuture.get()
let clientRequestChannel = try await NegotiatedClientConnection(negotiationResult: alpnResult)
.unwrapChannel(expectedHTTPVersion: httpVersion)

try await clientRequestChannel.executeThenClose { inbound, outbound in
try await outbound.write(Self.reqHead)

// Write the first body part.
try await outbound.write(Self.reqBody)

// Wait until the server has received the first body part.
try await firstChunkReadPromise.futureResult.get()

// Cancel the server task.
group.cancelAll()
// Wait for the server to shut down.
try await group.waitForAll()

// Wait for the client channel to be fully closed.
try await clientRequestChannel.channel.closeFuture.get()

// We shouldn't be able to complete our request; the server should have shut down.
await #expect(throws: ChannelError.ioOnClosedChannel) {
try await outbound.write(Self.reqBody)
}
}

try await clientConnectionChannel.closeFuture.get()
}
}

@Test(
"Active connection forcefully shutdown when server task cancelled",
arguments: [HTTPVersion.http1_1, HTTPVersion.http2]
Expand Down
Loading