-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
253 lines (225 loc) · 11 KB
/
parser.py
File metadata and controls
253 lines (225 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""
parser.py
─────────
Protobuf wire-format parser for Warp agent task blobs.
Warp stores task data as Protocol Buffers binary blobs. This module
parses the wire format without a .proto schema, extracting conversation
steps in execution order.
Observed field layout:
Task message — field 1: task_id, field 2: title, field 5 (repeated): steps
Step message — field 1: step_id, field 2: user prompt, field 3: AI response,
field 4: tool call, field 5: tool result, field 15: thinking,
field 25: model selection
"""
# ── Wire-format primitives ────────────────────────────────────────────────────
def read_varint(data, pos):
"""Read a variable-length integer from data at pos. Return (value, new_pos)."""
result, shift = 0, 0
while pos < len(data):
b = data[pos]; pos += 1
result |= (b & 0x7F) << shift
if not (b & 0x80):
break
shift += 7
return result, pos
def parse_fields(data):
"""Parse a protobuf message into a list of (field_number, wire_type, value) tuples."""
pos, fields = 0, []
while pos < len(data):
try:
tag, pos = read_varint(data, pos)
fn, wt = tag >> 3, tag & 0x7
if wt == 0:
v, pos = read_varint(data, pos)
fields.append((fn, 0, v))
elif wt == 2:
ln, pos = read_varint(data, pos)
if pos + ln > len(data):
break
fields.append((fn, 2, data[pos:pos + ln]))
pos += ln
elif wt == 1:
fields.append((fn, 1, data[pos:pos + 8])); pos += 8
elif wt == 5:
fields.append((fn, 5, data[pos:pos + 4])); pos += 4
else:
break
except Exception:
break
return fields
def decode_utf8(data):
"""Decode bytes to UTF-8 string, return None on failure."""
try:
return data.decode("utf-8")
except Exception:
return None
def decode_utf8_lossy(data):
"""Decode bytes to UTF-8, replacing invalid sequences."""
try:
return data.decode("utf-8", errors="replace")
except Exception:
return None
# ── Step extraction ───────────────────────────────────────────────────────────
def extract_steps(blob):
"""
Parse a task protobuf blob and return an ordered list of step dicts.
Each step is one of:
{"type": "user_prompt", "text": str, "context": dict}
{"type": "response", "text": str}
{"type": "tool_call", "call_id": str, "command": str|None, "files": list, "raw_input": str|None}
{"type": "tool_result", "call_id": str, "command": str|None, "output": str|None}
{"type": "thinking", "text": str}
{"type": "model", "id": str, "name": str}
"""
steps = []
task_fields = parse_fields(bytes(blob))
for fn, wt, val in task_fields:
if fn != 5 or wt != 2:
continue
step_fields = parse_fields(val)
sfnums = set(sfn for sfn, _, _ in step_fields)
# ── User prompt (field 2) ─────────────────────────────────────────
if 2 in sfnums:
text, context = None, {}
for sfn, swt, sv in step_fields:
if sfn == 2 and swt == 2:
inner = parse_fields(sv)
for ifn, iwt, iv in inner:
if ifn == 1 and iwt == 2:
text = decode_utf8(iv)
elif ifn == 2 and iwt == 2:
ctx_fields = parse_fields(iv)
for cfn, cwt, cv in ctx_fields:
if cfn == 1 and cwt == 2:
for dfn, dwt, dv in parse_fields(cv):
if dfn == 1 and dwt == 2:
context["pwd"] = decode_utf8(dv)
elif cfn == 2 and cwt == 2:
for dfn, dwt, dv in parse_fields(cv):
if dfn == 1 and dwt == 2:
context["os"] = decode_utf8(dv)
elif cfn == 3 and cwt == 2:
for dfn, dwt, dv in parse_fields(cv):
if dfn == 1 and dwt == 2:
context["shell"] = decode_utf8(dv)
elif dfn == 2 and dwt == 2:
context["shell_version"] = decode_utf8(dv)
if text:
steps.append({"type": "user_prompt", "text": text.strip(), "context": context})
# ── Thinking (field 15) ───────────────────────────────────────────
if 15 in sfnums:
chunks = []
for sfn, swt, sv in step_fields:
if sfn == 15 and swt == 2:
for ifn, iwt, iv in parse_fields(sv):
if iwt == 2:
t = decode_utf8(iv)
if t and len(t) > 10:
chunks.append(t)
if chunks:
steps.append({"type": "thinking", "text": "\n".join(chunks)})
# ── AI response (field 3) ─────────────────────────────────────────
if 3 in sfnums:
texts = []
for sfn, swt, sv in step_fields:
if sfn == 3 and swt == 2:
inner = parse_fields(sv)
for ifn, iwt, iv in inner:
if ifn == 1 and iwt == 2:
t = decode_utf8(iv)
if t and t.strip():
texts.append(t)
if texts:
steps.append({"type": "response", "text": "\n".join(texts)})
# ── Tool call (field 4) ───────────────────────────────────────────
if 4 in sfnums:
call_id, command, files, raw_input = None, None, [], None
for sfn, swt, sv in step_fields:
if sfn == 4 and swt == 2:
inner = parse_fields(sv)
for ifn, iwt, iv in inner:
if ifn == 1 and iwt == 2:
call_id = decode_utf8(iv)
elif ifn == 2 and iwt == 2:
for cfn, cwt, cv in parse_fields(iv):
if cfn == 1 and cwt == 2:
command = decode_utf8(cv)
elif ifn == 5 and iwt == 2:
t = decode_utf8(iv)
if t:
for line in t.split("\n"):
line = line.strip()
if line.startswith("/"):
files.append(line)
elif ifn == 4 and iwt == 2:
t = decode_utf8(iv)
if t:
raw_input = t[:2000]
if call_id:
steps.append({"type": "tool_call", "call_id": call_id,
"command": command, "files": files, "raw_input": raw_input})
# ── Tool result (field 5) ─────────────────────────────────────────
if 5 in sfnums and 4 not in sfnums:
call_id, command, output = None, None, None
for sfn, swt, sv in step_fields:
if sfn == 5 and swt == 2:
inner = parse_fields(sv)
for ifn, iwt, iv in inner:
if ifn == 1 and iwt == 2:
call_id = decode_utf8(iv)
elif ifn == 2 and iwt == 2:
for cfn, cwt, cv in parse_fields(iv):
if cfn == 3 and cwt == 2:
command = decode_utf8(cv)
elif cfn == 5 and cwt == 2:
for ofn, owt, ov in parse_fields(cv):
if ofn == 1 and owt == 2:
output = decode_utf8_lossy(ov)
elif ifn == 5 and iwt == 2:
file_output = _extract_file_result(iv)
if file_output:
output = file_output
if call_id:
steps.append({"type": "tool_result", "call_id": call_id,
"command": command, "output": output})
# ── Model switch (field 25) ───────────────────────────────────────
if 25 in sfnums:
model_id, model_name = None, None
for sfn, swt, sv in step_fields:
if sfn == 25 and swt == 2:
inner = parse_fields(sv)
for ifn, iwt, iv in inner:
if ifn == 1 and iwt == 2:
model_id = decode_utf8(iv)
elif ifn == 2 and iwt == 2:
model_name = decode_utf8(iv)
if model_id or model_name:
steps.append({"type": "model", "id": model_id or "",
"name": model_name or model_id or ""})
return steps
def _extract_file_result(data):
"""Try to extract readable text from a file-read result blob."""
texts = []
for fn, wt, val in parse_fields(data):
if wt == 2:
inner = parse_fields(val)
for ifn, iwt, iv in inner:
if ifn == 1 and iwt == 2:
nested = parse_fields(iv)
for nfn, nwt, nv in nested:
if nwt == 2:
t = decode_utf8_lossy(nv)
if t and len(t) > 5:
texts.append(t)
return "\n".join(texts) if texts else None
def extract_task_meta(blob):
"""Get (task_id, title) from a task blob's top-level fields."""
task_id, title = None, "(untitled)"
for fn, wt, val in parse_fields(bytes(blob)):
if fn == 1 and wt == 2:
task_id = decode_utf8(val)
elif fn == 2 and wt == 2:
t = decode_utf8(val)
if t:
title = t
return task_id, title