Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions services/analysis-engine/tests/test_sections_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Tests for section utility functions."""

import logging
from unittest.mock import Mock

import pytest

from bandscope_analysis.sections.utils import validate_section


@pytest.mark.parametrize(
("section", "index", "expected_id"),
[
({"id": "custom-id", "label": "intro"}, 1, "custom-id"),
({"label": "intro"}, 2, "section-2"),
],
)
def test_validate_section_accepts_dict_sections(
section: dict[str, str], index: int, expected_id: str
) -> None:
"""Verify valid section dictionaries return stable ids without warnings."""
mock_logger = Mock(spec=logging.Logger)

result = validate_section(section, index=index, logger=mock_logger)

assert result == expected_id
mock_logger.warning.assert_not_called()


def test_validate_section_warns_for_invalid_section_type() -> None:
"""Verify invalid sections log type context and return a generated id."""
mock_logger = Mock(spec=logging.Logger)

result = validate_section(["intro"], index=3, logger=mock_logger)

assert result == "section-3"
mock_logger.warning.assert_called_once()
warning_context = " ".join(str(value) for value in mock_logger.warning.call_args.args)
assert "3" in warning_context
assert "list" in warning_context
72 changes: 72 additions & 0 deletions services/analysis-engine/tests/test_segmenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,75 @@ def test_segment_with_boundaries_handles_empty_short_and_failed_inputs() -> None

assert "bad combined boundary" in failed_sections[0]["confidence_notes"]
assert failed_boundaries == [(0.0, 20.0)]


def test_detect_boundaries_ignores_peak_indexes_without_frame_times() -> None:
"""Ensure peaks beyond frame_times length are skipped."""
novelty = np.array([0.0, 0.1, 0.2, 0.9, 0.2, 0.1, 0.0], dtype=np.float64)
frame_times = np.array([0.0, 1.0], dtype=np.float64)

boundaries = detect_boundaries(novelty, frame_times, 10.0)

assert boundaries == [0.0]


def test_detect_boundaries_ignores_peaks_near_end_of_duration() -> None:
"""Ensure boundaries are not created within one second of total duration."""
novelty = np.array([0.0, 0.1, 0.9, 0.1, 0.9, 0.1, 0.0], dtype=np.float64)
frame_times = np.array([0.0, 2.5, 5.0, 7.5, 9.5, 10.0, 10.5], dtype=np.float64)

boundaries = detect_boundaries(novelty, frame_times, 10.0)

assert boundaries == [0.0, 5.0]


def test_detect_boundaries_threshold_floor_filters_small_peaks() -> None:
"""Ensure the adaptive threshold floor suppresses tiny local maxima."""
novelty = np.array([0.0, 0.01, 0.09, 0.01, 0.0], dtype=np.float64)
frame_times = np.array([0.0, 1.0, 2.0, 3.0, 4.0], dtype=np.float64)

boundaries = detect_boundaries(novelty, frame_times, 10.0)

assert boundaries == [0.0]


def test_detect_boundaries_flat_novelty_returns_start_only() -> None:
"""Ensure flat novelty does not produce boundaries."""
novelty = np.ones(10, dtype=np.float64) * 0.5
frame_times = np.arange(10, dtype=np.float64)

boundaries = detect_boundaries(novelty, frame_times, 10.0)

assert boundaries == [0.0]


def test_detect_boundaries_skips_candidates_too_close_to_previous_boundary() -> None:
"""Ensure candidate boundaries must satisfy the minimum segment length."""
novelty = np.array([0.0, 0.9, 0.0, 0.9, 0.0], dtype=np.float64)
frame_times = np.array([0.0, 1.0, 3.0, 5.0, 7.0], dtype=np.float64)

boundaries = detect_boundaries(novelty, frame_times, 10.0, min_segment_seconds=4.0)

assert boundaries == [0.0, 5.0]


def test_detect_boundaries_truncates_to_unique_increasing_boundaries() -> None:
"""Ensure truncation preserves ordered unique boundary times."""
novelty = np.tile(np.array([0.0, 1.0, 0.0], dtype=np.float64), 60)
frame_times = np.arange(len(novelty), dtype=np.float64)

boundaries = detect_boundaries(novelty, frame_times, 200.0, min_segment_seconds=1.0)

assert len(boundaries) == 20
assert boundaries == sorted(set(boundaries))
assert all(boundaries[index] < boundaries[index + 1] for index in range(len(boundaries) - 1))


def test_detect_boundaries_accepts_right_edge_peak_when_not_near_duration_end() -> None:
"""Ensure the last novelty frame can be used as a boundary."""
novelty = np.array([0.0, 0.1, 0.1, 0.1, 0.9], dtype=np.float64)
frame_times = np.array([0.0, 1.0, 2.0, 3.0, 4.0], dtype=np.float64)

boundaries = detect_boundaries(novelty, frame_times, 10.0, min_segment_seconds=2.0)

assert boundaries == [0.0, 4.0]