Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/pr-551.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[ATS] persistent budget UI streaming
82 changes: 47 additions & 35 deletions pkgs/python-packages/flasks/budget_ui/budget_ui.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
import argparse
import json
import os
import subprocess
from pathlib import Path

from flask import (
Flask, Blueprint, request, render_template, redirect,
url_for, session, Response, stream_with_context
url_for, session, Response, jsonify
)
from werkzeug.utils import secure_filename

from run_store import RunStore

parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=5000, help="Port to run the server on")
parser.add_argument("--subdomain", type=str, default="", help="URL prefix (e.g., '/budget')")
parser.add_argument("--config-file", type=str, default="~/configs/budget-tool.json",
help="Path to default budget config JSON file")
parser.add_argument("--state-dir", type=str, default="~/.local/state/budget-ui",
help="Directory for run log and state persistence")
args = parser.parse_args()

# Expand paths
config_path = Path(args.config_file).expanduser()
DATA_DIR = Path.home() / 'data' / 'budgets'
DATA_DIR.mkdir(parents=True, exist_ok=True)

state_dir = os.path.expanduser(args.state_dir)
upload_store = RunStore(os.path.join(state_dir, "upload"), "UPLOAD")
process_store = RunStore(os.path.join(state_dir, "process"), "PROCESS")

# Create blueprint with proper url_prefix
app = Flask(__name__)
bp = Blueprint('budget', __name__, url_prefix=args.subdomain)
Expand Down Expand Up @@ -82,41 +89,46 @@ def upload_csv(account):
file.save(DATA_DIR / filename)
return redirect(url_for('budget.index'))

def stream_command(command):
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env={**os.environ, 'PYTHONUNBUFFERED': '1'},
)
for line in process.stdout:
yield line.rstrip('\n')
process.wait()
if process.returncode != 0:
yield f"[Command failed with exit code {process.returncode}]"
@bp.route('/status')
def status():
return jsonify({
"upload": upload_store.read_state().get("status", "idle"),
"process": process_store.read_state().get("status", "idle"),
})

@bp.route('/trigger_upload')
@bp.route('/trigger_upload', methods=['POST'])
def trigger_upload():
def generate():
upload_script = DATA_DIR / 'upload.sh'
if not upload_script.exists():
yield "data: Error: upload.sh not found at ~/data/budgets/upload.sh\n\n"
return
for line in stream_command(['bash', str(upload_script)]):
yield f"data: {line}\n\n"
yield "data: [Upload script finished]\n\n"
return Response(stream_with_context(generate()), mimetype='text/event-stream')

@bp.route('/trigger_process')
upload_script = DATA_DIR / 'upload.sh'
if not upload_script.exists():
return jsonify({"error": "upload.sh not found at ~/data/budgets/upload.sh"}), 400
if not upload_store.start(['bash', str(upload_script)]):
return jsonify({"error": "Upload already in progress"}), 409
return jsonify({"started": True}), 202

@bp.route('/stream/upload')
def stream_upload():
return Response(
upload_store.stream(),
mimetype='text/event-stream',
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)

@bp.route('/trigger_process', methods=['POST'])
def trigger_process():
def generate():
for line in stream_command(['budget_report', 'transactions-process']):
yield f"data: {line}\n\n"
yield "data: [Processing complete]\n\n"
return Response(stream_with_context(generate()), mimetype='text/event-stream')
if not process_store.start(
['budget_report', 'transactions-process'],
env={**os.environ, 'PYTHONUNBUFFERED': '1'},
):
return jsonify({"error": "Processing already in progress"}), 409
return jsonify({"started": True}), 202

@bp.route('/stream/process')
def stream_process():
return Response(
process_store.stream(),
mimetype='text/event-stream',
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)

