diff --git a/services/analysis-engine/tests/test_sections_utils.py b/services/analysis-engine/tests/test_sections_utils.py new file mode 100644 index 00000000..b33628ff --- /dev/null +++ b/services/analysis-engine/tests/test_sections_utils.py @@ -0,0 +1,40 @@ +"""Tests for section utility functions.""" + +import logging +from unittest.mock import Mock + +import pytest + +from bandscope_analysis.sections.utils import validate_section + + +@pytest.mark.parametrize( + ("section", "index", "expected_id"), + [ + ({"id": "custom-id", "label": "intro"}, 1, "custom-id"), + ({"label": "intro"}, 2, "section-2"), + ], +) +def test_validate_section_accepts_dict_sections( + section: dict[str, str], index: int, expected_id: str +) -> None: + """Verify valid section dictionaries return stable ids without warnings.""" + mock_logger = Mock(spec=logging.Logger) + + result = validate_section(section, index=index, logger=mock_logger) + + assert result == expected_id + mock_logger.warning.assert_not_called() + + +def test_validate_section_warns_for_invalid_section_type() -> None: + """Verify invalid sections log type context and return a generated id.""" + mock_logger = Mock(spec=logging.Logger) + + result = validate_section(["intro"], index=3, logger=mock_logger) + + assert result == "section-3" + mock_logger.warning.assert_called_once() + warning_context = " ".join(str(value) for value in mock_logger.warning.call_args.args) + assert "3" in warning_context + assert "list" in warning_context diff --git a/services/analysis-engine/tests/test_segmenter.py b/services/analysis-engine/tests/test_segmenter.py index 37fee9f9..f0284b30 100644 --- a/services/analysis-engine/tests/test_segmenter.py +++ b/services/analysis-engine/tests/test_segmenter.py @@ -324,3 +324,75 @@ def test_segment_with_boundaries_handles_empty_short_and_failed_inputs() -> None assert "bad combined boundary" in failed_sections[0]["confidence_notes"] assert failed_boundaries == [(0.0, 20.0)] + + +def test_detect_boundaries_ignores_peak_indexes_without_frame_times() -> None: + """Ensure peaks beyond frame_times length are skipped.""" + novelty = np.array([0.0, 0.1, 0.2, 0.9, 0.2, 0.1, 0.0], dtype=np.float64) + frame_times = np.array([0.0, 1.0], dtype=np.float64) + + boundaries = detect_boundaries(novelty, frame_times, 10.0) + + assert boundaries == [0.0] + + +def test_detect_boundaries_ignores_peaks_near_end_of_duration() -> None: + """Ensure boundaries are not created within one second of total duration.""" + novelty = np.array([0.0, 0.1, 0.9, 0.1, 0.9, 0.1, 0.0], dtype=np.float64) + frame_times = np.array([0.0, 2.5, 5.0, 7.5, 9.5, 10.0, 10.5], dtype=np.float64) + + boundaries = detect_boundaries(novelty, frame_times, 10.0) + + assert boundaries == [0.0, 5.0] + + +def test_detect_boundaries_threshold_floor_filters_small_peaks() -> None: + """Ensure the adaptive threshold floor suppresses tiny local maxima.""" + novelty = np.array([0.0, 0.01, 0.09, 0.01, 0.0], dtype=np.float64) + frame_times = np.array([0.0, 1.0, 2.0, 3.0, 4.0], dtype=np.float64) + + boundaries = detect_boundaries(novelty, frame_times, 10.0) + + assert boundaries == [0.0] + + +def test_detect_boundaries_flat_novelty_returns_start_only() -> None: + """Ensure flat novelty does not produce boundaries.""" + novelty = np.ones(10, dtype=np.float64) * 0.5 + frame_times = np.arange(10, dtype=np.float64) + + boundaries = detect_boundaries(novelty, frame_times, 10.0) + + assert boundaries == [0.0] + + +def test_detect_boundaries_skips_candidates_too_close_to_previous_boundary() -> None: + """Ensure candidate boundaries must satisfy the minimum segment length.""" + novelty = np.array([0.0, 0.9, 0.0, 0.9, 0.0], dtype=np.float64) + frame_times = np.array([0.0, 1.0, 3.0, 5.0, 7.0], dtype=np.float64) + + boundaries = detect_boundaries(novelty, frame_times, 10.0, min_segment_seconds=4.0) + + assert boundaries == [0.0, 5.0] + + +def test_detect_boundaries_truncates_to_unique_increasing_boundaries() -> None: + """Ensure truncation preserves ordered unique boundary times.""" + novelty = np.tile(np.array([0.0, 1.0, 0.0], dtype=np.float64), 60) + frame_times = np.arange(len(novelty), dtype=np.float64) + + boundaries = detect_boundaries(novelty, frame_times, 200.0, min_segment_seconds=1.0) + + assert len(boundaries) == 20 + assert boundaries == sorted(set(boundaries)) + assert all(boundaries[index] < boundaries[index + 1] for index in range(len(boundaries) - 1)) + + +def test_detect_boundaries_accepts_right_edge_peak_when_not_near_duration_end() -> None: + """Ensure the last novelty frame can be used as a boundary.""" + novelty = np.array([0.0, 0.1, 0.1, 0.1, 0.9], dtype=np.float64) + frame_times = np.array([0.0, 1.0, 2.0, 3.0, 4.0], dtype=np.float64) + + boundaries = detect_boundaries(novelty, frame_times, 10.0, min_segment_seconds=2.0) + + assert boundaries == [0.0, 4.0]