Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions claude-code/hooks/test_stop_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
"""
Regression tests for the Stop-event duplicate-count bug in the claude-code hook.

Background
----------
Claude Code fires multiple `Stop` events per user turn (one per agent turn).
`process_stop_event` rebuilds the exchange window from the persisted audit log:
it resets the window on each `UserPromptSubmit` and appends every event after.
Commit 16d1ffe removed the post-send prune of already-shipped events (to fix a
`get_recent_user_prompts_for_session` lookback regression). With the prune gone,
every Stop re-shipped ALL PostToolUse events accumulated since the last
UserPromptSubmit, so one file edit got counted 1 + 2 + ... + N times across N
Stops.

These tests pin both halves of the contract:
- each PostToolUse tool_use is shipped exactly once across multiple Stops
(idempotent), and
- the user-prompt lookback still returns prior prompts (what 16d1ffe protected).
"""

import json
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch

import unbound


def _entry(session_id, hook_event_name, timestamp, **event_fields):
event = {'session_id': session_id, 'hook_event_name': hook_event_name}
event.update(event_fields)
return {'timestamp': timestamp, 'session_id': session_id, 'event': event}


class StopDedupTestBase(unittest.TestCase):
def setUp(self):
self.tmp = Path(tempfile.mkdtemp())
self.audit_log = self.tmp / "agent-audit.log"
self._audit_patcher = patch.object(unbound, "AUDIT_LOG", self.audit_log)
self._audit_patcher.start()
self.addCleanup(self._audit_patcher.stop)

# Capture every exchange handed to send_to_api; pretend the send
# succeeded so the shipped-marker path runs.
self.sent_exchanges = []

def _fake_send(exchange, api_key):
self.sent_exchanges.append(exchange)
return True

self._send_patcher = patch.object(unbound, "send_to_api", _fake_send)
self._send_patcher.start()
self.addCleanup(self._send_patcher.stop)

# Identity probe shells out / reads config; stub it.
self._identity_patcher = patch.object(
unbound, "build_account_identity", lambda probe=False: {}
)
self._identity_patcher.start()
self.addCleanup(self._identity_patcher.stop)

def _write_log(self, entries):
unbound.save_logs(entries)

def _all_shipped_tool_paths(self):
"""Flatten every tool_use file_path shipped across all sends."""
paths = []
for exchange in self.sent_exchanges:
for msg in exchange.get('messages', []):
for tu in msg.get('tool_use', []) or []:
paths.append(tu.get('tool_input', {}).get('file_path'))
return paths


class TestStopEventIdempotent(StopDedupTestBase):
"""Multiple Stops in one turn must ship each tool_use exactly once."""

def _turn_with_two_edits(self):
sid = "sess-1"
return [
_entry(sid, 'UserPromptSubmit', '2026-06-15T00:00:00Z',
prompt='please edit two files'),
_entry(sid, 'PostToolUse', '2026-06-15T00:00:01Z',
tool_name='Edit', tool_input={'file_path': '/a.py'},
tool_response={'ok': True}),
_entry(sid, 'PostToolUse', '2026-06-15T00:00:02Z',
tool_name='Edit', tool_input={'file_path': '/b.py'},
tool_response={'ok': True}),
]

def _stop_event(self, sid="sess-1", msg="done"):
return {
'session_id': sid,
'hook_event_name': 'Stop',
'transcript_path': 'undefined',
'last_assistant_message': msg,
}

def test_single_stop_ships_each_tool_once(self):
self._write_log(self._turn_with_two_edits())

unbound.process_stop_event(self._stop_event(), api_key='k')

paths = self._all_shipped_tool_paths()
self.assertEqual(sorted(paths), ['/a.py', '/b.py'])

def test_repeated_stops_do_not_reship(self):
# The bug: 2nd/3rd Stop in the same turn re-ship the same tool_uses.
self._write_log(self._turn_with_two_edits())

for _ in range(3):
unbound.process_stop_event(self._stop_event(), api_key='k')

