Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/pytest_data_loader/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
import warnings
from collections.abc import Generator
from collections.abc import Generator, Iterator
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast

Expand Down Expand Up @@ -59,6 +59,8 @@ class DataLoaderFixture:
Call it with a file path (absolute, or relative to the nearest data directory) to load a single
file at test runtime. Accepts the same reader, onload, and open() read options as @load.
Repeated calls with the same arguments within a single test return the cached result without re-reading the file.
One-shot iterators (e.g. generators from a .jsonl reader) are intentionally not cached so each call
returns a fresh iterator.
"""

def __init__(
Expand Down Expand Up @@ -128,7 +130,9 @@ def __call__(
loaded = file_loader.load()
assert isinstance(loaded, LoadedData)
data = loaded.data
self._cache[cache_key] = data
if not isinstance(data, Iterator):
# Skip caching one-shot iterators: the same exhausted object would be returned on the next call.
self._cache[cache_key] = data
return data
except DataNotFound as e:
return self._handle_missing_data(cache_key, e)
Expand Down
6 changes: 3 additions & 3 deletions src/pytest_data_loader/loaders/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def get_content(self, key: CacheKey, on_miss: Callable[[], str | bytes]) -> str
# (includes interpreter overhead, varies with code-point width) — deliberate: exact for
# binary, close-enough for text, consistent cap enforcement across both.
byte_size = len(data) if isinstance(data, (bytes, bytearray)) else sys.getsizeof(data)
if byte_size <= self._max_content_bytes:
if self._max_content_bytes > 0 and byte_size <= self._max_content_bytes:
while self._content and self._content_bytes + byte_size > self._max_content_bytes:
_, (_, evicted_size) = self._content.popitem(last=False)
self._content_bytes -= evicted_size
Expand Down Expand Up @@ -125,14 +125,14 @@ def get_handle(self, key: CacheKey, on_miss: Callable[[], IO[Any]]) -> IO[Any]:
if key in self._handles:
del self._handles[key]

# New handle opened before evicting the LRU — pool briefly holds max+1 fds.
f = on_miss()
# Evict the LRU before opening so the pool never exceeds max_open_handles.
if len(self._handles) >= self._max_open_handles:
_, lru = self._handles.popitem(last=False)
try:
lru.close()
except Exception:
logger.exception("Failed to close evicted file handle")
f = on_miss()
self._handles[key] = f
return f

Expand Down
20 changes: 12 additions & 8 deletions src/pytest_data_loader/loaders/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,9 @@ def _load_now(self, skip_processor: bool = False, cache: bool = False) -> Loaded
data = self.load_attrs.process_func(gidx, self.path, data)
loaded_data = LoadedData(file_path=self.path, loaded_from=self.load_from, data=data, gidx=gidx)

if cache:
if cache and not (isinstance(loaded_data, LoadedData) and isinstance(loaded_data.data, Iterator)):
# Skip caching one-shot iterators: a generator returned by a file_reader cannot be replayed,
# so a second resolve() would receive an exhausted object.
self._loaded_data = loaded_data

return loaded_data
Expand Down Expand Up @@ -490,18 +492,17 @@ def _get_file_obj(self) -> IO[Any]:
# Use the same options as _scan_text_file (text mode, no explicit mode resolution) so
# that byte positions recorded during scanning remain valid for this handle.
read_options = self._effective_read_options()
return self._file_cache.get_handle(
f = self._file_cache.get_handle(
self._build_session_cache_key(read_options),
on_miss=lambda: compression_aware_open(self.path, **read_options),
)
elif self._file_handles and not self._file_handles[0].closed:
f = self._file_handles[0]
f.seek(0)
return f
else:
f = compression_aware_open(self.path, **self._effective_read_options())
self._file_handles[:] = [f]
return f
f.seek(0)
return f

@requires_loader(DataLoaderType.PARAMETRIZE)
def _scan_text_file(self) -> Generator[tuple[int, int, Any, Any]]:
Expand Down Expand Up @@ -618,8 +619,11 @@ def _effective_read_options(self, *, mode: str | None = None) -> dict[str, Any]:
:param mode: Optional explicit mode to merge into the returned options.
"""
options: dict[str, Any] = dict(self.read_options)
if mode is not None:
if mode:
options["mode"] = mode
# normalize "r" and "rt" to "r"
if options.get("mode") == "rt":
options["mode"] = "r"
effective_mode = options.get("mode") or "r"
if "b" not in effective_mode and "encoding" not in options:
options["encoding"] = self._effective_encoding
Expand Down Expand Up @@ -771,8 +775,8 @@ def _close_files(file_handlers: list[IO[Any]]) -> None:
for f in file_handlers:
try:
f.close()
except Exception as e:
logger.exception(e)
except Exception:
logger.exception("Failed to close file handle")
file_handlers.clear()


