1515import asyncio
1616from medimgkit .format_detection import GZIP_MIME_TYPES , DEFAULT_MIME_TYPE , guess_typez , guess_extension
1717from datamint .utils .env import ensure_asyncio_loop
18+ import os
1819
1920if TYPE_CHECKING :
2021 from datamint .api .client import Api
@@ -261,6 +262,7 @@ def _stream_request(self, method: str, endpoint: str, **kwargs):
261262 for chunk in response.iter_bytes():
262263 process_chunk(chunk)
263264 """
265+ self ._ensure_client_fresh ()
264266 url = endpoint .lstrip ('/' ) # Remove leading slash for httpx
265267
266268 try :
@@ -269,6 +271,36 @@ def _stream_request(self, method: str, endpoint: str, **kwargs):
269271 logger .error (f"Request error for streaming { method } { endpoint } : { e } " )
270272 raise
271273
274+ def _ensure_client_fresh (self ) -> None :
275+ """Recreate the HTTP client if the process has been forked.
276+
277+ PyTorch DataLoader forks worker processes that inherit the parent's
278+ httpx.Client with its open connection pool. Those inherited connections
279+ are in a corrupt state (the server sees the TCP socket closed/reused
280+ and replies with chunked data the worker misinterprets as an HTTP
281+ status line). Detecting a PID change and recreating the client
282+ avoids this issue.
283+ """
284+ current_pid = os .getpid ()
285+ if current_pid != self ._pid :
286+ logger .debug (
287+ f"PID changed from { self ._pid } to { current_pid } (DataLoader fork detected). "
288+ "Recreating httpx client."
289+ )
290+ # Close the inherited client without going through our normal
291+ # close() path (which would also close the aiohttp session).
292+ try :
293+ self .client .close ()
294+ except Exception :
295+ pass
296+ self .client = BaseApi ._create_client (self .config )
297+ self ._owns_client = True
298+ self ._pid = current_pid
299+ # Invalidate any inherited aiohttp session as well.
300+ self ._aiohttp_session = None
301+ self ._aiohttp_connector = None
302+
303+
272304 def _make_request (self , method : str , endpoint : str , ** kwargs ) -> httpx .Response :
273305 """Make HTTP request with error handling and retries.
274306
@@ -283,6 +315,7 @@ def _make_request(self, method: str, endpoint: str, **kwargs) -> httpx.Response:
283315 Raises:
284316 httpx.HTTPStatusError: If the request fails
285317 """
318+ self ._ensure_client_fresh ()
286319 url = endpoint .lstrip ('/' ) # Remove leading slash for httpx
287320
288321 if logger .isEnabledFor (logging .DEBUG ):
0 commit comments