From a927da1643b764bccfab91a0a683697b21333c05 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Wed, 3 Jun 2026 16:43:12 +0100 Subject: [PATCH] Fix pipelined request after websocket rejection failing (#12796) --- aiohttp/web_protocol.py | 12 +++- tests/test_web_websocket_functional.py | 97 ++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 3 deletions(-) diff --git a/aiohttp/web_protocol.py b/aiohttp/web_protocol.py index 479ad931a3c..1e297111bef 100644 --- a/aiohttp/web_protocol.py +++ b/aiohttp/web_protocol.py @@ -452,7 +452,7 @@ def data_received(self, data: bytes) -> None: upgraded = False tail = b"" - for msg, payload in messages or (): + for msg, payload in messages: self._request_count += 1 self._messages.append((msg, payload)) @@ -731,8 +731,14 @@ async def finish_response( self._parser.set_upgraded(False) self._upgraded = False if self._message_tail: - self._parser.feed_data(self._message_tail) - self._message_tail = b"" + messages, _upgraded, tail = self._parser.feed_data(self._message_tail) + self._message_tail = tail + for msg, payload in messages: + self._request_count += 1 + self._messages.append((msg, payload)) + # This shouldn't be possible. If a future refactor results in this + # failing, then the code may need to be updated to set the waiter. + assert self._waiter is None try: prepare_meth = resp.prepare except AttributeError: diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index d4a1b402249..e2e76ee1776 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -30,6 +30,103 @@ async def handler(request: web.Request) -> NoReturn: assert resp.status == 426 +async def test_pipelined_request_after_failed_websocket_upgrade( + aiohttp_server: AiohttpServer, +) -> None: + """Pipelined HTTP request runs after a declined websocket upgrade. + + The parser flips into upgraded mode when it sees the ``Upgrade`` + header and buffers any trailing bytes in ``_message_tail``. If the + handler declines the upgrade, ``finish_response()`` must replay the + tail through the parser so the pipelined request is dispatched + instead of stalling until the keep-alive timeout fires. + """ + + async def upgrade_handler(request: web.Request) -> NoReturn: + raise web.HTTPUpgradeRequired() + + async def second_handler(request: web.Request) -> web.Response: + return web.Response(text="second-ok") + + app = web.Application() + app.router.add_route("GET", "/", upgrade_handler) + app.router.add_route("GET", "/second", second_handler) + server = await aiohttp_server(app) + + # Need to use a raw writer in order to send the pipelined request. + reader, writer = await asyncio.open_connection(server.host, server.port) + try: + writer.write( + b"GET / HTTP/1.1\r\n" + b"Host: localhost\r\n" + b"Upgrade: websocket\r\n" + b"Connection: Upgrade\r\n" + b"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + b"Sec-WebSocket-Version: 13\r\n" + b"\r\n" + b"GET /second HTTP/1.1\r\n" + b"Host: localhost\r\n" + b"\r\n" + ) + await writer.drain() + + # Without the fix the second request is dropped and this read hangs + data = await asyncio.wait_for(reader.readuntil(b"second-ok"), timeout=5) + finally: + writer.close() + await writer.wait_closed() + + assert b"426" in data + + +async def test_partial_pipelined_request_after_failed_websocket_upgrade( + aiohttp_server: AiohttpServer, +) -> None: + """Partial pipelined bytes are preserved across finish_response. + + Only part of the second request rides along with the upgrade, so + feed_data() in finish_response() returns no messages and the parser + keeps the partial bytes in its internal buffer. When the remainder + arrives via data_received() the request is completed and dispatched. + """ + + async def upgrade_handler(request: web.Request) -> NoReturn: + raise web.HTTPUpgradeRequired() + + async def second_handler(request: web.Request) -> web.Response: + return web.Response(text="second-ok") + + app = web.Application() + app.router.add_route("GET", "/", upgrade_handler) + app.router.add_route("GET", "/second", second_handler) + server = await aiohttp_server(app) + + reader, writer = await asyncio.open_connection(server.host, server.port) + try: + writer.write( + b"GET / HTTP/1.1\r\n" + b"Host: localhost\r\n" + b"Upgrade: websocket\r\n" + b"Connection: Upgrade\r\n" + b"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + b"Sec-WebSocket-Version: 13\r\n" + b"\r\n" + b"GET /second HTT" # truncated mid-request + ) + await writer.drain() + + first = await asyncio.wait_for(reader.readuntil(b"\r\n\r\n"), timeout=5) + assert b"426" in first + + writer.write(b"P/1.1\r\nHost: localhost\r\n\r\n") + await writer.drain() + + await asyncio.wait_for(reader.readuntil(b"second-ok"), timeout=5) + finally: + writer.close() + await writer.wait_closed() + + async def test_handshake_connection_header_substring_not_a_token( aiohttp_client: AiohttpClient, ) -> None: