Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 80 additions & 7 deletions tests/test_abandoned_meeting_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,15 @@ class TestCaseBDispatch:
"""Case B in `_poll_once` must snapshot + dispatch when the abandoned
accumulator looks real, and fall back to delete-only when it doesn't."""

def test_case_b_kicks_off_abandoned_generation_when_threshold_met(
def test_case_b_defers_then_fires_when_new_meeting_proves_real(
self, fake_origin, synthetic_wal, isolated_cache, isolated_vault
):
"""The 2026-06-30 single-meeting-flap fix changed Case B from
generate-immediately to defer-until-proven. When meeting B is detected,
meeting A's note must NOT fire yet — it's parked in `_pending_abandoned`.
Only once meeting B accumulates real content of its own (proving a
genuine back-to-back boundary, not an ID flap) does A's note fire.
"""
engine = ZoomEngine()
cfg = engine._get_cfg()

Expand All @@ -299,13 +305,30 @@ def test_case_b_kicks_off_abandoned_generation_when_threshold_met(
pre_count = len(engine._accumulated)
assert pre_count >= 5, "fixture sanity: accumulator must have meeting A's entries"

# Tick 3: Case B — meeting B detected. Patch the abandoned-generation
# entry point so we can assert it was called with meeting A's data.
b_entries = list(_make_realistic_accumulator("MEETING-B==", count=2).values())
# Tick 3: Case B — meeting B detected with only a couple of entries.
# Generation must NOT fire (could be a flap); A is parked pending.
b_entries_small = list(_make_realistic_accumulator("MEETING-B==", count=2).values())
with patch.object(ZoomEngine, "_trigger_abandoned_generation") as mock_dispatch:
_drive_tick(
engine, fake_origin, cfg, wal=synthetic_wal,
mtime=1010.0, size=12288, entries=b_entries,
mtime=1010.0, size=12288, entries=b_entries_small,
detected_meeting_id="MEETING-B==",
)
mock_dispatch.assert_not_called()

assert engine._pending_abandoned is not None, "meeting A must be parked"
assert engine._pending_abandoned["meeting_id"] == "MEETING-A=="
parked = engine._pending_abandoned["snapshot"]
assert len(parked) >= 5
assert all(e["meeting_id"] == "MEETING-A==" for e in parked.values())

# Tick 4: meeting B accumulates real content of its own — a genuine
# back-to-back boundary. Now A's deferred note fires, exactly once.
b_entries_full = list(_make_realistic_accumulator("MEETING-B==", count=6).values())
with patch.object(ZoomEngine, "_trigger_abandoned_generation") as mock_dispatch:
_drive_tick(
engine, fake_origin, cfg, wal=synthetic_wal,
mtime=1015.0, size=16384, entries=b_entries_full,
detected_meeting_id="MEETING-B==",
)
mock_dispatch.assert_called_once()
Expand All @@ -317,14 +340,64 @@ def test_case_b_kicks_off_abandoned_generation_when_threshold_met(
assert len(snapshot) >= 5
assert all(e["meeting_id"] == "MEETING-A==" for e in snapshot.values())

# And the new meeting's accumulator should start fresh (the existing
# Case B clear-and-switch behavior must not regress).
assert engine._pending_abandoned is None, "pending must clear after firing"

# The new meeting's accumulator holds only meeting B's data.
with engine._accumulated_lock:
ids_after = {e.get("meeting_id") for e in engine._accumulated.values()}
assert ids_after <= {"MEETING-B=="}, (
f"new meeting's accumulator polluted with abandoned data: {ids_after}"
)

def test_case_b_flap_back_restores_without_generating(
self, fake_origin, synthetic_wal, isolated_cache, isolated_vault
):
"""The core 2026-06-30 fix: a single meeting whose ID oscillates
between two values must NOT generate a premature note. When the score
swings A -> B (B never proves real) -> A, the flap-back guard restores
meeting A and discards the phantom B, generating nothing.
"""
engine = ZoomEngine()
cfg = engine._get_cfg()

# Anchor + populate meeting A.
_drive_tick(
engine, fake_origin, cfg, wal=synthetic_wal,
mtime=1000.0, size=4096, entries=[],
detected_meeting_id="MEETING-A==",
)
a_entries = list(_make_realistic_accumulator("MEETING-A==", count=6).values())
_drive_tick(
engine, fake_origin, cfg, wal=synthetic_wal,
mtime=1005.0, size=8192, entries=a_entries,
detected_meeting_id="MEETING-A==",
)

with patch.object(ZoomEngine, "_trigger_abandoned_generation") as mock_dispatch:
# Flip to phantom B (1 entry) — A parked, nothing generated.
b_entries = list(_make_realistic_accumulator("MEETING-B==", count=1).values())
_drive_tick(
engine, fake_origin, cfg, wal=synthetic_wal,
mtime=1010.0, size=12288, entries=b_entries,
detected_meeting_id="MEETING-B==",
)
assert engine._pending_abandoned is not None

# Score swings back to A before B ever became real → flap-back.
_drive_tick(
engine, fake_origin, cfg, wal=synthetic_wal,
mtime=1015.0, size=16384, entries=a_entries,
detected_meeting_id="MEETING-A==",
)
mock_dispatch.assert_not_called()

assert engine._pending_abandoned is None, "flap-back must clear pending"
_, _, _, tracked = engine._read_tracking()
assert tracked == "MEETING-A==", "flap-back must restore meeting A as tracked"
with engine._accumulated_lock:
restored_ids = {e.get("meeting_id") for e in engine._accumulated.values()}
assert "MEETING-A==" in restored_ids, "meeting A's transcript must be restored"

def test_case_b_carries_forward_new_meetings_opening_lines(
self, fake_origin, synthetic_wal, isolated_cache, isolated_vault
):
Expand Down
10 changes: 7 additions & 3 deletions tests/test_back_to_back_meetings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,12 @@ def test_boundary_persists_until_tracking_is_set(
import time as _time
base_mtime = _time.time()

# Pre-stamp a boundary as if a previous meeting just finished.
engine._last_completed_boundary = ("just_ended_id_AA", 12 * 3600)
# Pre-stamp a boundary as if a previous meeting just finished — a
# FRESH set_at, since the just-ended meeting whose data still lingers
# is the exact case the boundary protects against. (A stale boundary
# is expired by `_active_boundary`; see TestBoundaryExpiry.)
boundary_set_at = _time.time()
engine._last_completed_boundary = ("just_ended_id_AA", 12 * 3600, boundary_set_at)

# Tick 1: anchor.
_drive_tick(engine, fake_origin, cfg, wal=synthetic_wal,
Expand All @@ -1048,7 +1052,7 @@ def test_boundary_persists_until_tracking_is_set(
"the next tick has no protection against detecting the "
"just-ended meeting"
)
assert engine._last_completed_boundary == ("just_ended_id_AA", 12 * 3600)
assert engine._last_completed_boundary == ("just_ended_id_AA", 12 * 3600, boundary_set_at)


# ── Engine: collect-entries self-heal when active_meeting_id is empty ────
Expand Down
82 changes: 82 additions & 0 deletions tests/test_engine_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import os
import shutil
import time
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -440,3 +441,84 @@ def test_non_blocked_id_still_activates(self, fake_origin, tmp_path, isolated_ca
assert engine._get_state() == EngineState.ACTIVE, (
"a non-blocked meeting_id must still trigger IDLE→ACTIVE"
)


class TestBoundaryExpiry:
"""Regression guard for the 2026-07-02 back-to-back deadlock.

`_last_completed_boundary` carries a date-less seconds-since-midnight
freshness floor. Because the app runs 24/7 as a menu-bar process, a
boundary set by an earlier/previous-day meeting could outlive its purpose
and permanently suppress detection of a later meeting whose timestamps are
numerically below the stale floor — detection returned None every tick,
the empty tracked id never upgraded, and back-to-back meetings merged into
one note. The fix expires the boundary after `_BOUNDARY_EXPIRY_SECS`.
"""

def test_active_boundary_expires_stale_entry(self):
engine = ZoomEngine()
# Boundary older than the expiry window → dropped and cleared.
engine._last_completed_boundary = (
"OLD_MEETING_ID_xxxxxxxx==", 60000,
time.time() - (zoom_engine._BOUNDARY_EXPIRY_SECS + 60),
)
assert engine._active_boundary() is None
assert engine._last_completed_boundary is None

def test_active_boundary_keeps_fresh_entry(self):
engine = ZoomEngine()
fresh = ("RECENT_MEETING_xxxxxxxx==", 34200, time.time())
engine._last_completed_boundary = fresh
assert engine._active_boundary() == fresh
assert engine._last_completed_boundary == fresh

def _make_cfg(self):
engine = ZoomEngine()
return engine._get_cfg()

def test_stale_boundary_not_passed_to_detection(self, fake_origin, tmp_path, isolated_cache):
"""A stale boundary must NOT be handed to detect_active_meeting_id —
otherwise its floor filters out the genuinely-new meeting and tracking
stays empty forever."""
wal = tmp_path / "stale-boundary.sqlite3-wal"
wal.write_bytes(b"x" * 1024)
engine = ZoomEngine()
cfg = self._make_cfg()
new_id = "NEW9amMeeting_xxxxxxxxxx=="

# A boundary left over from a meeting hours ago, with a floor (16:40 =
# 60000s) numerically ABOVE this morning meeting's timestamps.
engine._last_completed_boundary = (
"PRIOR_MEETING_xxxxxxxxxx==", 60000,
time.time() - (zoom_engine._BOUNDARY_EXPIRY_SECS + 60),
)

seen = []

def rec_detect(wal_path, *, exclude_meeting_id=None, freshness_floor_secs=None):
seen.append((exclude_meeting_id, freshness_floor_secs))
return new_id

size = wal.stat().st_size
# Tick 1: anchor (IDLE).
os.utime(wal, (1000.0, 1000.0))
with patch.object(zoom_engine, "find_wal", return_value=wal), \
patch.object(zoom_engine, "detect_active_meeting_id", side_effect=rec_detect):
engine._poll_once(fake_origin, cfg, idle_threshold=cfg.idle_threshold_secs)

# Tick 2: size changed → detect runs, boundary should already be expired.
wal.write_bytes(b"x" * (size + 1))
os.utime(wal, (1005.0, 1005.0))
with patch.object(zoom_engine, "find_wal", return_value=wal), \
patch.object(zoom_engine, "detect_active_meeting_id", side_effect=rec_detect):
engine._poll_once(fake_origin, cfg, idle_threshold=cfg.idle_threshold_secs)

assert seen, "detect_active_meeting_id was never called"
# The last detection call must have been made with NO stale floor.
assert seen[-1] == (None, None), (
f"stale boundary leaked into detection: {seen[-1]}"
)
assert engine._get_state() == EngineState.ACTIVE
assert engine._read_tracking()[3] == new_id, (
"engine failed to upgrade to the concrete meeting id"
)
Loading