-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
468 lines (406 loc) · 18.5 KB
/
cli.py
File metadata and controls
468 lines (406 loc) · 18.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
"""CLI implementation for Delta Tool."""
import argparse
import sys
import os
import subprocess
import threading
import glob as globlib
import time
import json
from pathlib import Path
import logging
import core
from core import process_request, open_diff_report, undo_last_changes, get_available_backups, get_file_stats, config, APP_DATA_DIR
from application_state import (
state, init_app_state, load_fileset, save_fileset, to_relative,
setup_logging, load_presets, save_presets
)
def run_cli():
"""Run in CLI mode with subcommands."""
init_app_state()
# CLI uses standard logging to stderr, plus core logging setup
setup_logging(enable_gui=False)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)
parser = argparse.ArgumentParser(
description="Delta Tool - LLM-powered file modification",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Subcommands:
run Apply changes to files
ask Ask questions without modifying files
add Add files to saved state
remove Remove files from saved state
clear Clear all saved files
state Show current saved state
undo Undo last changes
backups List available backups
review Review changes from sessions
Examples:
delta run "Add error handling" main.py utils.py
delta ask "How does this work?" auth.py
delta add src/*.py
delta review --git
"""
)
subparsers = parser.add_subparsers(dest="command", help="Command to run")
# run command
run_parser = subparsers.add_parser("run", help="Apply changes to files")
run_parser.add_argument("prompt", help="The prompt to send to the LLM")
run_parser.add_argument("files", nargs="*", help="Files to include in context")
run_parser.add_argument("-m", "--model", help="Model to use")
run_parser.add_argument("-t", "--tries", type=int, default=2, help="Max retry attempts")
run_parser.add_argument("-r", "--recurse", type=int, default=0, help="Recursion iterations")
run_parser.add_argument("-v", "--validate", help="Validation command")
run_parser.add_argument("--timeout", type=float, default=10, help="Validation timeout (seconds)")
run_parser.add_argument("--no-backup", action="store_true", help="Disable automatic backups")
run_parser.add_argument("--ambiguous-mode", choices=["replace_all", "ignore", "fail"], help="How to handle ambiguous matches")
run_parser.add_argument("--verify", action="store_true", help="Verify changes with LLM after application")
run_parser.add_argument("--dry-run", action="store_true", help="Perform a dry run (simulate changes and exit)")
run_parser.add_argument("--review", action="store_true", help="Open visual diff report after completion")
run_parser.add_argument("-V", "--verbosity", choices=["verbose", "low", "diff", "silent"], default="low", help="Output level")
run_parser.add_argument("-p", "--preset", help="Use files from a specific preset")
# plan command
plan_parser = subparsers.add_parser("plan", help="Create an implementation plan")
plan_parser.add_argument("prompt", help="The prompt to send to the LLM")
plan_parser.add_argument("files", nargs="*", help="Files to include in context")
plan_parser.add_argument("-m", "--model", help="Model to use")
plan_parser.add_argument("-p", "--preset", help="Use files from a specific preset")
# ask command
ask_parser = subparsers.add_parser("ask", help="Ask questions without modifying files")
ask_parser.add_argument("prompt", help="The question to ask")
ask_parser.add_argument("files", nargs="*", help="Files to include in context")
ask_parser.add_argument("-m", "--model", help="Model to use")
ask_parser.add_argument("-p", "--preset", help="Use files from a specific preset")
# add command
add_parser = subparsers.add_parser("add", help="Add files to saved state")
add_parser.add_argument("patterns", nargs="+", help="File patterns to add")
add_parser.add_argument("-p", "--preset", help="Target a specific preset instead of current context")
# remove command
remove_parser = subparsers.add_parser("remove", help="Remove files from saved state")
remove_parser.add_argument("patterns", nargs="+", help="File patterns to remove")
remove_parser.add_argument("-p", "--preset", help="Target a specific preset instead of current context")
# clear command
subparsers.add_parser("clear", help="Clear all saved files")
# state command
state_parser = subparsers.add_parser("state", help="Show current saved state")
state_parser.add_argument("-p", "--preset", help="Show state of a specific preset")
# undo command
subparsers.add_parser("undo", help="Undo last changes")
# backups command
subparsers.add_parser("backups", help="List available backups")
# review command
review_parser = subparsers.add_parser("review", help="Review changes")
review_parser.add_argument("range", nargs="?", help="Session range (e.g., 0..3)")
review_parser.add_argument("--git", action="store_true", help="Review uncommitted git changes")
# config command
config_parser = subparsers.add_parser("config", help="Manage configuration")
config_parser.add_argument("--system-prompt", help="Set the extra system prompt")
config_parser.add_argument("--path", action="store_true", help="Print the AppData folder path")
config_parser.add_argument("--open", action="store_true", help="Open the AppData folder")
config_parser.add_argument("--settings", action="store_true", help="Open settings.json in default text editor")
# askpass command (hidden helper)
askpass_parser = subparsers.add_parser("askpass", help=argparse.SUPPRESS)
askpass_parser.add_argument("prompt_text", nargs="?", default="Authentication Required")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Load saved file state
load_presets()
load_fileset()
# Create callback functions for CLI output
def output_func(msg: str, end: str = "\n", flush: bool = False):
print(msg, end=end, file=sys.stderr, flush=flush)
def stream_func(text: str, end: str = "", flush: bool = False):
print(text, end=end, flush=True)
# Handle commands
if args.command in ("run", "ask", "plan"):
ask_mode = args.command == "ask"
plan_mode = args.command == "plan"
# Collect files
files = []
explicit_source = False
if hasattr(args, 'files') and args.files:
explicit_source = True
for pattern in args.files:
matched = list(globlib.glob(pattern, recursive=True))
if matched:
for m in matched:
path = Path(m)
if path.is_file() and path.exists():
files.append(path)
else:
path = Path(pattern)
if path.exists() and path.is_file():
files.append(path)
else:
print(f"Warning: No files found for pattern: {pattern}", file=sys.stderr)
if hasattr(args, 'preset') and args.preset:
explicit_source = True
load_presets()
if args.preset in state.presets:
preset_files = state.presets[args.preset].get("files", [])
for f in preset_files:
path = Path(f)
if not any(existing.resolve() == path.resolve() for existing in files):
files.append(path)
print(f"Loaded {len(preset_files)} files from preset '{args.preset}'", file=sys.stderr)
else:
print(f"Error: Preset '{args.preset}' not found", file=sys.stderr)
sys.exit(1)
if not explicit_source:
# Use saved state
files = list(state.selected_files)
if files:
print(f"Using {len(files)} files from saved state", file=sys.stderr)
if not files and not ask_mode and not plan_mode:
print("Error: No files specified and no saved state", file=sys.stderr)
sys.exit(1)
# Set model if specified
if hasattr(args, 'model') and args.model:
config.model = args.model
file_strs = [str(f) for f in files]
check_callback = None
if hasattr(args, 'dry_run') and args.dry_run:
def dry_run_cb(counts, states):
print("\n[Dry Run] Analysis complete. Proposed changes:")
changes_found = False
for fname, cnt in counts.items():
if cnt > 0:
is_new = not Path(fname).exists()
status = "Created" if is_new else f"{cnt} modification(s)"
print(f" {fname}: {status}")
changes_found = True
if not changes_found:
print(" No changes detected.")
return False # Abort execution
check_callback = dry_run_cb
try:
result = process_request(
files=file_strs,
prompt=args.prompt,
history=[],
output_func=output_func,
stream_func=stream_func,
cancel_event=threading.Event(),
ask_mode=ask_mode,
plan_mode=plan_mode,
max_retries=getattr(args, 'tries', 2),
recursion_limit=getattr(args, 'recurse', 0),
validation_cmd=getattr(args, 'validate', "") or "",
validation_timeout=getattr(args, 'timeout', 60.0),
verify=getattr(args, 'verify', False),
validate_at_start=config.validate_at_start,
confirmation_callback=check_callback,
)
if result.get("success"):
if not plan_mode:
print("\n\nSuccess!")
if hasattr(args, 'review') and args.review and result.get("backup_id"):
open_diff_report(result.get("backup_id"))
else:
msg = result.get('message', 'Unknown error')
if check_callback and msg == "Cancelled by confirmation callback":
print("\nDry run completed (no changes applied).")
else:
print(f"\n\nFailed: {msg}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("\nCancelled", file=sys.stderr)
sys.exit(130)
except Exception as e:
print(f"\nError: {e}", file=sys.stderr)
sys.exit(1)
elif args.command == "add":
target_set = state.selected_files
is_preset = False
if args.preset:
load_presets()
if args.preset not in state.presets:
state.presets[args.preset] = {"files": []}
print(f"Created new preset: {args.preset}")
# Use a temporary set to handle logic
current_files = set(Path(f) for f in state.presets[args.preset].get("files", []))
target_set = current_files
is_preset = True
for pattern in args.patterns:
matched = list(globlib.glob(pattern, recursive=True))
if matched:
for m in matched:
path = to_relative(Path(m))
if path.is_file():
target_set.add(path)
print(f"Added: {path}")
else:
path = to_relative(Path(pattern))
if path.exists() and path.is_file():
target_set.add(path)
print(f"Added: {path}")
else:
print(f"Warning: No files found for: {pattern}", file=sys.stderr)
if is_preset:
state.presets[args.preset]["files"] = [str(f) for f in target_set]
save_presets()
print(f"\nTotal: {len(target_set)} files in preset '{args.preset}'")
else:
save_fileset()
print(f"\nTotal: {len(state.selected_files)} files in saved state")
elif args.command == "remove":
target_set = state.selected_files
is_preset = False
if args.preset:
load_presets()
if args.preset not in state.presets:
print(f"Error: Preset '{args.preset}' not found", file=sys.stderr)
sys.exit(1)
# Use a temporary set
current_files = set(Path(f) for f in state.presets[args.preset].get("files", []))
target_set = current_files
is_preset = True
for pattern in args.patterns:
matched = list(globlib.glob(pattern, recursive=True))
if matched:
for m in matched:
path = to_relative(Path(m))
if path in target_set:
target_set.discard(path)
print(f"Removed: {path}")
else:
path = to_relative(Path(pattern))
if path in target_set:
target_set.discard(path)
print(f"Removed: {path}")
if is_preset:
state.presets[args.preset]["files"] = [str(f) for f in target_set]
save_presets()
print(f"\nTotal: {len(target_set)} files in preset '{args.preset}'")
else:
save_fileset()
print(f"\nTotal: {len(state.selected_files)} files in saved state")
elif args.command == "clear":
state.selected_files.clear()
save_fileset()
print("Cleared all saved files")
elif args.command == "state":
target_files = state.selected_files
title = "Saved state"
if args.preset:
load_presets()
if args.preset not in state.presets:
print(f"Preset '{args.preset}' not found")
return
target_files = {Path(f) for f in state.presets[args.preset].get("files", [])}
title = f"Preset '{args.preset}'"
if target_files:
print(f"{title} ({len(target_files)} files):")
for f in sorted(target_files, key=lambda p: str(p).lower()):
lines, tokens, _ = get_file_stats(f)
print(f" {f} ({lines} lines, ~{tokens} tokens)")
else:
print(f"No files in {title.lower()}")
elif args.command == "undo":
backups = get_available_backups()
if backups:
latest = backups[0]["session_id"]
try:
undo_last_changes()
print(f"Undone changes from: {latest}")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
else:
print("No backups available")
elif args.command == "backups":
backups = get_available_backups()
if backups:
print(f"Available backups ({len(backups)}):")
for i, b in enumerate(backups[:20]):
print(f" {i}: {b}")
if len(backups) > 20:
print(f" ... and {len(backups) - 20} more")
else:
print("No backups available")
elif args.command == "review":
try:
if args.git:
open_diff_report(use_git=True)
elif args.range:
# Parse range like "0..3"
if ".." in args.range:
start, end = args.range.split("..", 1)
backups = get_available_backups()
start_idx = int(start) if start else 0
end_idx = int(end) if end else len(backups)
session_ids = [b["session_id"] for b in backups[start_idx:end_idx]]
if session_ids:
open_diff_report(session_ids)
else:
print("No backups in that range")
else:
# Single session
backups = get_available_backups()
idx = int(args.range)
if 0 <= idx < len(backups):
open_diff_report(backups[idx]["session_id"])
else:
print(f"Invalid backup index: {idx}")
else:
# Review latest
backups = get_available_backups()
if backups:
open_diff_report(backups[0]["session_id"])
else:
print("No backups available to review")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
elif args.command == "config":
if args.system_prompt:
config.set_extra_system_prompt(args.system_prompt)
print("Updated system prompt.")
if args.path:
print(str(APP_DATA_DIR))
if args.open:
path = str(APP_DATA_DIR)
core.open_path_in_os(path)
print(f"Opened: {path}")
if args.settings:
path = str(APP_DATA_DIR / "settings.json")
core.open_path_in_os(path)
print(f"Opened: {path}")
elif args.command == "askpass":
# Handle authentication request from git/ssh
ipc_dir = APP_DATA_DIR / "ipc"
ipc_dir.mkdir(parents=True, exist_ok=True)
req_id = f"auth_{int(time.time()*1000)}"
req_file = ipc_dir / f"{req_id}.req"
res_file = ipc_dir / f"{req_id}.res"
# Write request
try:
with open(req_file, "w", encoding="utf-8") as f:
json.dump({"id": req_id, "prompt": args.prompt_text}, f)
except Exception as e:
# Fallback if IPC fails
return
# Wait for response (timeout 60s)
start_time = time.time()
while time.time() - start_time < 60:
if res_file.exists():
try:
# Give writer a moment to close
time.sleep(0.1)
with open(res_file, "r", encoding="utf-8") as f:
data = json.load(f)
res_file.unlink() # Cleanup
print(data.get("password", ""))
sys.exit(0)
except Exception:
pass
time.sleep(0.5)
# Timeout
try:
if req_file.exists(): req_file.unlink()
except: pass
sys.exit(1)