# Serve static files correctly under the subpath
@app.route(f'{args.subdomain}/static/<path:filename>')
Expand All @@ -128,7 +140,7 @@ def custom_static(filename):
def run():
global args, app
app.secret_key = os.urandom(24)
app.run(host='0.0.0.0', port=args.port, debug=False)
app.run(host='0.0.0.0', port=args.port, debug=False, threaded=True)

if __name__ == '__main__':
run()
88 changes: 74 additions & 14 deletions pkgs/python-packages/flasks/budget_ui/main.html
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
font-size: 16px;
margin-top: 10px;
}

button:disabled {
opacity: 0.6;
cursor: not-allowed;
}
</style>
</head>

Expand Down Expand Up @@ -71,14 +76,20 @@ <h3>{{ source.Account }}</h3>
</div>

<div class="section">
<button onclick='runCommand({{ url_for("budget.trigger_upload") | tojson }}, "upload-log")'>
<button id="upload-btn" onclick='runCommand(
{{ url_for("budget.trigger_upload") | tojson }},
{{ url_for("budget.stream_upload") | tojson }},
"upload-log", "upload-btn", "Upload transactions")'>
Upload transactions
</button>
<div id="upload-log" class="log-box"></div>
</div>

<div class="section">
<button onclick='runCommand({{ url_for("budget.trigger_process") | tojson }}, "process-log")'>
<button id="process-btn" onclick='runCommand(
{{ url_for("budget.trigger_process") | tojson }},
{{ url_for("budget.stream_process") | tojson }},
"process-log", "process-btn", "Process transactions")'>
Process transactions
</button>
<div id="process-log" class="log-box"></div>
Expand All @@ -99,23 +110,72 @@ <h3>{{ source.Account }}</h3>
log.scrollTop = log.scrollHeight;
}

function runCommand(endpoint, logId) {
const log = document.getElementById(logId);
log.innerHTML = '';
appendLog(log, 'Starting...');

const evtSource = new EventSource(endpoint);

evtSource.onmessage = function (event) {
function attachStream(streamEndpoint, log, btn, label) {
const es = new EventSource(streamEndpoint);
es.onmessage = function(event) {
if (event.data === '[DONE]') {
es.close();
btn.disabled = false;
btn.textContent = label;
return;
}
appendLog(log, event.data);
};

evtSource.onerror = function () {
es.onerror = function() {
appendLog(log, '[Connection failed or ended]');
evtSource.close();
es.close();
btn.disabled = false;
btn.textContent = label;
};
}

function runCommand(triggerEndpoint, streamEndpoint, logId, btnId, label) {
const log = document.getElementById(logId);
const btn = document.getElementById(btnId);
log.innerHTML = '';
btn.disabled = true;
btn.textContent = 'Running...';

fetch(triggerEndpoint, { method: 'POST' })
.then(r => {
if (r.status === 409) {
appendLog(log, '[Already in progress, attaching to existing run...]');
attachStream(streamEndpoint, log, btn, label);
return;
}
if (!r.ok) {
return r.json().then(d => { throw new Error(d.error || 'Failed to start'); });
}
attachStream(streamEndpoint, log, btn, label);
})
.catch(err => {
appendLog(log, 'Error: ' + err.message);
btn.disabled = false;
btn.textContent = label;
});
}

// On load, reattach streams for any jobs that are already running
fetch({{ url_for("budget.status") | tojson }})
.then(r => r.json())
.then(data => {
if (data.upload === 'running') {
const btn = document.getElementById('upload-btn');
btn.disabled = true;
btn.textContent = 'Running...';
attachStream({{ url_for("budget.stream_upload") | tojson }},
document.getElementById('upload-log'), btn, 'Upload transactions');
}
if (data.process === 'running') {
const btn = document.getElementById('process-btn');
btn.disabled = true;
btn.textContent = 'Running...';
attachStream({{ url_for("budget.stream_process") | tojson }},
document.getElementById('process-log'), btn, 'Process transactions');
}
})
.catch(() => {});
</script>
</body>

</html>
</html>
Loading
Loading