Skip to content

Commit 8d7ca08

Browse files
committed
Drop stale unavailable pool connections
1 parent 10a6582 commit 8d7ca08

6 files changed

Lines changed: 200 additions & 0 deletions

File tree

httpcore/_async/connection_pool.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ async def handle_async_request(self, request: Request) -> Response:
242242
#
243243
# In this case we clear the connection and try again.
244244
pool_request.clear_connection()
245+
with self._optional_thread_lock:
246+
closing = []
247+
# If the connection still claims to be available then
248+
# it would be immediately assigned again, so drop it.
249+
if (
250+
connection in self._connections
251+
and connection.is_available()
252+
):
253+
self._connections.remove(connection)
254+
closing.append(connection)
255+
await self._close_connections(closing)
245256
else:
246257
break # pragma: nocover
247258

httpcore/_async/http2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def can_handle_request(self, origin: Origin) -> bool:
511511
def is_available(self) -> bool:
512512
return (
513513
self._state != HTTPConnectionState.CLOSED
514+
and self._connection_terminated is None
514515
and not self._connection_error
515516
and not self._used_all_stream_ids
516517
and not (

httpcore/_sync/connection_pool.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ def handle_request(self, request: Request) -> Response:
242242
#
243243
# In this case we clear the connection and try again.
244244
pool_request.clear_connection()
245+
with self._optional_thread_lock:
246+
closing = []
247+
# If the connection still claims to be available then
248+
# it would be immediately assigned again, so drop it.
249+
if (
250+
connection in self._connections
251+
and connection.is_available()
252+
):
253+
self._connections.remove(connection)
254+
closing.append(connection)
255+
self._close_connections(closing)
245256
else:
246257
break # pragma: nocover
247258

httpcore/_sync/http2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def can_handle_request(self, origin: Origin) -> bool:
511511
def is_available(self) -> bool:
512512
return (
513513
self._state != HTTPConnectionState.CLOSED
514+
and self._connection_terminated is None
514515
and not self._connection_error
515516
and not self._used_all_stream_ids
516517
and not (

tests/_async/test_connection_pool.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,96 @@ async def trace(name, kwargs):
451451
]
452452

453453

454+
@pytest.mark.anyio
455+
async def test_connection_pool_drops_stale_available_connection():
456+
"""
457+
If a connection claims to be available but then refuses a request,
458+
the pool should not keep assigning requests to that stale connection.
459+
"""
460+
461+
class StaleConnection:
462+
def __init__(self) -> None:
463+
self.closed = False
464+
465+
async def handle_async_request(self, request):
466+
raise httpcore.ConnectionNotAvailable()
467+
468+
def can_handle_request(self, origin):
469+
return True
470+
471+
def is_available(self):
472+
return True
473+
474+
def has_expired(self):
475+
return False
476+
477+
def is_idle(self):
478+
return True
479+
480+
def is_closed(self):
481+
return False
482+
483+
async def aclose(self):
484+
self.closed = True
485+
486+
def info(self):
487+
return "STALE"
488+
489+
class SuccessConnection:
490+
def __init__(self) -> None:
491+
self.closed = False
492+
493+
async def handle_async_request(self, request):
494+
async def content() -> typing.AsyncIterator[bytes]:
495+
yield b"ok"
496+
497+
return httpcore.Response(200, content=content())
498+
499+
def can_handle_request(self, origin):
500+
return True
501+
502+
def is_available(self):
503+
return True
504+
505+
def has_expired(self):
506+
return False
507+
508+
def is_idle(self):
509+
return True
510+
511+
def is_closed(self):
512+
return False
513+
514+
async def aclose(self):
515+
self.closed = True
516+
517+
def info(self):
518+
return "OK"
519+
520+
class Pool(httpcore.AsyncConnectionPool):
521+
def __init__(self) -> None:
522+
super().__init__()
523+
self.stale_connection = StaleConnection()
524+
self.success_connection = SuccessConnection()
525+
self._created = 0
526+
527+
def create_connection(self, origin):
528+
self._created += 1
529+
if self._created == 1:
530+
return self.stale_connection
531+
return self.success_connection
532+
533+
pool = Pool()
534+
async with pool:
535+
response = await pool.request("GET", "https://example.com/")
536+
537+
assert response.status == 200
538+
assert response.content == b"ok"
539+
assert pool.stale_connection.closed
540+
assert len(pool.connections) == 1
541+
assert pool.connections[0] is pool.success_connection
542+
543+
454544
@pytest.mark.anyio
455545
async def test_connection_pool_with_immediate_expiry():
456546
"""

tests/_sync/test_connection_pool.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,92 @@ def trace(name, kwargs):
451451
]
452452

453453

454+
def test_connection_pool_drops_stale_available_connection():
455+
"""
456+
If a connection claims to be available but then refuses a request,
457+
the pool should not keep assigning requests to that stale connection.
458+
"""
459+
460+
class StaleConnection:
461+
def __init__(self) -> None:
462+
self.closed = False
463+
464+
def handle_request(self, request):
465+
raise httpcore.ConnectionNotAvailable()
466+
467+
def can_handle_request(self, origin):
468+
return True
469+
470+
def is_available(self):
471+
return True
472+
473+
def has_expired(self):
474+
return False
475+
476+
def is_idle(self):
477+
return True
478+
479+
def is_closed(self):
480+
return False
481+
482+
def close(self):
483+
self.closed = True
484+
485+
def info(self):
486+
return "STALE"
487+
488+
class SuccessConnection:
489+
def __init__(self) -> None:
490+
self.closed = False
491+
492+
def handle_request(self, request):
493+
return httpcore.Response(200, content=[b"ok"])
494+
495+
def can_handle_request(self, origin):
496+
return True
497+
498+
def is_available(self):
499+
return True
500+
501+
def has_expired(self):
502+
return False
503+
504+
def is_idle(self):
505+
return True
506+
507+
def is_closed(self):
508+
return False
509+
510+
def close(self):
511+
self.closed = True
512+
513+
def info(self):
514+
return "OK"
515+
516+
class Pool(httpcore.ConnectionPool):
517+
def __init__(self) -> None:
518+
super().__init__()
519+
self.stale_connection = StaleConnection()
520+
self.success_connection = SuccessConnection()
521+
self._created = 0
522+
523+
def create_connection(self, origin):
524+
self._created += 1
525+
if self._created == 1:
526+
return self.stale_connection
527+
return self.success_connection
528+
529+
pool = Pool()
530+
with pool:
531+
response = pool.request("GET", "https://example.com/")
532+
533+
assert response.status == 200
534+
assert response.content == b"ok"
535+
assert pool.stale_connection.closed
536+
assert len(pool.connections) == 1
537+
assert pool.connections[0] is pool.success_connection
538+
539+
454540

455541
def test_connection_pool_with_immediate_expiry():
456542
"""

0 commit comments

Comments
 (0)