diff --git a/src/python/controller/auto_queue.py b/src/python/controller/auto_queue.py index 2ecb9f82..0e2a120a 100644 --- a/src/python/controller/auto_queue.py +++ b/src/python/controller/auto_queue.py @@ -2,6 +2,7 @@ import fnmatch import json +import threading from abc import ABC, abstractmethod from collections.abc import Callable @@ -66,31 +67,47 @@ class AutoQueuePersist(Persist): __KEY_PATTERNS = "patterns" def __init__(self): + # Guards __patterns and __listeners. add/remove run on the web thread + # while patterns/to_str are read on the controller thread, so the list + # must be lock-guarded (mirrors common/path_pairs_config.py). RLock is + # reentrant so from_str -> add_pattern does not self-deadlock. + self.__lock = threading.RLock() self.__patterns: list[AutoQueuePattern] = [] self.__listeners: list[IAutoQueuePersistListener] = [] @property def patterns(self) -> set[AutoQueuePattern]: - return set(self.__patterns) + with self.__lock: + return set(self.__patterns) def add_pattern(self, pattern: AutoQueuePattern): # Check values if not pattern.pattern.strip(): raise ValueError("Cannot add blank pattern") - if pattern not in self.__patterns: + with self.__lock: + if pattern in self.__patterns: + return self.__patterns.append(pattern) - for listener in self.__listeners: - listener.pattern_added(pattern) + listeners = list(self.__listeners) + # Notify outside the lock so the persist lock and any listener lock are + # never held simultaneously. + for listener in listeners: + listener.pattern_added(pattern) def remove_pattern(self, pattern: AutoQueuePattern): - if pattern in self.__patterns: + with self.__lock: + if pattern not in self.__patterns: + return self.__patterns.remove(pattern) - for listener in self.__listeners: - listener.pattern_removed(pattern) + listeners = list(self.__listeners) + # Notify outside the lock (see add_pattern). + for listener in listeners: + listener.pattern_removed(pattern) def add_listener(self, listener: IAutoQueuePersistListener): - self.__listeners.append(listener) + with self.__lock: + self.__listeners.append(listener) @classmethod @overrides(Persist) @@ -108,7 +125,8 @@ def from_str(cls: type["AutoQueuePersist"], content: str) -> "AutoQueuePersist": @overrides(Persist) def to_str(self) -> str: dct: dict[str, list[str]] = {} - dct[AutoQueuePersist.__KEY_PATTERNS] = [p.to_str() for p in self.__patterns] + with self.__lock: + dct[AutoQueuePersist.__KEY_PATTERNS] = [p.to_str() for p in self.__patterns] return json.dumps(dct, indent=Constants.JSON_PRETTY_PRINT_INDENT) @@ -133,27 +151,51 @@ def file_removed(self, file: ModelFile): class AutoQueuePersistListener(IAutoQueuePersistListener): - """Keeps track of newly added patterns""" + """Keeps track of newly added patterns. + + pattern_added/pattern_removed run on the web thread while the controller + thread drains (drain_new_patterns) the set. The lock keeps the + controller-thread read from racing a web-thread mutation, which would + otherwise raise 'RuntimeError: Set changed size during iteration'. + """ def __init__(self): + self.__lock = threading.RLock() self.new_patterns: set[AutoQueuePattern] = set() @overrides(IAutoQueuePersistListener) def pattern_added(self, pattern: AutoQueuePattern): - self.new_patterns.add(pattern) + with self.__lock: + self.new_patterns.add(pattern) @overrides(IAutoQueuePersistListener) def pattern_removed(self, pattern: AutoQueuePattern): - if pattern in self.new_patterns: - self.new_patterns.remove(pattern) + with self.__lock: + self.new_patterns.discard(pattern) + + def drain_new_patterns(self) -> set[AutoQueuePattern]: + """Atomically return a copy of new_patterns and clear it in one locked + step. + + Draining (snapshot + clear together) closes a race that a separate + snapshot-then-clear leaves open: a pattern added by a concurrent + pattern_added() between the snapshot and the clear would be wiped + without ever being processed. With an atomic drain, such a pattern + simply stays in the set for the next cycle. + """ + with self.__lock: + drained = set(self.new_patterns) + self.new_patterns.clear() + return drained class AutoQueue: """ Implements auto-queue functionality by sending commands to controller - as matching files are discovered - AutoQueue is in the same thread as Controller, so no synchronization is - needed for now + as matching files are discovered. + AutoQueue runs on the Controller thread, but pattern add/remove happens on + the web handler thread; shared state in AutoQueuePersist and + AutoQueuePersistListener is therefore lock-guarded. """ def __init__(self, context: Context, persist: AutoQueuePersist, controller: Controller): @@ -205,9 +247,17 @@ def process(self): if not any_auto_feature: return - files_to_queue = self.__gather_queue_candidates() if self.__enabled else [] - files_to_extract = self.__gather_extract_candidates() if self.__auto_extract_enabled else [] - files_to_delete_remote = self.__gather_delete_remote_candidates() if self.__auto_delete_remote_enabled else [] + # Drain the newly-added patterns once for this whole cycle. Draining + # (snapshot + clear atomically) instead of snapshotting per-gather and + # clearing separately at the end ensures a pattern added concurrently + # mid-cycle is retained for the next cycle rather than cleared unprocessed. + new_patterns = self.__persist_listener.drain_new_patterns() + + files_to_queue = self.__gather_queue_candidates(new_patterns) if self.__enabled else [] + files_to_extract = self.__gather_extract_candidates(new_patterns) if self.__auto_extract_enabled else [] + files_to_delete_remote = ( + self.__gather_delete_remote_candidates(new_patterns) if self.__auto_delete_remote_enabled else [] + ) # Send commands (order matters: queue, extract, then delete remote) self.__send_commands(files_to_queue, Controller.Command.Action.QUEUE, "Auto queueing") @@ -218,13 +268,11 @@ def process(self): if self.__auto_delete_remote_enabled: self.__retry_delete_remote(files_to_delete_remote) - # Clear the processed files + # Clear the processed files (new patterns were already drained above) self.__model_listener.new_files.clear() self.__model_listener.modified_files.clear() - # Clear the new patterns - self.__persist_listener.new_patterns.clear() - def __gather_queue_candidates(self) -> list[_Candidate]: + def __gather_queue_candidates(self, new_patterns: set[AutoQueuePattern]) -> list[_Candidate]: candidates: list[ModelFile] = list(self.__model_listener.new_files) for old_file, new_file in self.__model_listener.modified_files: if old_file.remote_size != new_file.remote_size: @@ -236,9 +284,10 @@ def __gather_queue_candidates(self) -> list[_Candidate]: and f.state == ModelFile.State.DEFAULT and self._is_auto_queue_enabled_for_file(f) ), + new_patterns=new_patterns, ) - def __gather_extract_candidates(self) -> list[_Candidate]: + def __gather_extract_candidates(self, new_patterns: set[AutoQueuePattern]) -> list[_Candidate]: candidates: list[ModelFile] = list(self.__model_listener.new_files) # Candidate modified files that just became DOWNLOADED # But not files that went EXTRACTING -> DOWNLOADED (failed extraction) @@ -257,9 +306,10 @@ def __gather_extract_candidates(self) -> list[_Candidate]: and f.local_size > 0 and f.is_extractable ), + new_patterns=new_patterns, ) - def __gather_delete_remote_candidates(self) -> list[_Candidate]: + def __gather_delete_remote_candidates(self, new_patterns: set[AutoQueuePattern]) -> list[_Candidate]: candidates: list[ModelFile] = list(self.__model_listener.new_files) for old_file, new_file in self.__model_listener.modified_files: if old_file.state != new_file.state and new_file.state in ( @@ -279,6 +329,7 @@ def __gather_delete_remote_candidates(self) -> list[_Candidate]: ) ) ), + new_patterns=new_patterns, ) def __send_commands( @@ -328,7 +379,12 @@ def _is_auto_queue_enabled_for_file(self, file: ModelFile) -> bool: return self.__pair_auto_queue.get(file.pair_id, False) return True - def __filter_candidates(self, candidates: list[ModelFile], accept: Callable[[ModelFile], bool]) -> list[_Candidate]: + def __filter_candidates( + self, + candidates: list[ModelFile], + accept: Callable[[ModelFile], bool], + new_patterns: set[AutoQueuePattern], + ) -> list[_Candidate]: """ Given a list of candidate files, filter out those that match the accept criteria Also takes into consideration new patterns that were added @@ -336,6 +392,8 @@ def __filter_candidates(self, candidates: list[ModelFile], accept: Callable[[Mod new patterns :param candidates: :param accept: + :param new_patterns: patterns added since the last cycle (drained once by + the caller), matched retroactively against all existing model files :return: list of (filename, pair_id, pattern) tuples """ # Files accepted and matched, (name, pair_id) -> pattern map @@ -353,10 +411,11 @@ def __filter_candidates(self, candidates: list[ModelFile], accept: Callable[[Mod elif accept(file): files_matched[(file.name, file.pair_id)] = None - # Step 2: run new pattern through all the files - if self.__persist_listener.new_patterns: + # Step 2: run new patterns (drained once by process() for this cycle) + # through all the files + if new_patterns: model_files = self.__controller.get_model_files() - for new_pattern in self.__persist_listener.new_patterns: + for new_pattern in new_patterns: for file in model_files: if accept(file) and self.__match(new_pattern, file): files_matched[(file.name, file.pair_id)] = new_pattern diff --git a/src/python/controller/command_pipeline.py b/src/python/controller/command_pipeline.py index 10fef22a..f3b25c98 100644 --- a/src/python/controller/command_pipeline.py +++ b/src/python/controller/command_pipeline.py @@ -18,7 +18,7 @@ if TYPE_CHECKING: from .controller import Controller -from common import AppError, Context, MultiprocessingLogger +from common import AppError, AppProcess, Context, MultiprocessingLogger from lftp import LftpError from model import ModelError, ModelFile @@ -78,6 +78,10 @@ def __init__( # Track files with pending validation so extraction-completion doesn't race the move self.pending_validation_keys: set[str] = set() + # Worker ids already reported as dead, so a permanently-dead extract/ + # validate worker is surfaced once at ERROR rather than every cycle. + self.__reported_dead_workers: set[int] = set() + def queue(self, command: Controller.Command) -> None: """Put a command on the queue for processing.""" self.command_queue.put(command) @@ -391,16 +395,44 @@ def cleanup(self): if move_process.is_alive(): still_active_moves.append(move_process) else: - try: - move_process.propagate_exception() - except Exception: - self._logger.warning("Move process failed: %s", move_process.name, exc_info=True) - move_key = persist_key(move_process.pair_id, move_process.file_name) - self.moved_file_keys.discard(move_key) - for pc in self._pair_contexts: - pc.local_scan_process.force_scan() + self._finalize_move_process(move_process) + # Reap the finished process now (join + close its queues) so its + # FDs are released immediately. It is dropped from + # active_move_processes below, so Controller.exit() never sees it + # and would otherwise leak its queue FDs until shutdown. + move_process.join() + move_process.close_queues() self.active_move_processes = still_active_moves + def _finalize_move_process(self, move_process: MoveProcess) -> None: + """Finalize a completed move: surface failures, discard key on failure, rescan. + + A move can fail two ways: by raising (propagate_exception) or by + reporting a MoveFailedResult on its failed queue (silent return paths + such as a vanished source or a size mismatch). In both cases the moved + key is discarded so the next force_scan re-spawns the move (retry). + """ + failed = False + try: + move_process.propagate_exception() + except Exception: + self._logger.warning("Move process failed: %s", move_process.name, exc_info=True) + failed = True + for result in move_process.pop_failed(): + self._logger.error(f"Move failed for '{result.name}': {result.error_message}") + failed = True + if failed: + move_key = persist_key(move_process.pair_id, move_process.file_name) + self.moved_file_keys.discard(move_key) + # The move only changed the owning pair's local_path, so rescan just that + # pair; fall back to all pairs only if the owner can't be located. + owner = next((pc for pc in self._pair_contexts if pc.pair_id == move_process.pair_id), None) + if owner is not None: + owner.local_scan_process.force_scan() + else: + for pc in self._pair_contexts: + pc.local_scan_process.force_scan() + def propagate_exceptions(self): """ Propagate any exceptions from child processes/threads to this thread @@ -419,8 +451,45 @@ def propagate_exceptions(self): pc.local_scan_process.propagate_exception() pc.remote_scan_process.propagate_exception() self._mp_logger.propagate_exception() - self._extract_process.propagate_exception() - self._validate_process.propagate_exception() + # A fault in an extract/validate worker that surfaces outside its + # per-task guard (OOM, queue corruption, run_init/run_cleanup bug) + # must degrade that feature only, not kill the whole controller. + # Isolate each worker so one dead worker doesn't stop the other or + # halt all downloads. + # NOTE: this only ISOLATES the fault — the dead worker is not yet + # recreated, so extract/validate stays disabled until the service is + # restarted. Automatic worker recreation is tracked in #511's follow-up. + try: + self._extract_process.propagate_exception() + except Exception: + self._logger.warning("Extract worker process failed: %s", self._extract_process.name, exc_info=True) + try: + self._validate_process.propagate_exception() + except Exception: + self._logger.warning("Validate worker process failed: %s", self._validate_process.name, exc_info=True) + # propagate_exception() consumes the queued fault once, so without this a + # permanently-dead worker would go silent after the first cycle. Surface + # the dead state once at ERROR so the degradation stays visible. + self.__report_if_worker_dead(self._extract_process, "Extract") + self.__report_if_worker_dead(self._validate_process, "Validate") + + def __report_if_worker_dead(self, worker: AppProcess, feature: str) -> None: + worker_id = id(worker) + if worker_id in self.__reported_dead_workers: + return + try: + alive = worker.is_alive() + except (ValueError, AssertionError): + # Never started or already closed — treat as not-running. + alive = False + if not alive: + self.__reported_dead_workers.add(worker_id) + self._logger.error( + "%s worker process %s has died; %s is disabled until the service is restarted.", + feature, + worker.name, + feature.lower(), + ) def spawn_deferred_move(self, pair_id: str | None, file_name: str): """Spawn the staging->final move for a file whose validation just finished. @@ -459,11 +528,15 @@ def spawn_move_process(self, file_name: str, pc: PairContext): self.moved_file_keys.add(move_key) return - self.moved_file_keys.add(move_key) process = MoveProcess(source_path=staging_source, dest_path=dest_path, file_name=file_name, pair_id=pair_id) process.set_mp_log_queue(self._mp_logger.queue, self._mp_logger.log_level) - self.active_move_processes.append(process) + # Start before publishing bookkeeping: if start() raises we must not leave + # a stale move_key (which would block every future retry of this move) or + # a never-started process in active_move_processes (cleanup() would then + # join()/close_queues() it and raise AssertionError). process.start() + self.moved_file_keys.add(move_key) + self.active_move_processes.append(process) self._logger.info(f"Spawned move process for {file_name} (staging -> local)") def _get_pair_context_for_command(self, command: Controller.Command) -> PairContext | None: diff --git a/src/python/controller/controller.py b/src/python/controller/controller.py index 975e25c9..d513284f 100644 --- a/src/python/controller/controller.py +++ b/src/python/controller/controller.py @@ -5,14 +5,14 @@ import os import threading from abc import ABC, abstractmethod -from collections.abc import Callable +from collections.abc import Callable, Iterator from enum import Enum from typing import TYPE_CHECKING if TYPE_CHECKING: pass -from common import AppOneShotProcess, Constants, Context, MultiprocessingLogger +from common import AppOneShotProcess, AppProcess, Constants, Context, MultiprocessingLogger from lftp import Lftp from model import IModelListener, Model, ModelFile @@ -309,6 +309,11 @@ def start(self): :return: """ self.logger.debug("Starting controller") + # Mark started before launching children so that a partial failure here + # still lets exit() tear down whatever did start (exit() is best-effort). + # Otherwise exit() would early-return on __started=False and leak the + # already-started processes and their queue FDs. + self.__started = True for pc in self.__pair_contexts: pc.active_scan_process.start() pc.local_scan_process.start() @@ -316,7 +321,6 @@ def start(self): self.__extract_process.start() self.__validate_process.start() self.__mp_logger.start() - self.__started = True def request_lftp_reconfigure(self) -> None: """Signal that LFTP tuning settings have changed and should be reapplied. @@ -343,51 +347,85 @@ def process(self): self.__pipeline.step() self.__updater.update() + def __shutdown_lftp_best_effort(self): + # A hung/dead lftp PTY can raise here; guard each call so one failure + # does not abort teardown of the remaining pairs or the worker + # terminate/join/close_queues phases in exit() (otherwise orphaned + # processes leak FDs on every restart). + for pc in self.__pair_contexts: + try: + pc.lftp.exit() + except Exception: + self.logger.exception( + "Error shutting down lftp for pair %s; continuing teardown", + getattr(pc, "pair_id", None), + ) + + def __iter_worker_processes(self) -> Iterator[AppProcess]: + """All terminable worker processes, in teardown order.""" + for pc in self.__pair_contexts: + yield pc.active_scan_process + yield pc.local_scan_process + yield pc.remote_scan_process + yield self.__extract_process + yield self.__validate_process + for cp in self.__pipeline.active_command_processes: + yield cp.process + yield from self.__pipeline.active_move_processes + + # Bound each worker join during teardown: a worker stuck in uninterruptible + # I/O (e.g. a dead NAS mount that ignores the SIGTERM from terminate()) would + # otherwise hang join() — and thus exit() — forever, wedging ServiceRestart. + __JOIN_TIMEOUT_IN_SECS = 2 + + def __safe_teardown(self, description: str, action: Callable[[], object]) -> None: + # Teardown is best-effort: a raise from any single terminate/join/ + # close_queues call (a hung or already-closed worker) must not skip the + # remaining reaping or the FD-releasing close_queues phase, nor propagate + # out of exit(). Otherwise each restart leaks queue FDs until the OS + # limit (OSError: [Errno 24] No file descriptors available) is hit. + try: + action() + except Exception: + self.logger.exception("Error during controller teardown: %s", description) + + def __bounded_join(self, p: AppProcess) -> None: + self.__safe_teardown("join", lambda: p.join(self.__JOIN_TIMEOUT_IN_SECS)) + try: + still_alive = p.is_alive() + except (ValueError, AssertionError): + still_alive = False + if still_alive: + self.logger.warning( + "Worker %s did not exit within %ss; continuing teardown", + getattr(p, "name", "?"), + self.__JOIN_TIMEOUT_IN_SECS, + ) + def exit(self): self.logger.debug("Exiting controller") - if self.__started: - for pc in self.__pair_contexts: - pc.lftp.exit() - pc.active_scan_process.terminate() - pc.local_scan_process.terminate() - pc.remote_scan_process.terminate() - self.__extract_process.terminate() - self.__validate_process.terminate() - for cp in self.__pipeline.active_command_processes: - cp.process.terminate() - for mp in self.__pipeline.active_move_processes: - mp.terminate() - for pc in self.__pair_contexts: - pc.active_scan_process.join() - pc.local_scan_process.join() - pc.remote_scan_process.join() - self.__extract_process.join() - self.__validate_process.join() - for cp in self.__pipeline.active_command_processes: - cp.process.join() - for mp in self.__pipeline.active_move_processes: - mp.join() - self.__mp_logger.stop() - - # Close multiprocessing queues to release file descriptors. - # Without this, each restart cycle leaks FDs until the OS limit - # is exhausted (OSError: [Errno 24] No file descriptors available). - for pc in self.__pair_contexts: - pc.active_scan_process.close_queues() - pc.local_scan_process.close_queues() - pc.remote_scan_process.close_queues() - pc.active_scanner.close() - self.__extract_process.close_queues() - self.__validate_process.close_queues() - for cp in self.__pipeline.active_command_processes: - cp.process.close_queues() - for mp in self.__pipeline.active_move_processes: - mp.close_queues() - self.__pipeline.active_command_processes.clear() - self.__pipeline.active_move_processes.clear() - - self.__started = False - self.logger.info("Exited controller") + if not self.__started: + return + + self.__shutdown_lftp_best_effort() + processes = list(self.__iter_worker_processes()) + for p in processes: + self.__safe_teardown("terminate", p.terminate) + for p in processes: + self.__bounded_join(p) + self.__safe_teardown("stop mp_logger", self.__mp_logger.stop) + + # Close multiprocessing queues to release file descriptors; this must run + # even if a terminate()/join() above failed, or each restart cycle leaks + # FDs until the OS limit is exhausted. + for p in processes: + self.__safe_teardown("close_queues", p.close_queues) + for pc in self.__pair_contexts: + self.__safe_teardown("close active_scanner", pc.active_scanner.close) + self.__pipeline.active_command_processes.clear() + self.__pipeline.active_move_processes.clear() + self.__started = False + self.logger.info("Exited controller") def get_model_files(self) -> list[ModelFile]: return self.__registry.get_files() diff --git a/src/python/controller/move/__init__.py b/src/python/controller/move/__init__.py index ebfe9323..4eb6d289 100644 --- a/src/python/controller/move/__init__.py +++ b/src/python/controller/move/__init__.py @@ -1,3 +1,6 @@ # Copyright 2017, Inderpreet Singh, All rights reserved. -from .move_process import MoveProcess as MoveProcess +from .move_process import ( + MoveFailedResult as MoveFailedResult, + MoveProcess as MoveProcess, +) diff --git a/src/python/controller/move/move_process.py b/src/python/controller/move/move_process.py index 4312ddd2..52c61df4 100644 --- a/src/python/controller/move/move_process.py +++ b/src/python/controller/move/move_process.py @@ -1,10 +1,31 @@ # Copyright 2017, Inderpreet Singh, All rights reserved. +from __future__ import annotations + +import datetime import errno +import multiprocessing import os +import queue import shutil -from common import AppOneShotProcess +from common import AppOneShotProcess, overrides + + +class MoveFailedResult: + """Result reported when a staging->final move fails.""" + + def __init__( + self, + timestamp: datetime.datetime, + name: str, + pair_id: str | None = None, + error_message: str | None = None, + ): + self.timestamp = timestamp + self.name = name + self.pair_id = pair_id + self.error_message = error_message class MoveProcess(AppOneShotProcess): @@ -21,6 +42,7 @@ def __init__(self, source_path: str, dest_path: str, file_name: str, pair_id: st self.__dest_path = dest_path self.__file_name = file_name self._pair_id = pair_id + self.__failed_result_queue: multiprocessing.Queue[MoveFailedResult] = multiprocessing.Queue() @property def file_name(self) -> str: @@ -43,12 +65,43 @@ def _get_total_size(path: str) -> int: total += os.path.getsize(fp) return total + @staticmethod + def _get_copied_size(src: str, dst: str) -> int: + """Sum the sizes at dst of only the files present in the src subtree. + + This deliberately ignores any pre-existing files under dst so that + merging into a non-empty destination (dirs_exist_ok=True) does not + produce a false size mismatch (issue #510). + """ + if os.path.isfile(src): + return os.path.getsize(dst) if os.path.exists(dst) else 0 + total = 0 + for dirpath, _dirnames, filenames in os.walk(src): + rel_dir = os.path.relpath(dirpath, src) + for f in filenames: + dst_file = os.path.join(dst, rel_dir, f) if rel_dir != "." else os.path.join(dst, f) + if os.path.exists(dst_file): + total += os.path.getsize(dst_file) + return total + + def _report_failure(self, message: str) -> None: + """Log and publish a move failure so the controller can surface it.""" + self.logger.error(message) + self.__failed_result_queue.put( + MoveFailedResult( + timestamp=datetime.datetime.now(), + name=self.__file_name, + pair_id=self._pair_id, + error_message=message, + ) + ) + def run_once(self): src = os.path.join(self.__source_path, self.__file_name) dst = os.path.join(self.__dest_path, self.__file_name) if not os.path.exists(src): - self.logger.error(f"Move failed: source does not exist: {src}") + self._report_failure(f"Move failed: source does not exist: {src}") return self.logger.info(f"Moving {src} -> {dst}") @@ -64,6 +117,10 @@ def run_once(self): except OSError as e: if e.errno == errno.EXDEV: self.logger.debug("Cross-device move detected, falling back to copy+delete") + elif e.errno in (errno.ENOTEMPTY, errno.EEXIST): + # Destination already exists as a non-empty directory; rename + # cannot merge, so fall back to copy+delete which merges trees. + self.logger.debug("Destination not empty, falling back to copy+delete") else: raise @@ -75,12 +132,13 @@ def run_once(self): else: shutil.copytree(src, dst, dirs_exist_ok=True) - dest_size = self._get_total_size(dst) + # Verify only the copied subtree (ignore pre-existing files under dst) + copied_size = self._get_copied_size(src, dst) - if source_size != dest_size: - self.logger.error( + if source_size != copied_size: + self._report_failure( f"Move size verification failed for {self.__file_name}: " - f"source={source_size} dest={dest_size}. Source NOT deleted." + f"source={source_size} copied={copied_size}. Source NOT deleted." ) return @@ -91,3 +149,19 @@ def run_once(self): shutil.rmtree(src) self.logger.info(f"Move completed via copy+delete: {self.__file_name}") + + def pop_failed(self) -> list[MoveFailedResult]: + """Process-safe method to retrieve any move failures.""" + failed: list[MoveFailedResult] = [] + try: + while True: + failed.append(self.__failed_result_queue.get(block=False)) + except queue.Empty: + pass + return failed + + @overrides(AppOneShotProcess) + def close_queues(self): + self.__failed_result_queue.close() + self.__failed_result_queue.join_thread() + super().close_queues() diff --git a/src/python/controller/validate/validate_process.py b/src/python/controller/validate/validate_process.py index a2ee3d17..cab8381a 100644 --- a/src/python/controller/validate/validate_process.py +++ b/src/python/controller/validate/validate_process.py @@ -7,12 +7,11 @@ import multiprocessing import os import queue -import shlex import time from enum import Enum from typing import TYPE_CHECKING -from common import AppProcess, overrides +from common import AppProcess, escape_remote_path_double, escape_remote_path_single, overrides if TYPE_CHECKING: from ssh import Sshcp @@ -249,8 +248,7 @@ def _validate_directory(self, req: ValidateRequest, local_dir: str, sshcp: Sshcp # Build remote file set via SSH find remote_dir = os.path.join(req.remote_path, req.name) - quoted_dir = shlex.quote(remote_dir) - find_cmd = f"find {quoted_dir} -type f" + find_cmd = f"find {self._escape_remote(remote_dir)} -type f" try: find_output = sshcp.shell(find_cmd) remote_abs_paths = find_output.decode().strip().split("\n") @@ -258,8 +256,7 @@ def _validate_directory(self, req: ValidateRequest, local_dir: str, sshcp: Sshcp for abs_path in remote_abs_paths: abs_path = abs_path.strip() if abs_path: - rel = os.path.relpath(abs_path, req.remote_path) - remote_rel_paths.add(rel) + remote_rel_paths.add(self._remote_key(abs_path, req.name)) except Exception as e: raise ValueError(f"Failed to list remote directory {remote_dir}: {e!s}") from e @@ -320,6 +317,39 @@ def _create_ssh(self, req: ValidateRequest): sshcp.set_base_logger(self.logger) return sshcp + @staticmethod + def _escape_remote(path: str) -> str: + """Quote a remote path, expanding a leading ~ to $HOME (mirrors scanner/delete).""" + if path.startswith("~"): + return escape_remote_path_double(path) + return escape_remote_path_single(path) + + @staticmethod + def _remote_key(abs_path: str, name: str) -> str: + """Derive a `name/subpath` key from a remote `find` result. + + The find command is rooted at a directory whose final component is `name`, + so every emitted path looks like `...//`. Anchoring on the + last `` component makes the key independent of whether the remote + shell expanded a leading ~ to $HOME (issue #519), so remote keys always + match the local keys (`relpath(local_file, local_path)` == `name/subpath`). + + Known limitation: this is a heuristic. If the tree contains a *nested* + directory whose name equals `name` (e.g. `.../mydir/mydir/a.txt` for + name="mydir"), last-occurrence anchoring yields `mydir/a.txt` while the + local key is `mydir/mydir/a.txt`, producing a false (non-checksum) + mismatch. No name-based anchor is fully robust without the shell-expanded + find root; the robust fix (capture the expanded base via `cd ; + pwd; find .` and derive keys from it) is tracked as a #519 follow-up. + """ + parts = abs_path.split("/") + # Find the last occurrence of the directory leaf to anchor the key. + for idx in range(len(parts) - 1, -1, -1): + if parts[idx] == name: + return "/".join(parts[idx:]) + # Fallback: leaf not found (unexpected); use the basename under name. + return os.path.join(name, os.path.basename(abs_path)) + @staticmethod def _build_hash_cmd(algorithm: str, quoted_file: str) -> str: """Build the remote hash command for the given algorithm.""" @@ -334,7 +364,7 @@ def _build_hash_cmd(algorithm: str, quoted_file: str) -> str: def _hash_remote_file(self, req: ValidateRequest, rel_path: str, algorithm: str, sshcp: Sshcp) -> str: """Compute hash of a remote file via SSH.""" remote_file = os.path.join(req.remote_path, rel_path) - quoted_file = shlex.quote(remote_file) + quoted_file = self._escape_remote(remote_file) cmd = self._build_hash_cmd(algorithm, quoted_file) output = sshcp.shell(cmd) diff --git a/src/python/lftp/job_status_parser.py b/src/python/lftp/job_status_parser.py index 68ad36ad..61cb7281 100644 --- a/src/python/lftp/job_status_parser.py +++ b/src/python/lftp/job_status_parser.py @@ -26,7 +26,7 @@ class LftpJobStatusParser: r"m|mb|mib|M|Mb|MB|MiB|Mib|" r"g|gb|gib|G|Gb|GB|GiB|Gib" ) - __TIME_UNITS_REGEX = r"(?P\d*d)?(?P\d*h)?(?P\d*m)?(?P\d*s)?" + __TIME_UNITS_REGEX = r"(?P\d+d)?(?P\d+h)?(?P\d+m)?(?P\d+s)?" __QUOTED_FILE_NAME_REGEX = r"`(?P.*)'" @@ -101,9 +101,12 @@ def parse(self, output: str) -> list[LftpJobStatus]: # pexpect echoes the command back, and when lftp is writing status data # simultaneously, the echo can be interleaved mid-line, splitting # filenames across lines. Stripping before splitting reconstructs them. - # Order matters: first strip echo+newline pairs, then any remaining echo. + # The echo only ever appears at a line boundary, so anchor the removal + # there: strip echo+newline pairs (rejoins a filename split across the + # break) and standalone echo lines. This avoids deleting a literal + # 'jobs -v' that is part of a real filename (e.g. 'My.jobs -v.Release'). output = output.replace("jobs -v\n", "") - output = output.replace("jobs -v", "") + output = re.sub(r"(?m)^[ \t]*jobs -v[ \t]*$", "", output) lines = [s.strip() for s in output.splitlines()] lines = list(filter(None, lines)) # remove blank lines # remove any remaining log line @@ -563,12 +566,15 @@ def __parse_queue(lines: list[str]) -> list[LftpJobStatus]: # noqa: C901 — co if not lines: raise ValueError("Missing queue status") - # Look for 'Now executing' lines - line = lines.pop(0) - if re.match("Queue is stopped.", line): + # Look for 'Now executing' lines. Peek rather than unconditionally + # pop: if neither status line is present (e.g. 'Commands queued:' + # follows the header directly), the status line must be left in + # place for the 'Commands queued:' check below. + if lines and re.match("Queue is stopped.", lines[0]): # Nothing to do - pass - elif re.match("Now executing:", line): + lines.pop(0) + elif lines and re.match("Now executing:", lines[0]): + lines.pop(0) # Remove any more lines associated with 'now executing' while lines and re.match(r"^-\[\d+\]", lines[0]): lines.pop(0) diff --git a/src/python/seedsync.py b/src/python/seedsync.py index cdf180eb..49f9686e 100644 --- a/src/python/seedsync.py +++ b/src/python/seedsync.py @@ -231,7 +231,7 @@ def run(self): now = datetime.now() if (now - prev_persist_timestamp).total_seconds() > Constants.MIN_PERSIST_TO_FILE_INTERVAL_IN_SECS: prev_persist_timestamp = now - self.persist() + self._persist_periodic() # Propagate exceptions from child threads # Any exception here exits the main loop for clean shutdown @@ -245,8 +245,8 @@ def run(self): # Nothing else to do time.sleep(Constants.MAIN_THREAD_SLEEP_INTERVAL_IN_SECS) - except Exception: - self.context.logger.info("Exiting Seedsync") + except Exception as e: + self._log_shutdown_cause(e) # This sleep is important to allow the jobs to finish setup before we terminate them # If we kill too early, the jobs may leave lingering threads around @@ -268,8 +268,9 @@ def run(self): webhook_notifier.shutdown() arr_notifier.shutdown() - # Last persist - self.persist() + # Last persist; guarded so a write failure cannot mask the original + # in-flight exception that is re-raised below. + self._final_persist() # Raise any exceptions so they can be logged properly # Note: ServiceRestart and ServiceExit will be caught and handled @@ -285,6 +286,34 @@ def persist(self): self.context.path_pairs_config.to_file(self.path_pairs_path) self.context.integrations_config.to_file(self.integrations_path) + def _persist_periodic(self) -> None: + # A transient write failure (disk full, read-only mount, permission change) + # during a routine periodic save must never terminate the service. Catch + # broad Exception (not BaseException, so KeyboardInterrupt/SystemExit still + # propagate), log the traceback at ERROR, and let the supervisor loop continue. + try: + self.persist() + except Exception: + self.context.logger.exception("Periodic persist failed; continuing") + + def _log_shutdown_cause(self, e: BaseException) -> None: + # Intentional exit/restart stay friendly at INFO; genuine crashes surface + # at ERROR with a traceback so an abnormal shutdown is visible (not + # conflated with a normal exit). Call from within the handling `except` + # so logger.exception() captures the live traceback. + if isinstance(e, (ServiceExit, ServiceRestart)): + self.context.logger.info("Exiting Seedsync") + else: + self.context.logger.exception("Seedsync exiting due to unexpected error") + + def _final_persist(self) -> None: + # Last persist during shutdown. Guard it so a write failure here cannot + # mask the original in-flight exception that run() re-raises afterwards. + try: + self.persist() + except Exception: + self.context.logger.exception("Final persist during shutdown failed") + def signal(self, signum: int, _: FrameType | None) -> None: # noinspection PyUnresolvedReferences # Signals is a generated enum diff --git a/src/python/tests/integration/test_web/test_handler/test_auto_queue.py b/src/python/tests/integration/test_web/test_handler/test_auto_queue.py index cf59b131..825f8256 100644 --- a/src/python/tests/integration/test_web/test_handler/test_auto_queue.py +++ b/src/python/tests/integration/test_web/test_handler/test_auto_queue.py @@ -4,6 +4,7 @@ from urllib.parse import quote from controller import AutoQueuePattern, AutoQueuePersist +from controller.auto_queue import AutoQueuePersistListener from tests.integration.test_web.test_web_app import BaseTestWebApp @@ -142,10 +143,39 @@ def test_add_returns_500_when_persistence_fails(self): resp = self.test_app.get("/server/autoqueue/add/onepattern", expect_errors=True) self.assertEqual(500, resp.status_int) self.assertEqual("Failed to persist auto-queue", resp.text) - # Contract: the in-memory mutation is *not* rolled back on persistence - # failure. The 500 signals "disk and memory may diverge — retry"; the - # pattern stays in memory so a subsequent retry persists the same state. - self.assertIn(AutoQueuePattern("onepattern"), self.auto_queue_persist.patterns) + # Contract (#518): the in-memory mutation *is* rolled back on persistence + # failure. add_pattern fires a listener side effect that auto-queues + # matching files this session; leaving the pattern live while it never + # reached disk causes "phantom" queueing that contradicts the 500. The + # handler removes the pattern again so disk and memory stay consistent. + self.assertNotIn(AutoQueuePattern("onepattern"), self.auto_queue_persist.patterns) + + def test_add_500_rolls_back_listener_side_effect(self): + # The core of #518: rolling back __patterns is not enough — the + # pattern_added side effect (which drives the controller's queue replay) + # must also be undone. Attach a real listener and prove its new_patterns + # is cleared after the failed add. + listener = AutoQueuePersistListener() + self.auto_queue_persist.add_listener(listener) + with patch.object(AutoQueuePersist, "to_file", side_effect=OSError("disk full")): + resp = self.test_app.get("/server/autoqueue/add/onepattern", expect_errors=True) + self.assertEqual(500, resp.status_int) + self.assertEqual("Failed to persist auto-queue", resp.text) + self.assertNotIn(AutoQueuePattern("onepattern"), self.auto_queue_persist.patterns) + self.assertNotIn(AutoQueuePattern("onepattern"), listener.new_patterns) + + def test_add_500_rolls_back_on_non_oserror_persist_failure(self): + # Review (#537): rollback must fire for ANY persist failure, not just + # OSError (e.g. a to_str()/serialization error), or the listener side + # effect leaks while nothing reached disk. + listener = AutoQueuePersistListener() + self.auto_queue_persist.add_listener(listener) + with patch.object(AutoQueuePersist, "to_file", side_effect=RuntimeError("serialize boom")): + resp = self.test_app.get("/server/autoqueue/add/onepattern", expect_errors=True) + self.assertEqual(500, resp.status_int) + self.assertEqual("Failed to persist auto-queue", resp.text) + self.assertNotIn(AutoQueuePattern("onepattern"), self.auto_queue_persist.patterns) + self.assertNotIn(AutoQueuePattern("onepattern"), listener.new_patterns) def test_remove_returns_500_when_persistence_fails(self): self.auto_queue_persist.add_pattern(AutoQueuePattern(pattern="onepattern")) @@ -153,5 +183,7 @@ def test_remove_returns_500_when_persistence_fails(self): resp = self.test_app.get("/server/autoqueue/remove/onepattern", expect_errors=True) self.assertEqual(500, resp.status_int) self.assertEqual("Failed to persist auto-queue", resp.text) - # Contract: the in-memory removal is *not* rolled back on persistence failure. - self.assertNotIn(AutoQueuePattern("onepattern"), self.auto_queue_persist.patterns) + # Contract (#518): the in-memory removal is rolled back on persistence + # failure. The pattern was persisted on disk before removal, so re-adding + # it keeps in-memory state consistent with the unchanged on-disk file. + self.assertIn(AutoQueuePattern("onepattern"), self.auto_queue_persist.patterns) diff --git a/src/python/tests/integration/test_web/test_handler/test_config.py b/src/python/tests/integration/test_web/test_handler/test_config.py index cbc88ca5..74012427 100644 --- a/src/python/tests/integration/test_web/test_handler/test_config.py +++ b/src/python/tests/integration/test_web/test_handler/test_config.py @@ -135,6 +135,22 @@ def test_set_persistence_failure_rolls_back(self): self.assertEqual("INFO", self.context.config.general.log_level) self.controller.request_lftp_reconfigure.assert_not_called() + def test_set_persistence_failure_rolls_back_on_non_oserror(self): + """A non-OSError persist failure must also revert in-memory state. + + Regression guard for #507 Part 2: serialization can raise non-OSError + exceptions (e.g. configparser.Error). Those must still trigger the + rollback rather than escaping and leaving the new value live but never + persisted. + """ + self.context.config.general.log_level = "INFO" + with patch.object(Config, "to_file", side_effect=RuntimeError("serialize boom")): + resp = self.test_app.get("/server/config/set/general/log_level/DEBUG", expect_errors=True) + self.assertEqual(500, resp.status_int) + self.assertIn("Failed to persist config general.log_level", str(resp.html)) + self.assertEqual("INFO", self.context.config.general.log_level) + self.controller.request_lftp_reconfigure.assert_not_called() + def test_set_persistence_failure_rolls_back_lftp_tuning_key(self): """A failed write on a hot-reload key must not fire the LFTP callback.""" self.context.config.lftp.num_max_parallel_downloads = 3 diff --git a/src/python/tests/integration/test_web/test_handler/test_controller.py b/src/python/tests/integration/test_web/test_handler/test_controller.py index adc77e37..a1dfb038 100644 --- a/src/python/tests/integration/test_web/test_handler/test_controller.py +++ b/src/python/tests/integration/test_web/test_handler/test_controller.py @@ -1,8 +1,10 @@ # Copyright 2017, Inderpreet Singh, All rights reserved. -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from urllib.parse import quote +import timeout_decorator + from controller import Controller from tests.integration.test_web.test_web_app import BaseTestWebApp @@ -239,3 +241,62 @@ def test_path_traversal_rejected(self): resp = self.test_app.get("/server/command/queue/" + uri, expect_errors=True) self.assertEqual(400, resp.status_int) self.controller.queue_command.assert_not_called() + + +class TestControllerHandlerTimeout(BaseTestWebApp): + """Tests that a stalled controller does not hang the request thread (issue #526).""" + + # Patch the bound down so the timeout path resolves quickly and the test + # never relies on real-time waits beyond a fraction of a second. + @timeout_decorator.timeout(5) + @patch("web.handler.controller._ACTION_TIMEOUT_IN_SECS", 0.05) + def test_action_timeout_returns_504(self): + # queue_command never invokes any callback -> event is never set + self.controller.queue_command = MagicMock() + + resp = self.test_app.get("/server/command/queue/test1", expect_errors=True) + self.assertEqual(504, resp.status_int) + # The command was still dispatched to the controller + self.controller.queue_command.assert_called_once() + command = self.controller.queue_command.call_args[0][0] + self.assertEqual(Controller.Command.Action.QUEUE, command.action) + self.assertEqual("test1", command.filename) + + @timeout_decorator.timeout(5) + @patch("web.handler.controller._ACTION_TIMEOUT_IN_SECS", 0.05) + def test_delete_remote_timeout_returns_504(self): + # Confirms all endpoints share the timeout behavior via __dispatch_command + self.controller.queue_command = MagicMock() + + resp = self.test_app.get("/server/command/delete_remote/test1", expect_errors=True) + self.assertEqual(504, resp.status_int) + self.controller.queue_command.assert_called_once() + command = self.controller.queue_command.call_args[0][0] + self.assertEqual(Controller.Command.Action.DELETE_REMOTE, command.action) + + @timeout_decorator.timeout(5) + @patch("web.handler.controller._ACTION_TIMEOUT_IN_SECS", 0.05) + def test_success_unaffected_by_timeout_bound(self): + # When the controller responds in time, the 200 path is unchanged + def side_effect(cmd: Controller.Command): + cmd.callbacks[0].on_success() + + self.controller.queue_command = MagicMock() + self.controller.queue_command.side_effect = side_effect + + resp = self.test_app.get("/server/command/queue/test1") + self.assertEqual(200, resp.status_int) + + @timeout_decorator.timeout(5) + @patch("web.handler.controller._ACTION_TIMEOUT_IN_SECS", 0.05) + def test_failure_unaffected_by_timeout_bound(self): + # When the controller signals failure in time, the 400 path is unchanged + def side_effect(cmd: Controller.Command): + cmd.callbacks[0].on_failure("boom") + + self.controller.queue_command = MagicMock() + self.controller.queue_command.side_effect = side_effect + + resp = self.test_app.get("/server/command/queue/test1", expect_errors=True) + self.assertEqual(400, resp.status_int) + self.assertIn("boom", resp.text) diff --git a/src/python/tests/unittests/test_controller/test_auto_queue.py b/src/python/tests/unittests/test_controller/test_auto_queue.py index e3748a22..4e01bab0 100644 --- a/src/python/tests/unittests/test_controller/test_auto_queue.py +++ b/src/python/tests/unittests/test_controller/test_auto_queue.py @@ -3,11 +3,13 @@ import json import logging import sys +import threading import unittest from unittest.mock import MagicMock from common import Config, PersistError, overrides from controller import AutoQueue, AutoQueuePattern, AutoQueuePersist, Controller, IAutoQueuePersistListener +from controller.auto_queue import AutoQueuePersistListener from model import IModelListener, ModelFile @@ -252,6 +254,113 @@ def test_persist_read_error(self): with self.assertRaises(PersistError): AutoQueuePersist.from_str(content) + def test_add_pattern_visible_atomically(self): + # A pattern added via add_pattern is immediately and fully present in + # both patterns and the subsequent to_str() output (no torn snapshot). + persist = AutoQueuePersist() + pattern = AutoQueuePattern(pattern="one") + persist.add_pattern(pattern) + self.assertIn(pattern, persist.patterns) + round_tripped = AutoQueuePersist.from_str(persist.to_str()) + self.assertIn(pattern, round_tripped.patterns) + + def _assert_blocks_until_lock_released(self, lock, op, op_name): + """Deterministically prove `op` acquires `lock`: while this thread holds + the lock, a second thread running `op` must NOT complete; once released, + it must. Fails on unguarded code (op completes immediately), passes on + the locked code — with no reliance on GIL timing. + """ + started = threading.Event() + done = threading.Event() + errors: list[BaseException] = [] + + def run(): + started.set() + try: + op() + except BaseException as e: + errors.append(e) + finally: + done.set() + + thread = threading.Thread(target=run) + # Daemon so a hung op() (a regression where the lock isn't acquired and + # something else blocks) can't keep the test process alive. + thread.daemon = True + with lock: + thread.start() + self.assertTrue(started.wait(2), f"{op_name}: worker thread never started") + # The op cannot finish while we hold the shared lock -> it acquires it. + self.assertFalse(done.wait(0.2), f"{op_name} did not acquire the shared lock") + thread.join(2) + self.assertTrue(done.is_set(), f"{op_name} did not complete after lock release") + self.assertEqual([], errors, f"{op_name} raised: {errors}") + + def test_persist_mutators_and_readers_acquire_lock(self): + """add_pattern / remove_pattern / to_str / patterns all run inside the + shared RLock, so a web-thread mutation can never interleave with a + controller-thread serialize (the torn-snapshot / set-changed-size + regression). Proven deterministically via the shared lock.""" + persist = AutoQueuePersist() + persist.add_pattern(AutoQueuePattern(pattern="seed")) + lock = persist._AutoQueuePersist__lock + + cases = [ + ("add_pattern", lambda: persist.add_pattern(AutoQueuePattern(pattern="new"))), + ("remove_pattern", lambda: persist.remove_pattern(AutoQueuePattern(pattern="seed"))), + ("to_str", persist.to_str), + ("patterns", lambda: persist.patterns), + ] + for op_name, op in cases: + with self.subTest(op=op_name): + self._assert_blocks_until_lock_released(lock, op, op_name) + + def test_listener_new_patterns_operations_acquire_lock(self): + """The listener's new_patterns set is mutated on the web thread and + read on the controller thread, so every accessor must take the listener + lock. Proven deterministically via the shared lock.""" + listener = AutoQueuePersistListener() + listener.pattern_added(AutoQueuePattern(pattern="seed")) + lock = listener._AutoQueuePersistListener__lock + + cases = [ + ("pattern_added", lambda: listener.pattern_added(AutoQueuePattern(pattern="new"))), + ("pattern_removed", lambda: listener.pattern_removed(AutoQueuePattern(pattern="seed"))), + ("drain_new_patterns", listener.drain_new_patterns), + ] + for op_name, op in cases: + with self.subTest(op=op_name): + self._assert_blocks_until_lock_released(lock, op, op_name) + + def test_drain_new_patterns_returns_copy_and_clears(self): + """drain returns the current set and empties it in one locked step, so + the controller gets a private, safely-iterable copy and the listener + starts the next cycle empty.""" + listener = AutoQueuePersistListener() + listener.pattern_added(AutoQueuePattern(pattern="a")) + + drained = listener.drain_new_patterns() + + self.assertEqual({AutoQueuePattern(pattern="a")}, drained) + # The set is now empty, and the returned copy is decoupled from the + # listener: mutating the listener (and iterating the copy meanwhile) + # must neither change nor raise on the copy. + for _ in drained: + listener.pattern_added(AutoQueuePattern(pattern="b")) + self.assertEqual({AutoQueuePattern(pattern="a")}, drained) + + def test_drain_does_not_drop_pattern_added_after_drain(self): + """Regression for the snapshot-then-clear race: a pattern added after the + drain is retained for the next cycle, not silently cleared unprocessed.""" + listener = AutoQueuePersistListener() + listener.pattern_added(AutoQueuePattern(pattern="first")) + + self.assertEqual({AutoQueuePattern(pattern="first")}, listener.drain_new_patterns()) + + # A pattern added after the drain must survive to the next cycle. + listener.pattern_added(AutoQueuePattern(pattern="second")) + self.assertEqual({AutoQueuePattern(pattern="second")}, listener.drain_new_patterns()) + class TestAutoQueue(unittest.TestCase): def setUp(self): diff --git a/src/python/tests/unittests/test_controller/test_command_pipeline.py b/src/python/tests/unittests/test_controller/test_command_pipeline.py index e169069b..fee0eb1d 100644 --- a/src/python/tests/unittests/test_controller/test_command_pipeline.py +++ b/src/python/tests/unittests/test_controller/test_command_pipeline.py @@ -2,9 +2,12 @@ import os import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch +from common import AppError from controller.command_pipeline import CommandPipeline +from controller.persist_keys import persist_key +from lftp import LftpError from model import ModelFile @@ -132,3 +135,222 @@ def test_queue_puts_command_on_queue(self): self.assertFalse(pipeline.command_queue.empty()) self.assertIs(command, pipeline.command_queue.get()) + + # --- cleanup: move process failure handling (#510) --- + + def _make_move_process(self, pair_id, file_name, *, failed_results=None): + """Create a fake finished MoveProcess for cleanup tests.""" + move_process = MagicMock() + move_process.is_alive.return_value = False + move_process.pair_id = pair_id + move_process.file_name = file_name + move_process.name = "MoveProcess" + # No raised exception by default + move_process.propagate_exception.return_value = None + move_process.pop_failed.return_value = failed_results or [] + return move_process + + def test_cleanup_discards_key_when_move_reports_failure(self): + """A move that reports a failure via pop_failed must discard its moved key + and force a rescan so the move is retried.""" + pc = self._make_pair_context("pair-1") + pipeline = self._make_pipeline([pc]) + + move_key = persist_key("pair-1", "file.txt") + pipeline.moved_file_keys.add(move_key) + + failure = MagicMock() + failure.name = "file.txt" + failure.error_message = "source does not exist" + move_process = self._make_move_process("pair-1", "file.txt", failed_results=[failure]) + pipeline.active_move_processes.append(move_process) + + pipeline.cleanup() + + # Key discarded -> next force_scan re-spawns the move (retry) + self.assertNotIn(move_key, pipeline.moved_file_keys) + pc.local_scan_process.force_scan.assert_called_once() + # Finished process removed from the active list + self.assertEqual([], pipeline.active_move_processes) + + def test_cleanup_keeps_key_when_move_succeeds(self): + """A move that reports no failure must keep its moved key (no retry).""" + pc = self._make_pair_context("pair-1") + pipeline = self._make_pipeline([pc]) + + move_key = persist_key("pair-1", "file.txt") + pipeline.moved_file_keys.add(move_key) + + move_process = self._make_move_process("pair-1", "file.txt", failed_results=[]) + pipeline.active_move_processes.append(move_process) + + pipeline.cleanup() + + # Successful move keeps the key so it isn't re-spawned + self.assertIn(move_key, pipeline.moved_file_keys) + # A rescan still happens to pick up the moved file + pc.local_scan_process.force_scan.assert_called_once() + self.assertEqual([], pipeline.active_move_processes) + + def test_cleanup_discards_key_when_move_raises(self): + """A move that raises (propagate_exception) must also discard its key.""" + pc = self._make_pair_context("pair-1") + pipeline = self._make_pipeline([pc]) + + move_key = persist_key("pair-1", "file.txt") + pipeline.moved_file_keys.add(move_key) + + move_process = self._make_move_process("pair-1", "file.txt") + move_process.propagate_exception.side_effect = RuntimeError("boom") + pipeline.active_move_processes.append(move_process) + + pipeline.cleanup() + + self.assertNotIn(move_key, pipeline.moved_file_keys) + pc.local_scan_process.force_scan.assert_called_once() + + def test_cleanup_keeps_alive_move_process(self): + """A still-running move process must remain in the active list untouched.""" + pc = self._make_pair_context("pair-1") + pipeline = self._make_pipeline([pc]) + + move_process = MagicMock() + move_process.is_alive.return_value = True + pipeline.active_move_processes.append(move_process) + + pipeline.cleanup() + + self.assertIn(move_process, pipeline.active_move_processes) + move_process.pop_failed.assert_not_called() + # An alive process must not be joined/closed (#537 review). + move_process.join.assert_not_called() + move_process.close_queues.assert_not_called() + + def test_cleanup_reaps_finished_move_process(self): + """A finished move is joined and its queues closed in cleanup() so its FDs + are released immediately — it's dropped from the active list, so + Controller.exit() never sees it to reap later (#537 review).""" + pc = self._make_pair_context("pair-1") + pipeline = self._make_pipeline([pc]) + move_process = self._make_move_process("pair-1", "file.txt", failed_results=[]) + pipeline.active_move_processes.append(move_process) + + pipeline.cleanup() + + move_process.join.assert_called_once() + move_process.close_queues.assert_called_once() + self.assertEqual([], pipeline.active_move_processes) + + def test_finalize_rescans_only_owning_pair(self): + """A move only changes the owning pair's local_path, so only that pair is + rescanned, not every pair (#537 review).""" + owner = self._make_pair_context("pair-1") + other = self._make_pair_context("pair-2") + pipeline = self._make_pipeline([owner, other]) + move_process = self._make_move_process("pair-1", "file.txt", failed_results=[]) + pipeline.active_move_processes.append(move_process) + + pipeline.cleanup() + + owner.local_scan_process.force_scan.assert_called_once() + other.local_scan_process.force_scan.assert_not_called() + + def test_finalize_rescans_all_pairs_when_owner_not_found(self): + """If the move's pair_id matches no context, fall back to rescanning all.""" + pc1 = self._make_pair_context("pair-1") + pc2 = self._make_pair_context("pair-2") + pipeline = self._make_pipeline([pc1, pc2]) + move_process = self._make_move_process("ghost-pair", "file.txt", failed_results=[]) + pipeline.active_move_processes.append(move_process) + + pipeline.cleanup() + + pc1.local_scan_process.force_scan.assert_called_once() + pc2.local_scan_process.force_scan.assert_called_once() + + @patch("controller.command_pipeline.os.path.exists", return_value=True) + @patch("controller.command_pipeline.MoveProcess") + def test_spawn_move_process_start_failure_leaves_no_stale_state(self, mock_move_cls, _mock_exists): + """If MoveProcess.start() raises, no stale move_key (which would block all + retries) or never-started process (which cleanup() would join() and raise + on) is published (#537 review).""" + pc = self._make_pair_context("pair-1") + pc.pair_id = "pair-1" + pc.local_path = "/local/pair-1" + pipeline = self._make_pipeline([pc]) + pipeline._context.config.controller.use_staging = True + pipeline._context.config.controller.staging_path = "/tmp/staging" + mock_move_cls.return_value.start.side_effect = OSError("cannot fork") + + with self.assertRaises(OSError): + pipeline.spawn_move_process("file.txt", pc) + + self.assertNotIn(persist_key("pair-1", "file.txt"), pipeline.moved_file_keys) + self.assertEqual([], pipeline.active_move_processes) + + # --- propagate_exceptions: worker-fault isolation (#511) --- + + def test_propagate_exceptions_clean_when_no_worker_errors(self): + """With no worker faults, all worker propagation calls run and nothing raises.""" + pipeline = self._make_pipeline([]) + + pipeline.propagate_exceptions() + + pipeline._mp_logger.propagate_exception.assert_called_once() + pipeline._extract_process.propagate_exception.assert_called_once() + pipeline._validate_process.propagate_exception.assert_called_once() + + def test_propagate_exceptions_survives_extract_worker_crash(self): + """A dead extract worker is logged and swallowed; the validate worker is + still polled and the controller cycle is not killed.""" + pipeline = self._make_pipeline([]) + pipeline._extract_process.propagate_exception.side_effect = RuntimeError("worker OOM") + + # Must not raise out of propagate_exceptions -> controller keeps running + pipeline.propagate_exceptions() + + # Isolation: the sibling worker is still polled (per-worker try/except, + # not one shared guard around both). + pipeline._validate_process.propagate_exception.assert_called_once() + pipeline._logger.warning.assert_called() + + def test_propagate_exceptions_survives_validate_worker_crash(self): + """A dead validate worker is logged and swallowed; the extract worker is + still polled and the controller cycle is not killed.""" + pipeline = self._make_pipeline([]) + pipeline._validate_process.propagate_exception.side_effect = RuntimeError("worker OOM") + + pipeline.propagate_exceptions() + + pipeline._extract_process.propagate_exception.assert_called_once() + pipeline._logger.warning.assert_called() + + def test_propagate_exceptions_reports_dead_worker_once(self): + """A worker whose process has died is surfaced once at ERROR (not every + cycle), so the degraded-until-restart state stays visible after the + queued fault is consumed. The live sibling is not reported.""" + pipeline = self._make_pipeline([]) + pipeline._extract_process.is_alive.return_value = False + pipeline._validate_process.is_alive.return_value = True + + pipeline.propagate_exceptions() + pipeline.propagate_exceptions() + + error_messages = [c.args[0] % c.args[1:] for c in pipeline._logger.error.call_args_list] + dead_reports = [m for m in error_messages if "has died" in m] + self.assertEqual(1, len(dead_reports), error_messages) + self.assertIn("Extract", dead_reports[0]) + self.assertIn("restarted", dead_reports[0]) + # The live validate worker is never reported dead. + self.assertFalse(any("Validate worker" in m for m in error_messages)) + + def test_propagate_exceptions_reraises_permanent_lftp_error(self): + """A permanent lftp credential failure must still raise AppError so the + engine stops (this engine-stopping behavior is unaffected by the new + worker guards).""" + pc = self._make_pair_context("pair-1") + pc.lftp.raise_pending_error.side_effect = LftpError("Login failed: 530") + pipeline = self._make_pipeline([pc]) + + with self.assertRaises(AppError): + pipeline.propagate_exceptions() diff --git a/src/python/tests/unittests/test_controller/test_controller_exit.py b/src/python/tests/unittests/test_controller/test_controller_exit.py new file mode 100644 index 00000000..aec03a66 --- /dev/null +++ b/src/python/tests/unittests/test_controller/test_controller_exit.py @@ -0,0 +1,258 @@ +# Copyright 2017, Inderpreet Singh, All rights reserved. + +"""Tests for Controller.exit() teardown robustness. + +Regression test for https://github.com/nitrobass24/seedsync/issues/508 + +When the lftp PTY is hung/dead (network or seedbox loss), `pc.lftp.exit()` +raises. Before the fix, that exception propagated out of `Controller.exit()` +and skipped every later teardown phase (worker terminate/join, +`__mp_logger.stop()`, and the `close_queues()` block), orphaning worker +subprocesses and leaking their multiprocessing.Queue file descriptors. On +repeated ServiceRestart this eventually exhausted the FD limit +(OSError: [Errno 24] No file descriptors available). + +These tests build the Controller via `__new__` (no `__init__`) and inject the +name-mangled private attrs that `exit()` reads, using MagicMock children. They +verify that a hung lftp no longer aborts teardown and that all reaping/queue +cleanup still runs. +""" + +import unittest +from unittest.mock import MagicMock + +import timeout_decorator + +from controller.controller import Controller + + +def _make_worker_process(): + """A worker AppProcess-like mock with terminate/join/close_queues.""" + proc = MagicMock() + proc.terminate = MagicMock() + proc.join = MagicMock() + proc.close_queues = MagicMock() + # Reaped cleanly by default: dead after join (so the bounded join doesn't + # log a spurious "did not exit" warning). + proc.is_alive.return_value = False + return proc + + +def _make_pair_context(pair_id): + pc = MagicMock() + pc.pair_id = pair_id + pc.lftp = MagicMock() + pc.lftp.exit = MagicMock() + pc.active_scan_process = _make_worker_process() + pc.local_scan_process = _make_worker_process() + pc.remote_scan_process = _make_worker_process() + pc.active_scanner = MagicMock() + pc.active_scanner.close = MagicMock() + return pc + + +def _make_command_wrapper(): + """Mirrors active_command_processes entries: wrapper.process is the proc.""" + wrapper = MagicMock() + wrapper.process = _make_worker_process() + return wrapper + + +class TestControllerExit(unittest.TestCase): + def setUp(self): + self.controller = Controller.__new__(Controller) + self.controller.logger = MagicMock() + self.controller._Controller__started = True + + self.pc1 = _make_pair_context("pair1") + self.pc2 = _make_pair_context("pair2") + self.controller._Controller__pair_contexts = [self.pc1, self.pc2] + + self.extract_process = _make_worker_process() + self.validate_process = _make_worker_process() + self.controller._Controller__extract_process = self.extract_process + self.controller._Controller__validate_process = self.validate_process + + self.mp_logger = MagicMock() + self.mp_logger.stop = MagicMock() + self.controller._Controller__mp_logger = self.mp_logger + + self.command_wrapper = _make_command_wrapper() + self.move_process = _make_worker_process() + self.pipeline = MagicMock() + self.pipeline.active_command_processes = [self.command_wrapper] + self.pipeline.active_move_processes = [self.move_process] + self.controller._Controller__pipeline = self.pipeline + + def _all_pair_processes(self): + for pc in (self.pc1, self.pc2): + yield pc.active_scan_process + yield pc.local_scan_process + yield pc.remote_scan_process + + def _assert_full_teardown(self): + """Every worker was terminated, joined, and had its queues closed, + the mp logger stopped, and pipeline process lists were cleared.""" + for proc in self._all_pair_processes(): + proc.terminate.assert_called_once() + proc.join.assert_called_once() + proc.close_queues.assert_called_once() + for proc in (self.extract_process, self.validate_process): + proc.terminate.assert_called_once() + proc.join.assert_called_once() + proc.close_queues.assert_called_once() + for pc in (self.pc1, self.pc2): + pc.active_scanner.close.assert_called_once() + + self.command_wrapper.process.terminate.assert_called_once() + self.command_wrapper.process.join.assert_called_once() + self.command_wrapper.process.close_queues.assert_called_once() + self.move_process.terminate.assert_called_once() + self.move_process.join.assert_called_once() + self.move_process.close_queues.assert_called_once() + + self.mp_logger.stop.assert_called_once() + self.assertEqual(self.pipeline.active_command_processes, []) + self.assertEqual(self.pipeline.active_move_processes, []) + + @timeout_decorator.timeout(5) + def test_exit_continues_teardown_when_lftp_exit_raises(self): + """Core regression: a hung lftp must not abort teardown (AC #1/#2/#3).""" + self.pc1.lftp.exit.side_effect = RuntimeError("hung PTY") + + # Must not raise. + self.controller.exit() + + # The failure was logged (not silently swallowed). + self.controller.logger.exception.assert_called() + + # All reaping and queue-cleanup phases still ran. + self._assert_full_teardown() + + # Controller re-arms for a subsequent start(). + self.assertFalse(self.controller._Controller__started) + + @timeout_decorator.timeout(5) + def test_exit_terminates_other_pairs_when_one_lftp_hangs(self): + """One pair's lftp hanging must not skip the other pair's lftp.exit() + or any pair's worker reaping (AC #4).""" + self.pc1.lftp.exit.side_effect = RuntimeError("hung PTY") + + self.controller.exit() + + # Both pairs' lftp.exit() were attempted despite pair1 raising. + self.pc1.lftp.exit.assert_called_once() + self.pc2.lftp.exit.assert_called_once() + + # Both pairs' worker processes were terminated/joined/closed. + for proc in self._all_pair_processes(): + proc.terminate.assert_called_once() + proc.join.assert_called_once() + proc.close_queues.assert_called_once() + + @timeout_decorator.timeout(5) + def test_exit_happy_path_calls_all_phases(self): + """No side effects: every phase runs and started flips to False.""" + self.controller.exit() + + self.pc1.lftp.exit.assert_called_once() + self.pc2.lftp.exit.assert_called_once() + self.controller.logger.exception.assert_not_called() + self._assert_full_teardown() + self.assertFalse(self.controller._Controller__started) + + @timeout_decorator.timeout(5) + def test_exit_noop_when_not_started(self): + """exit() while not started does no teardown and does not raise.""" + self.controller._Controller__started = False + + self.controller.exit() + + self.pc1.lftp.exit.assert_not_called() + self.extract_process.terminate.assert_not_called() + self.mp_logger.stop.assert_not_called() + + @timeout_decorator.timeout(5) + def test_exit_closes_queues_when_worker_terminate_raises(self): + """AC #2 (literal): a raise from a worker terminate()/join() — not just + lftp — must not skip the remaining reaping or the FD-releasing + close_queues phase, and must not propagate out of exit().""" + self.extract_process.terminate.side_effect = ValueError("process already closed") + self.move_process.join.side_effect = RuntimeError("join failed") + + # Must not raise despite the worker-phase failures. + self.controller.exit() + + # The failures were logged, and every phase still ran for every worker + # (close_queues in particular, so no FD leak). + self.controller.logger.exception.assert_called() + self._assert_full_teardown() + self.assertFalse(self.controller._Controller__started) + + @timeout_decorator.timeout(5) + def test_partial_start_failure_leaves_started_true_so_exit_cleans_up(self): + """If start() fails partway, __started must be True so the job's cleanup + path (exit()) tears down the children that did start, instead of + early-returning on __started=False and leaking them (#537 review).""" + self.controller._Controller__started = False + # extract_process.start() raises after the pair scan processes started. + self.extract_process.start.side_effect = RuntimeError("boom") + + with self.assertRaises(RuntimeError): + self.controller.start() + + # Marked started despite the partial failure, so exit() proceeds. + self.assertTrue(self.controller._Controller__started) + self.controller.exit() + for proc in self._all_pair_processes(): + proc.terminate.assert_called_once() + proc.join.assert_called_once() + proc.close_queues.assert_called_once() + self.assertFalse(self.controller._Controller__started) + + @timeout_decorator.timeout(5) + def test_exit_continues_when_worker_join_times_out(self): + """A worker still alive after the bounded join (stuck in uninterruptible + I/O, ignoring SIGTERM) must not hang exit(): a warning is logged and + teardown still completes for every worker (#537 review).""" + self.extract_process.is_alive.return_value = True + + # Must not hang (timeout_decorator) and must not raise. + self.controller.exit() + + # The stuck worker was reported, and all phases still ran. + self.controller.logger.warning.assert_called() + self._assert_full_teardown() + self.assertFalse(self.controller._Controller__started) + + @timeout_decorator.timeout(5) + def test_repeated_exit_with_hung_lftp_does_not_leak(self): + """Repeated ServiceRestart with a hung lftp must close queues every + cycle and never raise (AC #3 / AC #6).""" + self.pc1.lftp.exit.side_effect = RuntimeError("hung PTY") + + for cycle in range(2): + # Re-arm and reset per-cycle worker mocks to count this cycle only. + self.controller._Controller__started = True + for proc in self._all_pair_processes(): + proc.reset_mock() + for pc in (self.pc1, self.pc2): + pc.active_scanner.close.reset_mock() + for proc in (self.extract_process, self.validate_process): + proc.reset_mock() + self.command_wrapper.process.reset_mock() + self.move_process.reset_mock() + self.mp_logger.stop.reset_mock() + self.pipeline.active_command_processes = [self.command_wrapper] + self.pipeline.active_move_processes = [self.move_process] + + # Must not raise on any cycle. + self.controller.exit() + + with self.subTest(cycle=cycle): + self._assert_full_teardown() + self.assertFalse(self.controller._Controller__started) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/python/tests/unittests/test_controller/test_move_process.py b/src/python/tests/unittests/test_controller/test_move_process.py index 20a96731..bfa46051 100644 --- a/src/python/tests/unittests/test_controller/test_move_process.py +++ b/src/python/tests/unittests/test_controller/test_move_process.py @@ -1,22 +1,48 @@ # Copyright 2017, Inderpreet Singh, All rights reserved. +import errno import os +import queue import shutil import tempfile import unittest +from unittest import mock -from controller.move.move_process import MoveProcess +from controller.move.move_process import MoveFailedResult, MoveProcess + + +class _SyncQueue(queue.Queue): + """A queue.Queue with close/join_thread stubs so it can replace + multiprocessing.Queue in single-process tests (no feeder-thread race).""" + + def close(self): + pass + + def join_thread(self): + pass class TestMoveProcess(unittest.TestCase): def setUp(self): self.src_dir = tempfile.mkdtemp() self.dst_dir = tempfile.mkdtemp() + self._processes = [] def tearDown(self): + for process in self._processes: + process.close_queues() shutil.rmtree(self.src_dir, ignore_errors=True) shutil.rmtree(self.dst_dir, ignore_errors=True) + def _make_process(self, **kwargs): + # Replace multiprocessing.Queue with a synchronous queue so that + # pop_failed() in these single-process tests never races the feeder + # thread (mirrors the validate/extract process test pattern). + with mock.patch("controller.move.move_process.multiprocessing.Queue", _SyncQueue): + process = MoveProcess(**kwargs) + self._processes.append(process) + return process + def _run_process(self, process): """Helper to run a MoveProcess synchronously via run_once().""" process.run_once() @@ -28,7 +54,7 @@ def test_move_single_file(self): with open(src_file, "w") as f: f.write("hello world") - process = MoveProcess(source_path=self.src_dir, dest_path=self.dst_dir, file_name="test.txt") + process = self._make_process(source_path=self.src_dir, dest_path=self.dst_dir, file_name="test.txt") self._run_process(process) # Source should be gone, dest should exist @@ -48,7 +74,7 @@ def test_move_directory(self): with open(os.path.join(src_subdir, "subdir", "b.txt"), "w") as f: f.write("file_b") - process = MoveProcess(source_path=self.src_dir, dest_path=self.dst_dir, file_name="mydir") + process = self._make_process(source_path=self.src_dir, dest_path=self.dst_dir, file_name="mydir") self._run_process(process) # Source should be gone @@ -64,7 +90,7 @@ def test_move_directory(self): def test_move_nonexistent_source(self): """Moving a nonexistent source should log error and not crash""" - process = MoveProcess(source_path=self.src_dir, dest_path=self.dst_dir, file_name="does_not_exist") + process = self._make_process(source_path=self.src_dir, dest_path=self.dst_dir, file_name="does_not_exist") # Should not raise self._run_process(process) @@ -78,7 +104,7 @@ def test_move_creates_dest_parent(self): with open(src_file, "w") as f: f.write("content") - process = MoveProcess(source_path=self.src_dir, dest_path=nested_dst, file_name="test.txt") + process = self._make_process(source_path=self.src_dir, dest_path=nested_dst, file_name="test.txt") self._run_process(process) self.assertFalse(os.path.exists(src_file)) @@ -110,7 +136,7 @@ def test_move_preserves_file_content(self): with open(src_file, "w") as f: f.write(content) - process = MoveProcess(source_path=self.src_dir, dest_path=self.dst_dir, file_name="big.txt") + process = self._make_process(source_path=self.src_dir, dest_path=self.dst_dir, file_name="big.txt") self._run_process(process) dst_file = os.path.join(self.dst_dir, "big.txt") @@ -122,8 +148,134 @@ def test_move_empty_directory(self): src_empty = os.path.join(self.src_dir, "emptydir") os.makedirs(src_empty) - process = MoveProcess(source_path=self.src_dir, dest_path=self.dst_dir, file_name="emptydir") + process = self._make_process(source_path=self.src_dir, dest_path=self.dst_dir, file_name="emptydir") self._run_process(process) self.assertFalse(os.path.exists(src_empty)) self.assertTrue(os.path.isdir(os.path.join(self.dst_dir, "emptydir"))) + # Happy path must not report any failure + self.assertEqual([], process.pop_failed()) + + # --- Failure reporting (#510) --- + + def test_move_nonexistent_source_reports_failure(self): + """A vanished source must be reported on the failed queue, not silently dropped""" + process = self._make_process( + source_path=self.src_dir, dest_path=self.dst_dir, file_name="does_not_exist", pair_id="pair-1" + ) + self._run_process(process) + + failures = process.pop_failed() + self.assertEqual(1, len(failures)) + result = failures[0] + self.assertIsInstance(result, MoveFailedResult) + self.assertEqual("does_not_exist", result.name) + self.assertEqual("pair-1", result.pair_id) + self.assertIn("does not exist", result.error_message) + + def test_move_size_mismatch_reports_failure_and_keeps_source(self): + """A forced size mismatch must report failure and leave the source intact""" + src_subdir = os.path.join(self.src_dir, "mydir") + os.makedirs(src_subdir) + with open(os.path.join(src_subdir, "a.txt"), "w") as f: + f.write("aaaa") + + process = self._make_process( + source_path=self.src_dir, dest_path=self.dst_dir, file_name="mydir", pair_id="pair-2" + ) + + # Force the copied-size check to disagree with the source size. + real_total = MoveProcess._get_total_size + with ( + mock.patch.object(MoveProcess, "_get_copied_size", staticmethod(lambda src, dst: real_total(src) + 1)), + mock.patch("controller.move.move_process.os.rename", side_effect=OSError(errno.EXDEV, "cross-device")), + ): + self._run_process(process) + + failures = process.pop_failed() + self.assertEqual(1, len(failures)) + self.assertEqual("mydir", failures[0].name) + self.assertEqual("pair-2", failures[0].pair_id) + # Source must NOT be deleted when verification fails + self.assertTrue(os.path.exists(src_subdir)) + + def test_move_into_nonempty_dest_succeeds(self): + """Merging into a pre-existing non-empty destination must succeed without false mismatch""" + # Source tree + src_subdir = os.path.join(self.src_dir, "mydir") + os.makedirs(src_subdir) + with open(os.path.join(src_subdir, "new.txt"), "w") as f: + f.write("brand new content") + + # Pre-existing destination tree with an unrelated, larger file already present + dst_subdir = os.path.join(self.dst_dir, "mydir") + os.makedirs(dst_subdir) + with open(os.path.join(dst_subdir, "old.txt"), "w") as f: + f.write("x" * 9999) # larger than the source so whole-dir size would mismatch + + process = self._make_process(source_path=self.src_dir, dest_path=self.dst_dir, file_name="mydir") + # Force the cross-device copy path so the size verification runs. + with mock.patch("controller.move.move_process.os.rename", side_effect=OSError(errno.EXDEV, "cross-device")): + self._run_process(process) + + # No false mismatch -- move succeeds and source is removed + self.assertEqual([], process.pop_failed()) + self.assertFalse(os.path.exists(src_subdir)) + self.assertTrue(os.path.exists(os.path.join(dst_subdir, "new.txt"))) + # Pre-existing destination file untouched + self.assertTrue(os.path.exists(os.path.join(dst_subdir, "old.txt"))) + + def test_move_rename_enotempty_falls_back_to_copy(self): + """ENOTEMPTY from os.rename must fall back to copy+delete, not re-raise""" + src_subdir = os.path.join(self.src_dir, "mydir") + os.makedirs(src_subdir) + with open(os.path.join(src_subdir, "a.txt"), "w") as f: + f.write("payload") + + process = self._make_process(source_path=self.src_dir, dest_path=self.dst_dir, file_name="mydir") + with mock.patch( + "controller.move.move_process.os.rename", + side_effect=OSError(errno.ENOTEMPTY, "directory not empty"), + ): + # Must not raise + self._run_process(process) + + self.assertEqual([], process.pop_failed()) + self.assertFalse(os.path.exists(src_subdir)) + self.assertTrue(os.path.exists(os.path.join(self.dst_dir, "mydir", "a.txt"))) + + def test_move_rename_unexpected_oserror_reraises(self): + """An unexpected OSError from os.rename must still propagate""" + src_file = os.path.join(self.src_dir, "test.txt") + with open(src_file, "w") as f: + f.write("content") + + process = self._make_process(source_path=self.src_dir, dest_path=self.dst_dir, file_name="test.txt") + with mock.patch( + "controller.move.move_process.os.rename", + side_effect=OSError(errno.EACCES, "permission denied"), + ): + with self.assertRaises(OSError): + self._run_process(process) + + def test_get_copied_size_ignores_preexisting_dest_files(self): + """_get_copied_size sums only files that exist in the source subtree""" + src_subdir = os.path.join(self.src_dir, "mydir") + os.makedirs(os.path.join(src_subdir, "inner")) + with open(os.path.join(src_subdir, "a.txt"), "w") as f: + f.write("aaa") # 3 + with open(os.path.join(src_subdir, "inner", "b.txt"), "w") as f: + f.write("bb") # 2 + + dst_subdir = os.path.join(self.dst_dir, "mydir") + os.makedirs(os.path.join(dst_subdir, "inner")) + # Mirror the source files + with open(os.path.join(dst_subdir, "a.txt"), "w") as f: + f.write("aaa") # 3 + with open(os.path.join(dst_subdir, "inner", "b.txt"), "w") as f: + f.write("bb") # 2 + # Plus an unrelated pre-existing file that must NOT be counted + with open(os.path.join(dst_subdir, "preexisting.txt"), "w") as f: + f.write("ignored entirely") + + self.assertEqual(5, MoveProcess._get_copied_size(src_subdir, dst_subdir)) diff --git a/src/python/tests/unittests/test_controller/test_validate/test_validate_process.py b/src/python/tests/unittests/test_controller/test_validate/test_validate_process.py index 7f77a227..b643be3b 100644 --- a/src/python/tests/unittests/test_controller/test_validate/test_validate_process.py +++ b/src/python/tests/unittests/test_controller/test_validate/test_validate_process.py @@ -302,6 +302,109 @@ def test_directory_validation_succeeds_when_all_match( failed = self.process.pop_failed() self.assertEqual(0, len(failed)) + @patch("controller.validate.validate_process.os.path.exists", return_value=True) + @patch("controller.validate.validate_process.os.path.isdir", return_value=True) + @patch("controller.validate.validate_process.os.walk") + @patch.object(ValidateProcess, "_create_ssh") + @patch.object(ValidateProcess, "_hash_remote_file", return_value="abc123") + @patch.object(ValidateProcess, "_hash_local_file", return_value="abc123") + def test_directory_validation_tilde_remote_path_matches( + self, mock_local_hash, mock_remote_hash, mock_ssh, mock_walk, mock_isdir, mock_exists + ): + """A tilde-based remote_path must still match local keys (regression for #519). + + With remote_path='~/downloads', the remote shell expands ~ to $HOME, so + find emits absolute paths like /home/user/downloads/mydir/a.txt. The fixed + key derivation roots relpath at remote_dir and prefixes req.name, so remote + keys (mydir/a.txt) match local keys despite the tilde base mismatch. + """ + mock_walk.return_value = [ + ("/local/mydir", [], ["a.txt", "b.txt"]), + ] + ssh_mock = MagicMock() + # Remote shell output after $HOME expansion (not literal ~) + ssh_mock.shell.return_value = b"/home/user/downloads/mydir/a.txt\n/home/user/downloads/mydir/b.txt\n" + mock_ssh.return_value = ssh_mock + + req = self._make_request(name="mydir", is_dir=True, remote_path="~/downloads") + self.process.validate(req) + self.process.run_loop() + + completed = self.process.pop_completed() + self.assertEqual(1, len(completed)) + self.assertEqual("mydir", completed[0].name) + + failed = self.process.pop_failed() + self.assertEqual(0, len(failed)) + + @patch("controller.validate.validate_process.os.path.exists", return_value=True) + @patch("controller.validate.validate_process.os.path.isdir", return_value=True) + @patch("controller.validate.validate_process.os.walk") + @patch.object(ValidateProcess, "_create_ssh") + @patch.object(ValidateProcess, "_hash_remote_file", return_value="abc123") + @patch.object(ValidateProcess, "_hash_local_file", return_value="abc123") + def test_directory_find_command_expands_tilde( + self, mock_local_hash, mock_remote_hash, mock_ssh, mock_walk, mock_isdir, mock_exists + ): + """The find command must expand a leading ~ to $HOME (double-quoted), not single-quote it.""" + mock_walk.return_value = [ + ("/local/mydir", [], ["a.txt"]), + ] + ssh_mock = MagicMock() + ssh_mock.shell.return_value = b"/home/user/downloads/mydir/a.txt\n" + mock_ssh.return_value = ssh_mock + + req = self._make_request(name="mydir", is_dir=True, remote_path="~/downloads") + self.process.validate(req) + self.process.run_loop() + + find_calls = [c.args[0] for c in ssh_mock.shell.call_args_list if c.args[0].startswith("find ")] + self.assertEqual(1, len(find_calls)) + find_cmd = find_calls[0] + self.assertIn('"$HOME/downloads/mydir"', find_cmd) + self.assertNotIn("'~", find_cmd) + + @patch.object(ValidateProcess, "_create_ssh") + def test_hash_remote_file_expands_tilde(self, mock_ssh): + """_hash_remote_file must expand a leading ~ to $HOME before issuing the hash command.""" + ssh_mock = MagicMock() + ssh_mock.shell.return_value = b"abc123 file\n" + + req = self._make_request(name="mydir", is_dir=True, remote_path="~/downloads") + result = self.process._hash_remote_file(req, "mydir/a.txt", "md5", ssh_mock) + + self.assertEqual("abc123", result) + cmd = ssh_mock.shell.call_args.args[0] + self.assertIn('"$HOME/downloads/mydir/a.txt"', cmd) + self.assertNotIn("'~", cmd) + + @patch("controller.validate.validate_process.os.path.exists", return_value=True) + @patch("controller.validate.validate_process.os.path.isdir", return_value=True) + @patch("controller.validate.validate_process.os.walk") + @patch.object(ValidateProcess, "_create_ssh") + @patch.object(ValidateProcess, "_hash_remote_file", return_value="abc123") + @patch.object(ValidateProcess, "_hash_local_file", return_value="abc123") + def test_directory_find_command_single_quotes_absolute( + self, mock_local_hash, mock_remote_hash, mock_ssh, mock_walk, mock_isdir, mock_exists + ): + """An absolute remote_path must be single-quoted (no $HOME expansion).""" + mock_walk.return_value = [ + ("/local/mydir", [], ["a.txt"]), + ] + ssh_mock = MagicMock() + ssh_mock.shell.return_value = b"/remote/mydir/a.txt\n" + mock_ssh.return_value = ssh_mock + + req = self._make_request(name="mydir", is_dir=True) + self.process.validate(req) + self.process.run_loop() + + find_calls = [c.args[0] for c in ssh_mock.shell.call_args_list if c.args[0].startswith("find ")] + self.assertEqual(1, len(find_calls)) + find_cmd = find_calls[0] + self.assertIn("'/remote/mydir'", find_cmd) + self.assertNotIn("$HOME", find_cmd) + def test_close_queues_releases_resources(self): # close_queues is also called in tearDown; calling it twice should be safe self.process.run_loop() diff --git a/src/python/tests/unittests/test_lftp/test_job_status_parser.py b/src/python/tests/unittests/test_lftp/test_job_status_parser.py index 3f2a7190..d9d25fc4 100644 --- a/src/python/tests/unittests/test_lftp/test_job_status_parser.py +++ b/src/python/tests/unittests/test_lftp/test_job_status_parser.py @@ -41,6 +41,14 @@ def test_eta_to_seconds(self): self.assertEqual(1 * 60 * 60 + 1 * 60 + 1, LftpJobStatusParser._eta_to_seconds("1h1m1s")) self.assertEqual(1 * 60 + 1, LftpJobStatusParser._eta_to_seconds("1m1s")) + def test_eta_to_seconds_digitless(self): + """Digit-less ETA tokens (e.g. 'm', 'd') must yield 0 instead of raising ValueError.""" + self.assertEqual(0, LftpJobStatusParser._eta_to_seconds("d")) + self.assertEqual(0, LftpJobStatusParser._eta_to_seconds("h")) + self.assertEqual(0, LftpJobStatusParser._eta_to_seconds("m")) + self.assertEqual(0, LftpJobStatusParser._eta_to_seconds("s")) + self.assertEqual(0, LftpJobStatusParser._eta_to_seconds("")) + def test_empty_output_1(self): output = "" parser = LftpJobStatusParser() @@ -1613,3 +1621,68 @@ def test_truly_unrecognized_line_outside_job_raises(self): parser = LftpJobStatusParser() with self.assertRaises(LftpJobStatusParserError): parser.parse(output) + + def test_filename_containing_jobs_v_literal(self): + """A filename containing the literal 'jobs -v' must not be corrupted by echo stripping. + + Regression for #517 BUG 1: the unconditional global replace('jobs -v', '') + deleted every 'jobs -v' substring, so a queued mirror of 'My.jobs -v.Release' + parsed as 'My..Release' and never matched scanner entries. + """ + output = ( + "jobs -v\n" + "[0] queue (sftp://someone:@localhost)\n" + "sftp://someone:@localhost/home/someone\n" + "Queue is stopped.\n" + "Commands queued:\n" + " 1. mirror -c /tmp/test_lftp/remote/My.jobs -v.Release /tmp/test_lftp/local/\n" + ) + parser = LftpJobStatusParser() + statuses = parser.parse(output) + self.assertEqual(1, len(statuses)) + self.assertEqual("My.jobs -v.Release", statuses[0].name) + self.assertEqual(LftpJobStatus.State.QUEUED, statuses[0].state) + + def test_eta_with_digitless_unit_does_not_raise(self): + """A malformed ETA unit (e.g. 'eta:m') must not abort the whole status parse. + + Regression for #517 BUG 2: a digit-less ETA token previously raised + ValueError -> LftpJobStatusParserError, suppressing all status. + """ + output = ( + "jobs -v\n" + "[1] pget -c /tmp/test_lftp/remote/c -o /tmp/test_lftp/local/\n" + "sftp://someone:@localhost/home/someone\n" + "`/tmp/test_lftp/remote/c' at 4585 (3%) 1.2K/s eta:m [Receiving data]\n" + ) + parser = LftpJobStatusParser() + statuses = parser.parse(output) + self.assertEqual(1, len(statuses)) + self.assertEqual("c", statuses[0].name) + + def test_queue_header_immediately_followed_by_commands_queued(self): + """'Commands queued:' directly after the second sftp header (no status line). + + Regression for #517 BUG 3: the unconditional pop consumed the + 'Commands queued:' line, the queued block was never parsed, and the + leftover numbered lines raised 'First line is not a matching header'. + """ + output = """ + [0] queue (sftp://someone:@localhost) + sftp://someone:@localhost/home/someone + Commands queued: + 1. mirror -c /tmp/test_lftp/remote/a /tmp/test_lftp/local/ + 2. pget -c /tmp/test_lftp/remote/c -o /tmp/test_lftp/local/ + """ + parser = LftpJobStatusParser() + statuses = parser.parse(output) + golden = [ + LftpJobStatus( + job_id=1, job_type=LftpJobStatus.Type.MIRROR, state=LftpJobStatus.State.QUEUED, name="a", flags="-c" + ), + LftpJobStatus( + job_id=2, job_type=LftpJobStatus.Type.PGET, state=LftpJobStatus.State.QUEUED, name="c", flags="-c" + ), + ] + self.assertEqual(len(golden), len(statuses)) + self.assertEqual(golden, statuses) diff --git a/src/python/tests/unittests/test_seedsync.py b/src/python/tests/unittests/test_seedsync.py index b7155af0..b301c82e 100644 --- a/src/python/tests/unittests/test_seedsync.py +++ b/src/python/tests/unittests/test_seedsync.py @@ -4,8 +4,9 @@ import os import tempfile import unittest +from unittest.mock import MagicMock -from common import Config, IntegrationsConfig, PathPairsConfig +from common import Config, IntegrationsConfig, PathPairsConfig, ServiceExit, ServiceRestart from seedsync import Seedsync @@ -342,3 +343,107 @@ def test_corrupt_file_backs_up_and_migrates(self): # Backup file should exist backup_files = [f for f in os.listdir(self.tmpdir) if f.endswith(".bak")] self.assertTrue(len(backup_files) > 0, "Expected a backup file to be created") + + +class TestPersistResilience(unittest.TestCase): + """Tests for issue #512: a transient persist() write failure must not kill the service. + + run() builds Controller/WebApp/real threads and loops forever, so it cannot be + exercised directly. We test the extracted _persist_periodic() helper on a bare + instance via Seedsync.__new__ with a mocked context/logger. (The need for + __new__ here itself signals that run()'s shutdown/persist coupling is a future + extraction candidate.) + """ + + def _make_bare_seedsync(self): + s = Seedsync.__new__(Seedsync) + s.context = MagicMock() + return s + + def test_periodic_persist_failure_is_swallowed_and_logged(self): + """A transient OSError (e.g. ENOSPC) during periodic persist must not propagate.""" + s = self._make_bare_seedsync() + s.persist = MagicMock(side_effect=OSError("ENOSPC")) + + # Must NOT raise — the supervisor loop has to keep running. + s._persist_periodic() + + s.persist.assert_called_once() + s.context.logger.exception.assert_called_once() + + def test_periodic_persist_success_no_error_log(self): + """On a successful persist, no error is logged.""" + s = self._make_bare_seedsync() + s.persist = MagicMock() + + s._persist_periodic() + + s.persist.assert_called_once() + s.context.logger.exception.assert_not_called() + + def test_periodic_persist_does_not_swallow_keyboardinterrupt(self): + """The guard is `except Exception`, so BaseException-derived signals propagate.""" + s = self._make_bare_seedsync() + s.persist = MagicMock(side_effect=KeyboardInterrupt()) + + with self.assertRaises(KeyboardInterrupt): + s._persist_periodic() + + def test_periodic_persist_does_not_swallow_systemexit(self): + """SystemExit (also BaseException) must propagate, not be swallowed.""" + s = self._make_bare_seedsync() + s.persist = MagicMock(side_effect=SystemExit()) + + with self.assertRaises(SystemExit): + s._persist_periodic() + + def test_final_persist_failure_is_swallowed_and_logged(self): + """AC3: a write failure during the shutdown persist must not propagate, + or it would mask the original in-flight exception run() re-raises.""" + s = self._make_bare_seedsync() + s.persist = MagicMock(side_effect=OSError("ENOSPC")) + + s._final_persist() # must NOT raise + + s.persist.assert_called_once() + s.context.logger.exception.assert_called_once() + + def test_final_persist_success_no_error_log(self): + s = self._make_bare_seedsync() + s.persist = MagicMock() + + s._final_persist() + + s.persist.assert_called_once() + s.context.logger.exception.assert_not_called() + + def test_shutdown_cause_intentional_exit_logged_at_info(self): + """AC2: an intentional ServiceExit/ServiceRestart is logged at INFO, not + surfaced as an error.""" + for exc in (ServiceExit(), ServiceRestart()): + with self.subTest(exc=type(exc).__name__): + s = self._make_bare_seedsync() + try: + raise exc + except (ServiceExit, ServiceRestart) as e: + s._log_shutdown_cause(e) + s.context.logger.info.assert_called_once() + s.context.logger.exception.assert_not_called() + + def test_shutdown_cause_unexpected_error_logged_at_exception(self): + """AC2: a genuine crash is surfaced at ERROR/exception with a traceback, + not the friendly INFO line — so an abnormal shutdown is visible.""" + s = self._make_bare_seedsync() + try: + raise RuntimeError("child thread blew up") + except RuntimeError as e: + s._log_shutdown_cause(e) + s.context.logger.exception.assert_called_once() + s.context.logger.info.assert_not_called() + + def test_service_exit_and_restart_are_app_errors_caught_by_shutdown_handler(self): + """ServiceExit/ServiceRestart subclass Exception, so run()'s `except Exception` + still catches them and the bare `raise` re-propagates them to the outer loop. + This guards the invariant the fix must preserve.""" + self.assertIsInstance(ServiceExit(), Exception) + self.assertIsInstance(ServiceRestart(), Exception) diff --git a/src/python/web/handler/auto_queue.py b/src/python/web/handler/auto_queue.py index 0de87e7a..f28766af 100644 --- a/src/python/web/handler/auto_queue.py +++ b/src/python/web/handler/auto_queue.py @@ -1,6 +1,7 @@ # Copyright 2017, Inderpreet Singh, All rights reserved. import logging +import threading from urllib.parse import unquote from bottle import HTTPResponse @@ -19,6 +20,14 @@ def __init__(self, auto_queue_persist: AutoQueuePersist, persist_path: str): self.__auto_queue_persist = auto_queue_persist self.__persist_path = persist_path self.__logger = logging.getLogger(self.__class__.__name__) + # Serializes the mutate → persist → rollback sequence in the add/remove + # handlers (mirrors ConfigHandler). add_pattern/remove_pattern fire + # listener side effects (AutoQueuePersistListener.new_patterns drives a + # queue replay on the next controller cycle), so a persist failure must + # roll back both __patterns and the listener state atomically. Without + # the lock, two concurrent writers could interleave mutate/persist/ + # rollback and leave on-disk and in-memory state diverged. + self.__write_lock = threading.Lock() @overrides(IHandler) def add_routes(self, web_app: WebApp): @@ -38,32 +47,48 @@ def __handle_add_autoqueue(self, pattern: str): aqp = AutoQueuePattern(pattern=pattern) - if aqp in self.__auto_queue_persist.patterns: - return HTTPResponse( - body=f"Auto-queue pattern '{pattern}' already exists.", - status=400, - content_type="text/plain", - headers=self._NOSNIFF_HEADERS, - ) - try: - self.__auto_queue_persist.add_pattern(aqp) - except ValueError as e: - return HTTPResponse( - body=str(e), - status=400, - content_type="text/plain", - headers=self._NOSNIFF_HEADERS, - ) - try: - self.__auto_queue_persist.to_file(self.__persist_path) - except OSError: - self.__logger.exception("Failed to persist auto-queue after adding pattern %r", pattern) - return HTTPResponse( - body="Failed to persist auto-queue", - status=500, - content_type="text/plain", - headers=self._NOSNIFF_HEADERS, - ) + # Hold __write_lock across the existence check AND the mutate → persist → + # rollback sequence so the test-and-set is atomic: two concurrent adds of + # the same pattern can't both pass the "already exists" check. add_pattern + # fires pattern_added, which the AutoQueuePersistListener records in + # new_patterns and the controller replays against all model files on the + # next cycle (auto-queueing matching files). If persisting fails we MUST + # undo that side effect, otherwise the pattern would auto-queue files this + # session yet be gone on restart — "phantom" queueing that contradicts the + # 500 (see #518). + with self.__write_lock: + if aqp in self.__auto_queue_persist.patterns: + return HTTPResponse( + body=f"Auto-queue pattern '{pattern}' already exists.", + status=400, + content_type="text/plain", + headers=self._NOSNIFF_HEADERS, + ) + try: + self.__auto_queue_persist.add_pattern(aqp) + except ValueError as e: + return HTTPResponse( + body=str(e), + status=400, + content_type="text/plain", + headers=self._NOSNIFF_HEADERS, + ) + try: + self.__auto_queue_persist.to_file(self.__persist_path) + except Exception: + # Roll back on ANY persist failure (not just OSError — e.g. a + # to_str()/serialization error). remove_pattern fires + # pattern_removed, which discards the pattern from the listener's + # new_patterns, undoing the side effect above so disk and memory + # stay consistent. + self.__auto_queue_persist.remove_pattern(aqp) + self.__logger.exception("Failed to persist auto-queue after adding pattern %r", pattern) + return HTTPResponse( + body="Failed to persist auto-queue", + status=500, + content_type="text/plain", + headers=self._NOSNIFF_HEADERS, + ) return HTTPResponse( body=f"Added auto-queue pattern '{pattern}'.", content_type="text/plain", @@ -76,24 +101,31 @@ def __handle_remove_autoqueue(self, pattern: str): aqp = AutoQueuePattern(pattern=pattern) - if aqp not in self.__auto_queue_persist.patterns: - return HTTPResponse( - body=f"Auto-queue pattern '{pattern}' doesn't exist.", - status=400, - content_type="text/plain", - headers=self._NOSNIFF_HEADERS, - ) - self.__auto_queue_persist.remove_pattern(aqp) - try: - self.__auto_queue_persist.to_file(self.__persist_path) - except OSError: - self.__logger.exception("Failed to persist auto-queue after removing pattern %r", pattern) - return HTTPResponse( - body="Failed to persist auto-queue", - status=500, - content_type="text/plain", - headers=self._NOSNIFF_HEADERS, - ) + # Hold __write_lock across the existence check AND the mutate → persist → + # rollback sequence so the test-and-set is atomic (see __handle_add_autoqueue). + with self.__write_lock: + if aqp not in self.__auto_queue_persist.patterns: + return HTTPResponse( + body=f"Auto-queue pattern '{pattern}' doesn't exist.", + status=400, + content_type="text/plain", + headers=self._NOSNIFF_HEADERS, + ) + self.__auto_queue_persist.remove_pattern(aqp) + try: + self.__auto_queue_persist.to_file(self.__persist_path) + except Exception: + # Roll back on ANY persist failure: re-add the pattern so + # in-memory state matches the on-disk file (the pattern was + # persisted before this removal). + self.__auto_queue_persist.add_pattern(aqp) + self.__logger.exception("Failed to persist auto-queue after removing pattern %r", pattern) + return HTTPResponse( + body="Failed to persist auto-queue", + status=500, + content_type="text/plain", + headers=self._NOSNIFF_HEADERS, + ) return HTTPResponse( body=f"Removed auto-queue pattern '{pattern}'.", content_type="text/plain", diff --git a/src/python/web/handler/config.py b/src/python/web/handler/config.py index edeb8ce9..0ea18a8d 100644 --- a/src/python/web/handler/config.py +++ b/src/python/web/handler/config.py @@ -84,7 +84,7 @@ def __handle_set_config(self, section: str, key: str, value: str): # Hold __write_lock across the mutate → persist → rollback sequence so # writers can't interleave. With the lock, the rollback is unconditional: # no other writer can have changed the value between set_property and - # the OSError handler. + # the persist handler. with self.__write_lock: old_value = getattr(inner_config, key) try: @@ -93,7 +93,13 @@ def __handle_set_config(self, section: str, key: str, value: str): return HTTPResponse(body=str(e), status=400) try: self.__config.to_file(self.__config_path) - except OSError: + except Exception: + # Restore the in-memory value on ANY persist failure (not just + # OSError) so runtime state never diverges from disk. A non-OSError + # such as configparser.Error from serialization would otherwise + # escape and leave the new value live without ever being persisted + # (see #507). old_value is the prior valid value, so the rollback + # set_property cannot raise ConfigError. inner_config.set_property(key, old_value) self.__logger.exception("Failed to persist config %s.%s", section, key) return HTTPResponse(body=f"Failed to persist config {section}.{key}", status=500) diff --git a/src/python/web/handler/controller.py b/src/python/web/handler/controller.py index 50e89997..bbae8db2 100644 --- a/src/python/web/handler/controller.py +++ b/src/python/web/handler/controller.py @@ -11,6 +11,13 @@ from ..web_app import IHandler, WebApp +# Upper bound (in seconds) on how long a file-action request will block waiting +# for the controller thread to signal the callback. The healthy path resolves in +# well under a second (the controller processes commands every ~0.5s), so this is +# a generous margin that still prevents a stalled controller from hanging the +# WSGI request thread indefinitely. +_ACTION_TIMEOUT_IN_SECS = 30 + def _validate_filename(file_name: str) -> bool: """ @@ -75,8 +82,12 @@ def on_success(self): self.success = True self.__event.set() - def wait(self): - self.__event.wait() + def wait(self) -> bool: + """Block until the controller signals the callback or the timeout elapses. + + Returns True if the event fired (success/failure recorded), False on timeout. + """ + return self.__event.wait(timeout=_ACTION_TIMEOUT_IN_SECS) class ControllerHandler(IHandler): @@ -105,7 +116,8 @@ def __dispatch_command(self, file_name: str, action: Controller.Command.Action, callback = WebResponseActionCallback() command.add_callback(callback) self.__controller.queue_command(command) - callback.wait() + if not callback.wait(): + return HTTPResponse(body="Controller did not respond in time", status=504) if callback.success: return HTTPResponse(body=success_msg.format(decoded)) return HTTPResponse(body=callback.error or "Unknown error", status=400)