Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d3bec3e
fix: use process_spatial_point for all spatial coordinate processing
fcollman Mar 4, 2026
06dfb2f
fix: use None instead of empty string for missing schema columns
fcollman Mar 4, 2026
9bbd4d4
fix: preserve missing values instead of silently filling with 0/False…
fcollman Mar 4, 2026
22359a1
fix: add id_counter_start parameter to SchemaProcessor to support ret…
fcollman Mar 4, 2026
3f89b07
fix: wire up id_counter_start through process_csv task
fcollman Mar 4, 2026
8228ca5
fix: prevent pg_dump/psql deadlock on subprocess failure
fcollman Mar 4, 2026
e1c8301
fix: chain exception in GCSCsvProcessor to preserve original traceback
fcollman Mar 4, 2026
5f77975
fix: wrap entire generate_presigned_url body in try/except
fcollman Mar 4, 2026
07d6234
fix: surface server error message in upload URL failure UI
fcollman Mar 4, 2026
edf031f
improve error messaging
fcollman Mar 5, 2026
4cd25af
fix bug in data chunks strategy
fcollman Mar 5, 2026
0b9d0aa
fixing status update and spatial config saving
fcollman Mar 5, 2026
f4dfb17
fixing retry exception handling
fcollman Mar 5, 2026
2e0b33f
improving robustness of transfer
fcollman Mar 5, 2026
b973a03
fixing table check bug
fcollman Mar 5, 2026
650c81c
fixing retries
fcollman Mar 5, 2026
6ac8900
add postgres-sqlclient to dockerfile
fcollman Mar 5, 2026
9d31e72
fix: recover chunks stuck in PROCESSING_SUBTASKS after worker restarts
fcollman Mar 6, 2026
7463f78
improve error handling
fcollman Mar 6, 2026
1ded706
fixing dockerfile for postgres-18
fcollman Mar 6, 2026
fb9a076
fixing segmentation metadata update call
fcollman Mar 6, 2026
3d76fe8
fixing bug in segmentation table creation
fcollman Mar 6, 2026
7cbfec0
adding UI updates and fixing metadata creation
fcollman Mar 6, 2026
27d9bfd
adding validation to prevent duplicate table names
fcollman Mar 6, 2026
6dfe45b
adding post-transfer UI updates
fcollman Mar 6, 2026
6d37f64
fixing last updated metadata updates
fcollman Mar 6, 2026
76bede3
improve error reporting in initial UI and changing defaults
fcollman Mar 6, 2026
616c10b
improving recovery from partially done chunks
fcollman Mar 6, 2026
1d8fac2
adding staging overwrite flow
fcollman Mar 6, 2026
f574edc
fixing step4
fcollman Mar 6, 2026
4425b90
fixing overwrite flow
fcollman Mar 6, 2026
e1580a6
more staging overwrite fixes
fcollman Mar 6, 2026
39b28cd
improving polling status message robustness.
fcollman Mar 7, 2026
07df43c
improve robustness of sql connection errors
fcollman Mar 7, 2026
173a290
adding connection pooling to help throughput for supervoxel lookup
fcollman Mar 7, 2026
df49b55
attemping to improve workflow robustness
fcollman Mar 8, 2026
44f2bd1
fix missing import
fcollman Mar 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
FROM tiangolo/uwsgi-nginx-flask:python3.12 AS builder
RUN apt-get update && apt-get install -y gcc
RUN apt-get update && apt-get install -y gcc curl ca-certificates gnupg lsb-release \
&& install -d /usr/share/postgresql-common/pgdg \
&& curl -fsSL https://www.postgresql.org/media/keys/ACCC4CF8.asc \
-o /usr/share/postgresql-common/pgdg/apt.postgresql.org.asc \
&& sh -c 'echo "deb [signed-by=/usr/share/postgresql-common/pgdg/apt.postgresql.org.asc] \
https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" \
> /etc/apt/sources.list.d/pgdg.list' \
&& apt-get update \
&& apt-get install -y postgresql-client-18
RUN pip install uv
# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1
Expand Down
126 changes: 102 additions & 24 deletions materializationengine/blueprints/upload/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
get_job_status,
process_and_upload,
)
from materializationengine.database import db_manager
from materializationengine.database import db_manager, dynamic_annotation_cache
from materializationengine.info_client import get_datastack_info, get_datastacks
from materializationengine.utils import get_config_param
from materializationengine import __version__
Expand Down Expand Up @@ -297,25 +297,29 @@ def create_storage_service():
@upload_bp.route("/generate-presigned-url/<string:datastack_name>", methods=["POST"])
@auth_requires_permission("edit", table_arg="datastack_name")
def generate_presigned_url(datastack_name: str):
data = request.json
filename = data["filename"]
content_type = data["contentType"]
bucket_name = current_app.config.get("MATERIALIZATION_UPLOAD_BUCKET_PATH")
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(filename)
origin = request.headers.get("Origin") or current_app.config.get(
"LOCAL_SERVER_URL", "http://localhost:5000"
)

