Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6e98254
Roll back config on any persist failure, not just OSError (#507)
nitrobass24 Jun 2, 2026
8448ae6
Harden periodic and shutdown persist against transient write failures…
nitrobass24 Jun 2, 2026
22c31fd
Fix job status parser robustness bugs (#517)
nitrobass24 Jun 2, 2026
6221513
Fix tilde expansion in remote validate find/hash commands (#519)
nitrobass24 Jun 2, 2026
61b3301
Lock-guard AutoQueuePersist patterns and listener set (#509)
nitrobass24 Jun 2, 2026
7ccae90
Roll back auto-queue add/remove on persist failure (#518)
nitrobass24 Jun 2, 2026
ea0cc8c
Surface staging->final move failures via result queue with retry (#510)
nitrobass24 Jun 2, 2026
35bf824
Isolate extract/validate worker faults from controller loop (#511)
nitrobass24 Jun 2, 2026
24712b3
Make Controller.exit() best-effort so hung lftp doesn't leak processe…
nitrobass24 Jun 2, 2026
7ff125c
Bound controller action wait with 504 timeout (#526)
nitrobass24 Jun 2, 2026
779a898
Make all Controller.exit() teardown calls best-effort (#508 review)
nitrobass24 Jun 2, 2026
6bdf327
Surface a permanently-dead extract/validate worker at ERROR (#511 rev…
nitrobass24 Jun 2, 2026
66d19da
Strengthen auto-queue lock tests into deterministic guards (#509 review)
nitrobass24 Jun 2, 2026
4761406
Extract and test shutdown classifier + final persist (#512 review)
nitrobass24 Jun 2, 2026
01ae955
Document _remote_key nested-same-name limitation (#519 review)
nitrobass24 Jun 2, 2026
6e00cad
Apply ruff format to #510/#519 test files
nitrobass24 Jun 2, 2026
44d053d
Add type annotations to teardown/worker-check helpers
nitrobass24 Jun 2, 2026
360d753
Drain new auto-queue patterns atomically per cycle (#537 review)
nitrobass24 Jun 2, 2026
1aae69a
Harden auto-queue handler: atomic check + broad rollback (#537 review)
nitrobass24 Jun 2, 2026
97379ad
Reap finished moves and rescan only the owning pair (#537 review)
nitrobass24 Jun 2, 2026
10dea66
Set __started before launching children so partial start cleans up (#…
nitrobass24 Jun 2, 2026
f2eb08c
Start move process before publishing bookkeeping (#537 review)
nitrobass24 Jun 2, 2026
0043d5e
Bound worker joins in Controller.exit() (#537 review)
nitrobass24 Jun 2, 2026
bd61dd2
Assert join in partial-start teardown test (#537 review)
nitrobass24 Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 88 additions & 29 deletions src/python/controller/auto_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import fnmatch
import json
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable

Expand Down Expand Up @@ -66,31 +67,47 @@ class AutoQueuePersist(Persist):
__KEY_PATTERNS = "patterns"

def __init__(self):
# Guards __patterns and __listeners. add/remove run on the web thread
# while patterns/to_str are read on the controller thread, so the list
# must be lock-guarded (mirrors common/path_pairs_config.py). RLock is
# reentrant so from_str -> add_pattern does not self-deadlock.
self.__lock = threading.RLock()
self.__patterns: list[AutoQueuePattern] = []
self.__listeners: list[IAutoQueuePersistListener] = []

@property
def patterns(self) -> set[AutoQueuePattern]:
return set(self.__patterns)
with self.__lock:
return set(self.__patterns)

def add_pattern(self, pattern: AutoQueuePattern):
# Check values
if not pattern.pattern.strip():
raise ValueError("Cannot add blank pattern")

if pattern not in self.__patterns:
with self.__lock:
if pattern in self.__patterns:
return
self.__patterns.append(pattern)
for listener in self.__listeners:
listener.pattern_added(pattern)
listeners = list(self.__listeners)
# Notify outside the lock so the persist lock and any listener lock are
# never held simultaneously.
for listener in listeners:
listener.pattern_added(pattern)

def remove_pattern(self, pattern: AutoQueuePattern):
if pattern in self.__patterns:
with self.__lock:
if pattern not in self.__patterns:
return
self.__patterns.remove(pattern)
for listener in self.__listeners:
listener.pattern_removed(pattern)
listeners = list(self.__listeners)
# Notify outside the lock (see add_pattern).
for listener in listeners:
listener.pattern_removed(pattern)

def add_listener(self, listener: IAutoQueuePersistListener):
self.__listeners.append(listener)
with self.__lock:
self.__listeners.append(listener)

@classmethod
@overrides(Persist)
Expand All @@ -108,7 +125,8 @@ def from_str(cls: type["AutoQueuePersist"], content: str) -> "AutoQueuePersist":
@overrides(Persist)
def to_str(self) -> str:
dct: dict[str, list[str]] = {}
dct[AutoQueuePersist.__KEY_PATTERNS] = [p.to_str() for p in self.__patterns]
with self.__lock:
dct[AutoQueuePersist.__KEY_PATTERNS] = [p.to_str() for p in self.__patterns]
return json.dumps(dct, indent=Constants.JSON_PRETTY_PRINT_INDENT)


Expand All @@ -133,27 +151,51 @@ def file_removed(self, file: ModelFile):


class AutoQueuePersistListener(IAutoQueuePersistListener):
"""Keeps track of newly added patterns"""
"""Keeps track of newly added patterns.

pattern_added/pattern_removed run on the web thread while the controller
thread drains (drain_new_patterns) the set. The lock keeps the
controller-thread read from racing a web-thread mutation, which would
otherwise raise 'RuntimeError: Set changed size during iteration'.
"""

def __init__(self):
self.__lock = threading.RLock()
self.new_patterns: set[AutoQueuePattern] = set()

@overrides(IAutoQueuePersistListener)
def pattern_added(self, pattern: AutoQueuePattern):
self.new_patterns.add(pattern)
with self.__lock:
self.new_patterns.add(pattern)

@overrides(IAutoQueuePersistListener)
def pattern_removed(self, pattern: AutoQueuePattern):
if pattern in self.new_patterns:
self.new_patterns.remove(pattern)
with self.__lock:
self.new_patterns.discard(pattern)

def drain_new_patterns(self) -> set[AutoQueuePattern]:
"""Atomically return a copy of new_patterns and clear it in one locked
step.

Draining (snapshot + clear together) closes a race that a separate
snapshot-then-clear leaves open: a pattern added by a concurrent
pattern_added() between the snapshot and the clear would be wiped
without ever being processed. With an atomic drain, such a pattern
simply stays in the set for the next cycle.
"""
with self.__lock:
drained = set(self.new_patterns)
self.new_patterns.clear()
return drained


class AutoQueue:
"""
Implements auto-queue functionality by sending commands to controller
as matching files are discovered
AutoQueue is in the same thread as Controller, so no synchronization is
needed for now
as matching files are discovered.
AutoQueue runs on the Controller thread, but pattern add/remove happens on
the web handler thread; shared state in AutoQueuePersist and
AutoQueuePersistListener is therefore lock-guarded.
"""

def __init__(self, context: Context, persist: AutoQueuePersist, controller: Controller):
Expand Down Expand Up @@ -205,9 +247,17 @@ def process(self):
if not any_auto_feature:
return

files_to_queue = self.__gather_queue_candidates() if self.__enabled else []
files_to_extract = self.__gather_extract_candidates() if self.__auto_extract_enabled else []
files_to_delete_remote = self.__gather_delete_remote_candidates() if self.__auto_delete_remote_enabled else []
# Drain the newly-added patterns once for this whole cycle. Draining
# (snapshot + clear atomically) instead of snapshotting per-gather and
# clearing separately at the end ensures a pattern added concurrently
# mid-cycle is retained for the next cycle rather than cleared unprocessed.
new_patterns = self.__persist_listener.drain_new_patterns()

files_to_queue = self.__gather_queue_candidates(new_patterns) if self.__enabled else []
files_to_extract = self.__gather_extract_candidates(new_patterns) if self.__auto_extract_enabled else []
files_to_delete_remote = (
self.__gather_delete_remote_candidates(new_patterns) if self.__auto_delete_remote_enabled else []
)

# Send commands (order matters: queue, extract, then delete remote)
self.__send_commands(files_to_queue, Controller.Command.Action.QUEUE, "Auto queueing")
Expand All @@ -218,13 +268,11 @@ def process(self):
if self.__auto_delete_remote_enabled:
self.__retry_delete_remote(files_to_delete_remote)

# Clear the processed files
# Clear the processed files (new patterns were already drained above)
self.__model_listener.new_files.clear()
self.__model_listener.modified_files.clear()
# Clear the new patterns
self.__persist_listener.new_patterns.clear()

def __gather_queue_candidates(self) -> list[_Candidate]:
def __gather_queue_candidates(self, new_patterns: set[AutoQueuePattern]) -> list[_Candidate]:
candidates: list[ModelFile] = list(self.__model_listener.new_files)
for old_file, new_file in self.__model_listener.modified_files:
if old_file.remote_size != new_file.remote_size:
Expand All @@ -236,9 +284,10 @@ def __gather_queue_candidates(self) -> list[_Candidate]:
and f.state == ModelFile.State.DEFAULT
and self._is_auto_queue_enabled_for_file(f)
),
new_patterns=new_patterns,
)

def __gather_extract_candidates(self) -> list[_Candidate]:
def __gather_extract_candidates(self, new_patterns: set[AutoQueuePattern]) -> list[_Candidate]:
candidates: list[ModelFile] = list(self.__model_listener.new_files)
# Candidate modified files that just became DOWNLOADED
# But not files that went EXTRACTING -> DOWNLOADED (failed extraction)
Expand All @@ -257,9 +306,10 @@ def __gather_extract_candidates(self) -> list[_Candidate]:
and f.local_size > 0
and f.is_extractable
),
new_patterns=new_patterns,
)

def __gather_delete_remote_candidates(self) -> list[_Candidate]:
def __gather_delete_remote_candidates(self, new_patterns: set[AutoQueuePattern]) -> list[_Candidate]:
candidates: list[ModelFile] = list(self.__model_listener.new_files)
for old_file, new_file in self.__model_listener.modified_files:
if old_file.state != new_file.state and new_file.state in (
Expand All @@ -279,6 +329,7 @@ def __gather_delete_remote_candidates(self) -> list[_Candidate]:
)
)
),
new_patterns=new_patterns,
)

def __send_commands(
Expand Down Expand Up @@ -328,14 +379,21 @@ def _is_auto_queue_enabled_for_file(self, file: ModelFile) -> bool:
return self.__pair_auto_queue.get(file.pair_id, False)
return True

def __filter_candidates(self, candidates: list[ModelFile], accept: Callable[[ModelFile], bool]) -> list[_Candidate]:
def __filter_candidates(
self,
candidates: list[ModelFile],
accept: Callable[[ModelFile], bool],
new_patterns: set[AutoQueuePattern],
) -> list[_Candidate]:
"""
Given a list of candidate files, filter out those that match the accept criteria
Also takes into consideration new patterns that were added
The accept criteria is applied to candidates AND all existing files in case of
new patterns
:param candidates:
:param accept:
:param new_patterns: patterns added since the last cycle (drained once by
the caller), matched retroactively against all existing model files
:return: list of (filename, pair_id, pattern) tuples
"""
# Files accepted and matched, (name, pair_id) -> pattern map
Expand All @@ -353,10 +411,11 @@ def __filter_candidates(self, candidates: list[ModelFile], accept: Callable[[Mod
elif accept(file):
files_matched[(file.name, file.pair_id)] = None

# Step 2: run new pattern through all the files
if self.__persist_listener.new_patterns:
# Step 2: run new patterns (drained once by process() for this cycle)
# through all the files
if new_patterns:
model_files = self.__controller.get_model_files()
for new_pattern in self.__persist_listener.new_patterns:
for new_pattern in new_patterns:
for file in model_files:
if accept(file) and self.__match(new_pattern, file):
files_matched[(file.name, file.pair_id)] = new_pattern
Expand Down
99 changes: 86 additions & 13 deletions src/python/controller/command_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
if TYPE_CHECKING:
from .controller import Controller

from common import AppError, Context, MultiprocessingLogger
from common import AppError, AppProcess, Context, MultiprocessingLogger
from lftp import LftpError
from model import ModelError, ModelFile

Expand Down Expand Up @@ -78,6 +78,10 @@ def __init__(
# Track files with pending validation so extraction-completion doesn't race the move
self.pending_validation_keys: set[str] = set()

# Worker ids already reported as dead, so a permanently-dead extract/
# validate worker is surfaced once at ERROR rather than every cycle.
self.__reported_dead_workers: set[int] = set()

def queue(self, command: Controller.Command) -> None:
"""Put a command on the queue for processing."""
self.command_queue.put(command)
Expand Down Expand Up @@ -391,16 +395,44 @@ def cleanup(self):
if move_process.is_alive():
still_active_moves.append(move_process)
else:
try:
move_process.propagate_exception()
except Exception:
self._logger.warning("Move process failed: %s", move_process.name, exc_info=True)
move_key = persist_key(move_process.pair_id, move_process.file_name)
self.moved_file_keys.discard(move_key)
for pc in self._pair_contexts:
pc.local_scan_process.force_scan()
self._finalize_move_process(move_process)
# Reap the finished process now (join + close its queues) so its
# FDs are released immediately. It is dropped from
# active_move_processes below, so Controller.exit() never sees it
# and would otherwise leak its queue FDs until shutdown.
move_process.join()
move_process.close_queues()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
self.active_move_processes = still_active_moves
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def _finalize_move_process(self, move_process: MoveProcess) -> None:
"""Finalize a completed move: surface failures, discard key on failure, rescan.

A move can fail two ways: by raising (propagate_exception) or by
reporting a MoveFailedResult on its failed queue (silent return paths
such as a vanished source or a size mismatch). In both cases the moved
key is discarded so the next force_scan re-spawns the move (retry).
"""
failed = False
try:
move_process.propagate_exception()
except Exception:
self._logger.warning("Move process failed: %s", move_process.name, exc_info=True)
failed = True
for result in move_process.pop_failed():
self._logger.error(f"Move failed for '{result.name}': {result.error_message}")
failed = True
if failed:
move_key = persist_key(move_process.pair_id, move_process.file_name)
self.moved_file_keys.discard(move_key)
# The move only changed the owning pair's local_path, so rescan just that
# pair; fall back to all pairs only if the owner can't be located.
owner = next((pc for pc in self._pair_contexts if pc.pair_id == move_process.pair_id), None)
if owner is not None:
owner.local_scan_process.force_scan()
else:
for pc in self._pair_contexts:
pc.local_scan_process.force_scan()

def propagate_exceptions(self):
"""
Propagate any exceptions from child processes/threads to this thread
Expand All @@ -419,8 +451,45 @@ def propagate_exceptions(self):
pc.local_scan_process.propagate_exception()
pc.remote_scan_process.propagate_exception()
self._mp_logger.propagate_exception()
self._extract_process.propagate_exception()
self._validate_process.propagate_exception()
# A fault in an extract/validate worker that surfaces outside its
# per-task guard (OOM, queue corruption, run_init/run_cleanup bug)
# must degrade that feature only, not kill the whole controller.
# Isolate each worker so one dead worker doesn't stop the other or
# halt all downloads.
# NOTE: this only ISOLATES the fault — the dead worker is not yet
# recreated, so extract/validate stays disabled until the service is
# restarted. Automatic worker recreation is tracked in #511's follow-up.
try:
self._extract_process.propagate_exception()
except Exception:
self._logger.warning("Extract worker process failed: %s", self._extract_process.name, exc_info=True)
try:
self._validate_process.propagate_exception()
except Exception:
self._logger.warning("Validate worker process failed: %s", self._validate_process.name, exc_info=True)
# propagate_exception() consumes the queued fault once, so without this a
# permanently-dead worker would go silent after the first cycle. Surface
# the dead state once at ERROR so the degradation stays visible.
self.__report_if_worker_dead(self._extract_process, "Extract")
self.__report_if_worker_dead(self._validate_process, "Validate")

def __report_if_worker_dead(self, worker: AppProcess, feature: str) -> None:
worker_id = id(worker)
if worker_id in self.__reported_dead_workers:
return
try:
alive = worker.is_alive()
except (ValueError, AssertionError):
# Never started or already closed — treat as not-running.
alive = False
if not alive:
self.__reported_dead_workers.add(worker_id)
self._logger.error(
"%s worker process %s has died; %s is disabled until the service is restarted.",
feature,
worker.name,
feature.lower(),
)

def spawn_deferred_move(self, pair_id: str | None, file_name: str):
"""Spawn the staging->final move for a file whose validation just finished.
Expand Down Expand Up @@ -459,11 +528,15 @@ def spawn_move_process(self, file_name: str, pc: PairContext):
self.moved_file_keys.add(move_key)
return

self.moved_file_keys.add(move_key)
process = MoveProcess(source_path=staging_source, dest_path=dest_path, file_name=file_name, pair_id=pair_id)
process.set_mp_log_queue(self._mp_logger.queue, self._mp_logger.log_level)
self.active_move_processes.append(process)
# Start before publishing bookkeeping: if start() raises we must not leave
# a stale move_key (which would block every future retry of this move) or
# a never-started process in active_move_processes (cleanup() would then
# join()/close_queues() it and raise AssertionError).
process.start()
self.moved_file_keys.add(move_key)
self.active_move_processes.append(process)
self._logger.info(f"Spawned move process for {file_name} (staging -> local)")

def _get_pair_context_for_command(self, command: Controller.Command) -> PairContext | None:
Expand Down
Loading