paths = self._all_shipped_tool_paths()
# Exactly one tool_use per edited file across ALL Stops — no inflation.
self.assertEqual(sorted(paths), ['/a.py', '/b.py'])

def test_new_tool_after_first_stop_is_shipped_once(self):
# First Stop ships /a.py. Then a new edit lands and a second Stop fires:
# /b.py must ship, /a.py must NOT ship again.
sid = "sess-1"
self._write_log([
_entry(sid, 'UserPromptSubmit', '2026-06-15T00:00:00Z', prompt='go'),
_entry(sid, 'PostToolUse', '2026-06-15T00:00:01Z',
tool_name='Edit', tool_input={'file_path': '/a.py'},
tool_response={'ok': True}),
])
unbound.process_stop_event(self._stop_event(), api_key='k')

# A new PostToolUse arrives (as main() would append it), then Stop again.
unbound.append_to_audit_log(
_entry(sid, 'PostToolUse', '2026-06-15T00:00:03Z',
tool_name='Edit', tool_input={'file_path': '/b.py'},
tool_response={'ok': True})
)
unbound.process_stop_event(self._stop_event(), api_key='k')

paths = self._all_shipped_tool_paths()
self.assertEqual(sorted(paths), ['/a.py', '/b.py'])

def test_stop_with_nothing_new_does_not_send(self):
# A turn with only a prompt and an already-shipped edit: a redundant
# Stop must not produce any send at all.
self._write_log(self._turn_with_two_edits())
unbound.process_stop_event(self._stop_event(), api_key='k')
sends_after_first = len(self.sent_exchanges)

unbound.process_stop_event(self._stop_event(), api_key='k')
self.assertEqual(len(self.sent_exchanges), sends_after_first)

def test_tool_free_turn_ships_exactly_once(self):
# A pure-conversation turn (no PostToolUse) must still ship once — and
# only once — across multiple Stops. Guards against over-narrowing the
# send to tool-bearing turns only.
sid = "sess-1"
self._write_log([
_entry(sid, 'UserPromptSubmit', '2026-06-15T00:00:00Z',
prompt='just answer, no tools'),
])

for _ in range(3):
unbound.process_stop_event(self._stop_event(msg='here is the answer'),
api_key='k')

self.assertEqual(len(self.sent_exchanges), 1)


class TestLookbackStillWorks(StopDedupTestBase):
"""The thing 16d1ffe protected: prior user prompts remain in the log and are
returned by get_recent_user_prompts_for_session after Stops are processed."""

def test_prior_prompts_survive_stop_processing(self):
sid = "sess-1"
self._write_log([
_entry(sid, 'UserPromptSubmit', '2026-06-15T00:00:00Z', prompt='first prompt'),
_entry(sid, 'PostToolUse', '2026-06-15T00:00:01Z',
tool_name='Edit', tool_input={'file_path': '/a.py'},
tool_response={'ok': True}),
_entry(sid, 'UserPromptSubmit', '2026-06-15T00:01:00Z', prompt='second prompt'),
_entry(sid, 'PostToolUse', '2026-06-15T00:01:01Z',
tool_name='Edit', tool_input={'file_path': '/b.py'},
tool_response={'ok': True}),
])

stop = {
'session_id': sid,
'hook_event_name': 'Stop',
'transcript_path': 'undefined',
'last_assistant_message': 'done',
}
# Process a couple of Stops, which is when the old code wiped the log.
unbound.process_stop_event(stop, api_key='k')
unbound.process_stop_event(stop, api_key='k')

prompts = unbound.get_recent_user_prompts_for_session(sid, n=5)
self.assertEqual(prompts, ['first prompt', 'second prompt'])

def test_shipped_marker_does_not_drop_postooluse_entries(self):
# Marking entries shipped must keep them in the log (size unchanged),
# only annotate them — otherwise rotation/lookback assumptions break.
sid = "sess-1"
self._write_log([
_entry(sid, 'UserPromptSubmit', '2026-06-15T00:00:00Z', prompt='go'),
_entry(sid, 'PostToolUse', '2026-06-15T00:00:01Z',
tool_name='Edit', tool_input={'file_path': '/a.py'},
tool_response={'ok': True}),
])
before = len(unbound.load_existing_logs())

