-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
433 lines (362 loc) · 15 KB
/
server.py
File metadata and controls
433 lines (362 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
from flask import Flask, request, jsonify
import sys
import os
from pathlib import Path
from flask_cors import CORS
import json
import base64
import threading
import random
import string
import thread_context
from upload import upload_jlc, upload_euro, upload_aisler
from server_packets import (
PCBArtifactRequest,
PCBArtifactResponse,
RoutingProgressRequest,
RoutingProgressResponse,
RoutingStartRequest,
RoutingStartResponse
)
from server_packets_panelize import (
PanelizeStartRequest,
PanelizeStartResponse,
PanelizeProgressRequest,
PanelizeProgressResponse
)
from run import run
from panelize import panelize
# Add the parent directory to the Python path if needed
sys.path.append(str(Path(__file__).parent))
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# NOTE: /app/storage is the persistent storage folder in the docker container
job_folder_base = Path("./storage/jobs")
# The user doesn't need to know about these sorts of errors, enough to throw them and check the logs
def validate_endpoint(data, endpoint: str):
if data.get("endpoint") != endpoint:
raise ValueError(f"Invalid endpoint: {data.get('endpoint')}. Expected: {endpoint}")
def generate_id(length=8): # Was using import uuid, but this is simpler
chars = string.ascii_letters + string.digits # A-Z, a-z, 0-9
return ''.join(random.choices(chars, k=length))
def read_job_issues(job_id: str) -> list:
issues_file = job_folder_base / job_id / "issues.json"
issues: list[str] = []
if not issues_file.exists():
return issues
try:
with open(issues_file, 'r') as file:
data = json.load(file)
if isinstance(data, dict):
loaded = data.get("issues", [])
issues = loaded if isinstance(loaded, list) else []
except Exception as e:
print(f"🔴 Error reading issues file {issues_file}: {e}")
return issues
# TODO: Implement these new endpoints properly
@app.route('/routingStart', methods=['POST'])
def routing_start():
data: RoutingStartRequest = request.get_json(force=True)
validate_endpoint(data, "routingStart")
# Job ID is generated by the backend (to ensure no folder name conflicts)
job_id = generate_id()
# Create a temporary working folder named after the job_id
job_folder = job_folder_base / Path(f"./{job_id}")
output_folder = job_folder / "output"
output_folder.mkdir(parents=True, exist_ok=True)
print(f"🔵 Created job folder: {job_folder}")
print(f"🔵 Created output folder: {output_folder}")
# Create a file to store the project data
project_file_path = output_folder / "project.MakeDevice"
with open(project_file_path, 'w') as file:
file.write(data["project"])
print(f"🔵 Project data saved to: {project_file_path}")
# The keepalive file is changed by routingProgress periodically to keep the job alive
keepalive_file = job_folder / "keepalive_time"
with open(keepalive_file, 'w') as file:
file.write("!") # Anything
print(f"🔵 Starting routing with job ID: {job_id}")
# TODO: Trigger routing in a different thread to allow responding to the client
# run() or whatever, and create a temp working folder for the job_id
threading.Thread(target=run, args=(job_id, job_folder)).start()
response: RoutingStartResponse = {
"endpoint": "routingStart",
"result": {
"jobId": job_id
}
}
return jsonify(response), 200
# TODO: Implement these new endpoints properly
#
# A nice to have would be an error if the server crashed since starting the routing,
# and so the job isn't running anymore. Could just add the jobId to a list when first
# calling run(), and if the jobId isn't in the list then the server has crashed/restarted
# since starting the job.
@app.route('/routingProgress', methods=['POST'])
def routing_progress():
data: RoutingProgressRequest = request.get_json(force=True)
validate_endpoint(data, "routingProgress")
job_id = data["jobId"]
# Check if the job_id folder exists
if not os.path.exists(job_folder_base / job_id):
response: RoutingProgressResponse = {
"endpoint": "routingProgress",
"error": {
"message": f"Job ID {job_id} does not exist.",
"failedModuleIds": [], #TODO: Make this field optional in front and back ends
}
}
return jsonify(response), 200
# Check if there's an output.zip file
zip_path = job_folder_base / job_id / "output.zip"
zip_ready = job_folder_base / job_id / "zip_ready.txt"
# A successful finished job must have both files
finished_success = os.path.exists(zip_path) and os.path.exists(zip_ready)
# TODO: Store some progress image(s) too
progress_file = job_folder_base / job_id / "progress.txt"
progress = 0.0
if os.path.exists(progress_file):
with open(progress_file, 'r') as file:
content = file.read().strip()
if content: # FIXED: This caused errors previously, now set to only convert if not empty
try:
progress = float(content)
except ValueError:
print(f"🔴 Warning: Invalid progress value in {progress_file}: '{content}'")
progress = 0.0
# The keepalive file is changed to keep the job alive
keepalive_file = job_folder_base / job_id / "keepalive_time"
with open(keepalive_file, 'w') as file:
file.write("!") # Anything
# Gather issues and check for routing failure
issue_list = read_job_issues(job_id)
# Check for routing failure
error_file = job_folder_base / job_id / "error.txt"
routing_failed = os.path.exists(error_file) or len(issue_list) > 0
error_message = ""
if routing_failed:
try:
with open(error_file, 'r') as file:
error_message = file.read().strip()
except:
if issue_list:
error_message = issue_list[0]
else:
error_message = "Unknown routing error occurred"
# Get latest routing images for this job
routing_imgs_folder = job_folder_base / job_id / "routing_imgs"
routing_image_front_b64 = None
routing_image_back_b64 = None
routing_image_b64 = None
def _read_b64(path: Path):
try:
with open(path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
except Exception as e:
print(f"🔴 Error reading routing image {path}: {e}")
return None
latest_image_path = None
if os.path.exists(routing_imgs_folder):
front_svg = routing_imgs_folder / "front.svg"
back_svg = routing_imgs_folder / "back.svg"
if front_svg.exists():
routing_image_front_b64 = _read_b64(front_svg)
if back_svg.exists():
routing_image_back_b64 = _read_b64(back_svg)
# Choose the newest available image for the generic routingImage field
candidates = [p for p in (front_svg, back_svg) if p.exists()]
if candidates:
latest_image_path = max(candidates, key=lambda p: p.stat().st_mtime)
routing_image_b64 = _read_b64(latest_image_path)
if not routing_image_front_b64 and not routing_image_back_b64:
latest_svgs = sorted(
list(routing_imgs_folder.glob("*.svg")),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if latest_svgs:
latest_image_path = latest_svgs[0]
routing_image_b64 = _read_b64(latest_image_path)
# Avoid stale UI state by always returning explicit image values
routing_image_b64 = routing_image_b64 or ""
routing_image_front_b64 = routing_image_front_b64 or ""
routing_image_back_b64 = routing_image_back_b64 or ""
if routing_failed:
if error_message and error_message not in issue_list:
issue_list.append(error_message)
response: RoutingProgressResponse = {
"endpoint": "routingProgress",
"issues": issue_list,
"result": {
"progress": progress,
"completed": False,
"routingImage": routing_image_b64,
"routingImageFront": routing_image_front_b64,
"routingImageBack": routing_image_back_b64,
},
"error": {
"message": error_message,
"failedModuleIds": [], # TODO: Implement
"succeededModuleIds": [], # TODO: Implement
}
}
return jsonify(response), 200
else:
response: RoutingProgressResponse = {
"endpoint": "routingProgress",
"result": {
"progress": progress,
"completed": finished_success,
"routingImage": routing_image_b64,
"routingImageFront": routing_image_front_b64,
"routingImageBack": routing_image_back_b64,
# TODO: Implement bus width left and right
}
}
if finished_success:
response["issues"] = issue_list
return jsonify(response), 200
# TODO: Split the artifact download and the fabrication house upload into two endpoints
@app.route('/pcbArtifact', methods=['POST'])
def pcb_artifact():
data: PCBArtifactRequest = request.get_json(force=True)
validate_endpoint(data, "pcbArtifact")
job_id = data.get("jobId")
# Check if the job_id folder exists
if not os.path.exists(job_folder_base / job_id):
response: PCBArtifactResponse = {
"endpoint": "pcbArtifact",
"error": {
"message": f"Job ID {job_id} does not exist.",
}
}
return jsonify(response), 200
job_folder = job_folder_base / job_id
zip_path = job_folder / "output.zip"
# Check if there's an output.zip file
zip_ready = os.path.exists(zip_path) and os.path.exists(job_folder / "zip_ready.txt")
if not zip_ready:
response: PCBArtifactResponse = {
"endpoint": "pcbArtifact",
"error": {
"message": "Artifact zip file is not yet ready.",
}
}
return jsonify(response), 200
# Zip present in job folder...
url = None
should_upload = data.get("uploadToFabHouse", False)
if should_upload:
try:
# Get the fabrication house used when routing the job
with open(job_folder_base / job_id / "output/project.MakeDevice", 'r') as data_file:
data_json = json.load(data_file)
fabrication_house = data_json.get("pcbOptions", {}).get("fabricationHouse", "")
print("🟢 Uploading files to fabrication service")
if fabrication_house == "JLCPCB":
url = upload_jlc(zip_path)
elif fabrication_house == "Eurocircuits":
url = upload_euro(zip_path)
elif fabrication_house == "Aisler":
url = upload_aisler(zip_path)
else:
print(f"🔴 Unsupported fabrication house for upload: {fabrication_house}")
except Exception as e:
print(f"🔴 Exception during fabrication house upload: {e}")
# Base64 encode zip
with open(job_folder_base / job_id / "output.zip", 'rb') as zip_file:
zip_data = zip_file.read()
encoded_zip = base64.b64encode(zip_data).decode('utf-8')
# Form the response
response: PCBArtifactResponse = {
"endpoint": "pcbArtifact",
}
if url:
response["result"] = {
"zipFile": encoded_zip,
"fabricationUrl": url
}
else:
response["result"] = {
"zipFile": encoded_zip,
}
# NOTE: Still use status code 200 for application-level errors, since we want the custom error message
return jsonify(response), 200
########################
# SmartPanelizer Endpoints
########################
@app.route('/panelizeStart', methods=['POST'])
def panelize_start():
data: PanelizeStartRequest = request.get_json(force=True)
validate_endpoint(data, "panelizeStart")
# Job ID is generated by the backend (to ensure no folder name conflicts)
job_id = generate_id()
# Create a temporary working folder named after the job_id
job_folder = job_folder_base / Path(f"./{job_id}")
output_folder = job_folder / "output"
output_folder.mkdir(parents=True, exist_ok=True)
print(f"🔵 Created job folder: {job_folder}")
print(f"🔵 Created output folder: {output_folder}")
print(f"🔵 Starting panelize with job ID: {job_id}")
# TODO: Trigger panelize in a different thread to allow responding to the client
# run() or whatever, and create a temp working folder for the job_id
threading.Thread(target=panelize, args=(job_id, job_folder, data)).start()
response: PanelizeStartResponse = {
"endpoint": "panelizeStart",
"result": {
"jobId": job_id
}
}
return jsonify(response), 200
@app.route('/panelizeProgress', methods=['POST'])
def panelize_progress():
data: PanelizeProgressRequest = request.get_json(force=True)
validate_endpoint(data, "panelizeProgress")
job_id = data["jobId"]
# Check if the job_id folder exists
if not os.path.exists(job_folder_base / job_id):
response: PanelizeProgressResponse = {
"endpoint": "panelizeProgress",
"error": {
"message": f"Job ID {job_id} does not exist.",
}
}
return jsonify(response), 200
# Check if there's an output.zip file
zip_path = job_folder_base / job_id / "output.zip"
finished_success = os.path.exists(zip_path)
progress_file = job_folder_base / job_id / "progress.txt"
progress = 0.0
if os.path.exists(progress_file):
with open(progress_file, 'r') as file:
content = file.read().strip()
if content:
try:
progress = float(content)
except ValueError:
print(f"🔴 Warning: Invalid progress value in {progress_file}: '{content}'")
progress = 0.0
error_message = thread_context.error_message
if error_message:
response: PanelizeProgressResponse = {
"endpoint": "panelizeProgress",
"error": {
"message": error_message,
}
}
return jsonify(response), 200
else:
response: PanelizeProgressResponse = {
"endpoint": "panelizeProgress",
"result": {
"progress": progress,
"completed": finished_success,
}
}
return jsonify(response), 200
# Setup server
if __name__ == '__main__':
# Default to port 5000 or use environment variable if specified
port = int(os.environ.get("PORT", 3333))
# Run the Flask app, enabling debug mode for development
app.run(host='0.0.0.0', port=port, debug=True)