Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions services/analysis-engine/src/bandscope_analysis/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import hashlib
import json
import logging
import multiprocessing as mp
import queue
import time
Expand All @@ -24,9 +25,12 @@
FEATURE_CACHE_SCHEMA_VERSION = 1
STEM_SEPARATION_TIMEOUT_SECONDS = 20.0

logger = logging.getLogger(__name__)

AnalysisJobState = Literal["queued", "running", "succeeded", "failed"]
AnalysisJobStage = Literal["queued", "decode", "separate", "analyze", "persist", "ready"]
AnalysisCacheStatus = Literal["disabled", "miss", "hit", "stored"]
StemSeparationFailureKind = Literal["file_not_found", "value_error", "runtime_error"]


class AnalysisJobRequest(TypedDict):
Expand Down Expand Up @@ -834,14 +838,39 @@ def _stem_separation_worker(
)
return
result_queue.put(("ok", separation_result))
except FileNotFoundError as error:
result_queue.put(("file_not_found", str(error)))
except ValueError as error:
result_queue.put(("value_error", str(error)))
except RuntimeError as error:
result_queue.put(("runtime_error", str(error)))
except Exception as error:
result_queue.put(("runtime_error", str(error)))
kind, safe_message, log_message = _stem_separation_failure(error)
logger.exception(log_message)
result_queue.put((kind, safe_message))


def _stem_separation_failure(
error: Exception,
) -> tuple[StemSeparationFailureKind, str, str]:
"""Map worker exceptions to safe parent payloads and stable log messages."""
if isinstance(error, FileNotFoundError):
return (
"file_not_found",
"Audio source file not found.",
"Stem separation failed because the source file was missing.",
)
if isinstance(error, ValueError):
return (
"value_error",
"Invalid audio source data.",
"Stem separation rejected invalid audio source data.",
)
if isinstance(error, RuntimeError):
return (
"runtime_error",
"Runtime error occurred during stem separation.",
"Stem separation failed with a runtime error.",
)
return (
"runtime_error",
"An unexpected error occurred during stem separation.",
"Stem separation failed unexpectedly.",
)


def _multiprocessing_context() -> mp.context.BaseContext:
Expand Down Expand Up @@ -1082,7 +1111,8 @@ def run_analysis_job_updates(
)
)
audio_features = None
except (FileNotFoundError, ValueError) as error:
except (FileNotFoundError, ValueError):
logger.error("Stem separation failed before analysis job completion.", exc_info=True)
updates.append(
_build_job_status(
job_id=job_id,
Expand All @@ -1094,7 +1124,7 @@ def run_analysis_job_updates(
cache_status=cache_status,
error={
"code": "engine_unavailable",
"message": f"Stem separation failed: {error}",
"message": "Stem separation failed",
},
)
)
Expand Down
60 changes: 44 additions & 16 deletions services/analysis-engine/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ def test_run_analysis_job_updates_report_progress_and_cache(tmp_path) -> None:

def test_run_analysis_job_updates_fail_safely_when_local_separation_fails() -> None:
"""Ensure unsafe or undecodable local audio returns a typed failure envelope."""
with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class:
with (
patch("bandscope_analysis.api.AudioStemSeparator") as separator_class,
patch("bandscope_analysis.api.logger") as logger,
):
separator_class.return_value.separate.side_effect = ValueError(
"Audio file is too large for stem separation: 16 bytes (max 8 bytes)"
)
Expand Down Expand Up @@ -495,11 +498,13 @@ def test_run_analysis_job_updates_fail_safely_when_local_separation_fails() -> N
assert updates[-1]["progressPercent"] == 45
assert updates[-1]["error"] == {
"code": "engine_unavailable",
"message": (
"Stem separation failed: Audio file is too large for stem separation: "
"16 bytes (max 8 bytes)"
),
"message": "Stem separation failed",
}
assert "/Users/test/Music" not in str(updates[-1]["error"])
logger.error.assert_called_once_with(
"Stem separation failed before analysis job completion.",
exc_info=True,
)


def test_cached_analysis_helpers_treat_invalid_cache_as_miss(tmp_path) -> None:
Expand Down Expand Up @@ -848,18 +853,43 @@ def put(self, item: tuple[str, object]) -> None:
self.items.append(item)

cases = [
(FileNotFoundError("missing"), "file_not_found"),
(ValueError("bad media"), "value_error"),
(RuntimeError("oom"), "runtime_error"),
(Exception("unexpected"), "runtime_error"),
(
FileNotFoundError("missing /secret/audio.wav"),
"file_not_found",
"Audio source file not found.",
"Stem separation failed because the source file was missing.",
),
(
ValueError("bad media /secret/audio.wav"),
"value_error",
"Invalid audio source data.",
"Stem separation rejected invalid audio source data.",
),
(
RuntimeError("oom /secret/audio.wav"),
"runtime_error",
"Runtime error occurred during stem separation.",
"Stem separation failed with a runtime error.",
),
(
Exception("unexpected /secret/audio.wav"),
"runtime_error",
"An unexpected error occurred during stem separation.",
"Stem separation failed unexpectedly.",
),
]

for error, expected_kind in cases:
for error, expected_kind, expected_message, expected_log_message in cases:
fake_queue = FakeQueue()
with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class:
with (
patch("bandscope_analysis.api.AudioStemSeparator") as separator_class,
patch("bandscope_analysis.api.logger") as logger,
):
separator_class.return_value.separate.side_effect = error
_stem_separation_worker("/tmp/audio.wav", fake_queue)
assert fake_queue.items == [(expected_kind, str(error))]
assert fake_queue.items == [(expected_kind, expected_message)]
assert "/secret" not in str(fake_queue.items)
logger.exception.assert_called_once_with(expected_log_message)

fake_queue = FakeQueue()
with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class:
Expand All @@ -871,7 +901,7 @@ def put(self, item: tuple[str, object]) -> None:
with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class:
separator_class.return_value.separate.return_value = {"stems": {}}
_stem_separation_worker("/tmp/audio.wav", fake_queue, "/tmp/stems.npz")
assert fake_queue.items == [("runtime_error", "Stem separation returned invalid stems.")]
assert fake_queue.items == [("runtime_error", "Runtime error occurred during stem separation.")]

fake_queue = FakeQueue()
with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class:
Expand All @@ -880,9 +910,7 @@ def put(self, item: tuple[str, object]) -> None:
"stem_role_types": {"bass": "percussion"},
}
_stem_separation_worker("/tmp/audio.wav", fake_queue, "/tmp/stems.npz")
assert fake_queue.items == [
("runtime_error", "Stem separation returned invalid stem role metadata.")
]
assert fake_queue.items == [("runtime_error", "Runtime error occurred during stem separation.")]


def test_stem_separation_worker_writes_large_stems_to_file_envelope(tmp_path) -> None:
Expand Down