-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtranscriber.py
More file actions
157 lines (133 loc) · 5.49 KB
/
transcriber.py
File metadata and controls
157 lines (133 loc) · 5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
Auto-transcription: record key down/up with timestamps and produce combo syntax.
Used when transcribe mode is on: start with f, stop with Esc; output goes to Inputs field.
"""
from __future__ import annotations
from typing import Callable
def _parse_valid_keys(keys_str: str) -> set[str]:
"""Parse comma-separated keys, normalize to lowercase, return set."""
out: set[str] = set()
for part in (keys_str or "").split(","):
k = part.strip().lower()
if k:
out.add(k)
return out
class Transcriber:
"""
Records key_down/key_up with timestamps and builds combo syntax string.
State: idle | recording. Only key_down/key_up when recording (and key in valid_keys).
"""
def __init__(
self,
*,
start_key: str = "f",
merge_threshold_s: float = 0.2,
hold_threshold_s: float = 0.20,
min_wait_s: float = 0.1,
on_stop: Callable[[str], None] | None = None,
):
self.start_key = start_key.lower()
self.merge_threshold_s = merge_threshold_s
self.hold_threshold_s = hold_threshold_s
self.min_wait_s = min_wait_s
self.on_stop = on_stop
self._valid_keys: set[str] = set()
self._state: str = "idle" # idle | recording
self._tokens: list[str] = []
# Run: same key within merge_threshold gets merged into one key
self._run_key: str | None = None
self._run_start: float = 0.0
self._run_key_down_count: int = 0
# After key_up we keep this so next key_down can emit wait (next_down - last_run_start)
self._last_run_start: float = 0.0
def set_valid_keys(self, keys_str: str) -> None:
self._valid_keys = _parse_valid_keys(keys_str or "")
def is_valid_key(self, key: str) -> bool:
return (key or "").strip().lower() in self._valid_keys
def is_recording(self) -> bool:
return self._state == "recording"
def start(self) -> None:
self._state = "recording"
self._tokens = []
self._run_key = None
self._run_start = 0.0
self._run_key_down_count = 0
self._last_run_start = 0.0
def stop(self) -> str:
transcript = self._build_transcript()
self._state = "idle"
self._tokens = []
self._run_key = None
if self.on_stop and transcript is not None:
self.on_stop(transcript)
return transcript or ""
def _emit_wait(self, duration_s: float) -> None:
if duration_s < self.min_wait_s:
return
# don't emit a wait immediately after a hold
if self._tokens and self._tokens[-1].startswith("hold("):
return
sec = round(duration_s, 2)
self._tokens.append(f"wait:{sec}s")
def _emit_key(self, key: str) -> None:
self._tokens.append(key)
def _emit_hold(self, key: str, duration_s: float) -> None:
sec = round(duration_s, 2)
self._tokens.append(f"hold({key}, {sec}s)")
def _build_transcript(self) -> str:
if self._run_key and self._run_key_down_count >= 1:
self._tokens.append(self._run_key)
return ", ".join(self._tokens)
def key_down(self, key: str, t: float) -> None:
if self._state != "recording":
return
key = (key or "").strip().lower()
if not key or key not in self._valid_keys:
return
# Wait = time from first key-down of previous run to this key-down
if self._run_key is not None:
wait_duration = t - self._run_start
if key == self._run_key:
if t - self._run_start <= self.merge_threshold_s:
self._run_key_down_count += 1
return
# Same key but past threshold: emit current run as key, wait, then new run
self._emit_key(self._run_key)
if wait_duration >= self.min_wait_s:
self._emit_wait(wait_duration)
self._run_start = t
self._run_key_down_count = 1
return
# Different key: flush current run as key (no key_up so not hold), then wait, then new run
self._emit_key(self._run_key)
if wait_duration >= self.min_wait_s:
self._emit_wait(wait_duration)
self._run_key = key
self._run_start = t
self._last_run_start = t
self._run_key_down_count = 1
return
# No current run; maybe we have a previous run (just had key_up)
if self._run_key is None and self._last_run_start > 0:
wait_duration = t - self._last_run_start
if wait_duration >= self.min_wait_s:
self._emit_wait(wait_duration)
self._run_key = key
self._run_start = t
self._last_run_start = t
self._run_key_down_count = 1
def key_up(self, key: str, t: float) -> None:
if self._state != "recording":
return
key = (key or "").strip().lower()
if not key or key not in self._valid_keys:
return
if self._run_key != key:
return
duration_s = t - self._run_start
if duration_s >= self.hold_threshold_s:
self._emit_hold(key, duration_s)
else:
self._emit_key(key)
self._last_run_start = self._run_start
self._run_key = None