Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 54 additions & 41 deletions src/httpcore2/httpcore2/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,61 +261,74 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
Any closing connections are returned, allowing the I/O for closing
those connections to be handled separately.
"""
closing_connections = []
closing_connections: list[AsyncConnectionInterface] = []
retained_connections: list[AsyncConnectionInterface] = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
# First we handle cleaning up any connections that are closed
# or have expired their keep-alive, in a single pass.
for connection in self._connections:
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
continue
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and sum(connection.is_idle() for connection in self._connections) > self._max_keepalive_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)
else:
retained_connections.append(connection)

# Then we close any surplus idle connections, to enforce the
# max_keepalive_connections setting.
idle_surplus = (
sum(connection.is_idle() for connection in retained_connections) - self._max_keepalive_connections
)
if idle_surplus > 0:
kept: list[AsyncConnectionInterface] = []
for connection in retained_connections:
if idle_surplus > 0 and connection.is_idle():
closing_connections.append(connection)
idle_surplus -= 1
else:
kept.append(connection)
retained_connections = kept

self._connections = retained_connections

# Snapshot the set of reusable connections once, rather than rebuilding
# it per queued request — this is what brings the loop from O(N*M) to
# O(N+M) in the common case.
available_connections = [connection for connection in self._connections if connection.is_available()]
new_connection_budget = self._max_connections - len(self._connections)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in self._requests:
if not pool_request.is_queued():
continue
origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [connection for connection in self._connections if connection.is_idle()]

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
for connection in available_connections:
if connection.can_handle_request(origin):
pool_request.assign_to_connection(connection)
break
else:
if new_connection_budget > 0:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
new_connection_budget -= 1
continue
for idx, connection in enumerate(available_connections):
if connection.is_idle():
del available_connections[idx]
self._connections.remove(connection)
closing_connections.append(connection)
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
break

return closing_connections

Expand Down
95 changes: 54 additions & 41 deletions src/httpcore2/httpcore2/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,61 +261,74 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]:
Any closing connections are returned, allowing the I/O for closing
those connections to be handled separately.
"""
closing_connections = []
closing_connections: list[ConnectionInterface] = []
retained_connections: list[ConnectionInterface] = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
# First we handle cleaning up any connections that are closed
# or have expired their keep-alive, in a single pass.
for connection in self._connections:
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
continue
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and sum(connection.is_idle() for connection in self._connections) > self._max_keepalive_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)
else:
retained_connections.append(connection)

# Then we close any surplus idle connections, to enforce the
# max_keepalive_connections setting.
idle_surplus = (
sum(connection.is_idle() for connection in retained_connections) - self._max_keepalive_connections
)
if idle_surplus > 0:
kept: list[ConnectionInterface] = []
for connection in retained_connections:
if idle_surplus > 0 and connection.is_idle():
closing_connections.append(connection)
idle_surplus -= 1
else:
kept.append(connection)
retained_connections = kept

self._connections = retained_connections

# Snapshot the set of reusable connections once, rather than rebuilding
# it per queued request — this is what brings the loop from O(N*M) to
# O(N+M) in the common case.
available_connections = [connection for connection in self._connections if connection.is_available()]
new_connection_budget = self._max_connections - len(self._connections)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in self._requests:
if not pool_request.is_queued():
continue
origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [connection for connection in self._connections if connection.is_idle()]

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
for connection in available_connections:
if connection.can_handle_request(origin):
pool_request.assign_to_connection(connection)
break
else:
if new_connection_budget > 0:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
new_connection_budget -= 1
continue
for idx, connection in enumerate(available_connections):
if connection.is_idle():
del available_connections[idx]
self._connections.remove(connection)
closing_connections.append(connection)
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
break

return closing_connections

Expand Down
37 changes: 37 additions & 0 deletions tests/httpcore2/_async/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,43 @@ async def test_connection_pool_with_no_keepalive_connections_allowed() -> None:
assert info == []


@pytest.mark.anyio
async def test_connection_pool_closes_idle_connection_for_different_origin() -> None:
"""
When the pool is at 'max_connections' and an incoming request is for an
origin with no reusable connection, an IDLE connection to a different
origin is closed to make room for the new connection.
"""
network_backend = httpcore2.AsyncMockBackend(
[
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 13\r\n",
b"\r\n",
b"Hello, world!",
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 13\r\n",
b"\r\n",
b"Hello, world!",
]
)

async with httpcore2.AsyncConnectionPool(network_backend=network_backend, max_connections=1) as pool:
# An initial request to a.com leaves a single IDLE connection in the pool.
response = await pool.request("GET", "https://a.com/")
assert response.status == 200
info = [repr(c) for c in pool.connections]
assert info == ["<AsyncHTTPConnection ['https://a.com:443', HTTP/1.1, IDLE, Request Count: 1]>"]

# A request to b.com cannot reuse the a.com connection and the pool is full,
# so the IDLE a.com connection is closed and replaced with a b.com connection.
response = await pool.request("GET", "https://b.com/")
assert response.status == 200
info = [repr(c) for c in pool.connections]
assert info == ["<AsyncHTTPConnection ['https://b.com:443', HTTP/1.1, IDLE, Request Count: 1]>"]


@pytest.mark.trio
async def test_connection_pool_concurrency() -> None:
"""
Expand Down
37 changes: 37 additions & 0 deletions tests/httpcore2/_sync/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,43 @@ def test_connection_pool_with_no_keepalive_connections_allowed() -> None:



def test_connection_pool_closes_idle_connection_for_different_origin() -> None:
"""
When the pool is at 'max_connections' and an incoming request is for an
origin with no reusable connection, an IDLE connection to a different
origin is closed to make room for the new connection.
"""
network_backend = httpcore2.MockBackend(
[
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 13\r\n",
b"\r\n",
b"Hello, world!",
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 13\r\n",
b"\r\n",
b"Hello, world!",
]
)

with httpcore2.ConnectionPool(network_backend=network_backend, max_connections=1) as pool:
# An initial request to a.com leaves a single IDLE connection in the pool.
response = pool.request("GET", "https://a.com/")
assert response.status == 200
info = [repr(c) for c in pool.connections]
assert info == ["<HTTPConnection ['https://a.com:443', HTTP/1.1, IDLE, Request Count: 1]>"]

# A request to b.com cannot reuse the a.com connection and the pool is full,
# so the IDLE a.com connection is closed and replaced with a b.com connection.
response = pool.request("GET", "https://b.com/")
assert response.status == 200
info = [repr(c) for c in pool.connections]
assert info == ["<HTTPConnection ['https://b.com:443', HTTP/1.1, IDLE, Request Count: 1]>"]



def test_connection_pool_concurrency() -> None:
"""
HTTP/1.1 requests made in concurrency must not ever exceed the maximum number
Expand Down
Loading