Expand Down
21 changes: 21 additions & 0 deletions tests/tests_plugin/test_data_loader_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,27 @@ def test_load(data_loader):
else:
assert "UserWarning: DataNotFound:" not in str(result.stdout)

def test_jsonl_repeated_call_returns_fresh_generator(self, pytester: Pytester, data_dir: Path) -> None:
"""Test that calling data_loader twice for the same JSONL file yields a fresh iterator each time.

The DataLoaderFixture._cache must not store a one-shot generator: the second call would return
the already-exhausted object and list(data) would be empty.
"""
(data_dir / "file.jsonl").write_text('{"k": 1}\n{"k": 2}\n')

pytester.makepyfile("""
def test_load(data_loader):
data1 = data_loader("file.jsonl")
items1 = list(data1)
data2 = data_loader("file.jsonl") # must not be the same exhausted generator
items2 = list(data2)
assert items1 == [{"k": 1}, {"k": 2}], f"First call: {items1!r}"
assert items2 == [{"k": 1}, {"k": 2}], f"Second call: {items2!r}"
""")
result = pytester.runpytest("-v")
assert result.ret == ExitCode.OK
result.assert_outcomes(passed=1)

def test_on_missing_warn_no_duplicate_warning(self, pytester: Pytester, data_dir: Path) -> None:
"""Test that calling data_loader twice for the same missing path emits only one warning."""
path = str(data_dir / "does_not_exist.txt")
Expand Down
30 changes: 30 additions & 0 deletions tests/tests_plugin/test_lazy_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,33 @@ def pytest_terminal_summary():
f"Eager loading should use at least the data size. "
f"total_param_size={total_param_size}, data_size_bytes={data_size_bytes}"
)


class TestGeneratorCaching:
"""Test that one-shot iterator values from file readers are not cached across resolve calls."""

def test_jsonl_load_with_stacked_parametrize_all_items_get_full_data(self, pytester: Pytester) -> None:
"""Test that all stacked-parametrize items each receive the full JSONL iterator.

When a LazyLoadedData produced by @load is shared across N test items (cartesian product with a
stacked @pytest.mark.parametrize), each item's resolve() must yield a fresh generator — not a
replayed exhausted one from the _loaded_data cache.
"""
data_dir = pytester.mkdir("data")
(data_dir / "file.jsonl").write_text('{"k": 1}\n{"k": 2}\n')

pytester.makepyfile("""
import pytest
from pytest_data_loader import load

@load("data", "file.jsonl")
@pytest.mark.parametrize("x", [1, 2, 3])
def test_load_stacked(data, x):
items = list(data)
assert len(items) == 2, f"Expected 2 items for x={x}, got {items!r}"
assert items[0] == {"k": 1}
assert items[1] == {"k": 2}
""")
result = pytester.runpytest("-v")
assert result.ret == ExitCode.OK
result.assert_outcomes(passed=3)
4 changes: 1 addition & 3 deletions tests/tests_plugin/test_loader_stacking.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,13 @@ def test_func(columns, row, item):
...

def test_open_counts():
# @load: opened exactly once at first test setup (subsequent tests reuse it via lru_cache)
# @load: opened exactly once at first test setup
assert counter.get("columns.txt", 0) == 1

# @parametrize: 1 scan open (collection) + 1 lazy-load open
# (subsequent tests reuse it via _cached_file_objects)
assert counter.get("rows.txt", 0) == 2

# @parametrize_dir: Each dir file is opened exactly once at first test setup
# (subsequent tests reuse it via lru_cache)
for file in ("a.txt", "b.txt"):
assert counter.get(file, 0) == 1
""")
Expand Down
63 changes: 60 additions & 3 deletions tests/tests_unit/test_file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections.abc import Callable
from pathlib import Path
from typing import Any
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest

Expand Down Expand Up @@ -402,8 +402,12 @@ def test_lazy_loading_cache_state_transitions(self, loader: DataLoader, path: Pa
else:
assert isinstance(lazy_loaded_data, LazyLoadedData)
lazy_loaded_data.resolve()
# @load and @parametrize_dir: resolve calls _load_now with cache=True, populating _loaded_data.
assert file_loader._loaded_data is not None
# @load and @parametrize_dir: resolve calls _load_now(cache=True), which populates _loaded_data
# unless the file reader returns a one-shot iterator (e.g. .jsonl), which is intentionally skipped.
if get_effective_suffix(abs_file_path) == ".jsonl":
assert file_loader._loaded_data is None
else:
assert file_loader._loaded_data is not None

# clear_cache resets _loaded_data and closes the open file handle.
file_loader.clear_cache()
Expand Down Expand Up @@ -747,3 +751,56 @@ def test_compressed_uppercase_suffix_is_routed(self, tmp_path: Path, ext: str) -
loaded = file_loader.load()
assert isinstance(loaded, LoadedData)
assert loaded.data == payload


class TestEffectiveReadOptionsNormalization:
"""Tests for FileLoader._effective_read_options mode normalization."""

@pytest.fixture
def txt_loader(self, tmp_path: Path) -> FileLoader:
"""Return a plain text FileLoader with no explicit read options."""
p = tmp_path / "f.txt"
p.write_text("hello")
load_attrs = DataLoaderLoadAttrs(
loader=load,
search_from=Path(__file__),
fixture_names=("data",),
path=p,
lazy_loading=False,
)
return FileLoader(p, load_attrs)

def test_mode_rt_normalizes_to_r(self, txt_loader: FileLoader) -> None:
"""Test that _effective_read_options normalizes mode 'rt' to 'r' in the returned dict.

