Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ final class BidirectionalStreamingURLSessionDelegate: NSObject, URLSessionTaskDe
responseContinuation?.resume(throwing: error)
responseContinuation = nil
}

// Propagate the error to the stream bridge.
requestStream?.cancel(error: error)
requestStream = nil
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ final class HTTPBodyOutputStreamBridge: NSObject, StreamDelegate {
break
}
}

func cancel(error: (any Error)? = nil) {
debug("Output stream received cancellation request.")
Self.streamQueue.async { [self] in
switch state {
case .initial, .waitingForBytes, .haveBytes, .needBytes:
self.performAction(state.errorOccurred(error ?? CancellationError()))
case .closed: break

}
}
}
}

extension HTTPBodyOutputStreamBridge {
Expand Down Expand Up @@ -186,10 +198,7 @@ extension HTTPBodyOutputStreamBridge {

mutating func errorOccurred(_ error: any Error) -> Action {
switch self {
case .initial:
self = .closed(error)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change necessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is. In the initializer, the output stream is opened and the state is set to initial. If URLSession receives an error before the state updates then the output stream is never closed.

return .none
case .waitingForBytes(_):
case .initial, .waitingForBytes(_):
self = .closed(error)
return .closeStream
case .haveBytes(_, _, let producerContinuation):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,102 @@ class HTTPBodyOutputStreamBridgeTests: XCTestCase {
}
await fulfillment(of: [closeExpectation], timeout: 0.1)
}

func testCancelClosesOutputStreamFromInitialState() async throws {
let requestBody = HTTPBody(
MockAsyncSequence(elementsToVend: [[UInt8]([0])], gatingProduction: true),
length: .known(1),
iterationBehavior: .single
)

var inputStream: InputStream?
var outputStream: OutputStream?
Stream.getBoundStreams(withBufferSize: 8, inputStream: &inputStream, outputStream: &outputStream)
guard let inputStream, let outputStream else { fatalError("getBoundStreams did not return non-nil streams") }
_ = inputStream // Held to keep the bound pair from tearing itself down.

let bridge = HTTPBodyOutputStreamBridge(outputStream, requestBody)

// Drive cancel without ever opening the input side; the bridge state stays
// in `.initial` because `.openCompleted` has not been delivered.
bridge.cancel(error: CancellationError())

let closeExpectation = expectation(description: "output stream closes after cancel() from .initial")
HTTPBodyOutputStreamBridge.streamQueue.asyncAfter(deadline: .now() + .milliseconds(10)) {
XCTAssertEqual(bridge.outputStream.streamStatus, .closed)
switch bridge.state {
case .closed: break
default: XCTFail("Expected bridge state .closed, got \(bridge.state)")
}
closeExpectation.fulfill()
}
await fulfillment(of: [closeExpectation], timeout: 1.0)
}

func testCancelReleasedWriterTaskReferenceWhenSuspended() async throws {
let chunkSize = 8
let streamBufferSize = 4 // Smaller than a single chunk.
let requestBytes = Array(repeating: UInt8(0xAB), count: 32)
let requestChunks = requestBytes.chunks(of: chunkSize)
let requestByteSequence = MockAsyncSequence(elementsToVend: requestChunks, gatingProduction: true)
let requestBody = HTTPBody(
requestByteSequence,
length: .known(Int64(requestBytes.count)),
iterationBehavior: .single
)

var inputStream: InputStream?
var outputStream: OutputStream?
Stream.getBoundStreams(withBufferSize: streamBufferSize, inputStream: &inputStream, outputStream: &outputStream)
guard let inputStreamUnwrapped = inputStream, let outputStream else {
fatalError("getBoundStreams did not return non-nil streams")
}

// Keep weak reference to bridge to check for deinit.
weak var weakBridge: HTTPBodyOutputStreamBridge?

try await {
let bridge = HTTPBodyOutputStreamBridge(outputStream, requestBody)
weakBridge = bridge

// Simulate URLSession opening the input stream so the bridge starts
// its writer Task.
inputStreamUnwrapped.open()

// Let the writer Task pull one chunk and suspend in `.haveBytes`.
requestByteSequence.openGate(for: 1)

try await waitForCondition("Bridge should enter haveBytes/needBytes state") {
switch bridge.state {
case .haveBytes, .needBytes: return true
default: return false
}
}

// Simulate the URLSession failure: drop the input stream reference without
// close(). Cross-half propagation does not fire — output stays `.open`.
inputStream = nil

// Explicitly cancel the bridge
bridge.cancel(error: CancellationError())
}()

try await waitForCondition("Bridge should dealloc when writer Task is cancelled") { weakBridge == nil }
try await waitForCondition("Bride cleanup should close output stream") { outputStream.streamStatus == .closed }

}

// Wait for given condition to be true within wait duration.
private func waitForCondition(_ label: String, within waitDuration: TimeInterval = 1.0, condition: () -> Bool)
async throws
{
let deadline = Date().addingTimeInterval(waitDuration)
while Date() < deadline {
if condition() { return }
try await Task.sleep(nanoseconds: 5_000_000)
}
XCTFail("waitForCondition timed out: \(label)")
}
}

#endif // canImport(Darwin)
Loading