From 7196c60690299bb3809877e30b13d9f996ac185e Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Wed, 11 Mar 2026 07:46:58 -0700 Subject: [PATCH 1/8] adding kvdb_client --- materializationengine/kvdb_gateway.py | 21 +++++++++++++++++++ .../workflows/spatial_lookup.py | 15 +++++++------ pyproject.toml | 1 + 3 files changed, 31 insertions(+), 6 deletions(-) create mode 100644 materializationengine/kvdb_gateway.py diff --git a/materializationengine/kvdb_gateway.py b/materializationengine/kvdb_gateway.py new file mode 100644 index 00000000..d8b292a8 --- /dev/null +++ b/materializationengine/kvdb_gateway.py @@ -0,0 +1,21 @@ +import os + +from kvdbclient.bigtable import BigTableConfig +from kvdbclient.bigtable.client import Client + + +class KVDBGateway: + def __init__(self, project: str, instance: str): + self._clients = {} + self._config = BigTableConfig(PROJECT=project, INSTANCE=instance, ADMIN=False) + + def get_client(self, table_id: str) -> Client: + if table_id not in self._clients: + self._clients[table_id] = Client(table_id=table_id, config=self._config) + return self._clients[table_id] + + +kvdb_cache = KVDBGateway( + project=os.environ.get("BIGTABLE_PROJECT", ""), + instance=os.environ.get("BIGTABLE_INSTANCE", "pychunkedgraph"), +) diff --git a/materializationengine/workflows/spatial_lookup.py b/materializationengine/workflows/spatial_lookup.py index 10fd8c80..21602401 100644 --- a/materializationengine/workflows/spatial_lookup.py +++ b/materializationengine/workflows/spatial_lookup.py @@ -32,7 +32,7 @@ RedisCheckpointManager, ) from materializationengine.celery_init import celery -from materializationengine.chunkedgraph_gateway import chunkedgraph_cache +from materializationengine.kvdb_gateway import kvdb_cache from materializationengine.cloudvolume_gateway import cloudvolume_cache from materializationengine.database import db_manager, dynamic_annotation_cache from materializationengine.index_manager import index_cache @@ -55,7 +55,6 @@ ) from materializationengine.workflows.ingest_new_annotations import ( create_missing_segmentation_table, - get_root_ids, ) Base = declarative_base() @@ -1125,7 +1124,7 @@ def get_root_ids_from_supervoxels( """ start_time = time.time() - pcg_table_name = mat_metadata.get("pcg_table_name") + pcg_table_name: str = mat_metadata["pcg_table_name"] database = mat_metadata.get("database") try: @@ -1159,6 +1158,7 @@ def get_root_ids_from_supervoxels( col for col in supervoxel_df.columns if col.endswith("supervoxel_id") ] + root_id_col_names = [ col.replace("supervoxel_id", "root_id") for col in supervoxel_col_names ] @@ -1193,7 +1193,7 @@ def get_root_ids_from_supervoxels( if existing_value and existing_value > 0: root_ids_df.at[idx, root_col] = existing_value - cg_client = chunkedgraph_cache.init_pcg(pcg_table_name) + cg_client = kvdb_cache.get_client(pcg_table_name) for sv_col in supervoxel_col_names: root_col = sv_col.replace("supervoxel_id", "root_id") @@ -1212,8 +1212,11 @@ def get_root_ids_from_supervoxels( if not supervoxels_to_lookup.empty: try: - root_ids = get_root_ids( - cg_client, supervoxels_to_lookup, materialization_time_stamp + root_ids = np.squeeze( + cg_client.root_ext.get_roots( + supervoxels_to_lookup.to_numpy(), + time_stamp=materialization_time_stamp, + ) ) root_ids_df.loc[sv_mask, root_col] = root_ids diff --git a/pyproject.toml b/pyproject.toml index c3c52cd3..ee5f10fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ dependencies = [ "Flask-Limiter[redis]", "cryptography>=44.0.2", "uwsgi>=2.0.30", + "kvdbclient[extensions]", ] authors = [ { name = "Forrest Collman", email = "forrestc@alleninstitute.org" }, From 2770b594f45731cdaa11e7090def16e1c0b450df Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Wed, 11 Mar 2026 12:09:45 -0700 Subject: [PATCH 2/8] reroute lookups to process queue --- materializationengine/workflows/spatial_lookup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/materializationengine/workflows/spatial_lookup.py b/materializationengine/workflows/spatial_lookup.py index 21602401..e3c1e032 100644 --- a/materializationengine/workflows/spatial_lookup.py +++ b/materializationengine/workflows/spatial_lookup.py @@ -1414,7 +1414,7 @@ def insert_segmentation_data( @celery.task( - name="workflow:process_and_insert_sub_batch", + name="process:process_and_insert_sub_batch", bind=True, acks_late=True, autoretry_for=(OperationalError, DisconnectionError, ChunkDataValidationError), From d86cb882d1b59f9c8d36afe7918baa1a66251797 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Wed, 11 Mar 2026 12:30:06 -0700 Subject: [PATCH 3/8] bumping uv.lock dependancies --- uv.lock | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/uv.lock b/uv.lock index ce808cbe..873d33a7 100644 --- a/uv.lock +++ b/uv.lock @@ -1055,6 +1055,12 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/14/4b/ead00905132820b623732b175d66354e9d3e69fcf2a5dcdab780664e7896/google_api_core-2.25.1-py3-none-any.whl", hash = "sha256:8a2a56c1fef82987a524371f99f3bd0143702fecc670c72e600c1cda6bf8dbb7", size = 160807, upload-time = "2025-06-12T20:52:19.334Z" }, ] +[package.optional-dependencies] +grpc = [ + { name = "grpcio" }, + { name = "grpcio-status" }, +] + [[package]] name = "google-apitools" version = "0.5.32" @@ -1097,6 +1103,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ac/84/40ee070be95771acd2f4418981edb834979424565c3eec3cd88b6aa09d24/google_auth_oauthlib-1.2.2-py3-none-any.whl", hash = "sha256:fd619506f4b3908b5df17b65f39ca8d66ea56986e5472eb5978fd8f3786f00a2", size = 19072, upload-time = "2025-04-22T16:40:28.174Z" }, ] +[[package]] +name = "google-cloud-bigtable" +version = "2.35.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core", extra = ["grpc"] }, + { name = "google-auth" }, + { name = "google-cloud-core" }, + { name = "google-crc32c" }, + { name = "grpc-google-iam-v1" }, + { name = "proto-plus" }, + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/57/c9/aceae21411b1a77fb4d3cde6e6f461321ee33c65fb8dc53480d4e47e1a55/google_cloud_bigtable-2.35.0.tar.gz", hash = "sha256:f5699012c5fea4bd4bdf7e80e5e3a812a847eb8f41bf8dc2f43095d6d876b83b", size = 775613, upload-time = "2025-12-17T15:18:14.303Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/62/69/03eed134d71f6117ffd9efac2d1033bb2fa2522e9e82545a0828061d32f4/google_cloud_bigtable-2.35.0-py3-none-any.whl", hash = "sha256:f355bfce1f239453ec2bb3839b0f4f9937cf34ef06ef29e1ca63d58fd38d0c50", size = 540341, upload-time = "2025-12-17T15:18:12.176Z" }, +] + [[package]] name = "google-cloud-core" version = "2.4.3" @@ -1164,6 +1188,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/86/f1/62a193f0227cf15a920390abe675f386dec35f7ae3ffe6da582d3ade42c7/googleapis_common_protos-1.70.0-py3-none-any.whl", hash = "sha256:b8bfcca8c25a2bb253e0e0b0adaf8c00773e5e6af6fd92397576680b807e0fd8", size = 294530, upload-time = "2025-04-14T10:17:01.271Z" }, ] +[package.optional-dependencies] +grpc = [ + { name = "grpcio" }, +] + [[package]] name = "greenlet" version = "3.2.3" @@ -1193,6 +1222,55 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/58/c6/5c20af38c2a57c15d87f7f38bee77d63c1d2a3689f74fefaf35915dd12b2/griffe-1.7.3-py3-none-any.whl", hash = "sha256:c6b3ee30c2f0f17f30bcdef5068d6ab7a2a4f1b8bf1a3e74b56fffd21e1c5f75", size = 129303, upload-time = "2025-04-23T11:29:07.145Z" }, ] +[[package]] +name = "grpc-google-iam-v1" +version = "0.14.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "googleapis-common-protos", extra = ["grpc"] }, + { name = "grpcio" }, + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/76/1e/1011451679a983f2f5c6771a1682542ecb027776762ad031fd0d7129164b/grpc_google_iam_v1-0.14.3.tar.gz", hash = "sha256:879ac4ef33136c5491a6300e27575a9ec760f6cdf9a2518798c1b8977a5dc389", size = 23745, upload-time = "2025-10-15T21:14:53.318Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4a/bd/330a1bbdb1afe0b96311249e699b6dc9cfc17916394fd4503ac5aca2514b/grpc_google_iam_v1-0.14.3-py3-none-any.whl", hash = "sha256:7a7f697e017a067206a3dfef44e4c634a34d3dee135fe7d7a4613fe3e59217e6", size = 32690, upload-time = "2025-10-15T21:14:51.72Z" }, +] + +[[package]] +name = "grpcio" +version = "1.78.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/8a/3d098f35c143a89520e568e6539cc098fcd294495910e359889ce8741c84/grpcio-1.78.0.tar.gz", hash = "sha256:7382b95189546f375c174f53a5fa873cef91c4b8005faa05cc5b3beea9c4f1c5", size = 12852416, upload-time = "2026-02-06T09:57:18.093Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4e/f4/7384ed0178203d6074446b3c4f46c90a22ddf7ae0b3aee521627f54cfc2a/grpcio-1.78.0-cp312-cp312-linux_armv7l.whl", hash = "sha256:f9ab915a267fc47c7e88c387a3a28325b58c898e23d4995f765728f4e3dedb97", size = 5913985, upload-time = "2026-02-06T09:55:26.832Z" }, + { url = "https://files.pythonhosted.org/packages/81/ed/be1caa25f06594463f685b3790b320f18aea49b33166f4141bfdc2bfb236/grpcio-1.78.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:3f8904a8165ab21e07e58bf3e30a73f4dffc7a1e0dbc32d51c61b5360d26f43e", size = 11811853, upload-time = "2026-02-06T09:55:29.224Z" }, + { url = "https://files.pythonhosted.org/packages/24/a7/f06d151afc4e64b7e3cc3e872d331d011c279aaab02831e40a81c691fb65/grpcio-1.78.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:859b13906ce098c0b493af92142ad051bf64c7870fa58a123911c88606714996", size = 6475766, upload-time = "2026-02-06T09:55:31.825Z" }, + { url = "https://files.pythonhosted.org/packages/8a/a8/4482922da832ec0082d0f2cc3a10976d84a7424707f25780b82814aafc0a/grpcio-1.78.0-cp312-cp312-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:b2342d87af32790f934a79c3112641e7b27d63c261b8b4395350dad43eff1dc7", size = 7170027, upload-time = "2026-02-06T09:55:34.7Z" }, + { url = "https://files.pythonhosted.org/packages/54/bf/f4a3b9693e35d25b24b0b39fa46d7d8a3c439e0a3036c3451764678fec20/grpcio-1.78.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:12a771591ae40bc65ba67048fa52ef4f0e6db8279e595fd349f9dfddeef571f9", size = 6690766, upload-time = "2026-02-06T09:55:36.902Z" }, + { url = "https://files.pythonhosted.org/packages/c7/b9/521875265cc99fe5ad4c5a17010018085cae2810a928bf15ebe7d8bcd9cc/grpcio-1.78.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:185dea0d5260cbb2d224c507bf2a5444d5abbb1fa3594c1ed7e4c709d5eb8383", size = 7266161, upload-time = "2026-02-06T09:55:39.824Z" }, + { url = "https://files.pythonhosted.org/packages/05/86/296a82844fd40a4ad4a95f100b55044b4f817dece732bf686aea1a284147/grpcio-1.78.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:51b13f9aed9d59ee389ad666b8c2214cc87b5de258fa712f9ab05f922e3896c6", size = 8253303, upload-time = "2026-02-06T09:55:42.353Z" }, + { url = "https://files.pythonhosted.org/packages/f3/e4/ea3c0caf5468537f27ad5aab92b681ed7cc0ef5f8c9196d3fd42c8c2286b/grpcio-1.78.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:fd5f135b1bd58ab088930b3c613455796dfa0393626a6972663ccdda5b4ac6ce", size = 7698222, upload-time = "2026-02-06T09:55:44.629Z" }, + { url = "https://files.pythonhosted.org/packages/d7/47/7f05f81e4bb6b831e93271fb12fd52ba7b319b5402cbc101d588f435df00/grpcio-1.78.0-cp312-cp312-win32.whl", hash = "sha256:94309f498bcc07e5a7d16089ab984d42ad96af1d94b5a4eb966a266d9fcabf68", size = 4066123, upload-time = "2026-02-06T09:55:47.644Z" }, + { url = "https://files.pythonhosted.org/packages/ad/e7/d6914822c88aa2974dbbd10903d801a28a19ce9cd8bad7e694cbbcf61528/grpcio-1.78.0-cp312-cp312-win_amd64.whl", hash = "sha256:9566fe4ababbb2610c39190791e5b829869351d14369603702e890ef3ad2d06e", size = 4797657, upload-time = "2026-02-06T09:55:49.86Z" }, +] + +[[package]] +name = "grpcio-status" +version = "1.78.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "googleapis-common-protos" }, + { name = "grpcio" }, + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8a/cd/89ce482a931b543b92cdd9b2888805518c4620e0094409acb8c81dd4610a/grpcio_status-1.78.0.tar.gz", hash = "sha256:a34cfd28101bfea84b5aa0f936b4b423019e9213882907166af6b3bddc59e189", size = 13808, upload-time = "2026-02-06T10:01:48.034Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/83/8a/1241ec22c41028bddd4a052ae9369267b4475265ad0ce7140974548dc3fa/grpcio_status-1.78.0-py3-none-any.whl", hash = "sha256:b492b693d4bf27b47a6c32590701724f1d3b9444b36491878fb71f6208857f34", size = 14523, upload-time = "2026-02-06T10:01:32.584Z" }, +] + [[package]] name = "h11" version = "0.16.0" @@ -1446,6 +1524,28 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ef/70/a07dcf4f62598c8ad579df241af55ced65bed76e42e45d3c368a6d82dbc1/kombu-5.5.4-py3-none-any.whl", hash = "sha256:a12ed0557c238897d8e518f1d1fdf84bd1516c5e305af2dacd85c2015115feb8", size = 210034, upload-time = "2025-06-01T10:19:20.436Z" }, ] +[[package]] +name = "kvdbclient" +version = "0.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-cloud-bigtable" }, + { name = "numpy" }, + { name = "pytz" }, + { name = "requests" }, + { name = "tenacity" }, + { name = "zstandard" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7d/9d/937d171c14f17174d5b83a0b19cbb81855180b909c708424f2e978facc9f/kvdbclient-0.5.0.tar.gz", hash = "sha256:d4de51d6a3302f4d1d9d47382374e643a2f6cf01de998cbe9debebd17057933d", size = 36709, upload-time = "2026-03-10T03:30:10.815Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f1/44/9f92a144660555bb4800335922f064d36c65eac3da563bbfed5241bff009/kvdbclient-0.5.0-py3-none-any.whl", hash = "sha256:b75f2d55df591cf48ab1e7d12b23297b53f38bfe36d81f73a5a52ae1b2be151a", size = 29772, upload-time = "2026-03-10T03:30:09.592Z" }, +] + +[package.optional-dependencies] +extensions = [ + { name = "cloud-volume" }, +] + [[package]] name = "limits" version = "5.4.0" @@ -1574,6 +1674,7 @@ dependencies = [ { name = "geoalchemy2" }, { name = "gevent" }, { name = "jsonschema" }, + { name = "kvdbclient", extra = ["extensions"] }, { name = "marshmallow-sqlalchemy" }, { name = "middle-auth-client" }, { name = "multiwrapper" }, @@ -1634,6 +1735,7 @@ requires-dist = [ { name = "geoalchemy2", specifier = ">=0.9.2" }, { name = "gevent" }, { name = "jsonschema" }, + { name = "kvdbclient", extras = ["extensions"] }, { name = "marshmallow-sqlalchemy" }, { name = "middle-auth-client", specifier = ">=3.19.0" }, { name = "multiwrapper" }, From 0f5c8c3c7d252d08a8345dcaa353828688e87a8b Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Wed, 11 Mar 2026 12:30:15 -0700 Subject: [PATCH 4/8] improve chunk recovery latency --- .../workflows/spatial_lookup.py | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/materializationengine/workflows/spatial_lookup.py b/materializationengine/workflows/spatial_lookup.py index e3c1e032..b6376dd3 100644 --- a/materializationengine/workflows/spatial_lookup.py +++ b/materializationengine/workflows/spatial_lookup.py @@ -494,6 +494,22 @@ def process_table_in_chunks( full_chain.apply_async() return f"No chunks to process for {annotation_table_name}. Finalizing." + # Recover stale chunks on EVERY conductor invocation so that preempted + # sub-batch workers are detected within 600s regardless of whether there + # are still pending chunks in the queue to dispatch. + recovered_subtasks = checkpoint_manager.recover_stale_processing_subtasks( + workflow_name, stale_threshold_seconds=600 + ) + recovered_processing = checkpoint_manager.recover_stale_processing_chunks( + workflow_name, stale_threshold_seconds=600 + ) + recovered = recovered_subtasks + recovered_processing + if recovered: + celery_logger.info( + f"Recovered {recovered} stale chunk(s) for {workflow_name} " + f"({recovered_subtasks} PROCESSING_SUBTASKS, {recovered_processing} PROCESSING)." + ) + chunk_indices_to_process, new_failed_cursor, new_pending_cursor = ( checkpoint_manager.get_chunks_to_process( table_name=workflow_name, @@ -561,27 +577,10 @@ def process_table_in_chunks( full_chain.apply_async() return f"All chunks processed for {annotation_table_name}. Finalizing." else: - # Before sleeping, check whether any chunks stuck in PROCESSING or - # PROCESSING_SUBTASKS state have been there long enough to be treated - # as lost (pod killed, broker blip, etc.) and should be retried. - recovered_subtasks = checkpoint_manager.recover_stale_processing_subtasks( - workflow_name, stale_threshold_seconds=600 - ) - recovered_processing = checkpoint_manager.recover_stale_processing_chunks( - workflow_name, stale_threshold_seconds=600 - ) - recovered = recovered_subtasks + recovered_processing - if recovered: - celery_logger.info( - f"Recovered {recovered} stale chunk(s) for {workflow_name} " - f"({recovered_subtasks} PROCESSING_SUBTASKS, {recovered_processing} PROCESSING). " - f"Retrying dispatcher immediately to re-dispatch them." - ) - raise self.retry(countdown=0) celery_logger.info( f"No chunks returned by get_chunks_to_process for {workflow_name}, but scan may not be exhausted or non-terminal chunks exist. Retrying dispatcher." ) - raise self.retry(countdown=30) + raise self.retry(countdown=30 if not recovered else 0) processing_tasks = [] for chunk_idx_to_process in chunk_indices_to_process: From 5a090781f5df5ce0b2cb28117d36ccc2f06b691d Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Thu, 12 Mar 2026 06:33:46 -0700 Subject: [PATCH 5/8] reducing parallelism and trying gs interface for cloudvolume --- materializationengine/cloudvolume_gateway.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/materializationengine/cloudvolume_gateway.py b/materializationengine/cloudvolume_gateway.py index 025a0d2f..cf683e73 100644 --- a/materializationengine/cloudvolume_gateway.py +++ b/materializationengine/cloudvolume_gateway.py @@ -2,9 +2,9 @@ import os # Number of parallel threads CloudVolume uses internally for fetching data. -# Matches the GCS connection pool size so all threads can make concurrent requests -# without exhausting the pool. Tune with CLOUDVOLUME_PARALLEL env var. -_CV_PARALLEL = int(os.environ.get("CLOUDVOLUME_PARALLEL", "32")) +# Default of 10 matches the urllib3 pool_maxsize (10) used by cloud-files/google-cloud-storage, +# avoiding connection pool overflow. Override with CLOUDVOLUME_PARALLEL env var. +_CV_PARALLEL = int(os.environ.get("CLOUDVOLUME_PARALLEL", "10")) class CloudVolumeGateway: @@ -57,7 +57,6 @@ def _get_cv_client( cv_client = cloudvolume.CloudVolume( seg_source, mip=mip_level, - use_https=True, bounded=False, fill_missing=True, green_threads=use_green_threads, From a0c91f8f26aa25e604c8c98232523e9055030603 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Thu, 12 Mar 2026 06:34:09 -0700 Subject: [PATCH 6/8] trying different celery worker initialization for pools --- materializationengine/celery_worker.py | 60 +++++++++++++++++++------- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/materializationengine/celery_worker.py b/materializationengine/celery_worker.py index b0fcd2f6..bc097cff 100644 --- a/materializationengine/celery_worker.py +++ b/materializationengine/celery_worker.py @@ -116,20 +116,31 @@ def celery_loggers(logger, *args, **kwargs): @worker_process_init.connect def configure_http_connection_pools(sender=None, **kwargs): """ - Increase urllib3/requests HTTP connection pool sizes for each forked worker process. - - The default pool_maxsize=10 per host is exhausted during parallel CloudVolume - supervoxel lookups (scattered_points makes many concurrent requests to - storage.googleapis.com). Each discarded connection requires a new TCP+TLS - handshake on the next request, adding latency to every supervoxel lookup. - - We patch HTTPAdapter.__init__ so every Session created in this process - (including sessions created internally by cloud-files/cloudvolume) uses the - larger pool. Explicit callers that pass their own pool_maxsize are unaffected. - - Tune with the GCS_CONNECTION_POOL_SIZE environment variable (default: 128). + Increase GCS urllib3 connection pool sizes for each forked worker process. + + CloudVolume uses cloud-files with use_https=True, which converts gs:// paths to + https://storage.googleapis.com/... and routes them through HttpInterface. + HttpInterface holds a class-level HTTPAdapter (created at import time with the + default pool_maxsize=10) that is shared across ALL instances and sessions. + With CLOUDVOLUME_PARALLEL concurrent threads all funnelling through that one + adapter, the pool fills immediately → discarded connections → TCP+TLS handshake + on every request. + + Three-part fix (all run once per forked worker process): + 1. Patch HTTPAdapter.__init__ so any new Session/AuthorizedSession created after + this hook uses pool_maxsize=GCS_CONNECTION_POOL_SIZE (default: 128). + 2. Replace HttpInterface.adaptor (the shared class-level adapter) with a fresh + HTTPAdapter that has the larger pool. This is the critical fix for use_https + paths because the old adapter was created before the patch could apply. + 3. Reset cloud-files' GC_POOL and invalidate cloudvolume_cache so any gs:// + connections inherited from the parent process are discarded; fresh ones pick + up the patched HTTPAdapter. + + Tune with GCS_CONNECTION_POOL_SIZE environment variable (default: 128). """ from requests.adapters import HTTPAdapter + import cloudfiles.interfaces as cf_interfaces + from materializationengine.cloudvolume_gateway import cloudvolume_cache pool_size = int(os.environ.get("GCS_CONNECTION_POOL_SIZE", "128")) _orig_init = HTTPAdapter.__init__ @@ -138,10 +149,29 @@ def _patched_init(self, pool_connections=pool_size, pool_maxsize=pool_size, **kw _orig_init(self, pool_connections=pool_connections, pool_maxsize=pool_maxsize, **kw) HTTPAdapter.__init__ = _patched_init + + # Replace the class-level adapter shared by all HttpInterface instances. + # This is the primary fix for use_https=True (https://storage.googleapis.com) + # paths: the old class-level adapter has pool_maxsize=10 and cannot be patched + # retroactively via HTTPAdapter.__init__. + cf_interfaces.HttpInterface.adaptor = HTTPAdapter( + pool_connections=pool_size, pool_maxsize=pool_size + ) + + # For gs:// paths (non-use_https): discard GCS bucket connections inherited + # from the parent process. reset_connection_pools() replaces the global + # GC_POOL with fresh empty queues; the next gs:// request creates a new + # google.cloud.storage.Client → AuthorizedSession → patched HTTPAdapter. + cf_interfaces.reset_connection_pools() + + # Clear any CloudVolume client objects that hold references to old connections. + # They are re-populated lazily on first use in this worker process. + cloudvolume_cache.invalidate_cache() + celery_logger.info( - f"[worker_process_init] HTTP connection pool defaults set to " - f"pool_connections={pool_size}, pool_maxsize={pool_size} " - f"(GCS_CONNECTION_POOL_SIZE={pool_size})." + f"[worker_process_init] GCS connection pool reset: " + f"HTTPAdapter defaults patched to pool_maxsize={pool_size}, " + f"HttpInterface.adaptor replaced, GC_POOL reset, cloudvolume_cache invalidated." ) From c6646b7b89bfbf88cea521263d0a4895bc88fb43 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Thu, 12 Mar 2026 09:10:59 -0700 Subject: [PATCH 7/8] trying a different connection failure --- materializationengine/database.py | 56 ++++++++++++------------------- 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/materializationengine/database.py b/materializationengine/database.py index 3c38ea60..a6ad154a 100644 --- a/materializationengine/database.py +++ b/materializationengine/database.py @@ -3,7 +3,7 @@ from dynamicannotationdb import DynamicAnnotationInterface from flask import current_app -from sqlalchemy import MetaData, create_engine, text +from sqlalchemy import MetaData, create_engine from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.pool import QueuePool @@ -59,41 +59,29 @@ def get_engine(self, database_name: str): SQL_URI_CONFIG = current_app.config["SQLALCHEMY_DATABASE_URI"] sql_base_uri = SQL_URI_CONFIG.rpartition("/")[0] sql_uri = f"{sql_base_uri}/{database_name}" - + pool_size = current_app.config.get("DB_CONNECTION_POOL_SIZE", 20) max_overflow = current_app.config.get("DB_CONNECTION_MAX_OVERFLOW", 30) - - try: - engine = create_engine( - sql_uri, - poolclass=QueuePool, - pool_size=pool_size, - max_overflow=max_overflow, - pool_timeout=30, - pool_recycle=1800, # Recycle connections after 30 minutes - pool_pre_ping=True, # Ensure connections are still valid - ) - - # Test the connection to make sure the database exists and is accessible - with engine.connect() as conn: - conn.execute(text("SELECT 1")) - - # Only store engine if connection test passes - self._engines[database_name] = engine - celery_logger.info(f"Created new connection pool for {database_name} " - f"(size={pool_size}, max_overflow={max_overflow})") - - except Exception as e: - # Clean up engine if it was created but connection failed - if 'engine' in locals(): - engine.dispose() - - celery_logger.error(f"Failed to create/connect to database {database_name}: {e}") - raise ConnectionError(f"Cannot connect to database '{database_name}'. " - f"Please check if the database exists and is accessible. " - f"Connection URI: {sql_uri}. " - f"Error: {e}") - + + engine = create_engine( + sql_uri, + poolclass=QueuePool, + pool_size=pool_size, + max_overflow=max_overflow, + pool_timeout=30, + pool_recycle=1800, # Recycle connections after 30 minutes + pool_pre_ping=True, # Test connections before use; reconnect if stale + ) + # Cache immediately — pool_pre_ping handles connectivity on first checkout. + # Previously we ran a SELECT 1 test here and didn't cache on failure, which + # caused a transient error to permanently break this worker's engine cache, + # leading to repeated ConnectionError → _on_connection_error → consumer isolation. + self._engines[database_name] = engine + celery_logger.info( + f"Created engine for {database_name} " + f"(pool_size={pool_size}, max_overflow={max_overflow})" + ) + return self._engines[database_name] def get_session_factory(self, database_name: str): From 28ce5228a0470f85742bc0a3d2bf7391269eebc4 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Thu, 12 Mar 2026 10:10:34 -0700 Subject: [PATCH 8/8] trying to improve connection error handling --- .../workflows/spatial_lookup.py | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/materializationengine/workflows/spatial_lookup.py b/materializationengine/workflows/spatial_lookup.py index b6376dd3..49f00c60 100644 --- a/materializationengine/workflows/spatial_lookup.py +++ b/materializationengine/workflows/spatial_lookup.py @@ -336,7 +336,7 @@ def process_table_in_chunks( database_name: str, chunk_scale_factor: int, supervoxel_batch_size: int, - batch_size_for_dispatch: int = 10, + batch_size_for_dispatch: int = 50, prioritize_failed_chunks: bool = True, initial_run: bool = False, ): @@ -904,25 +904,37 @@ def process_chunk( except ConnectionError as e: # SQL proxy / infra failure — NOT a data error. - # Reset chunk to PENDING so the attempt budget is NOT consumed; a healthy - # pod will re-pick it up after the countdown. - celery_logger.warning( - f"{log_prefix} DB connection error (SQL proxy down?): {e}. " - f"Resetting chunk to PENDING and retrying in 5 minutes." - ) + # Use a finite retry limit (5 × 300 s = 25 min max) so the outer chord + # eventually completes rather than blocking the conductor indefinitely. + # On exhaustion, mark FAILED_RETRYABLE and return normally so the chord + # body (next conductor invocation) can fire and stale detection re-queues + # the chunk. + _on_connection_error(database_name, self) try: - checkpoint_manager.set_chunk_status( - workflow_name, - chunk_idx, - CHUNK_STATUS_PENDING, - {"message": f"Connection error — will retry: {str(e)[:300]}"}, - ) - except Exception as status_err: celery_logger.warning( - f"{log_prefix} Could not reset chunk status to PENDING: {status_err}" + f"{log_prefix} DB connection error (SQL proxy down?): {e}. " + f"Retrying in 5 minutes (attempt {self.request.retries + 1}/5)." ) - _on_connection_error(database_name, self) - raise self.retry(exc=e, countdown=300, max_retries=None) + raise self.retry(exc=e, countdown=300, max_retries=5) + except MaxRetriesExceededError: + celery_logger.error( + f"{log_prefix} Max connection-error retries exceeded on process_chunk. " + f"Marking chunk FAILED_RETRYABLE so conductor can re-dispatch." + ) + error_payload = { + "error_message": f"Connection error retries exhausted in process_chunk: {str(e)}", + "error_type": type(e).__name__, + "attempt_count": workflow_attempt_count + 1, + "celery_task_id": self.request.id, + } + checkpoint_manager.set_chunk_status( + workflow_name, chunk_idx, CHUNK_STATUS_FAILED_RETRYABLE, error_payload + ) + return { + "status": "failed_connection_error_retries_exhausted", + "chunk_idx": chunk_idx, + "marked_retryable_in_checkpoint": True, + } except (OperationalError, DisconnectionError) as e: celery_logger.warning(