-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmanager.py
More file actions
1271 lines (1094 loc) · 52.4 KB
/
manager.py
File metadata and controls
1271 lines (1094 loc) · 52.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import time
import queue
import threading
import sqlite3
import subprocess
import json
import re
import shutil
import traceback
import uuid
import secrets
import platform
import gzip
import hashlib
from functools import wraps
from datetime import datetime, timedelta
from urllib.parse import quote, urljoin
# [ADDED] Requests for remote scanning
try:
import requests
except ImportError:
print("[!] Error: 'requests' module not found. Please run: pip install requests")
exit(1)
# Flask & Extensions
from flask import Flask, request, jsonify, render_template, send_from_directory, send_file, Response, abort
from werkzeug.exceptions import HTTPException
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# ==============================================================================
# CONFIGURATION
# ==============================================================================
# [SECURITY] Minimum Worker Version to accept.
# Workers older than this will be denied jobs until they auto-update.
MIN_CLIENT_VERSION = "2.5.0"
try:
from config import (
SERVER_HOST, SERVER_PORT, SERVER_URL_DISPLAY,
SOURCE_DIRECTORY, COMPLETED_DIRECTORY, WORKER_TEMPLATE_FILE,
DB_FILE, VIDEO_EXTENSIONS,
ADMIN_USER, ADMIN_PASS
)
try:
from config import WORKER_SECRET
except ImportError:
print("[!] WARNING: WORKER_SECRET not found. Using unsafe default.")
WORKER_SECRET = "DefaultInsecureSecret"
try:
from config import SECRET_KEY
except ImportError:
SECRET_KEY = secrets.token_hex(32)
try:
from config import USE_WAL_MODE
except ImportError:
USE_WAL_MODE = True
try:
from config import REMOTE_SOURCE_URL
except ImportError:
REMOTE_SOURCE_URL = None
try:
from config import DB_MODE
except ImportError:
DB_MODE = 'disk'
except ImportError:
print("[!] Critical Error: config.py not found.")
exit(1)
app = Flask(__name__)
app.secret_key = SECRET_KEY
# Security Config
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 * 1024
limiter = Limiter(
get_remote_address,
app=app,
storage_uri="memory://"
)
job_queue = queue.Queue()
queued_job_ids = set()
db_lock = threading.RLock()
# Cache for outdated worker logs to prevent spamming DB
OUTDATED_LOG_CACHE = {}
# In-memory cache for banned_workers.txt (refreshed every 60s)
_BANNED_CACHE: set = set()
_BANNED_CACHE_TIME: float = 0.0
_BANNED_CACHE_LOCK = threading.Lock()
# ==============================================================================
# ADVANCED DATABASE HANDLER (RAM/DISK)
# ==============================================================================
class DatabaseHandler:
def __init__(self, disk_path, mode):
self.disk_path = os.path.abspath(disk_path)
self.mode = mode
self.active_db_path = self.disk_path
# Determine RAM path based on OS
if self.mode == 'ram':
if platform.system() == 'Linux' and os.path.exists('/dev/shm'):
self.ram_path = os.path.join('/dev/shm', os.path.basename(self.disk_path))
print(f"[*] DB Mode: RAM (Shared Memory at {self.ram_path})")
self._load_to_ram()
self.active_db_path = self.ram_path
# Start Sync Thread
threading.Thread(target=self._background_sync_loop, daemon=True).start()
else:
print("[!] DB Mode: RAM requested but not supported on this OS/Config. Falling back to DISK.")
self.mode = 'disk'
def _load_to_ram(self):
"""Copies Disk DB to RAM at startup."""
with db_lock:
# Ensure disk file exists to copy
if not os.path.exists(self.disk_path):
open(self.disk_path, 'a').close()
# Copy to RAM
shutil.copy2(self.disk_path, self.ram_path)
# Set permissions
try:
os.chmod(self.ram_path, 0o666)
except:
pass
def sync_to_disk(self):
"""Safely backups RAM DB to Disk using SQLite Backup API."""
if self.mode != 'ram': return
# Don't grab the main lock; backup API handles concurrency well enough
try:
source_conn = sqlite3.connect(self.active_db_path)
dest_conn = sqlite3.connect(self.disk_path)
with source_conn:
source_conn.backup(dest_conn)
dest_conn.close()
source_conn.close()
except Exception as e:
print(f"[!] DB Sync Failed: {e}")
def _background_sync_loop(self):
while True:
time.sleep(60) # Sync every 60 seconds
self.sync_to_disk()
def get_connection(self):
"""Returns a connection to the currently active DB (RAM or Disk)."""
conn = sqlite3.connect(self.active_db_path, timeout=60)
# Enable WAL for concurrency
if USE_WAL_MODE:
try:
conn.execute("PRAGMA journal_mode=WAL;")
except:
pass
else:
try:
conn.execute("PRAGMA journal_mode=DELETE;")
except:
pass
return conn
# Initialize the Handler
db_handler = DatabaseHandler(DB_FILE, DB_MODE)
# ==============================================================================
# LOGGING
# ==============================================================================
def log_event(level, message, related_id=None):
try:
clean_msg = str(message).replace('<', '<').replace('>', '>')
clean_id = str(related_id) if related_id else None
if clean_id:
clean_id = re.sub(r'[^a-zA-Z0-9_.-]', '', clean_id)
with db_lock:
conn = db_handler.get_connection()
conn.execute("INSERT INTO system_logs (timestamp, level, message, related_id) VALUES (?, ?, ?, ?)",
(datetime.now(), level, clean_msg, clean_id))
conn.commit()
conn.close()
print(f"[{level}] {message}")
except Exception as e:
print(f"[!] Logging failed: {e}")
# ==============================================================================
# HELPERS
# ==============================================================================
def sanitize_input(val):
if not val: return None
return re.sub(r'[^a-zA-Z0-9_.-]', '', str(val))
def is_version_sufficient(client_ver, min_ver):
if not client_ver: return False
try:
c_parts = [int(x) for x in client_ver.split('.') if x.isdigit()]
m_parts = [int(x) for x in min_ver.split('.') if x.isdigit()]
return c_parts >= m_parts
except:
return False
def is_worker_banned(worker_id):
global _BANNED_CACHE, _BANNED_CACHE_TIME
if not worker_id: return False
with _BANNED_CACHE_LOCK:
now = time.time()
if now - _BANNED_CACHE_TIME > 60:
try:
with open("banned_workers.txt", "r") as f:
_BANNED_CACHE = {line.strip().lower() for line in f if line.strip()}
except FileNotFoundError:
_BANNED_CACHE = set()
_BANNED_CACHE_TIME = now
return worker_id.strip().lower() in _BANNED_CACHE
@app.after_request
def add_security_headers(response):
response.headers['Cross-Origin-Opener-Policy'] = 'same-origin'
response.headers['Cross-Origin-Embedder-Policy'] = 'require-corp'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-Content-Type-Options'] = 'nosniff'
return response
@app.before_request
def csrf_protect():
if request.method == "POST" and request.path.startswith('/api/admin_action'):
origin = request.headers.get('Origin')
referer = request.headers.get('Referer')
target = origin or referer or ""
if request.host not in target:
return jsonify({"status": "error", "message": "CSRF Blocked: Origin Mismatch"}), 403
def check_auth(u, p):
return u == ADMIN_USER and p == ADMIN_PASS
def authenticate():
return Response(
'Could not verify your access level for that URL.\n'
'You have to login with proper credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def requires_worker_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('X-Worker-Token') or request.args.get('token')
if token is None: return f(*args, **kwargs)
if token != WORKER_SECRET:
return jsonify({"status": "error", "message": "Unauthorized Worker"}), 401
return f(*args, **kwargs)
return decorated
def verify_upload(filepath):
try:
cmd = ['ffprobe', '-v', 'error', '-print_format', 'json', '-show_streams', '-show_format', filepath]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return False, "FFprobe Error"
data = json.loads(result.stdout)
has_video = False
for stream in data.get('streams', []):
if stream['codec_type'] == 'video':
if stream.get('codec_name') != 'av1': return False, "Invalid Codec (Not AV1)"
if int(stream.get('height', 0)) != 480: return False, "Invalid Height"
has_video = True
elif stream['codec_type'] == 'audio':
if stream.get('codec_name') != 'opus': return False, "Invalid Audio Codec"
if not has_video: return False, "No Video Stream"
return True, "Verified"
except Exception as e:
return False, str(e)
# ==============================================================================
# HYBRID SCANNER (STREAMING GENERATOR)
# ==============================================================================
def scan_remote_http(url, prefix="", depth=0):
"""
Recursively scans an HTTP directory listing for video files.
YIELDS results as they are found (Generator) to allow real-time queuing.
"""
if depth > 10: return # Prevent infinite recursion
headers = {'User-Agent': 'FractumManager/1.0'}
# Retry Loop & Increased Timeout
r = None
for attempt in range(3):
try:
r = requests.get(url, headers=headers, timeout=30)
if r.status_code == 200:
break
else:
return # Stop if 404/403
except requests.exceptions.RequestException as e:
if attempt == 2:
print(f"[!] HTTP Scan Error on {url} after 3 attempts: {e}")
return
time.sleep(3)
if not r or r.status_code != 200: return
try:
links = re.findall(r'href=["\']([^"\'<>]+)["\']', r.text)
for link in links:
if link.startswith('?') or link.startswith('/') or link in ['../', './']: continue
if "parent directory" in link.lower(): continue
full_url = urljoin(url, link)
if link.endswith('/'):
time.sleep(0.2) # Tiny delay to prevent hammering
# Recursively yield results from subdirectories.
# Always unquote the link before appending to prefix so that
# clean_id is consistently decoded text — quote() in the
# download URL builder will then encode it exactly once.
# Without this, a percent-encoded subdir name (e.g. %C2%B7 for ·)
# gets double-encoded to %25C2%25B7, producing a 404.
from urllib.parse import unquote as _unquote
yield from scan_remote_http(full_url, prefix=f"{prefix}{_unquote(link)}", depth=depth+1)
elif any(link.lower().endswith(ext) for ext in VIDEO_EXTENSIONS):
from urllib.parse import unquote
clean_name = unquote(link)
clean_id = f"{prefix}{clean_name}"
size = 0
try:
h = requests.head(full_url, headers=headers, timeout=15)
size = int(h.headers.get('content-length', 0))
except: pass
# Yield the found file immediately
yield (clean_id, clean_name, size)
except Exception as e:
print(f"[!] HTTP Parsing Error on {url}: {e}")
def scan_and_queue():
"""
Scans local and remote sources and updates the database/queue in batches.
"""
# --- Helper: Process a batch of files ---
def process_batch(file_batch):
if not file_batch: return
count_new = 0
# 1. Update Database
with db_lock:
conn = db_handler.get_connection()
try:
cursor = conn.cursor()
for item in file_batch:
# Unpack
if len(item) == 5:
job_id, fname, fsize, src_type, src_url = item
else:
job_id, fname, fsize, src_type = item
src_url = None
# Profile Logic: Remote is ALWAYS live_action
profile = 'standard'
if src_type == 'remote':
profile = 'live_action'
elif 'live_action' in str(job_id).lower():
profile = 'live_action'
cursor.execute("SELECT id FROM jobs WHERE id=?", (job_id,))
if not cursor.fetchone():
# Also check the legacy partially-encoded ID format produced
# before the double-encoding bug fix. The old scanner left
# subdir components percent-encoded (e.g. BURN%C2%B7E.../)
# while decoding only the filename. After the fix, all path
# components are decoded, so the IDs differ even for the same
# file. We normalise before inserting to avoid re-queuing
# files that are already done or in-progress.
_id_parts = job_id.split('/')
if len(_id_parts) > 1:
_legacy_id = (
'/'.join(quote(p, safe='') for p in _id_parts[:-1])
+ '/' + _id_parts[-1]
)
else:
_legacy_id = job_id
if _legacy_id != job_id:
cursor.execute("SELECT id FROM jobs WHERE id=?", (_legacy_id,))
if cursor.fetchone():
continue # already exists under the old encoded ID
cursor.execute(
"INSERT INTO jobs (id, filename, status, last_updated, file_size, source_type, source_url, content_profile, fail_count) VALUES (?, ?, 'queued', ?, ?, ?, ?, ?, 0)",
(job_id, fname, datetime.now(), fsize, src_type, src_url, profile)
)
count_new += 1
conn.commit()
finally:
conn.close()
if count_new > 0:
print(f"[*] Added {count_new} new files to Database...")
# 2. Update Active Queue (Push to Workers)
with db_lock:
conn = db_handler.get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT id, filename, file_size, source_type, source_url FROM jobs WHERE status = 'queued'")
for row in cursor.fetchall():
jid, fname, fsize, stype, surl = row
if jid not in queued_job_ids:
dl_link = ""
if stype == 'remote':
base_url = surl if surl else REMOTE_SOURCE_URL
if base_url:
dl_link = urljoin(base_url, quote(jid))
else:
dl_link = f"{SERVER_URL_DISPLAY.rstrip('/')}/download_source/{quote(jid, safe='/')}"
if dl_link:
job_queue.put({
"id": jid,
"filename": fname,
"file_size": fsize,
"download_url": dl_link
})
queued_job_ids.add(jid)
finally:
conn.close()
# --- 1. Scan Local ---
print(f"[*] Scanning LOCAL Source: {SOURCE_DIRECTORY} ...")
local_batch = []
if os.path.exists(SOURCE_DIRECTORY):
try:
for root, dirs, files in os.walk(SOURCE_DIRECTORY, topdown=True):
dirs.sort(); files.sort()
for file in files:
if file.lower().endswith(VIDEO_EXTENSIONS):
rel_path = os.path.relpath(os.path.join(root, file), SOURCE_DIRECTORY)
fsize = os.path.getsize(os.path.join(root, file))
local_batch.append((rel_path, file, fsize, 'local'))
except Exception as e:
print(f"[!] Local Scanner error: {e}")
if local_batch:
process_batch(local_batch)
# --- 2. Scan Remote (Streaming) ---
if REMOTE_SOURCE_URL:
print(f"[*] Scanning REMOTE Source: {REMOTE_SOURCE_URL} ...")
remote_batch = []
# Iterate over the generator
for r_id, r_name, r_size in scan_remote_http(REMOTE_SOURCE_URL):
remote_batch.append((r_id, r_name, r_size, 'remote', REMOTE_SOURCE_URL))
# Commit every 10 files so workers don't wait
if len(remote_batch) >= 10:
process_batch(remote_batch)
remote_batch = []
# Commit any remaining files
if remote_batch:
process_batch(remote_batch)
print("[*] Scan complete.")
def get_series_list():
try:
# [FIX] Allow series listing even in hybrid mode
if not os.path.exists(SOURCE_DIRECTORY): return [], []
folders = sorted([d for d in os.listdir(SOURCE_DIRECTORY) if os.path.isdir(os.path.join(SOURCE_DIRECTORY, d))])
folder_set = set(folders)
mapping = {}
stale_keys = []
if os.path.exists('series_names.json'):
try:
mapping = json.load(open('series_names.json', 'r'))
stale_keys = [k for k in mapping if k not in folder_set]
if stale_keys:
print(f"[!] series_names.json has {len(stale_keys)} stale key(s): {', '.join(stale_keys[:5])}")
except: pass
return [{"id": i+1, "folder": f, "name": mapping.get(f, f)} for i, f in enumerate(folders)], stale_keys
except:
return [], []
# ==============================================================================
# DATABASE INIT
# ==============================================================================
def init_db():
with db_lock:
conn = db_handler.get_connection()
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY, filename TEXT, status TEXT, worker_id TEXT,
progress INTEGER DEFAULT 0, duration INTEGER DEFAULT 0, last_updated TIMESTAMP,
started_at TIMESTAMP, file_size INTEGER DEFAULT 0, source_type TEXT DEFAULT 'local'
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
level TEXT,
message TEXT,
related_id TEXT
)
''')
try: cursor.execute("ALTER TABLE jobs ADD COLUMN file_size INTEGER DEFAULT 0")
except sqlite3.OperationalError: pass
try: cursor.execute("ALTER TABLE jobs ADD COLUMN worker_version TEXT")
except sqlite3.OperationalError: pass
try: cursor.execute("ALTER TABLE jobs ADD COLUMN source_type TEXT DEFAULT 'local'")
except sqlite3.OperationalError: pass
try: cursor.execute("ALTER TABLE jobs ADD COLUMN source_url TEXT")
except sqlite3.OperationalError: pass
try: cursor.execute("ALTER TABLE jobs ADD COLUMN warnings TEXT")
except sqlite3.OperationalError: pass
# [ADDED] Live Action Profile Column
try: cursor.execute("ALTER TABLE jobs ADD COLUMN content_profile TEXT DEFAULT 'standard'")
except sqlite3.OperationalError: pass
# [ADDED] fail_count column for permanent failure tracking
try: cursor.execute("ALTER TABLE jobs ADD COLUMN fail_count INTEGER DEFAULT 0")
except sqlite3.OperationalError: pass
# [ADDED] source_hash: MD5 of first 4 MB of source file for integrity verification
try: cursor.execute("ALTER TABLE jobs ADD COLUMN source_hash TEXT")
except sqlite3.OperationalError: pass
cursor.execute('''
CREATE TABLE IF NOT EXISTS error_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
job_id TEXT,
worker_id TEXT,
error_type TEXT,
message TEXT,
details TEXT
)
''')
conn.commit()
conn.close()
# ==============================================================================
# ROUTES
# ==============================================================================
# Number of bytes used for the fast source-file hash (4 MB)
_HASH_BYTES = 4 * 1024 * 1024
def _fast_hash_file(path, nbytes=_HASH_BYTES):
"""Return MD5 hex digest of the first `nbytes` of a local file."""
h = hashlib.md5()
try:
with open(path, 'rb') as f:
remaining = nbytes
while remaining > 0:
chunk = f.read(min(65536, remaining))
if not chunk:
break
h.update(chunk)
remaining -= len(chunk)
except Exception:
return None
return h.hexdigest()
@app.route('/')
def dashboard(): return render_template('dashboard.html')
@app.route('/admin')
@limiter.limit("5 per minute")
@requires_auth
def admin_panel(): return render_template('admin.html')
@app.route('/api/ping')
def api_ping(): return jsonify({"status": "pong"})
@app.route('/dl/worker')
def download_worker_script(): return send_file(WORKER_TEMPLATE_FILE, as_attachment=True, download_name='worker.py')
# [ADDED] Route to serve the watermark font to workers
@app.route('/dl/font')
def download_font():
font_path = os.path.abspath('arial.ttf')
if os.path.exists(font_path):
return send_file(font_path, as_attachment=True, download_name='arial.ttf')
return abort(404)
@app.route('/api/series')
def api_series_list():
series, stale = get_series_list()
return jsonify({"series": series, "stale_series_keys": stale})
@app.route('/install')
def install_script():
u = sanitize_input(request.args.get('username')) or 'Anonymous'
w = sanitize_input(request.args.get('workername')) or 'LinuxNode'
s_id = request.args.get('series_id', '')
if s_id and not s_id.isdigit(): s_id = ''
j = request.args.get('jobs', '1')
if not j.isdigit(): j = '1'
script = f"""#!/bin/bash
if [ -x "$(command -v apt-get)" ]; then sudo apt-get update -qq && sudo apt-get install -y ffmpeg python3 python3-requests; fi
if [ -x "$(command -v dnf)" ]; then sudo dnf install -y ffmpeg python3 python3-requests; fi
curl -s "{SERVER_URL_DISPLAY.rstrip('/')}/dl/worker" -o worker.py
export WORKER_SECRET="{WORKER_SECRET}"
python3 worker.py --username "{u}" --workername "{w}" --jobs {j} --manager "{SERVER_URL_DISPLAY}" --series-id "{s_id}"
"""
return Response(script, mimetype='text/x-shellscript')
@app.route('/download_source/<path:filename>')
def download_source(filename):
return send_from_directory(SOURCE_DIRECTORY, filename, as_attachment=True)
@app.route('/get_job', methods=['GET'])
@requires_worker_auth
def get_job():
max_size_mb = request.args.get('max_size_mb')
series_id = request.args.get('series_id')
worker_id = sanitize_input(request.args.get('worker_id'))
worker_version = sanitize_input(request.args.get('version'))
if is_worker_banned(worker_id):
now = time.time()
last_log = OUTDATED_LOG_CACHE.get(f"banned_{worker_id}", 0)
if now - last_log > 300:
log_event("WARN", f"Banned worker blocked from getting jobs: {worker_id}")
OUTDATED_LOG_CACHE[f"banned_{worker_id}"] = now
return jsonify({"status": "empty", "message": "Unauthorized"}), 403
# [ENFORCEMENT] Check Minimum Client Version
if not is_version_sufficient(worker_version, MIN_CLIENT_VERSION):
# LOGGING ADDED: Log specific workers (throttled to 5 mins) to prove they are alive
now = time.time()
last_log = OUTDATED_LOG_CACHE.get(worker_id, 0)
if now - last_log > 300: # Log only once every 5 minutes per worker
log_event("WARN", f"Outdated Worker Denied: {worker_id} (v{worker_version}). Waiting for auto-update.")
OUTDATED_LOG_CACHE[worker_id] = now
return jsonify({"status": "empty", "message": "Update Required"}), 200
search_phases = []
# Only offer remote jobs to updated clients (Redundant now due to enforcement above, but kept safe)
if REMOTE_SOURCE_URL and is_version_sufficient(worker_version, "1.9.0"):
search_phases.append('remote')
# Always offer local jobs as fallback
search_phases.append('local')
try:
with db_lock:
conn = db_handler.get_connection(); conn.row_factory = sqlite3.Row
try:
c = conn.cursor()
job = None
for source_type in search_phases:
if job: break
search_attempts = [series_id] if series_id and series_id.isdigit() else []
search_attempts.append(None)
for current_search_id in search_attempts:
folder_filter = None
if current_search_id:
for s in get_series_list()[0]:
if s['id'] == int(current_search_id):
folder_filter = s['folder']; break
params = [source_type]
query_parts = ["status='queued'", "source_type=?", "COALESCE(fail_count,0) < 5"]
if max_size_mb and max_size_mb.isdigit():
query_parts.append("file_size <= ?")
params.append(int(max_size_mb) * 1024 * 1024)
if folder_filter:
query_parts.append("id LIKE ?")
params.append(f"{folder_filter}%")
# [ADDED] content_profile to SELECT query
sql = f"SELECT id, filename, file_size, source_type, source_url, content_profile, source_hash FROM jobs WHERE {' AND '.join(query_parts)} ORDER BY id ASC LIMIT 1"
c.execute(sql, tuple(params)); row = c.fetchone()
if row: job = dict(row); break
if job:
if job['source_type'] == 'remote':
# Use stored URL if available, else fallback
base_url = job.get('source_url') if job.get('source_url') else REMOTE_SOURCE_URL
if base_url:
job['download_url'] = urljoin(base_url, quote(job['id']))
else:
# Fallback for old remote jobs if config is gone (might fail, but best effort)
job['download_url'] = ""
else:
job['download_url'] = f"{SERVER_URL_DISPLAY.rstrip('/')}/download_source/{quote(job['id'], safe='/')}"
# Lazily compute source hash for local files on first pickup
if not job.get('source_hash'):
_src_path = os.path.join(SOURCE_DIRECTORY, job['id'].replace('/', os.sep))
_computed = _fast_hash_file(_src_path)
if _computed:
conn.execute("UPDATE jobs SET source_hash=? WHERE id=?",
(_computed, job['id']))
job['source_hash'] = _computed
# Never reveal the hash to the worker — workers must submit their own
# computed hash blind so they cannot cheat by echoing the known value.
job.pop('source_hash', None)
conn.execute("UPDATE jobs SET status='processing', worker_id=?, worker_version=?, last_updated=?, started_at=? WHERE id=?",
(worker_id, worker_version, datetime.now(), datetime.now(), job['id']))
conn.commit()
return jsonify({"status": "ok", "job": job})
finally:
conn.close()
return jsonify({"status": "empty"})
except Exception as e:
log_event("ERROR", f"get_job failed: {e}")
return jsonify({"status": "error"}), 500
@app.route('/upload_result', methods=['POST'])
@requires_worker_auth
def upload_result():
job_id = request.form.get('job_id')
worker_id = sanitize_input(request.form.get('worker_id'))
try:
duration = int(float(request.form.get('duration', 0)))
except:
duration = 0
if 'file' in request.files and job_id:
new_filename = os.path.splitext(job_id)[0] + ".mp4"
quarantine_dir = os.path.join("temp_uploads", "quarantine")
os.makedirs(quarantine_dir, exist_ok=True)
temp_path = os.path.join(quarantine_dir, f"{uuid.uuid4().hex}.mp4")
request.files['file'].save(temp_path)
is_valid, reason = verify_upload(temp_path)
if not is_valid:
log_event("WARN", f"Security: Upload rejected ({reason})", job_id)
os.remove(temp_path)
with db_lock:
conn = db_handler.get_connection()
conn.execute("UPDATE jobs SET status='failed', last_updated=? WHERE id=?", (datetime.now(), job_id))
conn.commit(); conn.close()
return jsonify({"status": "error", "message": reason}), 400
save_path = os.path.abspath(os.path.join(COMPLETED_DIRECTORY, new_filename))
if not save_path.startswith(os.path.abspath(COMPLETED_DIRECTORY)):
os.remove(temp_path); return jsonify({"status": "error"}), 403
os.makedirs(os.path.dirname(save_path), exist_ok=True)
shutil.move(temp_path, save_path)
with db_lock:
conn = db_handler.get_connection()
conn.execute("UPDATE jobs SET status='completed', progress=100, worker_id=?, last_updated=?, duration=? WHERE id=?",
(worker_id, datetime.now(), duration, job_id))
conn.commit(); conn.close()
# [CRITICAL] Immediate Sync to Disk
db_handler.sync_to_disk()
log_event("INFO", f"Job completed by {worker_id}", job_id)
return jsonify({"status": "success"})
return jsonify({"status": "error"}), 400
@app.route('/upload_log', methods=['POST'])
@requires_worker_auth
def receive_log():
MAX_LOG_BYTES = 50 * 1024 * 1024 # 50 MB hard cap
if request.content_length and request.content_length > MAX_LOG_BYTES:
return jsonify({"status": "error", "message": "Log file exceeds 50 MB limit"}), 413
job_id = request.form.get('job_id')
worker_id = sanitize_input(request.form.get('worker_id'))
if 'log_file' in request.files and job_id:
log_dir = os.path.join(os.getcwd(), "encode_logs")
os.makedirs(log_dir, exist_ok=True)
# Save the compressed file
safe_name = re.sub(r'[^a-zA-Z0-9_.-]', '_', job_id)
log_path = os.path.join(log_dir, f"{safe_name}.log.gz")
request.files['log_file'].save(log_path)
warnings = []
try:
with gzip.open(log_path, 'rt', encoding='utf-8', errors='ignore') as f:
log_content = f.read().lower()
# 1. Check for GPU Encoders (Safely)
mapping_block = re.search(r'stream mapping:(.*?)(?:press \[|output #)', log_content, re.DOTALL)
if mapping_block:
mapping_text = mapping_block.group(1)
gpu_encoders = ['nvenc', 'qsv', 'amf', 'videotoolbox', 'hevc_amf']
for enc in gpu_encoders:
if enc in mapping_text:
warnings.append(f"GPU USED ({enc.upper()})")
break
# 2. Check SVT-AV1 preset
preset_match = re.search(r'svt\[info\]:\s*preset\s*:\s*(\d+)', log_content)
if preset_match:
used_preset = preset_match.group(1)
if used_preset != "2":
warnings.append(f"PRESET MODIFIED (Used {used_preset}, Expected 2)")
elif "libsvtav1" not in log_content:
if not warnings:
warnings.append("NON-STANDARD ENCODER USED")
except Exception as e:
log_event("WARN", f"Could not parse log for {job_id}: {e}")
if warnings:
warning_str = " | ".join(warnings)
log_event("WARN", f"CHEATING DETECTED by {worker_id}: {warning_str}", job_id)
with db_lock:
conn = db_handler.get_connection()
conn.execute("UPDATE jobs SET warnings=? WHERE id=?", (warning_str, job_id))
conn.commit()
conn.close()
return jsonify({"status": "success"})
return jsonify({"status": "error"}), 400
@app.route('/report_error', methods=['POST'])
@requires_worker_auth
def receive_error_report():
d = request.json
if not d:
return jsonify({"status": "error"}), 400
job_id = str(d.get('job_id', '') or 'unknown').strip()[:512]
worker_id = sanitize_input(d.get('worker_id', '')) or 'unknown'
error_type = sanitize_input(d.get('error_type', 'unknown')) or 'unknown'
message = str(d.get('message', ''))[:2048]
details = str(d.get('details', ''))[:32768]
with db_lock:
conn = db_handler.get_connection()
try:
conn.execute(
"INSERT INTO error_reports (timestamp, job_id, worker_id, error_type, message, details) VALUES (?, ?, ?, ?, ?, ?)",
(datetime.now(), job_id, worker_id, error_type, message, details)
)
conn.commit()
finally:
conn.close()
log_event("ERROR", f"Error report from {worker_id} [{error_type}]: {message}", job_id)
return jsonify({"status": "received"})
@app.route('/api/error_reports')
@requires_auth
def api_error_reports():
try:
limit = min(int(request.args.get('limit', 200)), 1000)
except (ValueError, TypeError):
limit = 200
with db_lock:
conn = db_handler.get_connection()
conn.row_factory = sqlite3.Row
try:
c = conn.cursor()
c.execute("SELECT id, timestamp, job_id, worker_id, error_type, message, details FROM error_reports ORDER BY timestamp DESC LIMIT ?", (limit,))
reports = [dict(r) for r in c.fetchall()]
finally:
conn.close()
return jsonify({"reports": reports})
@app.route('/api/download_log')
@requires_auth
def download_encode_log():
job_id = request.args.get('job_id', '')
if not job_id:
return abort(400)
safe_name = re.sub(r'[^a-zA-Z0-9_.-]', '_', job_id)
log_dir = os.path.abspath("encode_logs")
log_path = os.path.abspath(os.path.join(log_dir, f"{safe_name}.log.gz"))
# Path traversal guard
if not log_path.startswith(log_dir + os.sep):
return abort(403)
if os.path.exists(log_path):
return send_file(log_path, as_attachment=True, download_name=f"{safe_name}.log.gz")
return jsonify({"status": "error", "message": "Log file not found"}), 404
@app.route('/job_status', methods=['GET'])
@requires_worker_auth
def job_status():
"""Worker-facing endpoint: returns the current status and assigned worker for a job.
Used by recovering workers to detect whether a checkpoint is still viable."""
job_id = (request.args.get('job_id', '') or '').strip()
if not job_id:
return jsonify({"status": "error", "message": "job_id required"}), 400
with db_lock:
conn = db_handler.get_connection()
conn.row_factory = sqlite3.Row
try:
c = conn.cursor()
c.execute("SELECT status, worker_id FROM jobs WHERE id=?", (job_id,))
row = c.fetchone()
finally:
conn.close()
if row is None:
return jsonify({"status": "error", "message": "not_found"}), 404
return jsonify({"job_status": row["status"], "worker_id": row["worker_id"]})
@app.route('/reclaim_job', methods=['POST'])
@requires_worker_auth
def reclaim_job():
"""Worker-facing endpoint: attempt to reclaim a queued (or timed-out) job by ID.
Only succeeds if the job is currently in a reclaimable state (queued / failed
with fail_count < 5). Returns {"status": "ok"} on success so the worker knows
it is safe to upload the resumed encode."""
d = request.json or {}
job_id = str(d.get('job_id', '') or '').strip()
worker_id = sanitize_input(d.get('worker_id', ''))
worker_version = sanitize_input(d.get('version', ''))
if not job_id or not worker_id:
return jsonify({"status": "error", "message": "job_id and worker_id required"}), 400
with db_lock:
conn = db_handler.get_connection()
conn.row_factory = sqlite3.Row
try:
c = conn.cursor()
c.execute("SELECT status, COALESCE(fail_count, 0) as fail_count FROM jobs WHERE id=?", (job_id,))
row = c.fetchone()
if row is None:
return jsonify({"status": "error", "message": "not_found"}), 404
if row["status"] not in ('queued', 'failed') or row["fail_count"] >= 5:
return jsonify({"status": "conflict",
"message": f"Job is not reclaimable (status={row['status']})"}), 409
conn.execute(
"UPDATE jobs SET status='processing', worker_id=?, worker_version=?, "
"last_updated=?, started_at=COALESCE(started_at, ?) WHERE id=?",
(worker_id, worker_version, datetime.now(), datetime.now(), job_id))
conn.commit()
finally:
conn.close()
log_event("INFO", f"Job reclaimed by {worker_id} (resume path)", job_id)
return jsonify({"status": "ok"})
@app.route('/verify_source_hash', methods=['POST'])
@requires_worker_auth
def verify_source_hash():
"""Worker submits the MD5 hash it computed from the source file before encoding.
Server compares against the stored hash to detect wrong-file or cheating scenarios.
Returns {"status": "ok"} on match, {"status": "mismatch"} on mismatch,
or {"status": "pending"} when the server has not yet computed its hash."""
d = request.json or {}
job_id = str(d.get('job_id', '') or '').strip()
worker_id = sanitize_input(d.get('worker_id', ''))
worker_hash = str(d.get('source_hash', '')).strip().lower()