From 79997df5838749197c91e9bab77fdf4a36bb2d67 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Wed, 18 Mar 2026 17:07:22 -0700 Subject: [PATCH 01/15] [PP-883] Implement Lexile DB Metadata Service --- bin/lexile_db_update | 9 + src/palace/manager/celery/tasks/lexile.py | 254 ++++++++++++++++++ .../integration/metadata/lexile/__init__.py | 11 + .../integration/metadata/lexile/api.py | 97 +++++++ .../integration/metadata/lexile/service.py | 103 +++++++ .../integration/metadata/lexile/settings.py | 37 +++ src/palace/manager/scripts/lexile_db.py | 47 ++++ src/palace/manager/service/celery/celery.py | 5 + .../service/integration_registry/metadata.py | 2 + src/palace/manager/sqlalchemy/constants.py | 1 + .../sqlalchemy/model/classification.py | 1 + .../manager/sqlalchemy/model/datasource.py | 1 + .../controller/test_metadata_services.py | 32 ++- tests/manager/celery/tasks/test_lexile.py | 172 ++++++++++++ .../integration/metadata/lexile/__init__.py | 1 + .../integration/metadata/lexile/test_api.py | 199 ++++++++++++++ .../metadata/lexile/test_service.py | 112 ++++++++ 17 files changed, 1080 insertions(+), 4 deletions(-) create mode 100755 bin/lexile_db_update create mode 100644 src/palace/manager/celery/tasks/lexile.py create mode 100644 src/palace/manager/integration/metadata/lexile/__init__.py create mode 100644 src/palace/manager/integration/metadata/lexile/api.py create mode 100644 src/palace/manager/integration/metadata/lexile/service.py create mode 100644 src/palace/manager/integration/metadata/lexile/settings.py create mode 100644 src/palace/manager/scripts/lexile_db.py create mode 100644 tests/manager/celery/tasks/test_lexile.py create mode 100644 tests/manager/integration/metadata/lexile/__init__.py create mode 100644 tests/manager/integration/metadata/lexile/test_api.py create mode 100644 tests/manager/integration/metadata/lexile/test_service.py diff --git a/bin/lexile_db_update b/bin/lexile_db_update new file mode 100755 index 0000000000..d272579ddf --- /dev/null +++ b/bin/lexile_db_update @@ -0,0 +1,9 @@ +#!/usr/bin/env python +"""Run the Lexile DB update task. + +Use --force to update all ISBNs, including those that already have Lexile data +from Lexile DB (useful when Lexile may have updated their records). +""" +from palace.manager.scripts.lexile_db import LexileDBUpdateScript + +LexileDBUpdateScript().run() diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py new file mode 100644 index 0000000000..300c58244a --- /dev/null +++ b/src/palace/manager/celery/tasks/lexile.py @@ -0,0 +1,254 @@ +"""Celery tasks for the Lexile DB update.""" + +from __future__ import annotations + +from datetime import timedelta + +from celery import shared_task +from sqlalchemy import and_, exists, select +from sqlalchemy.orm import Session + +from palace.manager.celery.task import Task +from palace.manager.core.config import CannotLoadConfiguration +from palace.manager.integration.metadata.lexile.api import LexileDBAPI +from palace.manager.integration.metadata.lexile.service import LexileDBService +from palace.manager.service.celery.celery import QueueNames +from palace.manager.service.redis.models.lock import RedisLock +from palace.manager.service.redis.redis import Redis +from palace.manager.sqlalchemy.constants import DataSourceConstants +from palace.manager.sqlalchemy.model.classification import ( + Classification, + Subject, +) +from palace.manager.sqlalchemy.model.coverage import Timestamp +from palace.manager.sqlalchemy.model.datasource import DataSource +from palace.manager.sqlalchemy.model.identifier import Identifier +from palace.manager.sqlalchemy.util import get_one, get_one_or_create +from palace.manager.util.datetime_helpers import utc_now + +BATCH_SIZE = 10 +LOCK_TIMEOUT = timedelta(minutes=30) +SERVICE_NAME = "Lexile DB Update" + + +def _lexile_db_lock(redis_client: Redis, timestamp_id: int) -> RedisLock: + """Create a RedisLock for the Lexile DB update using timestamp_id as the lock value.""" + return RedisLock( + redis_client, + ["LexileDB", "Update"], + random_value=str(timestamp_id), + lock_timeout=LOCK_TIMEOUT, + ) + + +def _query_isbns_without_lexile( + session: Session, + offset: int, + limit: int, + force: bool, +) -> list[Identifier]: + """Query ISBN identifiers that need Lexile data. + + :param session: Database session. + :param offset: Offset for pagination. + :param limit: Maximum number of identifiers to return. + :param force: If True, include all ISBNs (including those with Lexile from other + sources). If False, only include ISBNs with no Lexile classification. + :return: List of Identifier objects. + """ + lexile_subject_exists = ( + select(Classification.id) + .where(Classification.identifier_id == Identifier.id) + .join(Subject, Classification.subject_id == Subject.id) + .where(Subject.type == Subject.LEXILE_SCORE) + ) + lexile_db_exists = ( + select(Classification.id) + .where(Classification.identifier_id == Identifier.id) + .join(Subject, Classification.subject_id == Subject.id) + .join(DataSource, Classification.data_source_id == DataSource.id) + .where( + and_( + DataSource.name == DataSourceConstants.LEXILE_DB, + Subject.type == Subject.LEXILE_SCORE, + ) + ) + ) + + query = select(Identifier).where(Identifier.type == Identifier.ISBN) + + if force: + # Force: process ISBNs that have no Lexile OR have Lexile from Lexile DB + query = query.where(~exists(lexile_subject_exists) | exists(lexile_db_exists)) + else: + # Default: only process ISBNs with no Lexile at all + query = query.where(~exists(lexile_subject_exists)) + + query = query.order_by(Identifier.id).offset(offset).limit(limit) + return list(session.execute(query).scalars().all()) + + +def _process_identifier( + session: Session, + identifier: Identifier, + api: LexileDBAPI, + data_source: DataSource, + force: bool, +) -> bool: + """Process a single identifier: fetch Lexile from API and update classification. + + :return: True if the identifier was updated, False otherwise. + """ + isbn = identifier.identifier + lexile = api.fetch_lexile_for_isbn(isbn) + if lexile is None: + return False + + # For force mode: remove existing Lexile DB classification if present (in case value changed) + if force: + for classification in list(identifier.classifications): + if ( + classification.data_source.name == DataSourceConstants.LEXILE_DB + and classification.subject.type == Subject.LEXILE_SCORE + ): + session.delete(classification) + + identifier.classify( + data_source, + Subject.LEXILE_SCORE, + str(lexile), + None, + weight=Classification.TRUSTED_DISTRIBUTOR_WEIGHT, + ) + return True + + +@shared_task(queue=QueueNames.default, bind=True) +def run_lexile_db_update(task: Task) -> None: + """Orchestrator: check for Lexile DB config and launch worker if lock is available. + + Runs nightly via Celery beat. If a Lexile DB integration exists and no update + is currently running (lock not held), launches the worker task. + """ + with task.session() as session: + try: + LexileDBService.from_config(session) + except CannotLoadConfiguration as e: + task.log.info("Lexile DB update skipped: %s", e) + return + + redis_client = task.services.redis().client() + lock = RedisLock( + redis_client, + ["LexileDB", "Update"], + lock_timeout=LOCK_TIMEOUT, + ) + if lock.locked(): + task.log.info("Lexile DB update already in progress, skipping.") + return + + if not lock.acquire(): + task.log.info("Lexile DB update could not acquire lock, skipping.") + return + + lock.release() + lexile_db_update_task.delay(force=False) + task.log.info("Lexile DB update task queued.") + + +@shared_task(queue=QueueNames.default, bind=True) +def lexile_db_update_task( + task: Task, + force: bool = False, + offset: int = 0, + timestamp_id: int | None = None, +) -> None: + """Worker: process batches of ISBNs, fetching Lexile data from the API. + + Uses task.replace() to continue with the next batch. Holds a lock across + replacements using the Timestamp id as the lock value so replacement tasks + can extend the lock. + """ + with task.transaction() as session: + try: + service = LexileDBService.from_config(session) + except CannotLoadConfiguration as e: + task.log.info("Lexile DB update skipped: %s", e) + return + + if offset == 0: + stamp, _ = get_one_or_create( + session, + Timestamp, + service=SERVICE_NAME, + service_type=Timestamp.TASK_TYPE, + collection=None, + ) + timestamp_id = stamp.id + stamp.start = utc_now() + stamp.finish = None + stamp.achievements = None + stamp.exception = None + session.commit() + elif timestamp_id is None: + task.log.error("Lexile DB update: timestamp_id required when offset > 0") + return + + redis_client = task.services.redis().client() + lock = _lexile_db_lock(redis_client, timestamp_id) + if not lock.acquire(): + task.log.info("Lexile DB update could not acquire lock, skipping.") + return + + identifiers: list[Identifier] = [] + try: + with task.transaction() as session: + data_source = DataSource.lookup( + session, DataSourceConstants.LEXILE_DB, autocreate=True + ) + if not data_source: + task.log.error("Lexile DB data source not found") + return + + api = LexileDBAPI(service._settings) + identifiers = _query_isbns_without_lexile( + session, offset, BATCH_SIZE, force + ) + + updated = 0 + for identifier in identifiers: + if _process_identifier(session, identifier, api, data_source, force): + updated += 1 + + run_stamp = get_one( + session, + Timestamp, + service=SERVICE_NAME, + service_type=Timestamp.TASK_TYPE, + collection=None, + ) + if run_stamp is not None: + run_stamp.update( + finish=utc_now(), + achievements=( + f"Processed {len(identifiers)} identifiers, " + f"updated {updated} with Lexile data (offset={offset})" + ), + ) + + if len(identifiers) == BATCH_SIZE: + raise task.replace( + lexile_db_update_task.s( + force=force, + offset=offset + BATCH_SIZE, + timestamp_id=timestamp_id, + ) + ) + finally: + lock.release() + + task.log.info( + "Lexile DB update complete. Processed %d identifiers at offset %d.", + len(identifiers), + offset, + ) diff --git a/src/palace/manager/integration/metadata/lexile/__init__.py b/src/palace/manager/integration/metadata/lexile/__init__.py new file mode 100644 index 0000000000..ae5a8d4ab5 --- /dev/null +++ b/src/palace/manager/integration/metadata/lexile/__init__.py @@ -0,0 +1,11 @@ +"""MetaMetrics Lexile DB integration for augmenting Lexile scores.""" + +from palace.manager.integration.metadata.lexile.api import LexileDBAPI +from palace.manager.integration.metadata.lexile.service import LexileDBService +from palace.manager.integration.metadata.lexile.settings import LexileDBSettings + +__all__ = [ + "LexileDBAPI", + "LexileDBService", + "LexileDBSettings", +] diff --git a/src/palace/manager/integration/metadata/lexile/api.py b/src/palace/manager/integration/metadata/lexile/api.py new file mode 100644 index 0000000000..ad1ac9d69b --- /dev/null +++ b/src/palace/manager/integration/metadata/lexile/api.py @@ -0,0 +1,97 @@ +"""Client for the MetaMetrics Lexile Titles Database API.""" + +from __future__ import annotations + +import logging +from typing import Any + +from palace.manager.core.exceptions import IntegrationException +from palace.manager.integration.metadata.lexile.settings import LexileDBSettings +from palace.manager.util.http.http import HTTP +from palace.manager.util.log import LoggerMixin + + +class LexileDBAPI(LoggerMixin): + """Client for fetching Lexile measures from the MetaMetrics Lexile DB API.""" + + def __init__(self, settings: LexileDBSettings): + """Initialize the API client with credentials and base URL.""" + self._settings = settings + self._log = logging.getLogger(self.__class__.__name__) + + def fetch_lexile_for_isbn( + self, isbn: str, *, raise_on_error: bool = False + ) -> int | None: + """Fetch the Lexile measure for a book by ISBN. + + :param isbn: 10 or 13 digit ISBN. + :param raise_on_error: If True, raise IntegrationException on HTTP errors + (e.g. 401, 403) instead of returning None. Used for self-tests. + :return: The Lexile measure (e.g. 650) or None if not found or on error. + """ + isbn = isbn.strip().replace("-", "") + if not isbn: + return None + + # Use ISBN or ISBN13 parameter based on length + param = "ISBN13" if len(isbn) == 13 else "ISBN" + url = f"{self._settings.base_url.rstrip('/')}/api/fab/v3/book/?format=json&{param}={isbn}" + + try: + response = HTTP.get_with_timeout( + url, + timeout=30, + auth=(self._settings.username, self._settings.password), + ) + except Exception as e: + self.log.warning("Lexile API request failed for ISBN %s: %s", isbn, e) + if raise_on_error: + raise IntegrationException( + "Lexile API request failed", + str(e), + ) from e + return None + + if response.status_code != 200: + self.log.warning( + "Lexile API returned %s for ISBN %s", response.status_code, isbn + ) + if raise_on_error: + if response.status_code in (401, 403): + raise IntegrationException( + "Lexile API authentication failed", + f"HTTP {response.status_code}. Check username and password.", + ) + raise IntegrationException( + "Lexile API request failed", + f"HTTP {response.status_code} for ISBN {isbn}", + ) + return None + + try: + data: dict[str, Any] = response.json() + except ValueError as e: + self.log.warning("Lexile API invalid JSON for ISBN %s: %s", isbn, e) + return None + + meta = data.get("meta", {}) + total_count = meta.get("total_count", 0) + if total_count == 0: + return None + + objects = data.get("objects", []) + if not objects: + return None + + first = objects[0] + lexile = first.get("lexile") + if lexile is None: + return None + + try: + return int(lexile) + except (TypeError, ValueError): + self.log.warning( + "Lexile API returned non-numeric lexile %r for ISBN %s", lexile, isbn + ) + return None diff --git a/src/palace/manager/integration/metadata/lexile/service.py b/src/palace/manager/integration/metadata/lexile/service.py new file mode 100644 index 0000000000..541c1c2728 --- /dev/null +++ b/src/palace/manager/integration/metadata/lexile/service.py @@ -0,0 +1,103 @@ +"""MetaMetrics Lexile DB Service - metadata integration for augmenting Lexile scores.""" + +from __future__ import annotations + +from collections.abc import Generator + +from sqlalchemy.orm import Session + +from palace.manager.core.config import CannotLoadConfiguration +from palace.manager.core.selftest import HasSelfTests, SelfTestResult +from palace.manager.integration.goals import Goals +from palace.manager.integration.metadata.base import ( + MetadataService, +) +from palace.manager.integration.metadata.lexile.api import LexileDBAPI +from palace.manager.integration.metadata.lexile.settings import ( + DEFAULT_SAMPLE_ISBN, + LexileDBSettings, +) +from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration +from palace.manager.sqlalchemy.util import get_one +from palace.manager.util.log import LoggerMixin + + +class LexileDBService( + MetadataService[LexileDBSettings], + HasSelfTests, + LoggerMixin, +): + """Augment Lexile scores from the authoritative MetaMetrics Lexile Titles Database. + + This integration fetches Lexile measures from the Lexile DB API and adds them + as classifications. Lexile scores from this source are treated as high-quality + and will override scores from other sources (e.g. Overdrive) when both exist. + """ + + def __init__( + self, + _db: Session, + settings: LexileDBSettings, + ) -> None: + """Initialize the service. + + :param _db: Database session (required for HasSelfTests compatibility). + :param settings: Lexile DB configuration. + """ + self._settings = settings + + @classmethod + def label(cls) -> str: + return "MetaMetrics Lexile DB Service" + + @classmethod + def description(cls) -> str: + return ( + "Augments Lexile reading measures from the authoritative MetaMetrics " + "Lexile Titles Database. A nightly task processes ISBNs that lack " + "Lexile data and adds scores from this high-quality source." + ) + + @classmethod + def settings_class(cls) -> type[LexileDBSettings]: + return LexileDBSettings + + @classmethod + def multiple_services_allowed(cls) -> bool: + return False + + @classmethod + def integration(cls, _db: Session) -> IntegrationConfiguration | None: + """Get the Lexile DB integration configuration if one exists.""" + return get_one( + _db, + IntegrationConfiguration, + goal=Goals.METADATA_GOAL, + protocol=cls.protocols()[0], + ) + + @classmethod + def from_config(cls, _db: Session) -> LexileDBService: + """Load the Lexile DB service from configuration.""" + integration = cls.integration(_db) + if not integration: + raise CannotLoadConfiguration("No Lexile DB integration configured.") + settings = cls.settings_load(integration) + return cls(_db, settings) + + def _run_self_tests(self, _db: Session) -> Generator[SelfTestResult]: + """Run a self-test by fetching Lexile data for the sample ISBN.""" + isbn = ( + self._settings.sample_identifier.strip() + if self._settings.sample_identifier + else DEFAULT_SAMPLE_ISBN + ) + + def test_lookup() -> str: + api = LexileDBAPI(self._settings) + lexile = api.fetch_lexile_for_isbn(isbn, raise_on_error=True) + if lexile is not None: + return f"Successfully retrieved Lexile measure {lexile} for ISBN {isbn}" + return f"No Lexile data found for ISBN {isbn} (API connection succeeded)" + + yield self.run_test(f"Looking up Lexile for ISBN {isbn}", test_lookup) diff --git a/src/palace/manager/integration/metadata/lexile/settings.py b/src/palace/manager/integration/metadata/lexile/settings.py new file mode 100644 index 0000000000..5e9a50fb82 --- /dev/null +++ b/src/palace/manager/integration/metadata/lexile/settings.py @@ -0,0 +1,37 @@ +"""Settings for the MetaMetrics Lexile DB integration.""" + +from typing import Annotated + +from palace.manager.integration.metadata.base import MetadataServiceSettings +from palace.manager.integration.settings import FormMetadata + +# Default sample ISBN for self-test: "The Hobbit" - widely available in Lexile DB +DEFAULT_SAMPLE_ISBN = "9780547928227" + + +class LexileDBSettings(MetadataServiceSettings): + """Settings for the MetaMetrics Lexile DB API.""" + + username: Annotated[ + str, + FormMetadata(label="Username"), + ] + password: Annotated[ + str, + FormMetadata(label="Password"), + ] + base_url: Annotated[ + str, + FormMetadata( + label="Base URL", + description="The Lexile API base URL provided by MetaMetrics (e.g. https://api.example.com)", + ), + ] + sample_identifier: Annotated[ + str, + FormMetadata( + label="Sample ISBN for self-test", + description="ISBN used when running the connection self-test. " + "Leave blank to use the default (9780547928227).", + ), + ] = DEFAULT_SAMPLE_ISBN diff --git a/src/palace/manager/scripts/lexile_db.py b/src/palace/manager/scripts/lexile_db.py new file mode 100644 index 0000000000..484e01d729 --- /dev/null +++ b/src/palace/manager/scripts/lexile_db.py @@ -0,0 +1,47 @@ +"""Script to kick off the Lexile DB update task.""" + +from __future__ import annotations + +import argparse +from typing import Any + +from sqlalchemy.orm import Session + +from palace.manager.celery.tasks.lexile import lexile_db_update_task +from palace.manager.scripts.base import Script + + +class LexileDBUpdateScript(Script): + """Kick off the Lexile DB update task.""" + + name = "Lexile DB Update" + + def __init__( + self, + force: bool = False, + _db: Session | None = None, + **kwargs: Any, + ) -> None: + super().__init__(_db=_db, **kwargs) + self._force = force + + @classmethod + def arg_parser(cls, _db: Session) -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Run the Lexile DB update task to augment Lexile scores." + ) + parser.add_argument( + "--force", + action="store_true", + help="Update all ISBNs, including those with existing Lexile DB data", + ) + return parser + + def do_run(self, *args: Any, **kwargs: Any) -> None: + parsed = self.parse_command_line(self._db, *args, **kwargs) + force = getattr(parsed, "force", self._force) + lexile_db_update_task.delay(force=force) + self.log.info( + "Successfully queued lexile_db_update_task (force=%s)", + force, + ) diff --git a/src/palace/manager/service/celery/celery.py b/src/palace/manager/service/celery/celery.py index 81647414fd..d101161815 100644 --- a/src/palace/manager/service/celery/celery.py +++ b/src/palace/manager/service/celery/celery.py @@ -39,6 +39,7 @@ def beat_schedule() -> dict[str, Any]: """ from palace.manager.celery.tasks import ( boundless, + lexile, marc, notifications, novelist, @@ -223,6 +224,10 @@ def beat_schedule() -> dict[str, Any]: hour="3", ), # Every morning at 3:30 am. }, + "lexile_db_update": { + "task": lexile.run_lexile_db_update.name, + "schedule": crontab(minute="0", hour="3"), # Every day at 3:00 AM + }, "update_novelists_for_all_libraries": { "task": novelist.update_novelists_for_all_libraries.name, "schedule": crontab( diff --git a/src/palace/manager/service/integration_registry/metadata.py b/src/palace/manager/service/integration_registry/metadata.py index bfd4fb1ae6..5ed80c7df0 100644 --- a/src/palace/manager/service/integration_registry/metadata.py +++ b/src/palace/manager/service/integration_registry/metadata.py @@ -1,5 +1,6 @@ from palace.manager.integration.goals import Goals from palace.manager.integration.metadata.base import MetadataServiceType +from palace.manager.integration.metadata.lexile.service import LexileDBService from palace.manager.integration.metadata.novelist import NoveListAPI from palace.manager.integration.metadata.nyt import NYTBestSellerAPI from palace.manager.service.integration_registry.base import IntegrationRegistry @@ -11,3 +12,4 @@ def __init__(self) -> None: self.register(NYTBestSellerAPI, canonical="New York Times") self.register(NoveListAPI, canonical="NoveList Select") + self.register(LexileDBService, canonical="MetaMetrics Lexile DB Service") diff --git a/src/palace/manager/sqlalchemy/constants.py b/src/palace/manager/sqlalchemy/constants.py index d69c969305..bb92b9fb1a 100644 --- a/src/palace/manager/sqlalchemy/constants.py +++ b/src/palace/manager/sqlalchemy/constants.py @@ -41,6 +41,7 @@ class DataSourceConstants: FEEDBOOKS = "FeedBooks" BIBBLIO = "Bibblio" PROQUEST = "ProQuest" + LEXILE_DB = "Lexile DB" DEPRECATED_NAMES: frozenbidict[str, str] = frozenbidict({}) diff --git a/src/palace/manager/sqlalchemy/model/classification.py b/src/palace/manager/sqlalchemy/model/classification.py index 4c56ffc784..25ee4cf122 100644 --- a/src/palace/manager/sqlalchemy/model/classification.py +++ b/src/palace/manager/sqlalchemy/model/classification.py @@ -426,6 +426,7 @@ def scaled_weight(self) -> float: # to use reading level as a proxy for age appropriateness in a # pinch. (But not outside of a pinch.) (DataSourceConstants.OVERDRIVE, Subject.GRADE_LEVEL): 0.35, + (DataSourceConstants.LEXILE_DB, Subject.LEXILE_SCORE): 0.95, Subject.LEXILE_SCORE: 0.1, Subject.ATOS_SCORE: 0.1, } diff --git a/src/palace/manager/sqlalchemy/model/datasource.py b/src/palace/manager/sqlalchemy/model/datasource.py index 96f40d2385..02d628d858 100644 --- a/src/palace/manager/sqlalchemy/model/datasource.py +++ b/src/palace/manager/sqlalchemy/model/datasource.py @@ -320,6 +320,7 @@ def well_known_sources(cls, _db: Session) -> Generator[DataSource]: None, ), (cls.PROQUEST, True, False, IdentifierConstants.PROQUEST_ID, None), + (cls.LEXILE_DB, False, True, IdentifierConstants.ISBN, None), ): obj = DataSource.lookup( _db, diff --git a/tests/manager/api/admin/controller/test_metadata_services.py b/tests/manager/api/admin/controller/test_metadata_services.py index 72ce1c232d..e3ffb54e7f 100644 --- a/tests/manager/api/admin/controller/test_metadata_services.py +++ b/tests/manager/api/admin/controller/test_metadata_services.py @@ -25,6 +25,7 @@ UNKNOWN_PROTOCOL, ) from palace.manager.integration.goals import Goals +from palace.manager.integration.metadata.lexile.service import LexileDBService from palace.manager.integration.metadata.novelist import ( NoveListAPI, NoveListApiSettings, @@ -55,6 +56,10 @@ def __init__( assert nyt_protocol is not None self.nyt_protocol = nyt_protocol + lexile_protocol = self.registry.get_protocol(LexileDBService) + assert lexile_protocol is not None + self.lexile_protocol = lexile_protocol + self.controller = MetadataServicesController(db.session, self.registry) self.db = db @@ -127,16 +132,35 @@ def test_process_get_with_no_services( response_content = response.json assert isinstance(response_content, dict) assert response_content.get("metadata_services") == [] - [nyt, novelist] = response_content.get("protocols", []) - - assert novelist.get("name") == metadata_services_fixture.novelist_protocol + protocols = response_content.get("protocols", []) + protocol_names = [p.get("name") for p in protocols] + assert metadata_services_fixture.lexile_protocol in protocol_names + assert metadata_services_fixture.novelist_protocol in protocol_names + assert metadata_services_fixture.nyt_protocol in protocol_names + + novelist = next( + p + for p in protocols + if p.get("name") == metadata_services_fixture.novelist_protocol + ) assert "settings" in novelist assert novelist.get("sitewide") is False - assert nyt.get("name") == metadata_services_fixture.nyt_protocol + nyt = next( + p + for p in protocols + if p.get("name") == metadata_services_fixture.nyt_protocol + ) assert "settings" in nyt assert nyt.get("sitewide") is True + lexile = next( + p + for p in protocols + if p.get("name") == metadata_services_fixture.lexile_protocol + ) + assert "settings" in lexile + def test_process_get_with_one_service( self, metadata_services_fixture: MetadataServicesFixture, diff --git a/tests/manager/celery/tasks/test_lexile.py b/tests/manager/celery/tasks/test_lexile.py new file mode 100644 index 0000000000..628309e864 --- /dev/null +++ b/tests/manager/celery/tasks/test_lexile.py @@ -0,0 +1,172 @@ +"""Tests for the Lexile DB Celery tasks.""" + +from __future__ import annotations + +from pytest import LogCaptureFixture + +from palace.manager.celery.tasks import lexile +from palace.manager.integration.goals import Goals +from palace.manager.integration.metadata.lexile.service import LexileDBService +from palace.manager.integration.metadata.lexile.settings import LexileDBSettings +from palace.manager.sqlalchemy.constants import DataSourceConstants +from palace.manager.sqlalchemy.model.classification import Classification, Subject +from palace.manager.sqlalchemy.model.coverage import Timestamp +from palace.manager.sqlalchemy.model.datasource import DataSource +from palace.manager.sqlalchemy.model.identifier import Identifier +from palace.manager.sqlalchemy.util import get_one +from tests.fixtures.celery import CeleryFixture +from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.http import MockHttpClientFixture + + +class TestLexileDBUpdate: + """Tests for the Lexile DB update tasks.""" + + def test_run_lexile_db_update_skipped_when_not_configured( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + caplog: LogCaptureFixture, + ) -> None: + """Orchestrator skips when no Lexile DB integration exists.""" + lexile.run_lexile_db_update.delay().wait() + assert "Lexile DB update skipped" in caplog.text + + def test_run_lexile_db_update_queues_worker_when_configured( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + caplog: LogCaptureFixture, + ) -> None: + """Orchestrator queues worker when integration exists and lock is free.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + lexile.run_lexile_db_update.delay().wait() + assert "Lexile DB update task queued" in caplog.text + + def test_lexile_db_update_task_adds_classification( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + http_client: MockHttpClientFixture, + ) -> None: + """Worker fetches Lexile from API and adds classification.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + identifier = db.identifier( + identifier_type=Identifier.ISBN, foreign_id="9780123456789" + ) + + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 650}], + }, + ) + + lexile.lexile_db_update_task.delay(force=False).wait() + + db.session.refresh(identifier) + classifications = [ + c + for c in identifier.classifications + if c.subject.type == Subject.LEXILE_SCORE + ] + assert len(classifications) == 1 + assert classifications[0].identifier == "650" + assert classifications[0].data_source.name == DataSourceConstants.LEXILE_DB + + def test_lexile_db_update_task_force_mode_replaces_existing( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + http_client: MockHttpClientFixture, + ) -> None: + """Force mode replaces existing Lexile DB classification with new value.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + data_source = DataSource.lookup( + db.session, DataSourceConstants.LEXILE_DB, autocreate=True + ) + identifier = db.identifier( + identifier_type=Identifier.ISBN, foreign_id="9780123456789" + ) + identifier.classify( + data_source, + Subject.LEXILE_SCORE, + "500", + None, + weight=Classification.TRUSTED_DISTRIBUTOR_WEIGHT, + ) + db.session.commit() + + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 720}], + }, + ) + + lexile.lexile_db_update_task.delay(force=True).wait() + + db.session.refresh(identifier) + classifications = [ + c + for c in identifier.classifications + if c.subject.type == Subject.LEXILE_SCORE + ] + assert len(classifications) == 1 + assert classifications[0].identifier == "720" + + def test_lexile_db_update_task_creates_timestamp( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + http_client: MockHttpClientFixture, + ) -> None: + """Worker creates Timestamp for run status.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + + lexile.lexile_db_update_task.delay(force=False).wait() + + stamp = get_one( + db.session, + Timestamp, + service="Lexile DB Update", + service_type=Timestamp.TASK_TYPE, + collection=None, + ) + assert stamp is not None + assert stamp.finish is not None + assert "Processed" in (stamp.achievements or "") diff --git a/tests/manager/integration/metadata/lexile/__init__.py b/tests/manager/integration/metadata/lexile/__init__.py new file mode 100644 index 0000000000..3b17606055 --- /dev/null +++ b/tests/manager/integration/metadata/lexile/__init__.py @@ -0,0 +1 @@ +"""Tests for the Lexile DB metadata integration.""" diff --git a/tests/manager/integration/metadata/lexile/test_api.py b/tests/manager/integration/metadata/lexile/test_api.py new file mode 100644 index 0000000000..3fee044e26 --- /dev/null +++ b/tests/manager/integration/metadata/lexile/test_api.py @@ -0,0 +1,199 @@ +"""Tests for the Lexile DB API client.""" + +from __future__ import annotations + +import pytest + +from palace.manager.core.exceptions import IntegrationException +from palace.manager.integration.metadata.lexile.api import LexileDBAPI +from palace.manager.integration.metadata.lexile.settings import LexileDBSettings +from tests.fixtures.http import MockHttpClientFixture + + +class TestLexileDBAPI: + """Tests for LexileDBAPI.""" + + def test_fetch_lexile_for_isbn_returns_lexile( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns Lexile measure when book is found.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 650}], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result == 650 + assert len(http_client.requests) == 1 + assert "ISBN13=9780123456789" in http_client.requests[0] + + def test_fetch_lexile_for_isbn_10_digit( + self, http_client: MockHttpClientFixture + ) -> None: + """API uses ISBN param for 10-digit ISBNs.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 720}], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("0123456789") + + assert result == 720 + assert "ISBN=0123456789" in http_client.requests[0] + + def test_fetch_lexile_for_isbn_strips_hyphens( + self, http_client: MockHttpClientFixture + ) -> None: + """ISBN hyphens are stripped before request.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 500}], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("978-0-12-345678-9") + + assert result == 500 + assert "ISBN13=9780123456789" in http_client.requests[0] + + def test_fetch_lexile_for_isbn_not_found( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns None when book has no Lexile data.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 0}, + "objects": [], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result is None + + def test_fetch_lexile_for_isbn_empty_objects( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns None when objects list is empty despite total_count.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result is None + + def test_fetch_lexile_for_isbn_null_lexile( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns None when lexile field is null.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": None}], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result is None + + def test_fetch_lexile_for_isbn_http_error( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns None on HTTP error.""" + http_client.queue_response(404, content="") + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result is None + + def test_fetch_lexile_for_isbn_empty_string( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns None for empty ISBN.""" + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("") + + assert result is None + assert len(http_client.requests) == 0 + + def test_fetch_lexile_for_isbn_raise_on_error_403( + self, http_client: MockHttpClientFixture + ) -> None: + """API raises IntegrationException on 403 when raise_on_error=True.""" + http_client.queue_response(403, content="Forbidden") + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + with pytest.raises(IntegrationException) as excinfo: + api.fetch_lexile_for_isbn("9780123456789", raise_on_error=True) + + assert "authentication" in str(excinfo.value).lower() diff --git a/tests/manager/integration/metadata/lexile/test_service.py b/tests/manager/integration/metadata/lexile/test_service.py new file mode 100644 index 0000000000..83b3854451 --- /dev/null +++ b/tests/manager/integration/metadata/lexile/test_service.py @@ -0,0 +1,112 @@ +"""Tests for the Lexile DB service.""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +from palace.manager.integration.metadata.lexile.service import LexileDBService +from palace.manager.integration.metadata.lexile.settings import ( + DEFAULT_SAMPLE_ISBN, + LexileDBSettings, +) +from tests.fixtures.http import MockHttpClientFixture + + +class TestLexileDBService: + """Tests for LexileDBService.""" + + def test_run_self_tests_success_with_lexile_data( + self, http_client: MockHttpClientFixture + ) -> None: + """Self-test succeeds when API returns Lexile data.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 650}], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + service = LexileDBService(MagicMock(), settings) + + results = list(service._run_self_tests(MagicMock())) + + assert len(results) == 1 + assert results[0].success is True + assert "650" in str(results[0].result) + assert DEFAULT_SAMPLE_ISBN in str(results[0].result) + + def test_run_self_tests_success_no_lexile_data( + self, http_client: MockHttpClientFixture + ) -> None: + """Self-test succeeds when API returns no data (book not in DB).""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 0}, + "objects": [], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + service = LexileDBService(MagicMock(), settings) + + results = list(service._run_self_tests(MagicMock())) + + assert len(results) == 1 + assert results[0].success is True + assert "No Lexile data found" in str(results[0].result) + assert "API connection succeeded" in str(results[0].result) + + def test_run_self_tests_uses_custom_sample_identifier( + self, http_client: MockHttpClientFixture + ) -> None: + """Self-test uses sample_identifier from settings when provided.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 720}], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + sample_identifier="9780123456789", + ) + service = LexileDBService(MagicMock(), settings) + + results = list(service._run_self_tests(MagicMock())) + + assert len(results) == 1 + assert results[0].success is True + assert "9780123456789" in str(results[0].name) + assert "720" in str(results[0].result) + assert "9780123456789" in http_client.requests[0] + + def test_run_self_tests_fails_on_auth_error( + self, http_client: MockHttpClientFixture + ) -> None: + """Self-test fails when API returns 403.""" + http_client.queue_response(403, content="Forbidden") + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + service = LexileDBService(MagicMock(), settings) + + results = list(service._run_self_tests(MagicMock())) + + assert len(results) == 1 + assert results[0].success is False + assert results[0].exception is not None + assert "authentication" in str(results[0].exception).lower() From 5a9baa77a4ecb5d44a8f45cb4c852751ea5ff4c8 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 09:53:59 -0700 Subject: [PATCH 02/15] Fix broken tests. --- src/palace/manager/celery/tasks/lexile.py | 2 +- tests/manager/celery/tasks/test_lexile.py | 13 +++++++++++-- tests/manager/sqlalchemy/model/test_datasource.py | 6 ++++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index 300c58244a..55fb9816b3 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -85,7 +85,7 @@ def _query_isbns_without_lexile( query = query.where(~exists(lexile_subject_exists)) query = query.order_by(Identifier.id).offset(offset).limit(limit) - return list(session.execute(query).scalars().all()) + return list(session.execute(query).unique().scalars().all()) def _process_identifier( diff --git a/tests/manager/celery/tasks/test_lexile.py b/tests/manager/celery/tasks/test_lexile.py index 628309e864..71c0bab43b 100644 --- a/tests/manager/celery/tasks/test_lexile.py +++ b/tests/manager/celery/tasks/test_lexile.py @@ -8,6 +8,7 @@ from palace.manager.integration.goals import Goals from palace.manager.integration.metadata.lexile.service import LexileDBService from palace.manager.integration.metadata.lexile.settings import LexileDBSettings +from palace.manager.service.logging.configuration import LogLevel from palace.manager.sqlalchemy.constants import DataSourceConstants from palace.manager.sqlalchemy.model.classification import Classification, Subject from palace.manager.sqlalchemy.model.coverage import Timestamp @@ -17,6 +18,7 @@ from tests.fixtures.celery import CeleryFixture from tests.fixtures.database import DatabaseTransactionFixture from tests.fixtures.http import MockHttpClientFixture +from tests.fixtures.redis import RedisFixture class TestLexileDBUpdate: @@ -26,9 +28,11 @@ def test_run_lexile_db_update_skipped_when_not_configured( self, db: DatabaseTransactionFixture, celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, caplog: LogCaptureFixture, ) -> None: """Orchestrator skips when no Lexile DB integration exists.""" + caplog.set_level(LogLevel.info) lexile.run_lexile_db_update.delay().wait() assert "Lexile DB update skipped" in caplog.text @@ -36,6 +40,7 @@ def test_run_lexile_db_update_queues_worker_when_configured( self, db: DatabaseTransactionFixture, celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, caplog: LogCaptureFixture, ) -> None: """Orchestrator queues worker when integration exists and lock is free.""" @@ -48,6 +53,7 @@ def test_run_lexile_db_update_queues_worker_when_configured( base_url="https://api.example.com", ), ) + caplog.set_level(LogLevel.info) lexile.run_lexile_db_update.delay().wait() assert "Lexile DB update task queued" in caplog.text @@ -55,6 +61,7 @@ def test_lexile_db_update_task_adds_classification( self, db: DatabaseTransactionFixture, celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, http_client: MockHttpClientFixture, ) -> None: """Worker fetches Lexile from API and adds classification.""" @@ -88,13 +95,14 @@ def test_lexile_db_update_task_adds_classification( if c.subject.type == Subject.LEXILE_SCORE ] assert len(classifications) == 1 - assert classifications[0].identifier == "650" + assert classifications[0].subject.identifier == "650" assert classifications[0].data_source.name == DataSourceConstants.LEXILE_DB def test_lexile_db_update_task_force_mode_replaces_existing( self, db: DatabaseTransactionFixture, celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, http_client: MockHttpClientFixture, ) -> None: """Force mode replaces existing Lexile DB classification with new value.""" @@ -139,12 +147,13 @@ def test_lexile_db_update_task_force_mode_replaces_existing( if c.subject.type == Subject.LEXILE_SCORE ] assert len(classifications) == 1 - assert classifications[0].identifier == "720" + assert classifications[0].subject.identifier == "720" def test_lexile_db_update_task_creates_timestamp( self, db: DatabaseTransactionFixture, celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, http_client: MockHttpClientFixture, ) -> None: """Worker creates Timestamp for run status.""" diff --git a/tests/manager/sqlalchemy/model/test_datasource.py b/tests/manager/sqlalchemy/model/test_datasource.py index 66f0874c25..743c1846f0 100644 --- a/tests/manager/sqlalchemy/model/test_datasource.py +++ b/tests/manager/sqlalchemy/model/test_datasource.py @@ -79,12 +79,14 @@ def test_lookup_with_autocreate(self, db: DatabaseTransactionFixture): def test_metadata_sources_for(self, db: DatabaseTransactionFixture): content_cafe = DataSource.lookup(db.session, DataSource.CONTENT_CAFE) + lexile_db = DataSource.lookup(db.session, DataSource.LEXILE_DB) isbn_metadata_sources = DataSource.metadata_sources_for( db.session, Identifier.ISBN ) - assert 1 == len(isbn_metadata_sources) - assert [content_cafe] == isbn_metadata_sources + assert 2 == len(isbn_metadata_sources) + assert content_cafe in isbn_metadata_sources + assert lexile_db in isbn_metadata_sources def test_license_source_for(self, db: DatabaseTransactionFixture): identifier = db.identifier(Identifier.OVERDRIVE_ID) From 07a8728669220228e8a9925b26e62fed7dce669c Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 10:01:19 -0700 Subject: [PATCH 03/15] Address race condition. --- src/palace/manager/celery/tasks/lexile.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index 55fb9816b3..0cd7782c58 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -147,11 +147,6 @@ def run_lexile_db_update(task: Task) -> None: task.log.info("Lexile DB update already in progress, skipping.") return - if not lock.acquire(): - task.log.info("Lexile DB update could not acquire lock, skipping.") - return - - lock.release() lexile_db_update_task.delay(force=False) task.log.info("Lexile DB update task queued.") From 48fb7551a3944209bc9e6c79349a84fff7d02ecf Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 10:10:08 -0700 Subject: [PATCH 04/15] Expose settings as public property. --- src/palace/manager/celery/tasks/lexile.py | 2 +- src/palace/manager/integration/metadata/lexile/service.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index 0cd7782c58..0bd244d9af 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -205,7 +205,7 @@ def lexile_db_update_task( task.log.error("Lexile DB data source not found") return - api = LexileDBAPI(service._settings) + api = LexileDBAPI(service.settings) identifiers = _query_isbns_without_lexile( session, offset, BATCH_SIZE, force ) diff --git a/src/palace/manager/integration/metadata/lexile/service.py b/src/palace/manager/integration/metadata/lexile/service.py index 541c1c2728..212c6da953 100644 --- a/src/palace/manager/integration/metadata/lexile/service.py +++ b/src/palace/manager/integration/metadata/lexile/service.py @@ -46,6 +46,11 @@ def __init__( """ self._settings = settings + @property + def settings(self) -> LexileDBSettings: + """The Lexile DB configuration for this service instance.""" + return self._settings + @classmethod def label(cls) -> str: return "MetaMetrics Lexile DB Service" From 82cd35e5a46c98e5a7a44fa43a50e5b755ad67e6 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 10:14:46 -0700 Subject: [PATCH 05/15] rationalize the lock timeout. --- src/palace/manager/celery/tasks/lexile.py | 5 +++-- src/palace/manager/integration/metadata/lexile/api.py | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index 0bd244d9af..ae344c6c7c 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -10,7 +10,7 @@ from palace.manager.celery.task import Task from palace.manager.core.config import CannotLoadConfiguration -from palace.manager.integration.metadata.lexile.api import LexileDBAPI +from palace.manager.integration.metadata.lexile.api import API_TIMEOUT, LexileDBAPI from palace.manager.integration.metadata.lexile.service import LexileDBService from palace.manager.service.celery.celery import QueueNames from palace.manager.service.redis.models.lock import RedisLock @@ -27,7 +27,8 @@ from palace.manager.util.datetime_helpers import utc_now BATCH_SIZE = 10 -LOCK_TIMEOUT = timedelta(minutes=30) +# 2× the worst-case sequential time for a full batch (BATCH_SIZE requests × API_TIMEOUT each). +LOCK_TIMEOUT = timedelta(seconds=BATCH_SIZE * API_TIMEOUT * 2) SERVICE_NAME = "Lexile DB Update" diff --git a/src/palace/manager/integration/metadata/lexile/api.py b/src/palace/manager/integration/metadata/lexile/api.py index ad1ac9d69b..c1b7f1e2b3 100644 --- a/src/palace/manager/integration/metadata/lexile/api.py +++ b/src/palace/manager/integration/metadata/lexile/api.py @@ -10,6 +10,8 @@ from palace.manager.util.http.http import HTTP from palace.manager.util.log import LoggerMixin +API_TIMEOUT = 30 # seconds per request + class LexileDBAPI(LoggerMixin): """Client for fetching Lexile measures from the MetaMetrics Lexile DB API.""" @@ -40,7 +42,7 @@ def fetch_lexile_for_isbn( try: response = HTTP.get_with_timeout( url, - timeout=30, + timeout=API_TIMEOUT, auth=(self._settings.username, self._settings.password), ) except Exception as e: From 75840f049e55ce8dede825855a3eaaba80f8118d Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 10:19:51 -0700 Subject: [PATCH 06/15] Clarify sample isbn messaging. --- src/palace/manager/integration/metadata/lexile/service.py | 6 +----- src/palace/manager/integration/metadata/lexile/settings.py | 6 +++--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/palace/manager/integration/metadata/lexile/service.py b/src/palace/manager/integration/metadata/lexile/service.py index 212c6da953..78ef0622f3 100644 --- a/src/palace/manager/integration/metadata/lexile/service.py +++ b/src/palace/manager/integration/metadata/lexile/service.py @@ -92,11 +92,7 @@ def from_config(cls, _db: Session) -> LexileDBService: def _run_self_tests(self, _db: Session) -> Generator[SelfTestResult]: """Run a self-test by fetching Lexile data for the sample ISBN.""" - isbn = ( - self._settings.sample_identifier.strip() - if self._settings.sample_identifier - else DEFAULT_SAMPLE_ISBN - ) + isbn = (self._settings.sample_identifier or DEFAULT_SAMPLE_ISBN).strip() def test_lookup() -> str: api = LexileDBAPI(self._settings) diff --git a/src/palace/manager/integration/metadata/lexile/settings.py b/src/palace/manager/integration/metadata/lexile/settings.py index 5e9a50fb82..56120d39ee 100644 --- a/src/palace/manager/integration/metadata/lexile/settings.py +++ b/src/palace/manager/integration/metadata/lexile/settings.py @@ -28,10 +28,10 @@ class LexileDBSettings(MetadataServiceSettings): ), ] sample_identifier: Annotated[ - str, + str | None, FormMetadata( label="Sample ISBN for self-test", description="ISBN used when running the connection self-test. " - "Leave blank to use the default (9780547928227).", + f"Defaults to {DEFAULT_SAMPLE_ISBN} if not set.", ), - ] = DEFAULT_SAMPLE_ISBN + ] = None From 9dc59db0bf1e0feb76ff7ea1cdb5b028f87266a4 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 10:24:15 -0700 Subject: [PATCH 07/15] Address redundancy, unused vars, and clarify in comment semantics of "force" --- src/palace/manager/celery/tasks/lexile.py | 8 ++++++-- src/palace/manager/integration/metadata/lexile/api.py | 2 -- src/palace/manager/integration/metadata/lexile/service.py | 3 ++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index ae344c6c7c..2d56a0f53c 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -79,10 +79,14 @@ def _query_isbns_without_lexile( query = select(Identifier).where(Identifier.type == Identifier.ISBN) if force: - # Force: process ISBNs that have no Lexile OR have Lexile from Lexile DB + # Force: process ISBNs that have no Lexile at all, OR that already have a + # Lexile DB record (to pick up updated values from MetaMetrics). + # ISBNs that have a Lexile from another source only (e.g. Overdrive) are + # intentionally excluded — we don't overwrite third-party scores unless we + # have our own authoritative record to replace. query = query.where(~exists(lexile_subject_exists) | exists(lexile_db_exists)) else: - # Default: only process ISBNs with no Lexile at all + # Default: only process ISBNs with no Lexile classification from any source. query = query.where(~exists(lexile_subject_exists)) query = query.order_by(Identifier.id).offset(offset).limit(limit) diff --git a/src/palace/manager/integration/metadata/lexile/api.py b/src/palace/manager/integration/metadata/lexile/api.py index c1b7f1e2b3..4456b0b7a1 100644 --- a/src/palace/manager/integration/metadata/lexile/api.py +++ b/src/palace/manager/integration/metadata/lexile/api.py @@ -2,7 +2,6 @@ from __future__ import annotations -import logging from typing import Any from palace.manager.core.exceptions import IntegrationException @@ -19,7 +18,6 @@ class LexileDBAPI(LoggerMixin): def __init__(self, settings: LexileDBSettings): """Initialize the API client with credentials and base URL.""" self._settings = settings - self._log = logging.getLogger(self.__class__.__name__) def fetch_lexile_for_isbn( self, isbn: str, *, raise_on_error: bool = False diff --git a/src/palace/manager/integration/metadata/lexile/service.py b/src/palace/manager/integration/metadata/lexile/service.py index 78ef0622f3..23a3b08d86 100644 --- a/src/palace/manager/integration/metadata/lexile/service.py +++ b/src/palace/manager/integration/metadata/lexile/service.py @@ -41,9 +41,10 @@ def __init__( ) -> None: """Initialize the service. - :param _db: Database session (required for HasSelfTests compatibility). + :param _db: Database session. :param settings: Lexile DB configuration. """ + self._db = _db self._settings = settings @property From 42c8307bd89ed5d6d9fd90303bf65c37d00381f1 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 10:40:05 -0700 Subject: [PATCH 08/15] Address test coverage gaps --- tests/manager/celery/tasks/test_lexile.py | 92 +++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/tests/manager/celery/tasks/test_lexile.py b/tests/manager/celery/tasks/test_lexile.py index 71c0bab43b..5fc4a4d12c 100644 --- a/tests/manager/celery/tasks/test_lexile.py +++ b/tests/manager/celery/tasks/test_lexile.py @@ -2,6 +2,9 @@ from __future__ import annotations +from unittest.mock import patch + +import pytest from pytest import LogCaptureFixture from palace.manager.celery.tasks import lexile @@ -57,6 +60,41 @@ def test_run_lexile_db_update_queues_worker_when_configured( lexile.run_lexile_db_update.delay().wait() assert "Lexile DB update task queued" in caplog.text + def test_run_lexile_db_update_skipped_when_lock_already_held( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + caplog: LogCaptureFixture, + ) -> None: + """Orchestrator skips when lock is already held (worker in progress).""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + caplog.set_level(LogLevel.info) + with patch.object(lexile.RedisLock, "locked", return_value=True): + lexile.run_lexile_db_update.delay().wait() + assert "Lexile DB update already in progress, skipping." in caplog.text + assert "Lexile DB update task queued" not in caplog.text + + def test_lexile_db_update_task_skipped_when_not_configured( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + caplog: LogCaptureFixture, + ) -> None: + """Worker skips when no Lexile DB integration exists.""" + caplog.set_level(LogLevel.info) + lexile.lexile_db_update_task.delay(force=False).wait() + assert "Lexile DB update skipped" in caplog.text + def test_lexile_db_update_task_adds_classification( self, db: DatabaseTransactionFixture, @@ -179,3 +217,57 @@ def test_lexile_db_update_task_creates_timestamp( assert stamp is not None assert stamp.finish is not None assert "Processed" in (stamp.achievements or "") + + def test_lexile_db_update_task_continues_to_next_batch( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + http_client: MockHttpClientFixture, + ) -> None: + """Worker calls task.replace() with correct args when full batch returned.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + # Create exactly BATCH_SIZE (10) ISBNs so the task replaces to continue. + for i in range(lexile.BATCH_SIZE): + db.identifier( + identifier_type=Identifier.ISBN, + foreign_id=f"9780123456{i:03d}", + ) + + for i in range(lexile.BATCH_SIZE): + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 600 + i}], + }, + ) + + replace_calls: list[tuple] = [] + + def capture_replace(*args: object, **kwargs: object) -> None: + replace_calls.append((args, kwargs)) + raise RuntimeError("Replace captured (avoid actual replacement)") + + with patch.object( + lexile.lexile_db_update_task, "replace", side_effect=capture_replace + ): + with pytest.raises(RuntimeError, match="Replace captured"): + lexile.lexile_db_update_task.delay(force=False).wait() + + assert len(replace_calls) == 1 + (args, kwargs) = replace_calls[0] + assert len(args) == 1 + sig = args[0] + assert sig.kwargs.get("force") is False + assert sig.kwargs.get("offset") == lexile.BATCH_SIZE + assert "timestamp_id" in sig.kwargs + assert sig.kwargs["timestamp_id"] is not None From 827b163894481e12902bd27cd0a64dcd5054b8ab Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 14:00:25 -0700 Subject: [PATCH 09/15] Add test for tests/manager/scripts/test_lexile_db.py --- tests/manager/scripts/test_lexile_db.py | 44 +++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 tests/manager/scripts/test_lexile_db.py diff --git a/tests/manager/scripts/test_lexile_db.py b/tests/manager/scripts/test_lexile_db.py new file mode 100644 index 0000000000..9a940c9295 --- /dev/null +++ b/tests/manager/scripts/test_lexile_db.py @@ -0,0 +1,44 @@ +"""Tests for the Lexile DB update script.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from palace.manager.scripts.lexile_db import LexileDBUpdateScript +from palace.manager.service.logging.configuration import LogLevel +from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.services import ServicesFixture + + +class TestLexileDBUpdateScript: + """Tests for LexileDBUpdateScript.""" + + @pytest.mark.parametrize( + "force", + [ + pytest.param(True, id="force"), + pytest.param(False, id="no force"), + ], + ) + def test_do_run( + self, + force: bool, + db: DatabaseTransactionFixture, + services_fixture: ServicesFixture, + caplog: pytest.LogCaptureFixture, + ) -> None: + """do_run queues lexile_db_update_task with correct force flag and logs.""" + caplog.set_level(LogLevel.info) + with patch( + "palace.manager.scripts.lexile_db.lexile_db_update_task" + ) as mock_task: + command_args = ["--force"] if force else [] + LexileDBUpdateScript( + _db=db.session, + services=services_fixture.services, + ).do_run(command_args) + mock_task.delay.assert_called_once_with(force=force) + assert "Successfully queued lexile_db_update_task" in caplog.text + assert f"force={force}" in caplog.text From 5acc61356feb9b45bd4fbffff64d92fbc6e2513c Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 15:41:05 -0700 Subject: [PATCH 10/15] Add complete test coverage for lexile_db.py and ensure fstrings are used throughout. Also use PalaceValueError rather than generic ValueError. --- src/palace/manager/celery/tasks/lexile.py | 8 ++- .../integration/metadata/lexile/api.py | 8 +-- src/palace/manager/scripts/lexile_db.py | 3 +- tests/manager/scripts/test_lexile_db.py | 54 +++++++++++++++++++ 4 files changed, 62 insertions(+), 11 deletions(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index 2d56a0f53c..097d372e42 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -139,7 +139,7 @@ def run_lexile_db_update(task: Task) -> None: try: LexileDBService.from_config(session) except CannotLoadConfiguration as e: - task.log.info("Lexile DB update skipped: %s", e) + task.log.info(f"Lexile DB update skipped: {e}") return redis_client = task.services.redis().client() @@ -173,7 +173,7 @@ def lexile_db_update_task( try: service = LexileDBService.from_config(session) except CannotLoadConfiguration as e: - task.log.info("Lexile DB update skipped: %s", e) + task.log.info(f"Lexile DB update skipped: {e}") return if offset == 0: @@ -248,7 +248,5 @@ def lexile_db_update_task( lock.release() task.log.info( - "Lexile DB update complete. Processed %d identifiers at offset %d.", - len(identifiers), - offset, + f"Lexile DB update complete. Processed {len(identifiers)} identifiers at offset {offset}.", ) diff --git a/src/palace/manager/integration/metadata/lexile/api.py b/src/palace/manager/integration/metadata/lexile/api.py index 4456b0b7a1..89e8da71e4 100644 --- a/src/palace/manager/integration/metadata/lexile/api.py +++ b/src/palace/manager/integration/metadata/lexile/api.py @@ -44,7 +44,7 @@ def fetch_lexile_for_isbn( auth=(self._settings.username, self._settings.password), ) except Exception as e: - self.log.warning("Lexile API request failed for ISBN %s: %s", isbn, e) + self.log.warning(f"Lexile API request failed for ISBN {isbn}: {e}") if raise_on_error: raise IntegrationException( "Lexile API request failed", @@ -54,7 +54,7 @@ def fetch_lexile_for_isbn( if response.status_code != 200: self.log.warning( - "Lexile API returned %s for ISBN %s", response.status_code, isbn + f"Lexile API returned {response.status_code} for ISBN {isbn}" ) if raise_on_error: if response.status_code in (401, 403): @@ -71,7 +71,7 @@ def fetch_lexile_for_isbn( try: data: dict[str, Any] = response.json() except ValueError as e: - self.log.warning("Lexile API invalid JSON for ISBN %s: %s", isbn, e) + self.log.warning(f"Lexile API invalid JSON for ISBN {isbn}: {e}") return None meta = data.get("meta", {}) @@ -92,6 +92,6 @@ def fetch_lexile_for_isbn( return int(lexile) except (TypeError, ValueError): self.log.warning( - "Lexile API returned non-numeric lexile %r for ISBN %s", lexile, isbn + f"Lexile API returned non-numeric lexile {lexile} for ISBN {isbn}" ) return None diff --git a/src/palace/manager/scripts/lexile_db.py b/src/palace/manager/scripts/lexile_db.py index 484e01d729..fa9b7f2c8d 100644 --- a/src/palace/manager/scripts/lexile_db.py +++ b/src/palace/manager/scripts/lexile_db.py @@ -42,6 +42,5 @@ def do_run(self, *args: Any, **kwargs: Any) -> None: force = getattr(parsed, "force", self._force) lexile_db_update_task.delay(force=force) self.log.info( - "Successfully queued lexile_db_update_task (force=%s)", - force, + f"Successfully queued lexile_db_update_task (force={force})", ) diff --git a/tests/manager/scripts/test_lexile_db.py b/tests/manager/scripts/test_lexile_db.py index 9a940c9295..92617bd9a5 100644 --- a/tests/manager/scripts/test_lexile_db.py +++ b/tests/manager/scripts/test_lexile_db.py @@ -2,6 +2,7 @@ from __future__ import annotations +from types import SimpleNamespace from unittest.mock import patch import pytest @@ -15,6 +16,33 @@ class TestLexileDBUpdateScript: """Tests for LexileDBUpdateScript.""" + def test_script_name( + self, + db: DatabaseTransactionFixture, + services_fixture: ServicesFixture, + ) -> None: + """Script has correct name.""" + script = LexileDBUpdateScript( + _db=db.session, + services=services_fixture.services, + ) + assert script.script_name == "Lexile DB Update" + + def test_arg_parser( + self, + db: DatabaseTransactionFixture, + ) -> None: + """arg_parser returns parser with --force and description.""" + parser = LexileDBUpdateScript.arg_parser(db.session) + args = parser.parse_args([]) + assert args.force is False + + args = parser.parse_args(["--force"]) + assert args.force is True + + assert "Lexile" in parser.description + assert "augment" in parser.description.lower() + @pytest.mark.parametrize( "force", [ @@ -42,3 +70,29 @@ def test_do_run( mock_task.delay.assert_called_once_with(force=force) assert "Successfully queued lexile_db_update_task" in caplog.text assert f"force={force}" in caplog.text + + def test_do_run_uses_constructor_force_when_parsed_missing_force( + self, + db: DatabaseTransactionFixture, + services_fixture: ServicesFixture, + caplog: pytest.LogCaptureFixture, + ) -> None: + """do_run uses self._force when parsed namespace has no force attribute.""" + caplog.set_level(LogLevel.info) + with ( + patch( + "palace.manager.scripts.lexile_db.lexile_db_update_task" + ) as mock_task, + patch.object( + LexileDBUpdateScript, + "parse_command_line", + return_value=SimpleNamespace(), + ), + ): + script = LexileDBUpdateScript( + _db=db.session, + services=services_fixture.services, + force=True, + ) + script.do_run([]) + mock_task.delay.assert_called_once_with(force=True) From 94d8b37bd23f074a2ab00eee7f9a54034eb057e9 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 19 Mar 2026 16:21:20 -0700 Subject: [PATCH 11/15] Fix mypy --- tests/manager/celery/tasks/test_lexile.py | 3 ++- tests/manager/scripts/test_lexile_db.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/manager/celery/tasks/test_lexile.py b/tests/manager/celery/tasks/test_lexile.py index 5fc4a4d12c..43f5af3f33 100644 --- a/tests/manager/celery/tasks/test_lexile.py +++ b/tests/manager/celery/tasks/test_lexile.py @@ -12,6 +12,7 @@ from palace.manager.integration.metadata.lexile.service import LexileDBService from palace.manager.integration.metadata.lexile.settings import LexileDBSettings from palace.manager.service.logging.configuration import LogLevel +from palace.manager.service.redis.models.lock import RedisLock from palace.manager.sqlalchemy.constants import DataSourceConstants from palace.manager.sqlalchemy.model.classification import Classification, Subject from palace.manager.sqlalchemy.model.coverage import Timestamp @@ -78,7 +79,7 @@ def test_run_lexile_db_update_skipped_when_lock_already_held( ), ) caplog.set_level(LogLevel.info) - with patch.object(lexile.RedisLock, "locked", return_value=True): + with patch.object(RedisLock, "locked", return_value=True): lexile.run_lexile_db_update.delay().wait() assert "Lexile DB update already in progress, skipping." in caplog.text assert "Lexile DB update task queued" not in caplog.text diff --git a/tests/manager/scripts/test_lexile_db.py b/tests/manager/scripts/test_lexile_db.py index 92617bd9a5..ac4eff9c4b 100644 --- a/tests/manager/scripts/test_lexile_db.py +++ b/tests/manager/scripts/test_lexile_db.py @@ -40,6 +40,7 @@ def test_arg_parser( args = parser.parse_args(["--force"]) assert args.force is True + assert parser.description is not None assert "Lexile" in parser.description assert "augment" in parser.description.lower() From 9a242053df659be8c2d11d1b77cbe296ad09e788 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Fri, 20 Mar 2026 09:56:42 -0700 Subject: [PATCH 12/15] Improve test coverage. --- .../integration/metadata/lexile/test_api.py | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/tests/manager/integration/metadata/lexile/test_api.py b/tests/manager/integration/metadata/lexile/test_api.py index 3fee044e26..9c578d9ae9 100644 --- a/tests/manager/integration/metadata/lexile/test_api.py +++ b/tests/manager/integration/metadata/lexile/test_api.py @@ -2,11 +2,14 @@ from __future__ import annotations +from unittest.mock import patch + import pytest from palace.manager.core.exceptions import IntegrationException from palace.manager.integration.metadata.lexile.api import LexileDBAPI from palace.manager.integration.metadata.lexile.settings import LexileDBSettings +from palace.manager.util.http.http import HTTP from tests.fixtures.http import MockHttpClientFixture @@ -197,3 +200,120 @@ def test_fetch_lexile_for_isbn_raise_on_error_403( api.fetch_lexile_for_isbn("9780123456789", raise_on_error=True) assert "authentication" in str(excinfo.value).lower() + + def test_fetch_lexile_for_isbn_http_exception_returns_none( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns None when HTTP request raises an exception.""" + with patch.object( + HTTP, "get_with_timeout", side_effect=Exception("Connection refused") + ): + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result is None + + def test_fetch_lexile_for_isbn_http_exception_raise_on_error( + self, http_client: MockHttpClientFixture + ) -> None: + """API raises IntegrationException when HTTP request fails and raise_on_error=True.""" + with patch.object( + HTTP, "get_with_timeout", side_effect=Exception("Connection refused") + ): + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + with pytest.raises(IntegrationException) as excinfo: + api.fetch_lexile_for_isbn("9780123456789", raise_on_error=True) + + assert excinfo.value.message == "Lexile API request failed" + assert excinfo.value.debug_message == "Connection refused" + + def test_fetch_lexile_for_isbn_non_401_403_raise_on_error( + self, http_client: MockHttpClientFixture + ) -> None: + """API raises IntegrationException on 500 when raise_on_error=True.""" + http_client.queue_response(500, content="Internal Server Error") + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + with pytest.raises(IntegrationException) as excinfo: + api.fetch_lexile_for_isbn("9780123456789", raise_on_error=True) + + assert excinfo.value.message == "Lexile API request failed" + assert "500" in (excinfo.value.debug_message or "") + + def test_fetch_lexile_for_isbn_invalid_json( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns None when response body is invalid JSON.""" + http_client.queue_response(200, content="not valid json") + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result is None + + def test_fetch_lexile_for_isbn_non_numeric_lexile( + self, http_client: MockHttpClientFixture + ) -> None: + """API returns None when lexile field is non-numeric.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": "abc"}], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result is None + + def test_fetch_lexile_for_isbn_base_url_trailing_slash( + self, http_client: MockHttpClientFixture + ) -> None: + """API constructs URL correctly when base_url has trailing slash.""" + http_client.queue_response( + 200, + content={ + "meta": {"total_count": 1}, + "objects": [{"lexile": 650}], + }, + ) + settings = LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com/", + ) + api = LexileDBAPI(settings) + + result = api.fetch_lexile_for_isbn("9780123456789") + + assert result == 650 + assert "https://api.example.com/api/fab/v3/book/" in http_client.requests[0] From c6b1a478f81916241c60503e0b5c8fc39dbe59f3 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Mon, 23 Mar 2026 10:10:52 -0700 Subject: [PATCH 13/15] Align docstring with code. --- src/palace/manager/celery/tasks/lexile.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index 097d372e42..f329d3ba42 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -53,8 +53,8 @@ def _query_isbns_without_lexile( :param session: Database session. :param offset: Offset for pagination. :param limit: Maximum number of identifiers to return. - :param force: If True, include all ISBNs (including those with Lexile from other - sources). If False, only include ISBNs with no Lexile classification. + :param force: If True, include all ISBNs (including those that already have lexile scores from + Lexile DB). If False, only include ISBNs without Lexile classification. :return: List of Identifier objects. """ lexile_subject_exists = ( From 66ffe8de32c3c5dd19c83183c0a2498b188c26ba Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Mon, 23 Mar 2026 11:23:28 -0700 Subject: [PATCH 14/15] Lexile DB: address code review findings Bug and logic fixes: - Remove unreachable `if not data_source:` check after lookup with autocreate=True - Prevent orphaned Timestamp when lock acquisition fails by creating the stamp only after acquiring the lock (use provisional uuid4 for offset 0) - Use shared LEXILE_DB_LOCK_KEY and _lexile_db_lock in run_lexile_db_update - Clarify force mode semantics and Overdrive-only exclusion in docstrings Test coverage: - Add test for timestamp_id=None when offset > 0 - Assert lexile_db_update_task.delay() is called in orchestrator test - Add tests that Overdrive-only Lexile ISBNs are excluded in default and force modes Dead and redundant code: - Remove unused self._db from LexileDBService.__init__ - Remove LoggerMixin from LexileDBService (inherited via HasSelfTests) Code quality: - Add lexile_settings pytest fixture in test_api.py - Replace N+1 classification loop with single DELETE in _process_identifier - Add synchronize_session=False for delete with subquery - Add comment documenting Celery replace idiom for task chaining --- src/palace/manager/celery/tasks/lexile.py | 102 ++++++----- .../integration/metadata/lexile/service.py | 5 +- tests/manager/celery/tasks/test_lexile.py | 121 ++++++++++++- .../integration/metadata/lexile/test_api.py | 167 +++++++----------- 4 files changed, 245 insertions(+), 150 deletions(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index f329d3ba42..6a240fd7da 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -3,9 +3,10 @@ from __future__ import annotations from datetime import timedelta +from uuid import uuid4 from celery import shared_task -from sqlalchemy import and_, exists, select +from sqlalchemy import and_, delete, exists, select from sqlalchemy.orm import Session from palace.manager.celery.task import Task @@ -30,14 +31,19 @@ # 2× the worst-case sequential time for a full batch (BATCH_SIZE requests × API_TIMEOUT each). LOCK_TIMEOUT = timedelta(seconds=BATCH_SIZE * API_TIMEOUT * 2) SERVICE_NAME = "Lexile DB Update" +LEXILE_DB_LOCK_KEY: tuple[str, str] = ("LexileDB", "Update") -def _lexile_db_lock(redis_client: Redis, timestamp_id: int) -> RedisLock: - """Create a RedisLock for the Lexile DB update using timestamp_id as the lock value.""" +def _lexile_db_lock(redis_client: Redis, lock_value: int | str) -> RedisLock: + """Create a RedisLock for the Lexile DB update. + + :param lock_value: The lock value (timestamp_id for worker batches, or uuid for + the initial batch before a Timestamp exists). + """ return RedisLock( redis_client, - ["LexileDB", "Update"], - random_value=str(timestamp_id), + list(LEXILE_DB_LOCK_KEY), + random_value=str(lock_value), lock_timeout=LOCK_TIMEOUT, ) @@ -53,9 +59,14 @@ def _query_isbns_without_lexile( :param session: Database session. :param offset: Offset for pagination. :param limit: Maximum number of identifiers to return. - :param force: If True, include all ISBNs (including those that already have lexile scores from - Lexile DB). If False, only include ISBNs without Lexile classification. + :param force: If True, include ISBNs that have no Lexile or already have a Lexile DB record + (to refresh from MetaMetrics). If False, only include ISBNs with no Lexile from any source. :return: List of Identifier objects. + + Note: In force mode, ISBNs that have a Lexile score only from a third-party source (e.g. + Overdrive) are excluded. We only process ISBNs where we either have no Lexile data at all, or + we already have our own Lexile DB record to refresh. This avoids overwriting third-party + scores with a new Lexile DB lookup when we have never had authoritative data for that ISBN. """ lexile_subject_exists = ( select(Classification.id) @@ -79,11 +90,8 @@ def _query_isbns_without_lexile( query = select(Identifier).where(Identifier.type == Identifier.ISBN) if force: - # Force: process ISBNs that have no Lexile at all, OR that already have a - # Lexile DB record (to pick up updated values from MetaMetrics). - # ISBNs that have a Lexile from another source only (e.g. Overdrive) are - # intentionally excluded — we don't overwrite third-party scores unless we - # have our own authoritative record to replace. + # Force: no Lexile at all, OR already have Lexile DB record (refresh from MetaMetrics). + # Third-party-only Lexiles (e.g. Overdrive) excluded; see docstring. query = query.where(~exists(lexile_subject_exists) | exists(lexile_db_exists)) else: # Default: only process ISBNs with no Lexile classification from any source. @@ -111,12 +119,19 @@ def _process_identifier( # For force mode: remove existing Lexile DB classification if present (in case value changed) if force: - for classification in list(identifier.classifications): - if ( - classification.data_source.name == DataSourceConstants.LEXILE_DB - and classification.subject.type == Subject.LEXILE_SCORE - ): - session.delete(classification) + lexile_db_ids = ( + select(Classification.id) + .where( + Classification.identifier_id == identifier.id, + Classification.data_source_id == data_source.id, + ) + .join(Subject, Classification.subject_id == Subject.id) + .where(Subject.type == Subject.LEXILE_SCORE) + ) + session.execute( + delete(Classification).where(Classification.id.in_(lexile_db_ids)), + execution_options={"synchronize_session": False}, + ) identifier.classify( data_source, @@ -143,11 +158,7 @@ def run_lexile_db_update(task: Task) -> None: return redis_client = task.services.redis().client() - lock = RedisLock( - redis_client, - ["LexileDB", "Update"], - lock_timeout=LOCK_TIMEOUT, - ) + lock = _lexile_db_lock(redis_client, "orchestrator") if lock.locked(): task.log.info("Lexile DB update already in progress, skipping.") return @@ -169,6 +180,10 @@ def lexile_db_update_task( replacements using the Timestamp id as the lock value so replacement tasks can extend the lock. """ + if offset > 0 and timestamp_id is None: + task.log.error("Lexile DB update: timestamp_id required when offset > 0") + return + with task.transaction() as session: try: service = LexileDBService.from_config(session) @@ -176,26 +191,10 @@ def lexile_db_update_task( task.log.info(f"Lexile DB update skipped: {e}") return - if offset == 0: - stamp, _ = get_one_or_create( - session, - Timestamp, - service=SERVICE_NAME, - service_type=Timestamp.TASK_TYPE, - collection=None, - ) - timestamp_id = stamp.id - stamp.start = utc_now() - stamp.finish = None - stamp.achievements = None - stamp.exception = None - session.commit() - elif timestamp_id is None: - task.log.error("Lexile DB update: timestamp_id required when offset > 0") - return - + # Use a provisional lock value for the initial batch; later batches use timestamp_id. + lock_value: int | str = timestamp_id if timestamp_id is not None else str(uuid4()) redis_client = task.services.redis().client() - lock = _lexile_db_lock(redis_client, timestamp_id) + lock = _lexile_db_lock(redis_client, lock_value) if not lock.acquire(): task.log.info("Lexile DB update could not acquire lock, skipping.") return @@ -203,13 +202,23 @@ def lexile_db_update_task( identifiers: list[Identifier] = [] try: with task.transaction() as session: + if offset == 0: + stamp, _ = get_one_or_create( + session, + Timestamp, + service=SERVICE_NAME, + service_type=Timestamp.TASK_TYPE, + collection=None, + ) + timestamp_id = stamp.id + stamp.start = utc_now() + stamp.finish = None + stamp.achievements = None + stamp.exception = None + data_source = DataSource.lookup( session, DataSourceConstants.LEXILE_DB, autocreate=True ) - if not data_source: - task.log.error("Lexile DB data source not found") - return - api = LexileDBAPI(service.settings) identifiers = _query_isbns_without_lexile( session, offset, BATCH_SIZE, force @@ -237,6 +246,7 @@ def lexile_db_update_task( ) if len(identifiers) == BATCH_SIZE: + # Celery expects replace() to be raised as an exception to trigger task chaining. raise task.replace( lexile_db_update_task.s( force=force, diff --git a/src/palace/manager/integration/metadata/lexile/service.py b/src/palace/manager/integration/metadata/lexile/service.py index 23a3b08d86..c3d322d23e 100644 --- a/src/palace/manager/integration/metadata/lexile/service.py +++ b/src/palace/manager/integration/metadata/lexile/service.py @@ -19,13 +19,11 @@ ) from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration from palace.manager.sqlalchemy.util import get_one -from palace.manager.util.log import LoggerMixin class LexileDBService( MetadataService[LexileDBSettings], HasSelfTests, - LoggerMixin, ): """Augment Lexile scores from the authoritative MetaMetrics Lexile Titles Database. @@ -41,10 +39,9 @@ def __init__( ) -> None: """Initialize the service. - :param _db: Database session. + :param _db: Database session (required by the framework but not stored). :param settings: Lexile DB configuration. """ - self._db = _db self._settings = settings @property diff --git a/tests/manager/celery/tasks/test_lexile.py b/tests/manager/celery/tasks/test_lexile.py index 43f5af3f33..c686d89401 100644 --- a/tests/manager/celery/tasks/test_lexile.py +++ b/tests/manager/celery/tasks/test_lexile.py @@ -58,7 +58,13 @@ def test_run_lexile_db_update_queues_worker_when_configured( ), ) caplog.set_level(LogLevel.info) - lexile.run_lexile_db_update.delay().wait() + with patch.object( + lexile.lexile_db_update_task, + "delay", + wraps=lexile.lexile_db_update_task.delay, + ) as mock_delay: + lexile.run_lexile_db_update.delay().wait() + mock_delay.assert_called_once_with(force=False) assert "Lexile DB update task queued" in caplog.text def test_run_lexile_db_update_skipped_when_lock_already_held( @@ -96,6 +102,29 @@ def test_lexile_db_update_task_skipped_when_not_configured( lexile.lexile_db_update_task.delay(force=False).wait() assert "Lexile DB update skipped" in caplog.text + def test_lexile_db_update_task_errors_when_timestamp_id_missing_with_offset( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + caplog: LogCaptureFixture, + ) -> None: + """Worker logs error and returns when offset > 0 but timestamp_id is None.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + caplog.set_level(LogLevel.error) + lexile.lexile_db_update_task.delay( + force=False, offset=5, timestamp_id=None + ).wait() + assert "timestamp_id required when offset > 0" in caplog.text + def test_lexile_db_update_task_adds_classification( self, db: DatabaseTransactionFixture, @@ -272,3 +301,93 @@ def capture_replace(*args: object, **kwargs: object) -> None: assert sig.kwargs.get("offset") == lexile.BATCH_SIZE assert "timestamp_id" in sig.kwargs assert sig.kwargs["timestamp_id"] is not None + + def test_lexile_db_update_task_excludes_overdrive_only_lexile_default_mode( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + http_client: MockHttpClientFixture, + ) -> None: + """Default mode does not process ISBNs that have Lexile only from Overdrive.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + overdrive_source = DataSource.lookup( + db.session, DataSourceConstants.OVERDRIVE, autocreate=True + ) + identifier = db.identifier( + identifier_type=Identifier.ISBN, foreign_id="9780123456789" + ) + identifier.classify( + overdrive_source, + Subject.LEXILE_SCORE, + "600", + None, + weight=Classification.TRUSTED_DISTRIBUTOR_WEIGHT, + ) + db.session.commit() + + lexile.lexile_db_update_task.delay(force=False).wait() + + db.session.refresh(identifier) + lexile_classifications = [ + c + for c in identifier.classifications + if c.subject.type == Subject.LEXILE_SCORE + ] + assert len(lexile_classifications) == 1 + assert ( + lexile_classifications[0].data_source.name == DataSourceConstants.OVERDRIVE + ) + + def test_lexile_db_update_task_excludes_overdrive_only_lexile_force_mode( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + http_client: MockHttpClientFixture, + ) -> None: + """Force mode does not process ISBNs that have Lexile only from Overdrive.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + overdrive_source = DataSource.lookup( + db.session, DataSourceConstants.OVERDRIVE, autocreate=True + ) + identifier = db.identifier( + identifier_type=Identifier.ISBN, foreign_id="9780123456789" + ) + identifier.classify( + overdrive_source, + Subject.LEXILE_SCORE, + "600", + None, + weight=Classification.TRUSTED_DISTRIBUTOR_WEIGHT, + ) + db.session.commit() + + lexile.lexile_db_update_task.delay(force=True).wait() + + db.session.refresh(identifier) + lexile_classifications = [ + c + for c in identifier.classifications + if c.subject.type == Subject.LEXILE_SCORE + ] + assert len(lexile_classifications) == 1 + assert ( + lexile_classifications[0].data_source.name == DataSourceConstants.OVERDRIVE + ) diff --git a/tests/manager/integration/metadata/lexile/test_api.py b/tests/manager/integration/metadata/lexile/test_api.py index 9c578d9ae9..ffcb6a0e7b 100644 --- a/tests/manager/integration/metadata/lexile/test_api.py +++ b/tests/manager/integration/metadata/lexile/test_api.py @@ -13,11 +13,23 @@ from tests.fixtures.http import MockHttpClientFixture +@pytest.fixture +def lexile_settings() -> LexileDBSettings: + """Default test settings for Lexile DB API.""" + return LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ) + + class TestLexileDBAPI: """Tests for LexileDBAPI.""" def test_fetch_lexile_for_isbn_returns_lexile( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns Lexile measure when book is found.""" http_client.queue_response( @@ -27,12 +39,7 @@ def test_fetch_lexile_for_isbn_returns_lexile( "objects": [{"lexile": 650}], }, ) - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("9780123456789") @@ -41,7 +48,9 @@ def test_fetch_lexile_for_isbn_returns_lexile( assert "ISBN13=9780123456789" in http_client.requests[0] def test_fetch_lexile_for_isbn_10_digit( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API uses ISBN param for 10-digit ISBNs.""" http_client.queue_response( @@ -51,12 +60,7 @@ def test_fetch_lexile_for_isbn_10_digit( "objects": [{"lexile": 720}], }, ) - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("0123456789") @@ -64,7 +68,9 @@ def test_fetch_lexile_for_isbn_10_digit( assert "ISBN=0123456789" in http_client.requests[0] def test_fetch_lexile_for_isbn_strips_hyphens( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """ISBN hyphens are stripped before request.""" http_client.queue_response( @@ -74,12 +80,7 @@ def test_fetch_lexile_for_isbn_strips_hyphens( "objects": [{"lexile": 500}], }, ) - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("978-0-12-345678-9") @@ -87,7 +88,9 @@ def test_fetch_lexile_for_isbn_strips_hyphens( assert "ISBN13=9780123456789" in http_client.requests[0] def test_fetch_lexile_for_isbn_not_found( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns None when book has no Lexile data.""" http_client.queue_response( @@ -97,19 +100,16 @@ def test_fetch_lexile_for_isbn_not_found( "objects": [], }, ) - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("9780123456789") assert result is None def test_fetch_lexile_for_isbn_empty_objects( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns None when objects list is empty despite total_count.""" http_client.queue_response( @@ -119,19 +119,16 @@ def test_fetch_lexile_for_isbn_empty_objects( "objects": [], }, ) - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("9780123456789") assert result is None def test_fetch_lexile_for_isbn_null_lexile( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns None when lexile field is null.""" http_client.queue_response( @@ -141,43 +138,32 @@ def test_fetch_lexile_for_isbn_null_lexile( "objects": [{"lexile": None}], }, ) - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("9780123456789") assert result is None def test_fetch_lexile_for_isbn_http_error( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns None on HTTP error.""" http_client.queue_response(404, content="") - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("9780123456789") assert result is None def test_fetch_lexile_for_isbn_empty_string( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns None for empty ISBN.""" - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("") @@ -185,16 +171,13 @@ def test_fetch_lexile_for_isbn_empty_string( assert len(http_client.requests) == 0 def test_fetch_lexile_for_isbn_raise_on_error_403( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API raises IntegrationException on 403 when raise_on_error=True.""" http_client.queue_response(403, content="Forbidden") - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) with pytest.raises(IntegrationException) as excinfo: api.fetch_lexile_for_isbn("9780123456789", raise_on_error=True) @@ -202,36 +185,30 @@ def test_fetch_lexile_for_isbn_raise_on_error_403( assert "authentication" in str(excinfo.value).lower() def test_fetch_lexile_for_isbn_http_exception_returns_none( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns None when HTTP request raises an exception.""" with patch.object( HTTP, "get_with_timeout", side_effect=Exception("Connection refused") ): - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("9780123456789") assert result is None def test_fetch_lexile_for_isbn_http_exception_raise_on_error( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API raises IntegrationException when HTTP request fails and raise_on_error=True.""" with patch.object( HTTP, "get_with_timeout", side_effect=Exception("Connection refused") ): - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) with pytest.raises(IntegrationException) as excinfo: api.fetch_lexile_for_isbn("9780123456789", raise_on_error=True) @@ -240,16 +217,13 @@ def test_fetch_lexile_for_isbn_http_exception_raise_on_error( assert excinfo.value.debug_message == "Connection refused" def test_fetch_lexile_for_isbn_non_401_403_raise_on_error( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API raises IntegrationException on 500 when raise_on_error=True.""" http_client.queue_response(500, content="Internal Server Error") - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) with pytest.raises(IntegrationException) as excinfo: api.fetch_lexile_for_isbn("9780123456789", raise_on_error=True) @@ -258,23 +232,22 @@ def test_fetch_lexile_for_isbn_non_401_403_raise_on_error( assert "500" in (excinfo.value.debug_message or "") def test_fetch_lexile_for_isbn_invalid_json( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns None when response body is invalid JSON.""" http_client.queue_response(200, content="not valid json") - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("9780123456789") assert result is None def test_fetch_lexile_for_isbn_non_numeric_lexile( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, + lexile_settings: LexileDBSettings, ) -> None: """API returns None when lexile field is non-numeric.""" http_client.queue_response( @@ -284,19 +257,15 @@ def test_fetch_lexile_for_isbn_non_numeric_lexile( "objects": [{"lexile": "abc"}], }, ) - settings = LexileDBSettings( - username="user", - password="pass", - base_url="https://api.example.com", - ) - api = LexileDBAPI(settings) + api = LexileDBAPI(lexile_settings) result = api.fetch_lexile_for_isbn("9780123456789") assert result is None def test_fetch_lexile_for_isbn_base_url_trailing_slash( - self, http_client: MockHttpClientFixture + self, + http_client: MockHttpClientFixture, ) -> None: """API constructs URL correctly when base_url has trailing slash.""" http_client.queue_response( From 17a204adcd34ae959ad9641683a987db1bca508f Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Mon, 23 Mar 2026 12:35:22 -0700 Subject: [PATCH 15/15] Implement workflow lock for Lexile DB update task Replace the per-batch acquire/release lock pattern with a workflow-level Redis lock (2-hour TTL) that persists across all batches in a paginated run. A UUID is generated on the first batch and passed to every replacement task via task.replace(), allowing each subsequent batch to extend the same lock. The lock() context manager is configured with ignored_exceptions=(Ignore,) so that Celery's Ignore exception (raised by task.replace()) does not trigger a lock release mid-workflow. Also adds lock_value parameter validation and three new tests covering the workflow lock configuration, the first-batch skip-when-locked path, and the missing-lock_value guard. --- src/palace/manager/celery/tasks/lexile.py | 78 +++++++++++++------ tests/manager/celery/tasks/test_lexile.py | 92 +++++++++++++++++++++++ 2 files changed, 146 insertions(+), 24 deletions(-) diff --git a/src/palace/manager/celery/tasks/lexile.py b/src/palace/manager/celery/tasks/lexile.py index 6a240fd7da..4e27330297 100644 --- a/src/palace/manager/celery/tasks/lexile.py +++ b/src/palace/manager/celery/tasks/lexile.py @@ -6,12 +6,13 @@ from uuid import uuid4 from celery import shared_task +from celery.exceptions import Ignore from sqlalchemy import and_, delete, exists, select from sqlalchemy.orm import Session from palace.manager.celery.task import Task from palace.manager.core.config import CannotLoadConfiguration -from palace.manager.integration.metadata.lexile.api import API_TIMEOUT, LexileDBAPI +from palace.manager.integration.metadata.lexile.api import LexileDBAPI from palace.manager.integration.metadata.lexile.service import LexileDBService from palace.manager.service.celery.celery import QueueNames from palace.manager.service.redis.models.lock import RedisLock @@ -28,23 +29,26 @@ from palace.manager.util.datetime_helpers import utc_now BATCH_SIZE = 10 -# 2× the worst-case sequential time for a full batch (BATCH_SIZE requests × API_TIMEOUT each). -LOCK_TIMEOUT = timedelta(seconds=BATCH_SIZE * API_TIMEOUT * 2) +# Workflow lock TTL: 2 hours, so a failed run will eventually unblock. +WORKFLOW_LOCK_TIMEOUT = timedelta(hours=2) SERVICE_NAME = "Lexile DB Update" LEXILE_DB_LOCK_KEY: tuple[str, str] = ("LexileDB", "Update") -def _lexile_db_lock(redis_client: Redis, lock_value: int | str) -> RedisLock: - """Create a RedisLock for the Lexile DB update. +def _lexile_workflow_lock(redis_client: Redis, lock_value: str) -> RedisLock: + """Create a workflow-level RedisLock for the Lexile DB update. - :param lock_value: The lock value (timestamp_id for worker batches, or uuid for - the initial batch before a Timestamp exists). + The lock is held across all batches of a single run. ``lock_value`` is a UUID + generated on the first batch and passed to every replacement task, allowing + re-acquisition (extend) on each subsequent batch. + + :param lock_value: UUID string identifying this workflow run. """ return RedisLock( redis_client, list(LEXILE_DB_LOCK_KEY), - random_value=str(lock_value), - lock_timeout=LOCK_TIMEOUT, + random_value=lock_value, + lock_timeout=WORKFLOW_LOCK_TIMEOUT, ) @@ -158,8 +162,8 @@ def run_lexile_db_update(task: Task) -> None: return redis_client = task.services.redis().client() - lock = _lexile_db_lock(redis_client, "orchestrator") - if lock.locked(): + # Check with a sentinel value — we only need to know if any lock is held. + if _lexile_workflow_lock(redis_client, "sentinel").locked(): task.log.info("Lexile DB update already in progress, skipping.") return @@ -173,16 +177,28 @@ def lexile_db_update_task( force: bool = False, offset: int = 0, timestamp_id: int | None = None, + lock_value: str | None = None, ) -> None: """Worker: process batches of ISBNs, fetching Lexile data from the API. - Uses task.replace() to continue with the next batch. Holds a lock across - replacements using the Timestamp id as the lock value so replacement tasks - can extend the lock. + Uses task.replace() to continue with the next batch. A workflow-level Redis + lock is held across all batches: acquired on the first batch (when lock_value + is None) and extended on each subsequent batch via re-acquisition with the same + UUID. The lock() context manager's ignored_exceptions=(Ignore,) ensures that the + lock is not released when task.replace() hands off to the next batch. + + :param force: If True, reprocess ISBNs that already have a Lexile DB record. + :param offset: Pagination offset for the current batch. + :param timestamp_id: ID of the Timestamp DB record for this run. Required when offset > 0. + :param lock_value: UUID identifying this workflow run's lock. Generated on the first batch + and passed to every replacement task. Required when offset > 0. """ if offset > 0 and timestamp_id is None: task.log.error("Lexile DB update: timestamp_id required when offset > 0") return + if offset > 0 and lock_value is None: + task.log.error("Lexile DB update: lock_value required when offset > 0") + return with task.transaction() as session: try: @@ -191,16 +207,29 @@ def lexile_db_update_task( task.log.info(f"Lexile DB update skipped: {e}") return - # Use a provisional lock value for the initial batch; later batches use timestamp_id. - lock_value: int | str = timestamp_id if timestamp_id is not None else str(uuid4()) + # is_first_batch is True only when no lock_value was passed in (fresh run). + is_first_batch = lock_value is None + if lock_value is None: + lock_value = str(uuid4()) + redis_client = task.services.redis().client() - lock = _lexile_db_lock(redis_client, lock_value) - if not lock.acquire(): - task.log.info("Lexile DB update could not acquire lock, skipping.") - return + workflow_lock = _lexile_workflow_lock(redis_client, lock_value) + + # Ignore is raised by task.replace() — it must not release the lock when chaining + # to the next batch, so the next batch can extend it with the same lock_value. + with workflow_lock.lock( + raise_when_not_acquired=False, + ignored_exceptions=(Ignore,), + ) as lock_acquired: + if not lock_acquired and is_first_batch: + task.log.info("Lexile DB update could not acquire lock, skipping.") + return + if not lock_acquired and not is_first_batch: + task.log.warning( + "Lexile DB update: workflow lock expired between batches; continuing." + ) - identifiers: list[Identifier] = [] - try: + identifiers: list[Identifier] = [] with task.transaction() as session: if offset == 0: stamp, _ = get_one_or_create( @@ -247,15 +276,16 @@ def lexile_db_update_task( if len(identifiers) == BATCH_SIZE: # Celery expects replace() to be raised as an exception to trigger task chaining. + # Raising Ignore (via task.replace) is listed in ignored_exceptions above, so the + # workflow lock is NOT released here — the next batch task will extend it. raise task.replace( lexile_db_update_task.s( force=force, offset=offset + BATCH_SIZE, timestamp_id=timestamp_id, + lock_value=lock_value, ) ) - finally: - lock.release() task.log.info( f"Lexile DB update complete. Processed {len(identifiers)} identifiers at offset {offset}.", diff --git a/tests/manager/celery/tasks/test_lexile.py b/tests/manager/celery/tasks/test_lexile.py index c686d89401..9f528da266 100644 --- a/tests/manager/celery/tasks/test_lexile.py +++ b/tests/manager/celery/tasks/test_lexile.py @@ -5,6 +5,7 @@ from unittest.mock import patch import pytest +from celery.exceptions import Ignore from pytest import LogCaptureFixture from palace.manager.celery.tasks import lexile @@ -125,6 +126,51 @@ def test_lexile_db_update_task_errors_when_timestamp_id_missing_with_offset( ).wait() assert "timestamp_id required when offset > 0" in caplog.text + def test_lexile_db_update_task_errors_when_lock_value_missing_with_offset( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + caplog: LogCaptureFixture, + ) -> None: + """Worker logs error and returns when offset > 0 but lock_value is None.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + caplog.set_level(LogLevel.error) + lexile.lexile_db_update_task.delay( + force=False, offset=5, timestamp_id=42, lock_value=None + ).wait() + assert "lock_value required when offset > 0" in caplog.text + + def test_lexile_db_update_task_skipped_when_lock_already_held_first_batch( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + caplog: LogCaptureFixture, + ) -> None: + """First batch skips when workflow lock is already held by another run.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + caplog.set_level(LogLevel.info) + with patch.object(RedisLock, "acquire", return_value=False): + lexile.lexile_db_update_task.delay(force=False).wait() + assert "could not acquire lock, skipping" in caplog.text + def test_lexile_db_update_task_adds_classification( self, db: DatabaseTransactionFixture, @@ -301,6 +347,52 @@ def capture_replace(*args: object, **kwargs: object) -> None: assert sig.kwargs.get("offset") == lexile.BATCH_SIZE assert "timestamp_id" in sig.kwargs assert sig.kwargs["timestamp_id"] is not None + assert "lock_value" in sig.kwargs + assert isinstance(sig.kwargs["lock_value"], str) + + def test_lexile_db_update_task_workflow_lock_configured_to_survive_replace( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + http_client: MockHttpClientFixture, + ) -> None: + """Workflow lock is configured with ignored_exceptions=(Ignore,) so it is + NOT released when task.replace() raises Ignore to chain the next batch.""" + db.integration_configuration( + protocol=LexileDBService, + goal=Goals.METADATA_GOAL, + settings=LexileDBSettings( + username="user", + password="pass", + base_url="https://api.example.com", + ), + ) + + captured_ignored_exceptions: list[tuple[type[BaseException], ...]] = [] + original_lock = RedisLock.lock + + def spy_lock( + self_lock: RedisLock, + raise_when_not_acquired: bool = True, + release_on_error: bool = True, + release_on_exit: bool = True, + ignored_exceptions: tuple[type[BaseException], ...] = (), + ) -> object: + captured_ignored_exceptions.append(ignored_exceptions) + return original_lock( + self_lock, + raise_when_not_acquired=raise_when_not_acquired, + release_on_error=release_on_error, + release_on_exit=release_on_exit, + ignored_exceptions=ignored_exceptions, + ) + + with patch.object(RedisLock, "lock", spy_lock): + lexile.lexile_db_update_task.delay(force=False).wait() + + assert len(captured_ignored_exceptions) == 1 + assert Ignore in captured_ignored_exceptions[0] def test_lexile_db_update_task_excludes_overdrive_only_lexile_default_mode( self,