try:
data = request.json
if not data:
return jsonify({"status": "error", "message": "Request body must be JSON"}), 400
filename = data["filename"]
content_type = data["contentType"]
bucket_name = current_app.config.get("MATERIALIZATION_UPLOAD_BUCKET_PATH")
if not bucket_name:
return jsonify({"status": "error", "message": "Upload bucket is not configured"}), 500
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(filename)
origin = request.headers.get("Origin") or current_app.config.get(
"LOCAL_SERVER_URL", "http://localhost:5000"
)
resumable_url = blob.create_resumable_upload_session(
content_type=content_type,
origin=origin, # Allow cross-origin requests for uploads
timeout=3600, # Set the session timeout to 1 hour
origin=origin,
timeout=3600,
)

return jsonify({"resumableUrl": resumable_url, "origin": origin})
except KeyError as e:
return jsonify({"status": "error", "message": f"Missing required field: {e}"}), 400
except google_exceptions.Forbidden as e:
current_app.logger.error(
f"GCS Forbidden error generating presigned URL: {str(e)}"
Expand Down Expand Up @@ -552,6 +556,73 @@ def save_metadata():
400,
)

# Check whether the table name already exists in production before
# allowing the user to proceed with the upload.
table_name = data["table_name"]
datastack_name = data["datastack_name"]
force_overwrite = data.get("force_overwrite", False)
try:
datastack_info = get_datastack_info(datastack_name)
production_db_name = datastack_info["aligned_volume"]["name"]
production_db_client = dynamic_annotation_cache.get_db(production_db_name)
existing_meta = production_db_client.database.get_table_metadata(table_name)
if existing_meta:
return (
jsonify(
{
"status": "error",
"message": f"Table '{table_name}' already exists in production. "
f"Choose a different name.",
}
),
409,
)
except Exception as check_err:
current_app.logger.warning(
f"Could not check production for existing table '{table_name}': {check_err}"
)

# Check whether the table already exists in staging (from a previous failed run).
# If so and the user has not explicitly requested an overwrite, surface the conflict
# so the frontend can ask for confirmation before wiping any existing staging data.
if not force_overwrite:
try:
staging_db_name = current_app.config.get("STAGING_DATABASE_NAME")
if staging_db_name:
staging_db_client = dynamic_annotation_cache.get_db(staging_db_name)
staging_meta = staging_db_client.database.get_table_metadata(table_name)
if staging_meta:
# Count existing rows so the UI can show a meaningful message.
staging_engine = db_manager.get_engine(staging_db_name)
try:
from sqlalchemy import text as _text
with staging_engine.connect() as conn:
row_count = conn.execute(
_text(f"SELECT COUNT(*) FROM {table_name}")
).scalar()
except Exception:
row_count = None
return (
jsonify(
{
"status": "staging_conflict",
"staging_exists": True,
"row_count": row_count,
"message": (
f"Table '{table_name}' already exists in staging"
+ (f" with {row_count:,} rows" if row_count is not None else "")
+ " from a previous run. "
+ "Do you want to clear it and restart the upload?"
),
}
),
409,
)
except Exception as check_err:
current_app.logger.warning(
f"Could not check staging for existing table '{table_name}': {check_err}"
)

success, result = storage.save_metadata(
filename=data["table_name"], metadata=data
)
Expand Down Expand Up @@ -671,19 +742,26 @@ def start_csv_processing():
@auth_required
def check_processing_status(job_id):
"""Get processing job status"""
status = get_job_status(job_id)
if not status:
return jsonify({"status": "error", "message": "Job not found"}), 404
try:
status = get_job_status(job_id)
if not status:
return jsonify({"status": "error", "message": "Job not found"}), 404

if _check_authorization(status):
return jsonify({"status": "error", "message": "Forbidden"}), 403
if _check_authorization(status):
return jsonify({"status": "error", "message": "Forbidden"}), 403

_set_item_type(status)
_set_item_type(status)

if status.get("active_workflow_part") == "spatial_lookup":
_handle_spatial_lookup(status, job_id)
if status.get("active_workflow_part") == "spatial_lookup":
_handle_spatial_lookup(status, job_id)

return jsonify(status)
return jsonify(status)
except Exception as e:
current_app.logger.error(
f"Unexpected error in check_processing_status for job {job_id}: {e}",
exc_info=True,
)
return jsonify({"status": "error", "message": "Internal error fetching job status"}), 500


def _check_authorization(status):
Expand Down
Loading