Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/interrupt-on-disconnect-fastapi.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@e2b/code-interpreter-template': patch
---

Interrupt the kernel when the HTTP client disconnects mid-execution so the per-context lock is released and subsequent executions aren't blocked (#213). On the latest FastAPI (0.136.3) / Starlette (1.2.1), `StreamingResponse` no longer cancels the response body iterator on `http.disconnect` (ASGI spec 2.4+), so the server now detects the disconnect itself by polling `request.is_disconnected()` while streaming and interrupts the kernel.
1 change: 1 addition & 0 deletions template/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def post_execute(request: Request, exec_request: ExecutionRequest):
exec_request.code,
env_vars=exec_request.env_vars,
access_token=request.headers.get("X-Access-Token", None),
request=request,
)
)

Expand Down
57 changes: 36 additions & 21 deletions template/server/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Union,
)
from pydantic import StrictStr
from starlette.requests import Request
from websockets.client import WebSocketClientProtocol, connect
from websockets.exceptions import (
ConnectionClosedError,
Expand All @@ -39,6 +40,10 @@
KEEPALIVE_INTERVAL = 5 # seconds between keepalive pings during streaming


class _ClientDisconnected(Exception):
"""Raised internally when the HTTP client disconnects mid-execution (#213)."""


class Execution:
def __init__(self, in_background: bool = False):
self.queue = Queue[
Expand Down Expand Up @@ -251,27 +256,29 @@ async def _cleanup_env_vars(self, env_vars: Dict[StrictStr, str]):
finally:
del self._executions[message_id]

async def _wait_for_result(self, message_id: str):
async def _wait_for_result(
self, message_id: str, request: Optional[Request] = None
):
queue = self._executions[message_id].queue

# Use a timeout on queue.get() to periodically send keepalives.
# Without keepalives, the generator blocks indefinitely waiting for
# kernel output. If the client silently disappears (e.g. network
# failure), uvicorn can only detect the broken connection when it
# tries to write — so we force a write every KEEPALIVE_INTERVAL
# seconds. This ensures timely disconnect detection and kernel
# interrupt for abandoned executions (see #213).
# Wait with a timeout so that, even when the kernel emits no output, we
# periodically poll for client disconnects and write a keepalive. The
# latest Starlette no longer cancels this generator on disconnect, so
# an orphaned execution would otherwise keep holding self._lock (#213).
while True:
try:
output = await asyncio.wait_for(queue.get(), timeout=KEEPALIVE_INTERVAL)
except asyncio.TimeoutError:
# Yield a keepalive so Starlette writes to the socket.
# If the client has disconnected, the write fails and
# uvicorn delivers http.disconnect, which cancels this
# generator via CancelledError.
if request is not None and await request.is_disconnected():
raise _ClientDisconnected()
yield {"type": "keepalive"}
continue

# Also check before forwarding output, in case the client left
# while the kernel was actively streaming.
if request is not None and await request.is_disconnected():
raise _ClientDisconnected()

if output.type == OutputType.END_OF_EXECUTION:
break

Expand Down Expand Up @@ -320,6 +327,7 @@ async def execute(
code: Union[str, StrictStr],
env_vars: Dict[StrictStr, str],
access_token: str,
request: Optional[Request] = None,
):
if self._ws is None:
raise Exception("WebSocket not connected")
Expand Down Expand Up @@ -368,10 +376,12 @@ async def execute(
logger.info(
f"Sending code for the execution ({message_id}): {complete_code}"
)
request = self._get_execute_request(
# Don't rebind `request`: it holds the Starlette Request
# we poll for disconnects below (#213).
execute_request = self._get_execute_request(
message_id, complete_code, False
)
await self._ws.send(request)
await self._ws.send(execute_request)
break
except (ConnectionClosedError, WebSocketException) as e:
# Keep the last result, even if error
Expand All @@ -392,22 +402,27 @@ async def execute(
)
await execution.queue.put(UnexpectedEndOfExecution())

# Stream the results.
# If the client disconnects (Starlette cancels the task), we
# interrupt the kernel so the next execution isn't blocked (#213).
# Stream the results. On client disconnect we interrupt the kernel
# so the lock is released and the next execution isn't blocked
# (#213). The disconnect surfaces either as _ClientDisconnected
# (latest Starlette, raised by _wait_for_result) or as
# CancelledError/GeneratorExit (older Starlette / generator teardown).
try:
async for item in self._wait_for_result(message_id):
async for item in self._wait_for_result(message_id, request=request):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve the FastAPI request for disconnect polling

For any execution that produces no output for KEEPALIVE_INTERVAL (for example time.sleep(10)), the local request has already been rebound above to the JSON websocket payload string, so this call passes a str into _wait_for_result. On the first timeout _wait_for_result calls await request.is_disconnected(), which raises AttributeError and aborts the stream instead of continuing or interrupting on an actual disconnect. Keep the FastAPI Request in a separate variable from the Jupyter execute payload.

Useful? React with 👍 / 👎.

Comment thread
cursor[bot] marked this conversation as resolved.
Comment on lines 403 to +411
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Critical: The new request: Optional[Request] = None parameter on execute() is shadowed by the local assignment at line 415 — request = self._get_execute_request(message_id, complete_code, False) — which returns a JSON str. When self._wait_for_result(message_id, request=request) then runs, request.is_disconnected() on the string raises AttributeError on the first 5s keepalive tick. AttributeError isn't in execute()'s except (CancelledError, GeneratorExit, _ClientDisconnected) tuple, so it propagates, the kernel is never interrupted, and the very test_subsequent_execution_works_after_client_timeout test this PR exists to fix will still fail. Fix: rename the local payload variable (e.g. exec_payload).

Extended reasoning...

What the bug is

The PR adds a new request: Optional[Request] = None parameter to ContextWebSocket.execute() (messaging.py:341) so that _wait_for_result can poll request.is_disconnected() and raise _ClientDisconnected to interrupt the kernel. The problem is that inside execute()'s retry loop at lines 411-415, the local variable name request is reused for the JSON payload to send over the WebSocket:

request = self._get_execute_request(message_id, complete_code, False)
await self._ws.send(request)

_get_execute_request is annotated -> str and returns json.dumps(...). This rebinds the local name request to a string, shadowing the Request parameter for the remainder of the function.

How it manifests

At line 421, async for item in self._wait_for_result(message_id, request=request): then passes the JSON string (not the original Starlette Request) into _wait_for_result. Inside _wait_for_result (lines 295-296):

if request is not None and await request.is_disconnected():
    raise _ClientDisconnected()

A non-empty string is truthy, so Python evaluates request.is_disconnected — strings have no such attribute, so Python raises AttributeError: 'str' object has no attribute 'is_disconnected' before await even runs.

Why existing code doesn't prevent it

execute()'s exception handler only catches (asyncio.CancelledError, GeneratorExit, _ClientDisconnected) (line 423). AttributeError is not in that tuple, so it escapes the generator and propagates up through StreamingListJsonResponse, breaking the streaming response entirely. The kernel-interrupt path on line 427 never runs.

Impact

The KEEPALIVE_INTERVAL is 5 seconds, so any execution that takes longer than 5 seconds will trigger the first timeout and immediately crash with AttributeError. This means:

  1. The disconnect-detection mechanism this PR adds is dead on arrival — _ClientDisconnected is never raised in normal flow.
  2. The keepalive mechanism itself is now broken for any execution > 5s.
  3. The kernel is never interrupted on client disconnect, leaving the failing-test scenario (test_subsequent_execution_works_after_client_timeout, which sleeps 300s) unfixed.

Step-by-step proof

  1. Client POST /execute with code that runs long enough to require keepalives (e.g. time.sleep(300)).
  2. post_execute (main.py:126) calls ws.execute(..., request=request), where request is the Starlette Request.
  3. Inside execute(), the for-loop body executes request = self._get_execute_request(message_id, complete_code, False) at line 415. request is now '{"header": {...}, ...}' (a JSON string).
  4. await self._ws.send(request) at line 416 sends that string, then break exits the loop with request still bound to the string.
  5. Line 421 calls self._wait_for_result(message_id, request=request) — passing the string.
  6. After 5 seconds with no kernel output, asyncio.wait_for(queue.get(), timeout=5) raises TimeoutError.
  7. The handler evaluates if request is not None and await request.is_disconnected():. The string is non-None, so it accesses request.is_disconnectedAttributeError.
  8. The AttributeError propagates out of _wait_for_result into execute()'s try block.
  9. The except (CancelledError, GeneratorExit, _ClientDisconnected) clause does not match, so it propagates out of execute(), breaking the streaming response. self.interrupt() is never called.
  10. Result: client gets a broken stream, kernel keeps running, and the next POST /execute blocks waiting for the lock (the same regression asyncio.Lock in messaging.py not released on client disconnect → cascading timeouts #213 was meant to fix).

How to fix

Rename the local payload variable so the Request parameter is preserved. Smallest patch is at lines 411-416:

exec_payload = self._get_execute_request(message_id, complete_code, False)
await self._ws.send(exec_payload)

(_cleanup_env_vars and change_current_directory also use a local named request for the JSON payload, but they don't take a Request parameter, so the shadowing only matters in execute().)

yield item
except (asyncio.CancelledError, GeneratorExit):
except (asyncio.CancelledError, GeneratorExit, _ClientDisconnected) as e:
logger.warning(
f"Client disconnected during execution ({message_id}), interrupting kernel"
)
# Shield the interrupt from the ongoing cancellation so
# the HTTP request to the kernel actually completes.
# Shield so the interrupt completes even if we're being cancelled.
try:
await asyncio.shield(self.interrupt())
except asyncio.CancelledError:
pass
# We detected the disconnect ourselves: unwind cleanly so the
# lock releases. A real cancellation/teardown must propagate.
if isinstance(e, _ClientDisconnected):
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write disconnect not handled

Medium Severity

The streaming disconnect handler only catches CancelledError, GeneratorExit, and internal _ClientDisconnected. On Starlette ≥ 1.0 a dead client can also surface as connection errors (including Starlette’s ClientDisconnect) while flushing a yielded chunk, so those paths skip interrupt() even though the finally block drops the execution entry.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cf6255b. Configure here.

raise
finally:
if message_id in self._executions:
Expand Down
2 changes: 1 addition & 1 deletion template/server/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
fastapi==0.111.0
fastapi==0.136.3
httpx==0.28.1
websockets==12.0
uvicorn[standard]==0.30.1
Expand Down
Loading