stop = {
'session_id': sid,
'hook_event_name': 'Stop',
'transcript_path': 'undefined',
'last_assistant_message': 'done',
}
unbound.process_stop_event(stop, api_key='k')

after_logs = unbound.load_existing_logs()
# Nothing is dropped — entries are annotated in place, not pruned.
self.assertEqual(len(after_logs), before)
# The PostToolUse entry is now flagged shipped.
flagged_tools = [
l for l in after_logs
if l.get(unbound.SHIPPED_FLAG)
and l.get('event', {}).get('hook_event_name') == 'PostToolUse'
]
self.assertEqual(len(flagged_tools), 1)


if __name__ == '__main__':
unittest.main()
92 changes: 85 additions & 7 deletions claude-code/hooks/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
PRETOOL_USER_MESSAGES_LIMIT = 5
AUDIT_LOG_TOTAL_LIMIT = 100

# Per-entry flag set on PostToolUse audit-log entries once their tool_use has
# been shipped to the proxy on a Stop event. Claude Code fires multiple Stop
# events per turn; this flag lets a later Stop skip already-shipped tool_uses
# instead of re-sending them, while the entries stay in the log so the
# user-prompt lookback (get_recent_user_prompts_for_session) keeps working.
SHIPPED_FLAG = 'unbound_shipped'

APPROVAL_TIMEOUT = 4 * 60 * 60

DISCOVERY_DEBOUNCE_SECONDS = 24 * 3600
Expand Down Expand Up @@ -1126,29 +1133,85 @@ def cleanup_old_logs():
save_logs(logs[-AUDIT_LOG_TOTAL_LIMIT:])


def _event_name_of(log: Dict) -> Optional[str]:
if 'event' in log:
return log.get('event', {}).get('hook_event_name')
return log.get('hook_event_name')


def _log_identity(log: Dict) -> tuple:
"""Stable identity for an audit-log entry, used to re-find the entries this
Stop shipped after a fresh re-read (a concurrent Stop may have appended new
lines since we loaded the log)."""
event = log.get('event', {}) if 'event' in log else log
return (
log.get('timestamp'),
log.get('session_id') or event.get('session_id'),
event.get('hook_event_name'),
event.get('tool_name'),
)


def _persist_shipped_flags(session_id: Optional[str], shipped_logs: List[Dict]) -> None:
"""Re-read the audit log and stamp SHIPPED_FLAG on the entries that were
just shipped, then rewrite it. Re-reading (instead of saving our stale
in-memory copy) avoids clobbering entries a concurrent hook appended after
we loaded the log."""
if not shipped_logs:
return
shipped_ids = {_log_identity(log) for log in shipped_logs}
Comment on lines +1155 to +1162

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The session_id parameter is accepted but never referenced inside the function body. _log_identity already embeds the session ID in the identity tuple, so the match is implicitly session-scoped. The dead parameter adds confusion about whether session-scoping is intended as an extra guard and makes callers think they need to supply it. Consider dropping it from the signature (and the call-site).

Suggested change
def _persist_shipped_flags(session_id: Optional[str], shipped_logs: List[Dict]) -> None:
"""Re-read the audit log and stamp SHIPPED_FLAG on the entries that were
just shipped, then rewrite it. Re-reading (instead of saving our stale
in-memory copy) avoids clobbering entries a concurrent hook appended after
we loaded the log."""
if not shipped_logs:
return
shipped_ids = {_log_identity(log) for log in shipped_logs}
def _persist_shipped_flags(shipped_logs: List[Dict]) -> None:
"""Re-read the audit log and stamp SHIPPED_FLAG on the entries that were
just shipped, then rewrite it. Re-reading (instead of saving our stale
in-memory copy) avoids clobbering entries a concurrent hook appended after
we loaded the log."""
if not shipped_logs:
return
shipped_ids = {_log_identity(log) for log in shipped_logs}

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

