Skip to content

Commit f87199d

Browse files
committed
Drop stale unavailable pool connections
1 parent 10a6582 commit f87199d

6 files changed

Lines changed: 206 additions & 0 deletions

File tree

httpcore/_async/connection_pool.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ async def handle_async_request(self, request: Request) -> Response:
242242
#
243243
# In this case we clear the connection and try again.
244244
pool_request.clear_connection()
245+
with self._optional_thread_lock:
246+
closing = []
247+
# If the connection still claims to be available then
248+
# it would be immediately assigned again, so drop it.
249+
if (
250+
connection in self._connections
251+
and connection.is_available()
252+
):
253+
self._connections.remove(connection)
254+
closing.append(connection)
255+
await self._close_connections(closing)
245256
else:
246257
break # pragma: nocover
247258

httpcore/_async/http2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def can_handle_request(self, origin: Origin) -> bool:
511511
def is_available(self) -> bool:
512512
return (
513513
self._state != HTTPConnectionState.CLOSED
514+
and self._connection_terminated is None
514515
and not self._connection_error
515516
and not self._used_all_stream_ids
516517
and not (

httpcore/_sync/connection_pool.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ def handle_request(self, request: Request) -> Response:
242242
#
243243
# In this case we clear the connection and try again.
244244
pool_request.clear_connection()
245+
with self._optional_thread_lock:
246+
closing = []
247+
# If the connection still claims to be available then
248+
# it would be immediately assigned again, so drop it.
249+
if (
250+
connection in self._connections
251+
and connection.is_available()
252+
):
253+
self._connections.remove(connection)
254+
closing.append(connection)
255+
self._close_connections(closing)
245256
else:
246257
break # pragma: nocover
247258

httpcore/_sync/http2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def can_handle_request(self, origin: Origin) -> bool:
511511
def is_available(self) -> bool:
512512
return (
513513
self._state != HTTPConnectionState.CLOSED
514+
and self._connection_terminated is None
514515
and not self._connection_error
515516
and not self._used_all_stream_ids
516517
and not (

tests/_async/test_connection_pool.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,97 @@ async def trace(name, kwargs):
451451
]
452452

453453

454+
@pytest.mark.anyio
455+
async def test_connection_pool_drops_stale_available_connection():
456+
"""
457+
If a connection claims to be available but then refuses a request,
458+
the pool should not keep assigning requests to that stale connection.
459+
"""
460+
461+
class StaleConnection:
462+
def __init__(self) -> None:
463+
self.closed = False
464+
465+
async def handle_async_request(self, request):
466+
raise httpcore.ConnectionNotAvailable()
467+
468+
def can_handle_request(self, origin):
469+
return True
470+
471+
def is_available(self):
472+
return True
473+
474+
def has_expired(self):
475+
return False
476+
477+
def is_idle(self):
478+
return True
479+
480+
def is_closed(self):
481+
return False
482+
483+
async def aclose(self):
484+
self.closed = True
485+
486+
def info(self):
487+
return "STALE"
488+
489+
class SuccessConnection:
490+
def __init__(self) -> None:
491+
self.closed = False
492+
493+
async def handle_async_request(self, request):
494+
async def content() -> typing.AsyncIterator[bytes]:
495+
yield b"ok"
496+
497+
return httpcore.Response(200, content=content())
498+
499+
def can_handle_request(self, origin):
500+
return True
501+
502+
def is_available(self):
503+
return True
504+
505+
def has_expired(self):
506+
return False
507+
508+
def is_idle(self):
509+
return True
510+
511+
def is_closed(self):
512+
return False
513+
514+
async def aclose(self):
515+
self.closed = True
516+
517+
def info(self):
518+
return "OK"
519+
520+
class Pool(httpcore.AsyncConnectionPool):
521+
def __init__(self) -> None:
522+
super().__init__()
523+
self.stale_connection = StaleConnection()
524+
self.success_connection = SuccessConnection()
525+
self._created = 0
526+
527+
def create_connection(self, origin):
528+
self._created += 1
529+
if self._created == 1:
530+
return self.stale_connection
531+
return self.success_connection
532+
533+
pool = Pool()
534+
async with pool:
535+
response = await pool.request("GET", "https://example.com/")
536+
537+
assert response.status == 200
538+
assert response.content == b"ok"
539+
assert pool.stale_connection.closed
540+
assert len(pool.connections) == 1
541+
connection = typing.cast(typing.Any, pool.connections[0])
542+
assert connection is pool.success_connection
543+
544+
454545
@pytest.mark.anyio
455546
async def test_connection_pool_with_immediate_expiry():
456547
"""

tests/_sync/test_connection_pool.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,97 @@ def trace(name, kwargs):
452452

453453

454454

455+
def test_connection_pool_drops_stale_available_connection():
456+
"""
457+
If a connection claims to be available but then refuses a request,
458+
the pool should not keep assigning requests to that stale connection.
459+
"""
460+
461+
class StaleConnection:
462+
def __init__(self) -> None:
463+
self.closed = False
464+
465+
def handle_request(self, request):
466+
raise httpcore.ConnectionNotAvailable()
467+
468+
def can_handle_request(self, origin):
469+
return True
470+
471+
def is_available(self):
472+
return True
473+
474+
def has_expired(self):
475+
return False
476+
477+
def is_idle(self):
478+
return True
479+
480+
def is_closed(self):
481+
return False
482+
483+
def close(self):
484+
self.closed = True
485+
486+
def info(self):
487+
return "STALE"
488+
489+
class SuccessConnection:
490+
def __init__(self) -> None:
491+
self.closed = False
492+
493+
def handle_request(self, request):
494+
def content() -> typing.Iterator[bytes]:
495+
yield b"ok"
496+
497+
return httpcore.Response(200, content=content())
498+
499+
def can_handle_request(self, origin):
500+
return True
501+
502+
def is_available(self):
503+
return True
504+
505+
def has_expired(self):
506+
return False
507+
508+
def is_idle(self):
509+
return True
510+
511+
def is_closed(self):
512+
return False
513+
514+
def close(self):
515+
self.closed = True
516+
517+
def info(self):
518+
return "OK"
519+
520+
class Pool(httpcore.ConnectionPool):
521+
def __init__(self) -> None:
522+
super().__init__()
523+
self.stale_connection = StaleConnection()
524+
self.success_connection = SuccessConnection()
525+
self._created = 0
526+
527+
def create_connection(self, origin):
528+
self._created += 1
529+
if self._created == 1:
530+
return self.stale_connection
531+
return self.success_connection
532+
533+
pool = Pool()
534+
with pool:
535+
response = pool.request("GET", "https://example.com/")
536+
537+
assert response.status == 200
538+
assert response.content == b"ok"
539+
assert pool.stale_connection.closed
540+
assert len(pool.connections) == 1
541+
connection = typing.cast(typing.Any, pool.connections[0])
542+
assert connection is pool.success_connection
543+
544+
545+
455546
def test_connection_pool_with_immediate_expiry():
456547
"""
457548
Connection pools with keepalive_expiry=0.0 should immediately expire

0 commit comments

Comments
 (0)