-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
73 lines (62 loc) · 2.33 KB
/
server.py
File metadata and controls
73 lines (62 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from fastapi import FastAPI, UploadFile, Response
from PIL import Image
import coremltools as ct
import numpy as np
import io
app = FastAPI()
model = ct.models.MLModel("./coreml-depth-anything-v2-small/DepthAnythingV2SmallF32P8.mlpackage")
def depth_array_to_png_bytes(depth_array: np.ndarray) -> bytes:
depth_norm = (depth_array - depth_array.min()) / np.ptp(depth_array)
depth_uint8 = (depth_norm * 255).astype(np.uint8)
depth_image = Image.fromarray(depth_uint8, mode='L')
buf = io.BytesIO()
depth_image.save(buf, format='PNG')
return buf.getvalue()
@app.post("/predict/")
async def predict(file: UploadFile):
# spec = model.get_spec()
# for inp in spec.description.input:
# if inp.type.WhichOneof("Type") == "imageType":
# print(f"Input name: {inp.name}, size: {inp.type.imageType.width} x {inp.type.imageType.height}")
image = Image.open(file.file).convert("RGB").resize((518, 392))
input_data = {"image": image}
output = model.predict(input_data)
depth_image = output['depth']
depth_array = np.array(depth_image)
png_bytes = depth_array_to_png_bytes(depth_array)
return Response(content=png_bytes, media_type="image/png")
def depth_array_to_ply(depth, fx, fy, cx, cy):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h))
X = (i - cx) * depth / fx
Y = (j - cy) * depth / fy
Z = depth
points = np.stack([X, Y, Z], axis=-1).reshape(-1, 3)
# Create PLY header
ply_header = f"""ply
format ascii 1.0
element vertex {points.shape[0]}
property float x
property float y
property float z
end_header
"""
ply_body = "\n".join(f"{x} {y} {z}" for x, y, z in points)
return (ply_header + ply_body).encode("utf-8")
@app.post("/pointcloud/")
async def pointcloud(file: UploadFile):
image = Image.open(file.file).convert("RGB").resize((518, 392))
input_data = {"image": image}
output = model.predict(input_data)
depth_image = output['depth']
depth_array = np.array(depth_image)
# Rough camera intrinsics guess
h, w = depth_array.shape
fx = fy = max(h, w)
cx, cy = w / 2, h / 2
ply_bytes = depth_array_to_ply(depth_array, fx, fy, cx, cy)
return Response(
content=ply_bytes,
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=pointcloud.ply"}
)