diff --git a/changes/pr-551.md b/changes/pr-551.md new file mode 100644 index 000000000..8f206418b --- /dev/null +++ b/changes/pr-551.md @@ -0,0 +1 @@ +[ATS] persistent budget UI streaming diff --git a/pkgs/python-packages/flasks/budget_ui/budget_ui.py b/pkgs/python-packages/flasks/budget_ui/budget_ui.py index 93221d27c..a91fb836e 100644 --- a/pkgs/python-packages/flasks/budget_ui/budget_ui.py +++ b/pkgs/python-packages/flasks/budget_ui/budget_ui.py @@ -1,20 +1,23 @@ import argparse import json import os -import subprocess from pathlib import Path from flask import ( Flask, Blueprint, request, render_template, redirect, - url_for, session, Response, stream_with_context + url_for, session, Response, jsonify ) from werkzeug.utils import secure_filename +from run_store import RunStore + parser = argparse.ArgumentParser() parser.add_argument("--port", type=int, default=5000, help="Port to run the server on") parser.add_argument("--subdomain", type=str, default="", help="URL prefix (e.g., '/budget')") parser.add_argument("--config-file", type=str, default="~/configs/budget-tool.json", help="Path to default budget config JSON file") +parser.add_argument("--state-dir", type=str, default="~/.local/state/budget-ui", + help="Directory for run log and state persistence") args = parser.parse_args() # Expand paths @@ -22,6 +25,10 @@ DATA_DIR = Path.home() / 'data' / 'budgets' DATA_DIR.mkdir(parents=True, exist_ok=True) +state_dir = os.path.expanduser(args.state_dir) +upload_store = RunStore(os.path.join(state_dir, "upload"), "UPLOAD") +process_store = RunStore(os.path.join(state_dir, "process"), "PROCESS") + # Create blueprint with proper url_prefix app = Flask(__name__) bp = Blueprint('budget', __name__, url_prefix=args.subdomain) @@ -82,41 +89,46 @@ def upload_csv(account): file.save(DATA_DIR / filename) return redirect(url_for('budget.index')) -def stream_command(command): - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - universal_newlines=True, - env={**os.environ, 'PYTHONUNBUFFERED': '1'}, - ) - for line in process.stdout: - yield line.rstrip('\n') - process.wait() - if process.returncode != 0: - yield f"[Command failed with exit code {process.returncode}]" +@bp.route('/status') +def status(): + return jsonify({ + "upload": upload_store.read_state().get("status", "idle"), + "process": process_store.read_state().get("status", "idle"), + }) -@bp.route('/trigger_upload') +@bp.route('/trigger_upload', methods=['POST']) def trigger_upload(): - def generate(): - upload_script = DATA_DIR / 'upload.sh' - if not upload_script.exists(): - yield "data: Error: upload.sh not found at ~/data/budgets/upload.sh\n\n" - return - for line in stream_command(['bash', str(upload_script)]): - yield f"data: {line}\n\n" - yield "data: [Upload script finished]\n\n" - return Response(stream_with_context(generate()), mimetype='text/event-stream') - -@bp.route('/trigger_process') + upload_script = DATA_DIR / 'upload.sh' + if not upload_script.exists(): + return jsonify({"error": "upload.sh not found at ~/data/budgets/upload.sh"}), 400 + if not upload_store.start(['bash', str(upload_script)]): + return jsonify({"error": "Upload already in progress"}), 409 + return jsonify({"started": True}), 202 + +@bp.route('/stream/upload') +def stream_upload(): + return Response( + upload_store.stream(), + mimetype='text/event-stream', + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + +@bp.route('/trigger_process', methods=['POST']) def trigger_process(): - def generate(): - for line in stream_command(['budget_report', 'transactions-process']): - yield f"data: {line}\n\n" - yield "data: [Processing complete]\n\n" - return Response(stream_with_context(generate()), mimetype='text/event-stream') + if not process_store.start( + ['budget_report', 'transactions-process'], + env={**os.environ, 'PYTHONUNBUFFERED': '1'}, + ): + return jsonify({"error": "Processing already in progress"}), 409 + return jsonify({"started": True}), 202 + +@bp.route('/stream/process') +def stream_process(): + return Response( + process_store.stream(), + mimetype='text/event-stream', + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) # Serve static files correctly under the subpath @app.route(f'{args.subdomain}/static/') @@ -128,7 +140,7 @@ def custom_static(filename): def run(): global args, app app.secret_key = os.urandom(24) - app.run(host='0.0.0.0', port=args.port, debug=False) + app.run(host='0.0.0.0', port=args.port, debug=False, threaded=True) if __name__ == '__main__': run() diff --git a/pkgs/python-packages/flasks/budget_ui/main.html b/pkgs/python-packages/flasks/budget_ui/main.html index f3a4ecdbb..402d1b98e 100644 --- a/pkgs/python-packages/flasks/budget_ui/main.html +++ b/pkgs/python-packages/flasks/budget_ui/main.html @@ -43,6 +43,11 @@ font-size: 16px; margin-top: 10px; } + + button:disabled { + opacity: 0.6; + cursor: not-allowed; + } @@ -71,14 +76,20 @@

