Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/tagstudio/core/library/alchemy/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class SortingModeEnum(enum.Enum):
DATE_ADDED = "file.date_added"
FILE_NAME = "generic.filename"
PATH = "file.path"
SIZE = "file.size"
RANDOM = "sorting.mode.random"


Expand Down
67 changes: 56 additions & 11 deletions src/tagstudio/core/library/alchemy/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,9 @@ def search_library(
assert self.library_dir

with Session(unwrap(self.engine), expire_on_commit=False) as session:
if page_size:
is_size_sort = search.sorting_mode == SortingModeEnum.SIZE

if page_size and not is_size_sort:
statement = (
select(Entry.id, func.count().over())
.offset(search.page_index * page_size)
Expand All @@ -1054,15 +1056,16 @@ def search_library(
statement = statement.distinct(Entry.id)

sort_on: ColumnExpressionArgument = Entry.id
match search.sorting_mode:
case SortingModeEnum.DATE_ADDED:
sort_on = Entry.id
case SortingModeEnum.FILE_NAME:
sort_on = func.lower(Entry.filename)
case SortingModeEnum.PATH:
sort_on = func.lower(Entry.path)
case SortingModeEnum.RANDOM:
sort_on = func.sin(Entry.id * search.random_seed)
if not is_size_sort:
match search.sorting_mode:
case SortingModeEnum.DATE_ADDED:
sort_on = Entry.id
case SortingModeEnum.FILE_NAME:
sort_on = func.lower(Entry.filename)
case SortingModeEnum.PATH:
sort_on = func.lower(Entry.path)
case SortingModeEnum.RANDOM:
sort_on = func.sin(Entry.id * search.random_seed)

statement = statement.order_by(asc(sort_on) if search.ascending else desc(sort_on))

Expand All @@ -1073,7 +1076,7 @@ def search_library(
)

start_time = time.time()
if page_size:
if page_size and not is_size_sort:
rows = session.execute(statement).fetchall()
ids = []
total_count = 0
Expand All @@ -1086,6 +1089,12 @@ def search_library(
end_time = time.time()
logger.info(f"SQL Execution finished ({format_timespan(end_time - start_time)})")

if is_size_sort:
ids = self._sort_ids_by_file_size(ids, search.ascending)
if page_size:
start = search.page_index * page_size
ids = ids[start : start + page_size]

Comment on lines +1092 to +1097
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks when the query has a LIMIT due to the page size.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the sort logic to handle the query having page size.

res = SearchResult(
total_count=total_count,
ids=ids,
Expand All @@ -1095,6 +1104,42 @@ def search_library(

return res

def _sort_ids_by_file_size(self, ids: list[int], ascending: bool) -> list[int]:
"""Sort entry IDs by their file size on disk.

Entries whose files cannot be stat-ed (unlinked or missing) are
assigned a sentinel size of -1 and sort to the low end.

Args:
ids: Entry IDs to sort.
ascending: If True, sort smallest first.

Returns:
The same IDs re-ordered by file size.
"""
if not ids:
return ids

library_dir = unwrap(self.library_dir)

with Session(unwrap(self.engine)) as session:
rows = session.execute(
select(Entry.id, Entry.path).where(Entry.id.in_(ids))
).fetchall()

id_to_path: dict[int, Path] = {row[0]: row[1] for row in rows}

def get_size(entry_id: int) -> int:
path = id_to_path.get(entry_id)
if path is None:
return -1
try:
return (library_dir / path).stat().st_size
except OSError:
return -1

return sorted(ids, key=get_size, reverse=not ascending)

def search_tags(self, name: str | None, limit: int = 100) -> list[set[Tag]]:
"""Return a list of Tag records matching the query."""
with Session(self.engine) as session:
Expand Down
128 changes: 127 additions & 1 deletion tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
# Created for TagStudio: https://github.com/CyanVoxel/TagStudio


from pathlib import Path
from tempfile import TemporaryDirectory

import pytest
import structlog

from tagstudio.core.library.alchemy.enums import BrowsingState
from tagstudio.core.library.alchemy.enums import BrowsingState, SortingModeEnum
from tagstudio.core.library.alchemy.library import Library
from tagstudio.core.library.alchemy.models import Entry
from tagstudio.core.query_lang.util import ParsingError
from tagstudio.core.utils.types import unwrap

logger = structlog.get_logger()

Expand Down Expand Up @@ -146,3 +151,124 @@ def test_parent_tags(search_library: Library, query: str, count: int):
def test_syntax(search_library: Library, invalid_query: str):
with pytest.raises(ParsingError) as e_info: # noqa: F841 # pyright: ignore[reportUnusedVariable]
search_library.search_library(BrowsingState.from_search_query(invalid_query), page_size=500)


def _make_size_library(files: list[tuple[str, bytes]]) -> tuple[Library, TemporaryDirectory]:
"""Create a temporary library with files of known sizes.

Args:
files: List of (relative path, content) pairs.

Returns:
A tuple of (open Library, TemporaryDirectory) — caller must close the tempdir.
"""
tmp = TemporaryDirectory()
lib_path = Path(tmp.name)

lib = Library()
status = lib.open_library(lib_path)
assert status.success

folder = unwrap(lib.folder)
entries = []
for rel_path, content in files:
full = lib_path / rel_path
full.parent.mkdir(parents=True, exist_ok=True)
full.write_bytes(content)
entries.append(Entry(folder=folder, path=Path(rel_path), fields=lib.default_fields))

lib.add_entries(entries)
return lib, tmp


def test_sort_by_size_ascending():
"""Entries are returned smallest-first when sorting by size ascending."""
files = [
("large.bin", b"x" * 300),
("small.bin", b"x" * 100),
("medium.bin", b"x" * 200),
]
lib, tmp = _make_size_library(files)
try:
state = BrowsingState(sorting_mode=SortingModeEnum.SIZE, ascending=True)
results = lib.search_library(state, page_size=None)

assert results.total_count == 3
sizes = []
for entry_id in results.ids:
entry = lib.get_entry(entry_id)
assert entry is not None
sizes.append((unwrap(lib.library_dir) / entry.path).stat().st_size)

assert sizes == sorted(sizes), f"Expected ascending order, got sizes: {sizes}"
finally:
tmp.cleanup()


def test_sort_by_size_descending():
"""Entries are returned largest-first when sorting by size descending."""
files = [
("large.bin", b"x" * 300),
("small.bin", b"x" * 100),
("medium.bin", b"x" * 200),
]
lib, tmp = _make_size_library(files)
try:
state = BrowsingState(sorting_mode=SortingModeEnum.SIZE, ascending=False)
results = lib.search_library(state, page_size=None)

assert results.total_count == 3
sizes = []
for entry_id in results.ids:
entry = lib.get_entry(entry_id)
assert entry is not None
sizes.append((unwrap(lib.library_dir) / entry.path).stat().st_size)

assert (
sizes == sorted(sizes, reverse=True)
), (
f"Expected descending order, "
f"got sizes: {sizes}"
)
finally:
tmp.cleanup()


def test_sort_by_size_empty_result():
"""Sorting an empty result set returns an empty list without error."""
lib, tmp = _make_size_library([("placeholder.bin", b"x")])
try:
state = BrowsingState(
sorting_mode=SortingModeEnum.SIZE,
ascending=True,
query="tag:nonexistent_tag_xyz",
)
results = lib.search_library(state, page_size=None)
assert results.total_count == 0
assert results.ids == []
finally:
tmp.cleanup()


def test_sort_by_size_missing_file_sorts_to_start_ascending():
"""Entries with missing files (size=-1) sort to the start when ascending."""
files = [
("exists.bin", b"x" * 200),
]
lib, tmp = _make_size_library(files)
try:
folder = unwrap(lib.folder)
# Add an entry for a file that doesn't exist on disk
ghost = Entry(folder=folder, path=Path("ghost.bin"), fields=lib.default_fields)
lib.add_entries([ghost])

state = BrowsingState(sorting_mode=SortingModeEnum.SIZE, ascending=True)
results = lib.search_library(state, page_size=None)

assert results.total_count == 2
# The ghost entry (size=-1) should come first in ascending order
first_entry = lib.get_entry(results.ids[0])
assert first_entry is not None
assert first_entry.path == Path("ghost.bin")
finally:
tmp.cleanup()