fresh_logs = load_existing_logs()
changed = False
for log in fresh_logs:
if log.get(SHIPPED_FLAG):
continue
if _log_identity(log) in shipped_ids:
log[SHIPPED_FLAG] = True
changed = True
if changed:
save_logs(fresh_logs)


def process_stop_event(event: Dict, api_key: str):
session_id = event.get('session_id')
transcript_path = event.get('transcript_path')
last_assistant_message = event.get('last_assistant_message')

logs = load_existing_logs()

session_events = []
# PostToolUse entries (the same dict objects held in `logs`) that this Stop
# is about to ship. Marked shipped only after send_to_api succeeds.
pending_tool_logs = []
# The current turn's UserPromptSubmit entry. Doubles as the per-turn shipped
# anchor so a no-tool conversation turn ships exactly once, not once per Stop.
prompt_log = None
current_conversation_started = False
user_prompt_timestamp = None

for log in logs:
log_session_id = log.get('session_id') or log.get('event', {}).get('session_id')

if log_session_id == session_id:
event_name = log.get('event', {}).get('hook_event_name') if 'event' in log else log.get('hook_event_name')
event_name = _event_name_of(log)

if event_name == 'UserPromptSubmit':
# Reset the window on each new prompt. The anchor entries
# (UserPromptSubmit, SessionStart) stay in the log untouched so
# the user-prompt lookback keeps finding prior prompts.
session_events = [log]
pending_tool_logs = []
prompt_log = log
current_conversation_started = True
user_prompt_timestamp = log.get('timestamp')
elif current_conversation_started:
# Skip PostToolUse entries already shipped by an earlier Stop in
# this same turn — re-shipping them is the duplicate-count bug.
if event_name == 'PostToolUse' and log.get(SHIPPED_FLAG):
continue
session_events.append(log)
if event_name == 'PostToolUse':
pending_tool_logs.append(log)

transcript_assistant_messages = []
transcript_usage = None
Expand All @@ -1166,6 +1229,15 @@ def process_stop_event(event: Dict, api_key: str):
# the cached session model is wrong). Fall back to the audit log otherwise.
session_model = transcript_model or _extract_session_model(logs, session_id) or 'auto'

# Idempotency guard: Claude Code fires a Stop per agent turn, so a single
# user turn produces several Stop events. Re-sending already-shipped
# tool_uses is the root cause of inflated policy-match counts. Send only
# when this Stop has something new: an unshipped PostToolUse, or — for a
# tool-free conversation turn — a turn whose prompt anchor hasn't shipped yet.
turn_already_shipped = prompt_log is not None and prompt_log.get(SHIPPED_FLAG)
if not pending_tool_logs and turn_already_shipped:
return

exchange = build_llm_exchange(
session_events,
stop_assistant_message=last_assistant_message,
Expand All @@ -1174,8 +1246,14 @@ def process_stop_event(event: Dict, api_key: str):
usage=transcript_usage,
)

if exchange:
send_to_api(exchange, api_key)
if exchange and send_to_api(exchange, api_key):
# Persist the shipped markers so later Stops in this turn skip what was
# already sent. The entries stay in the log for the user-prompt lookback.
for log in pending_tool_logs:
log[SHIPPED_FLAG] = True
if prompt_log is not None:
prompt_log[SHIPPED_FLAG] = True
_persist_shipped_flags(session_id, pending_tool_logs + ([prompt_log] if prompt_log else []))


Comment on lines 1246 to 1258

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 TOCTOU race between concurrent Stops

The guard read (prompt_log.get(SHIPPED_FLAG)) and the flag write (_persist_shipped_flags) are not atomic. If two Stop events fire within the same small window — both load the log before either has finished writing — both will see an unshipped state, both pass the guard, and both call send_to_api, shipping the same exchange twice. The re-read inside _persist_shipped_flags only protects against a third process appending new lines, not against two racing Stops both clearing the guard simultaneously. For the common sequential-Stop case this is fully correct; the risk is limited to genuinely concurrent invocations, but it's worth documenting (or protecting with a lock/tmp-rename swap) so the behavior is explicit.

def get_api_key():
Expand Down
Loading