{{ source.Account }}

-
-
@@ -99,23 +110,72 @@

{{ source.Account }}

log.scrollTop = log.scrollHeight; } - function runCommand(endpoint, logId) { - const log = document.getElementById(logId); - log.innerHTML = ''; - appendLog(log, 'Starting...'); - - const evtSource = new EventSource(endpoint); - - evtSource.onmessage = function (event) { + function attachStream(streamEndpoint, log, btn, label) { + const es = new EventSource(streamEndpoint); + es.onmessage = function(event) { + if (event.data === '[DONE]') { + es.close(); + btn.disabled = false; + btn.textContent = label; + return; + } appendLog(log, event.data); }; - - evtSource.onerror = function () { + es.onerror = function() { appendLog(log, '[Connection failed or ended]'); - evtSource.close(); + es.close(); + btn.disabled = false; + btn.textContent = label; }; } + + function runCommand(triggerEndpoint, streamEndpoint, logId, btnId, label) { + const log = document.getElementById(logId); + const btn = document.getElementById(btnId); + log.innerHTML = ''; + btn.disabled = true; + btn.textContent = 'Running...'; + + fetch(triggerEndpoint, { method: 'POST' }) + .then(r => { + if (r.status === 409) { + appendLog(log, '[Already in progress, attaching to existing run...]'); + attachStream(streamEndpoint, log, btn, label); + return; + } + if (!r.ok) { + return r.json().then(d => { throw new Error(d.error || 'Failed to start'); }); + } + attachStream(streamEndpoint, log, btn, label); + }) + .catch(err => { + appendLog(log, 'Error: ' + err.message); + btn.disabled = false; + btn.textContent = label; + }); + } + + // On load, reattach streams for any jobs that are already running + fetch({{ url_for("budget.status") | tojson }}) + .then(r => r.json()) + .then(data => { + if (data.upload === 'running') { + const btn = document.getElementById('upload-btn'); + btn.disabled = true; + btn.textContent = 'Running...'; + attachStream({{ url_for("budget.stream_upload") | tojson }}, + document.getElementById('upload-log'), btn, 'Upload transactions'); + } + if (data.process === 'running') { + const btn = document.getElementById('process-btn'); + btn.disabled = true; + btn.textContent = 'Running...'; + attachStream({{ url_for("budget.stream_process") | tojson }}, + document.getElementById('process-log'), btn, 'Process transactions'); + } + }) + .catch(() => {}); - \ No newline at end of file + diff --git a/pkgs/python-packages/flasks/budget_ui/run_store.py b/pkgs/python-packages/flasks/budget_ui/run_store.py new file mode 100644 index 000000000..c140966f8 --- /dev/null +++ b/pkgs/python-packages/flasks/budget_ui/run_store.py @@ -0,0 +1,177 @@ +import json +import os +import subprocess +import threading +import time +from datetime import datetime, timezone + +REPLAY_CAP_BYTES = 512 * 1024 +POLL_INTERVAL_S = 0.5 + + +def _now(): + return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") + + +def _sse(msg): + return f"data: {msg}\n\n" + + +class RunStore: + """Owns the on-disk state of a single background job: a log file and a state JSON. + + The subprocess writes directly to the log file descriptor, so no pipe exists + between the UI and the job — nothing can block if viewers disconnect or the + UI process restarts. + """ + + def __init__(self, state_dir, label="JOB"): + os.makedirs(state_dir, exist_ok=True) + self.log_path = os.path.join(state_dir, "current.log") + self.state_path = os.path.join(state_dir, "state.json") + self.label = label + self._lock = threading.RLock() + self._thread = None + + # -- state.json ---------------------------------------------------------- + + def _read_raw(self): + try: + with open(self.state_path) as f: + return json.load(f) + except (OSError, ValueError): + return {"status": "idle"} + + def _write_state(self, state): + tmp = self.state_path + ".tmp" + with open(tmp, "w") as f: + json.dump(state, f) + os.replace(tmp, self.state_path) + + @staticmethod + def _pid_alive(pid): + if not pid: + return False + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + return True + + def read_state(self): + """Current state, finalizing orphaned runs. + + A run is orphaned when state.json says running but no runner thread + exists (UI was restarted) and the recorded PID is dead. + """ + state = self._read_raw() + if state.get("status") != "running": + return state + if self._thread is not None and self._thread.is_alive(): + return state + if self._pid_alive(state.get("pid")): + return state + with self._lock: + state = self._read_raw() + if state.get("status") != "running": + return state + state.update(status="failed", returncode=None, finished_at=_now()) + self._write_state(state) + return state + + # -- running ------------------------------------------------------------- + + def start(self, cmd, env=None): + """Begin a run in a background thread. Returns False if one is already active.""" + with self._lock: + if self.read_state().get("status") == "running": + return False + state = { + "status": "running", + "pid": None, + "cmd": cmd, + "started_at": _now(), + "finished_at": None, + "returncode": None, + } + with open(self.log_path, "wb") as log_fd: + try: + proc = subprocess.Popen( + cmd, stdout=log_fd, stderr=subprocess.STDOUT, env=env + ) + except OSError as e: + log_fd.write(f"[ERROR: {e}]\n".encode()) + state.update(status="failed", finished_at=_now()) + self._write_state(state) + return True + state["pid"] = proc.pid + self._write_state(state) + self._thread = threading.Thread( + target=self._wait, args=(proc,), daemon=True + ) + self._thread.start() + return True + + def _wait(self, proc): + rc = proc.wait() + with self._lock: + state = self._read_raw() + state.update( + status="success" if rc == 0 else "failed", + returncode=rc, + finished_at=_now(), + ) + self._write_state(state) + + # -- streaming ----------------------------------------------------------- + + def stream(self): + """SSE generator: replay capped log tail, then follow until the run + leaves 'running'. Safe for any number of concurrent consumers.""" + pos = 0 + try: + size = os.path.getsize(self.log_path) + if size > REPLAY_CAP_BYTES: + pos = size - REPLAY_CAP_BYTES + except OSError: + pass + drop_partial = pos > 0 + buf = b"" + while True: + try: + size = os.path.getsize(self.log_path) + except OSError: + size = 0 + if size < pos: # log truncated by a new run starting + pos, buf, drop_partial = 0, b"", False + if size > pos: + with open(self.log_path, "rb") as f: + f.seek(pos) + data = f.read() + pos = f.tell() + buf += data + lines = buf.split(b"\n") + buf = lines.pop() + if drop_partial: + lines = lines[1:] + drop_partial = False + for line in lines: + yield _sse(line.decode("utf-8", errors="replace")) + continue + state = self.read_state() + if state.get("status") == "running": + time.sleep(POLL_INTERVAL_S) + continue + if buf: + yield _sse(buf.decode("utf-8", errors="replace")) + status = state.get("status", "idle") + if status == "success": + yield _sse(f"[{self.label} SUCCESSFUL]") + elif status == "failed": + yield _sse(f"[{self.label} FAILED (exit {state.get('returncode')})]") + else: + yield _sse(f"[no {self.label.lower()} output]") + yield _sse("[DONE]") + return diff --git a/pkgs/python-packages/flasks/budget_ui/setup.py b/pkgs/python-packages/flasks/budget_ui/setup.py index 76efb2421..b5d365957 100644 --- a/pkgs/python-packages/flasks/budget_ui/setup.py +++ b/pkgs/python-packages/flasks/budget_ui/setup.py @@ -2,7 +2,7 @@ setup( name='budget_ui', version='0.0.0', - py_modules=['budget_ui'], + py_modules=['budget_ui', 'run_store'], entry_points={ 'console_scripts': ['budget_ui = budget_ui:run'] },