Regression: before normalization, explicit mode='rt' and auto-detected mode='r' produced
different cache keys for the same file even though both open() calls are identical.
"""
opts = txt_loader._effective_read_options(mode="rt")
assert opts.get("mode") == "r", f"Expected mode='r', got {opts.get('mode')!r}"

def test_mode_rt_and_mode_r_produce_same_session_cache_key(self, tmp_path: Path) -> None:
"""Test that loaders with read_options mode='rt' and mode='r' share a content cache entry."""
p = tmp_path / "f.txt"
p.write_text("hello")

def make_loader(read_options: dict) -> FileLoader:
attrs = DataLoaderLoadAttrs(
loader=load,
search_from=Path(__file__),
fixture_names=("data",),
path=p,
lazy_loading=False,
read_options=HashableDict(read_options),
)
return FileLoader(p, attrs)

fl_rt = make_loader({"mode": "rt"})
fl_r = make_loader({"mode": "r"})

cache = SessionFileCache()
on_miss = MagicMock(return_value="hello")
# _read_file passes the resolved mode; simulate that with explicit mode="r" for both
cache.get_content(fl_rt._build_session_cache_key(fl_rt._effective_read_options(mode="rt")), on_miss)
cache.get_content(fl_r._build_session_cache_key(fl_r._effective_read_options(mode="r")), on_miss)
assert on_miss.call_count == 1, "'rt' and 'r' must resolve to the same content cache key"
38 changes: 38 additions & 0 deletions tests/tests_unit/test_session_file_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,20 @@ def test_content_bytes_tracked_correctly(self) -> None:
cache.get_content(_content_key(path="/b.txt"), lambda: "world!")
assert cache._content_bytes == sys.getsizeof("hello") + sys.getsizeof("world!")

def test_max_content_bytes_zero_does_not_cache_empty_bytes(self) -> None:
"""Test that max_content_bytes=0 never stores anything, including zero-byte content.

Regression: len(b"") == 0, so the guard ``byte_size <= max_content_bytes`` was True when
max_content_bytes=0, causing an empty binary file to be cached despite caching being disabled.
"""
cache = SessionFileCache(max_content_bytes=0)
key = _content_key(read_options={"mode": "rb"})
on_miss = MagicMock(return_value=b"")
cache.get_content(key, on_miss)
cache.get_content(key, on_miss)
assert on_miss.call_count == 2, "on_miss must be called every time when caching is disabled"
assert cache._content == {}, "Nothing should be stored when max_content_bytes=0"


class TestSessionFileCacheGetHandle:
"""Tests for SessionFileCache.get_handle()"""
Expand Down Expand Up @@ -220,6 +234,30 @@ def test_different_keys_produce_independent_handles(self) -> None:
assert r1 is h1
assert r2 is h2

def test_pool_never_exceeds_max_open_handles(self) -> None:
"""Test that the pool never holds more than max_open_handles simultaneously-open handles.

Regression: the old implementation opened the new handle before evicting the LRU,
transiently holding max+1 open fds and risking EMFILE near ulimit.
"""
max_handles = 3
cache = SessionFileCache(max_open_handles=max_handles)
open_counts: list[int] = []
handles: list[StringIO] = []

def opener(name: str) -> StringIO:
h = StringIO(name)
handles.append(h)
open_counts.append(sum(1 for hh in handles if not hh.closed))
return h

for i in range(max_handles + 3):
cache.get_handle((f"/{i}.txt", 1, 10, HashableDict()), lambda n=str(i): opener(n))

assert max(open_counts) <= max_handles, (
f"Pool exceeded max_open_handles={max_handles}: peak was {max(open_counts)}"
)

def test_eviction_closes_lru_handle_when_pool_full(self) -> None:
"""Test that LRU handle is closed and removed when pool capacity is exceeded."""
cache = SessionFileCache(max_open_handles=2)
Expand Down
Loading