From 35cb4f3286c9926b9db49c51ac5985a6d87b9b9b Mon Sep 17 00:00:00 2001 From: Adam Budde Date: Thu, 18 Jun 2026 17:06:09 -0700 Subject: [PATCH] Propagate URLSession errors to HTTPBodyOutputStreamBridge --- ...rectionalStreamingURLSessionDelegate.swift | 4 + .../HTTPBodyOutputStreamBridge.swift | 17 +++- .../HTTPBodyOutputStreamTests.swift | 96 +++++++++++++++++++ 3 files changed, 113 insertions(+), 4 deletions(-) diff --git a/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/BidirectionalStreamingURLSessionDelegate.swift b/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/BidirectionalStreamingURLSessionDelegate.swift index f457582..09323c4 100644 --- a/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/BidirectionalStreamingURLSessionDelegate.swift +++ b/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/BidirectionalStreamingURLSessionDelegate.swift @@ -178,6 +178,10 @@ final class BidirectionalStreamingURLSessionDelegate: NSObject, URLSessionTaskDe responseContinuation?.resume(throwing: error) responseContinuation = nil } + + // Propagate the error to the stream bridge. + requestStream?.cancel(error: error) + requestStream = nil } } } diff --git a/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/HTTPBodyOutputStreamBridge.swift b/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/HTTPBodyOutputStreamBridge.swift index b416296..f542958 100644 --- a/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/HTTPBodyOutputStreamBridge.swift +++ b/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/HTTPBodyOutputStreamBridge.swift @@ -120,6 +120,18 @@ final class HTTPBodyOutputStreamBridge: NSObject, StreamDelegate { break } } + + func cancel(error: (any Error)? = nil) { + debug("Output stream received cancellation request.") + Self.streamQueue.async { [self] in + switch state { + case .initial, .waitingForBytes, .haveBytes, .needBytes: + self.performAction(state.errorOccurred(error ?? CancellationError())) + case .closed: break + + } + } + } } extension HTTPBodyOutputStreamBridge { @@ -186,10 +198,7 @@ extension HTTPBodyOutputStreamBridge { mutating func errorOccurred(_ error: any Error) -> Action { switch self { - case .initial: - self = .closed(error) - return .none - case .waitingForBytes(_): + case .initial, .waitingForBytes(_): self = .closed(error) return .closeStream case .haveBytes(_, _, let producerContinuation): diff --git a/Tests/OpenAPIURLSessionTests/URLSessionBidirectionalStreamingTests/HTTPBodyOutputStreamTests.swift b/Tests/OpenAPIURLSessionTests/URLSessionBidirectionalStreamingTests/HTTPBodyOutputStreamTests.swift index bfeac2f..631ee65 100644 --- a/Tests/OpenAPIURLSessionTests/URLSessionBidirectionalStreamingTests/HTTPBodyOutputStreamTests.swift +++ b/Tests/OpenAPIURLSessionTests/URLSessionBidirectionalStreamingTests/HTTPBodyOutputStreamTests.swift @@ -291,6 +291,102 @@ class HTTPBodyOutputStreamBridgeTests: XCTestCase { } await fulfillment(of: [closeExpectation], timeout: 0.1) } + + func testCancelClosesOutputStreamFromInitialState() async throws { + let requestBody = HTTPBody( + MockAsyncSequence(elementsToVend: [[UInt8]([0])], gatingProduction: true), + length: .known(1), + iterationBehavior: .single + ) + + var inputStream: InputStream? + var outputStream: OutputStream? + Stream.getBoundStreams(withBufferSize: 8, inputStream: &inputStream, outputStream: &outputStream) + guard let inputStream, let outputStream else { fatalError("getBoundStreams did not return non-nil streams") } + _ = inputStream // Held to keep the bound pair from tearing itself down. + + let bridge = HTTPBodyOutputStreamBridge(outputStream, requestBody) + + // Drive cancel without ever opening the input side; the bridge state stays + // in `.initial` because `.openCompleted` has not been delivered. + bridge.cancel(error: CancellationError()) + + let closeExpectation = expectation(description: "output stream closes after cancel() from .initial") + HTTPBodyOutputStreamBridge.streamQueue.asyncAfter(deadline: .now() + .milliseconds(10)) { + XCTAssertEqual(bridge.outputStream.streamStatus, .closed) + switch bridge.state { + case .closed: break + default: XCTFail("Expected bridge state .closed, got \(bridge.state)") + } + closeExpectation.fulfill() + } + await fulfillment(of: [closeExpectation], timeout: 1.0) + } + + func testCancelReleasedWriterTaskReferenceWhenSuspended() async throws { + let chunkSize = 8 + let streamBufferSize = 4 // Smaller than a single chunk. + let requestBytes = Array(repeating: UInt8(0xAB), count: 32) + let requestChunks = requestBytes.chunks(of: chunkSize) + let requestByteSequence = MockAsyncSequence(elementsToVend: requestChunks, gatingProduction: true) + let requestBody = HTTPBody( + requestByteSequence, + length: .known(Int64(requestBytes.count)), + iterationBehavior: .single + ) + + var inputStream: InputStream? + var outputStream: OutputStream? + Stream.getBoundStreams(withBufferSize: streamBufferSize, inputStream: &inputStream, outputStream: &outputStream) + guard let inputStreamUnwrapped = inputStream, let outputStream else { + fatalError("getBoundStreams did not return non-nil streams") + } + + // Keep weak reference to bridge to check for deinit. + weak var weakBridge: HTTPBodyOutputStreamBridge? + + try await { + let bridge = HTTPBodyOutputStreamBridge(outputStream, requestBody) + weakBridge = bridge + + // Simulate URLSession opening the input stream so the bridge starts + // its writer Task. + inputStreamUnwrapped.open() + + // Let the writer Task pull one chunk and suspend in `.haveBytes`. + requestByteSequence.openGate(for: 1) + + try await waitForCondition("Bridge should enter haveBytes/needBytes state") { + switch bridge.state { + case .haveBytes, .needBytes: return true + default: return false + } + } + + // Simulate the URLSession failure: drop the input stream reference without + // close(). Cross-half propagation does not fire — output stays `.open`. + inputStream = nil + + // Explicitly cancel the bridge + bridge.cancel(error: CancellationError()) + }() + + try await waitForCondition("Bridge should dealloc when writer Task is cancelled") { weakBridge == nil } + try await waitForCondition("Bride cleanup should close output stream") { outputStream.streamStatus == .closed } + + } + + // Wait for given condition to be true within wait duration. + private func waitForCondition(_ label: String, within waitDuration: TimeInterval = 1.0, condition: () -> Bool) + async throws + { + let deadline = Date().addingTimeInterval(waitDuration) + while Date() < deadline { + if condition() { return } + try await Task.sleep(nanoseconds: 5_000_000) + } + XCTFail("waitForCondition timed out: \(label)") + } } #endif // canImport(Darwin)