diff --git a/.github/workflows/cleanup-firebase.yml b/.github/workflows/cleanup-firebase.yml deleted file mode 100644 index 89ccd1792..000000000 --- a/.github/workflows/cleanup-firebase.yml +++ /dev/null @@ -1,22 +0,0 @@ -name: Cleanup Firebase Metadata - -on: - schedule: - - cron: "24 18 * * 1" # Runs at 18:24 UTC every Monday - -jobs: - cleanup: - runs-on: ${{ matrix.os }} - strategy: - matrix: - python-version: [3.11] - os: [ubuntu-latest, windows-latest, macOS-latest] - steps: - - uses: actions/checkout@v6 - - uses: ./.github/actions/dependencies - - name: Cleanup Firebase Metadata - env: - FIREBASE_TOKEN: ${{ secrets.FIREBASE_TOKEN }} - FIREBASE_EMAIL: ${{ secrets.FIREBASE_EMAIL }} - run: | - uv run python cellpack/bin/cleanup_tasks.py diff --git a/cellpack/autopack/DBRecipeHandler.py b/cellpack/autopack/DBRecipeHandler.py index 8b4e8578a..03558df47 100644 --- a/cellpack/autopack/DBRecipeHandler.py +++ b/cellpack/autopack/DBRecipeHandler.py @@ -1,19 +1,20 @@ import copy import logging import shutil -from datetime import datetime, timezone from enum import Enum from pathlib import Path from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS +from cellpack.autopack.AWSHandler import AWSHandler import hashlib import json -import requests from cellpack.autopack.utils import deep_merge +DB_SETUP_README_URL = "https://github.com/mesoscope/cellpack?tab=readme-ov-file#introduction-to-remote-databases" + class DataDoc(object): def __init__( @@ -321,36 +322,6 @@ def __init__(self, settings): self.settings = settings -class ResultDoc: - def __init__(self, db): - self.db = db - - def handle_expired_results(self): - """ - Check if the results in the database are expired and delete them if the linked object expired. - """ - current_utc = datetime.now(timezone.utc) - results = self.db.get_all_docs("results") - if results: - for result in results: - result_data = self.db.doc_to_dict(result) - result_age = current_utc - result_data["timestamp"] - if result_age.days > 180 and not self.validate_existence( - result_data["url"] - ): - self.db.delete_doc("results", self.db.doc_id(result)) - logging.info("Results cleanup complete.") - else: - logging.info("No results found in the database.") - - def validate_existence(self, url): - """ - Validate the existence of an S3 object by checking if the URL is accessible. - Returns True if the URL is accessible. - """ - return requests.head(url).status_code == requests.codes.ok - - class DBUploader(object): """ Handles the uploading of data to the database. @@ -529,42 +500,34 @@ def upload_config(self, config_data, source_path): self.db.update_doc("configs", id, config_data) return id - def upload_result_metadata(self, file_name, url, job_id=None): - """ - Upload the metadata of the result file to the database. - """ - if self.db: - username = self.db.get_username() - timestamp = self.db.create_timestamp() - self.db.update_or_create( - "results", - file_name, - { - "user": username, - "timestamp": timestamp, - "url": url, - "batch_job_id": job_id, - }, - ) - if job_id: - self.upload_job_status(job_id, "DONE", result_path=url) - - def upload_job_status(self, job_id, status, result_path=None, error_message=None): + def upload_job_status( + self, + dedup_hash, + status, + result_path=None, + error_message=None, + outputs_directory=None, + ): """ - Update status for a given job ID + Update status for a given dedup_hash """ if self.db: - timestamp = self.db.create_timestamp() - self.db.update_or_create( - "job_status", - job_id, - { - "timestamp": timestamp, - "status": str(status), - "result_path": result_path, - "error_message": error_message, - }, - ) + db_handler = self.db + # If db is AWSHandler, switch to firebase handler for job status updates + if isinstance(self.db, AWSHandler): + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.FIREBASE) + db_handler = handler(default_db="staging") + timestamp = db_handler.create_timestamp() + data = { + "timestamp": timestamp, + "status": str(status), + "error_message": error_message, + } + if result_path: + data["result_path"] = result_path + if outputs_directory: + data["outputs_directory"] = outputs_directory + db_handler.update_or_create("job_status", dedup_hash, data) def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data): output_path = Path(output_folder) @@ -583,7 +546,7 @@ def upload_packing_results_workflow( self, source_folder, recipe_name, - job_id, + dedup_hash, config_data, recipe_data, ): @@ -591,7 +554,7 @@ def upload_packing_results_workflow( Complete packing results upload workflow including folder preparation and s3 upload """ try: - if job_id: + if dedup_hash: source_path = Path(source_folder) if not source_path.exists(): @@ -601,7 +564,7 @@ def upload_packing_results_workflow( # prepare unique S3 upload folder parent_folder = source_path.parent - unique_folder_name = f"{source_path.name}_run_{job_id}" + unique_folder_name = f"{source_path.name}_run_{dedup_hash}" s3_upload_folder = parent_folder / unique_folder_name logging.debug(f"outputs will be copied to: {s3_upload_folder}") @@ -618,7 +581,7 @@ def upload_packing_results_workflow( upload_result = self.upload_outputs_to_s3( output_folder=s3_upload_folder, recipe_name=recipe_name, - job_id=job_id, + dedup_hash=dedup_hash, ) # clean up temporary folder after upload @@ -628,9 +591,12 @@ def upload_packing_results_workflow( f"Cleaned up temporary upload folder: {s3_upload_folder}" ) - # update outputs directory in firebase - self.update_outputs_directory( - job_id, upload_result.get("outputs_directory") + # update outputs directory in job status + self.upload_job_status( + dedup_hash, + "DONE", + result_path=upload_result.get("simularium_url"), + outputs_directory=upload_result.get("outputs_directory"), ) return upload_result @@ -639,7 +605,7 @@ def upload_packing_results_workflow( logging.error(e) return {"success": False, "error": e} - def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): + def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash): """ Upload packing outputs to S3 bucket """ @@ -647,7 +613,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): bucket_name = self.db.bucket_name region_name = self.db.region_name sub_folder_name = self.db.sub_folder_name - s3_prefix = f"{sub_folder_name}/{recipe_name}/{job_id}" + s3_prefix = f"{sub_folder_name}/{recipe_name}/{dedup_hash}" try: upload_result = self.db.upload_directory( @@ -661,8 +627,11 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): f"{base_url}/{file_info['s3_key']}" for file_info in upload_result["uploaded_files"] ] + simularium_url = None + for url in public_urls: + if url.endswith(".simularium"): + simularium_url = url outputs_directory = f"https://us-west-2.console.aws.amazon.com/s3/buckets/{bucket_name}/{s3_prefix}/" - logging.info( f"Successfully uploaded {upload_result['total_files']} files to {outputs_directory}" ) @@ -671,7 +640,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): return { "success": True, - "run_id": job_id, + "dedup_hash": dedup_hash, "s3_bucket": bucket_name, "s3_prefix": s3_prefix, "public_url_base": f"{base_url}/{s3_prefix}/", @@ -680,30 +649,12 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): "total_size": upload_result["total_size"], "urls": public_urls, "outputs_directory": outputs_directory, + "simularium_url": simularium_url, } except Exception as e: logging.error(e) return {"success": False, "error": e} - def update_outputs_directory(self, job_id, outputs_directory): - if not self.db or self.db.s3_client: - # switch to firebase handler to update job status - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler(default_db="staging") - if job_id: - timestamp = initialized_db.create_timestamp() - initialized_db.update_or_create( - "job_status", - job_id, - { - "timestamp": timestamp, - "outputs_directory": outputs_directory, - }, - ) - logging.debug( - f"Updated outputs s3 location {outputs_directory} for job ID: {job_id}" - ) - class DBRecipeLoader(object): """ @@ -888,25 +839,3 @@ def compile_db_recipe_data(db_recipe_data, obj_dict, grad_dict, comp_dict): {**v} for v in DBRecipeLoader.remove_dedup_hash(grad_dict).values() ] return recipe_data - - -class DBMaintenance(object): - """ - Handles the maintenance of the database. - """ - - def __init__(self, db_handler): - self.db = db_handler - self.result_doc = ResultDoc(self.db) - - def cleanup_results(self): - """ - Check if the results in the database are expired and delete them if the linked object expired. - """ - self.result_doc.handle_expired_results() - - def readme_url(self): - """ - Return the URL to the README file for the database setup section. - """ - return "https://github.com/mesoscope/cellpack?tab=readme-ov-file#introduction-to-remote-databases" diff --git a/cellpack/autopack/Environment.py b/cellpack/autopack/Environment.py index 101dce8f5..a9805dae1 100644 --- a/cellpack/autopack/Environment.py +++ b/cellpack/autopack/Environment.py @@ -124,6 +124,7 @@ def __init__(self, config=None, recipe=None): self.config_data = config self.recipe_data = recipe + self.dedup_hash = None name = recipe["name"] self.log = logging.getLogger("env") self.log.propagate = False diff --git a/cellpack/autopack/interface_objects/default_values.py b/cellpack/autopack/interface_objects/default_values.py index bbdbc452d..969eaa6a8 100644 --- a/cellpack/autopack/interface_objects/default_values.py +++ b/cellpack/autopack/interface_objects/default_values.py @@ -9,7 +9,6 @@ "objects", "gradients", "recipes", - "results", "configs", "recipes_edited", ] diff --git a/cellpack/autopack/loaders/recipe_loader.py b/cellpack/autopack/loaders/recipe_loader.py index 84cd78ac0..ce220c0f5 100644 --- a/cellpack/autopack/loaders/recipe_loader.py +++ b/cellpack/autopack/loaders/recipe_loader.py @@ -30,22 +30,29 @@ class RecipeLoader(object): # TODO: add all default values here default_values = default_recipe_values.copy() - def __init__(self, input_file_path, save_converted_recipe=False, use_docker=False): - _, file_extension = os.path.splitext(input_file_path) + def __init__( + self, + input_data, + save_converted_recipe=False, + use_docker=False, + ): self.current_version = CURRENT_VERSION - self.file_path = input_file_path - self.file_extension = file_extension self.ingredient_list = [] self.compartment_list = [] self.save_converted_recipe = save_converted_recipe - # set CURRENT_RECIPE_PATH appropriately for remote(firebase) vs local recipes - if autopack.is_remote_path(self.file_path): - autopack.CURRENT_RECIPE_PATH = os.path.join( - os.getcwd(), "examples", "recipes", "v2" - ) + if isinstance(input_data, dict): + self._json_recipe = input_data + self.file_path = None else: - autopack.CURRENT_RECIPE_PATH = os.path.dirname(self.file_path) + self.file_path = input_data + self._json_recipe = None + if autopack.is_remote_path(self.file_path): + autopack.CURRENT_RECIPE_PATH = os.path.join( + os.getcwd(), "examples", "recipes", "v2" + ) + else: + autopack.CURRENT_RECIPE_PATH = os.path.dirname(self.file_path) self.recipe_data = self._read(use_docker=use_docker) @@ -168,16 +175,25 @@ def _migrate_version(self, old_recipe): ) def _read(self, resolve_inheritance=True, use_docker=False): - new_values, database_name, is_unnested_firebase = autopack.load_file( - self.file_path, cache="recipes", use_docker=use_docker - ) + database_name = None + is_unnested_firebase = False + new_values = self._json_recipe + if new_values is None: + # Read recipe from filepath + new_values, database_name, is_unnested_firebase = autopack.load_file( + self.file_path, cache="recipes", use_docker=use_docker + ) + + if "composition" in new_values: + new_values["composition"] = DBRecipeLoader.remove_empty( + new_values["composition"] + ) + if database_name == "firebase": if is_unnested_firebase: objects = new_values.get("objects", {}) gradients = new_values.get("gradients", {}) - composition = DBRecipeLoader.remove_empty( - new_values.get("composition", {}) - ) + composition = new_values.get("composition", {}) else: objects, gradients, composition = DBRecipeLoader.collect_and_sort_data( new_values["composition"] diff --git a/cellpack/autopack/upy/simularium/simularium_helper.py b/cellpack/autopack/upy/simularium/simularium_helper.py index 86af08746..87c616e18 100644 --- a/cellpack/autopack/upy/simularium/simularium_helper.py +++ b/cellpack/autopack/upy/simularium/simularium_helper.py @@ -22,7 +22,7 @@ from simulariumio.cellpack import HAND_TYPE, CellpackConverter from simulariumio.constants import DISPLAY_TYPE, VIZ_TYPE -from cellpack.autopack.DBRecipeHandler import DBMaintenance, DBUploader +from cellpack.autopack.DBRecipeHandler import DB_SETUP_README_URL from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.autopack.upy import hostHelper from cellpack.autopack.upy.simularium.plots import PlotData @@ -1385,64 +1385,29 @@ def raycast(self, **kw): def raycast_test(self, obj, start, end, length, **kw): return - def post_and_open_file(self, file_name, open_results_in_browser): + def post_and_open_file(self, file_name, open_results_in_browser, dedup_hash=None): simularium_file = Path(f"{file_name}.simularium") - url = None - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) - file_name, url = simulariumHelper.store_result_file( - simularium_file, storage="aws", batch_job_id=job_id - ) - if file_name and url: - simulariumHelper.store_metadata( - file_name, url, db="firebase", job_id=job_id - ) - if open_results_in_browser: + if dedup_hash is None: + url = simulariumHelper.store_result_file(simularium_file, storage="aws") + if url and open_results_in_browser: simulariumHelper.open_in_simularium(url) @staticmethod - def store_result_file( - file_path, storage=None, batch_job_id=None, sub_folder="simularium" - ): + def store_result_file(file_path, storage=None, sub_folder="simularium"): if storage == "aws": handler = DATABASE_IDS.handlers().get(storage) - # if batch_job_id is not None, then we are in a batch job and should use the temp bucket - # TODO: use cellpack-results bucket for batch jobs once we have the correct permissions - if batch_job_id: - initialized_handler = handler( - bucket_name="cellpack-demo", - sub_folder_name=sub_folder, - region_name="us-west-2", - ) - else: - initialized_handler = handler( - bucket_name="cellpack-results", - sub_folder_name=sub_folder, - region_name="us-west-2", - ) - file_name, url = initialized_handler.save_file_and_get_url(file_path) - if not file_name or not url: - db_maintainer = DBMaintenance(initialized_handler) - logging.warning( - f"Skipping browser opening, upload credentials not configured. For setup instructions see: {db_maintainer.readme_url()}" - ) - return file_name, url - - @staticmethod - def store_metadata(file_name, url, db=None, job_id=None): - if db == "firebase": - handler = DATABASE_IDS.handlers().get(db) - initialized_db = handler( - default_db="staging" - ) # default to staging for metadata uploads - if initialized_db._initialized: - db_uploader = DBUploader(initialized_db) - db_uploader.upload_result_metadata(file_name, url, job_id) - else: - db_maintainer = DBMaintenance(initialized_db) + initialized_handler = handler( + bucket_name="cellpack-results", + sub_folder_name=sub_folder, + region_name="us-west-2", + ) + _, url = initialized_handler.save_file_and_get_url(file_path) + if not url: logging.warning( - f"Firebase credentials not found. For setup instructions see: {db_maintainer.readme_url()}. Or try cellPACK web interface: https://cellpack.allencell.org (no setup required)" + f"Skipping browser opening, upload credentials not configured. For setup instructions see: {DB_SETUP_README_URL}" ) - return + return url + return None @staticmethod def open_in_simularium(aws_url): diff --git a/cellpack/autopack/writers/__init__.py b/cellpack/autopack/writers/__init__.py index de5f72161..434fd0086 100644 --- a/cellpack/autopack/writers/__init__.py +++ b/cellpack/autopack/writers/__init__.py @@ -196,8 +196,11 @@ def save_as_simularium(self, env, seed_to_results_map): number_of_packings = env.config_data.get("number_of_packings", 1) open_results_in_browser = env.config_data.get("open_results_in_browser", False) upload_results = env.config_data.get("upload_results", False) + dedup_hash = getattr(env, "dedup_hash") if (number_of_packings == 1 or is_aggregate) and upload_results: - autopack.helper.post_and_open_file(file_name, open_results_in_browser) + autopack.helper.post_and_open_file( + file_name, open_results_in_browser, dedup_hash + ) def save_Mixed_asJson( self, diff --git a/cellpack/bin/cleanup_tasks.py b/cellpack/bin/cleanup_tasks.py deleted file mode 100644 index 08217aa04..000000000 --- a/cellpack/bin/cleanup_tasks.py +++ /dev/null @@ -1,20 +0,0 @@ -from cellpack.autopack.DBRecipeHandler import DBMaintenance -from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS - - -def run_cleanup(db_id=DATABASE_IDS.FIREBASE): - """ - Performs cleanup operations on expired database entries. - This function is executed as part of a scheduled task defined in .github/workflows/cleanup-firebase.yml - - Args: - db_id(str): The database id to use - """ - handler = DATABASE_IDS.handlers().get(db_id) - initialized_db = handler(default_db="staging") - db_maintainer = DBMaintenance(initialized_db) - db_maintainer.cleanup_results() - - -if __name__ == "__main__": - run_cleanup() diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index 9db539372..75ee170b0 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -1,6 +1,5 @@ import logging import logging.config -import os import time from pathlib import Path @@ -25,15 +24,19 @@ def pack( - recipe, config_path=None, analysis_config_path=None, docker=False, validate=True + recipe, + config_path=None, + analysis_config_path=None, + docker=False, + hash=None, ): """ Initializes an autopack packing from the command line - :param recipe: string argument, path to recipe + :param recipe: string argument, path to recipe file, or a dictionary representing a recipe :param config_path: string argument, path to packing config file :param analysis_config_path: string argument, path to analysis config file :param docker: boolean argument, are we using docker - :param validate: boolean argument, validate recipe before packing + :param hash: string argument, dedup hash identifier for tracking/caching results :return: void """ @@ -52,6 +55,7 @@ def pack( autopack.helper = helper env = Environment(config=packing_config_data, recipe=recipe_data) env.helper = helper + env.dedup_hash = hash log.info("Packing recipe: %s", recipe_data["name"]) log.info("Outputs will be saved to %s", env.out_folder) @@ -78,24 +82,23 @@ def pack( env.buildGrid(rebuild=True) env.pack_grid(verbose=0, usePP=False) - if docker: - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) - if job_id: - handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) - # temporarily using demo bucket before permissions are granted - initialized_handler = handler( - bucket_name="cellpack-demo", - sub_folder_name="runs", - region_name="us-west-2", - ) - uploader = DBUploader(db_handler=initialized_handler) - uploader.upload_packing_results_workflow( - source_folder=env.out_folder, - recipe_name=recipe_data["name"], - job_id=job_id, - config_data=packing_config_data, - recipe_data=recipe_loader.serializable_recipe_data, - ) + # Upload results to S3 for server-initiated packings (docker and hash are both provided) + if docker and hash: + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) + # temporarily using demo bucket before permissions are granted + initialized_handler = handler( + bucket_name="cellpack-demo", + sub_folder_name="runs", + region_name="us-west-2", + ) + uploader = DBUploader(db_handler=initialized_handler) + uploader.upload_packing_results_workflow( + source_folder=env.out_folder, + recipe_name=recipe_data["name"], + dedup_hash=hash, + config_data=packing_config_data, + recipe_data=recipe_loader.serializable_recipe_data, + ) def main(): diff --git a/cellpack/bin/upload.py b/cellpack/bin/upload.py index b038b4e4e..59fc2104e 100644 --- a/cellpack/bin/upload.py +++ b/cellpack/bin/upload.py @@ -3,7 +3,7 @@ import json from cellpack.autopack.FirebaseHandler import FirebaseHandler -from cellpack.autopack.DBRecipeHandler import DBUploader, DBMaintenance +from cellpack.autopack.DBRecipeHandler import DBUploader, DB_SETUP_README_URL from cellpack.autopack.upy.simularium.simularium_helper import simulariumHelper from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.autopack.loaders.config_loader import ConfigLoader @@ -90,7 +90,7 @@ def upload( id, _ = db_handler.upload_data("editable_fields", field) editable_fields_ids.append(id) if output_file: - _, result_url = simulariumHelper.store_result_file( + result_url = simulariumHelper.store_result_file( output_file, storage="aws", sub_folder="client" ) if studio: @@ -105,9 +105,8 @@ def upload( db_handler.upload_data("example_packings", recipe_metadata) else: - db_maintainer = DBMaintenance(db_handler) sys.exit( - f"The selected database is not initialized. Please set up Firebase credentials to upload recipes. Refer to the instructions at {db_maintainer.readme_url()} " + f"The selected database is not initialized. Please set up Firebase credentials to upload recipes. Refer to the instructions at {DB_SETUP_README_URL} " ) diff --git a/cellpack/tests/test_db_uploader.py b/cellpack/tests/test_db_uploader.py index 0c91cbd52..5d3ee4d78 100644 --- a/cellpack/tests/test_db_uploader.py +++ b/cellpack/tests/test_db_uploader.py @@ -1,3 +1,4 @@ +from cellpack.autopack.AWSHandler import AWSHandler from cellpack.autopack.DBRecipeHandler import DBUploader from cellpack.tests.mocks.mock_db import MockDB from unittest.mock import MagicMock, patch @@ -175,3 +176,55 @@ def test_upload_recipe(): "A": "firebase:composition/test_id", } assert recipe_doc.objects_to_path_map == {"sphere_25": "firebase:objects/test_id"} + + +def test_upload_job_status_with_firebase_handler(): + mock_firebase_db = MagicMock() + mock_firebase_db.create_timestamp.return_value = "test_timestamp" + + uploader = DBUploader(mock_firebase_db) + uploader.upload_job_status("test_hash", "RUNNING") + + mock_firebase_db.create_timestamp.assert_called_once() + mock_firebase_db.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "test_timestamp", + "status": "RUNNING", + "error_message": None, + }, + ) + + +def test_upload_job_status_with_aws_handler(): + mock_aws_db = MagicMock(spec=AWSHandler, wraps=None) + # Allow create_timestamp to be checked even though it's not on AWSHandler + mock_aws_db.create_timestamp = MagicMock() + + mock_firebase_handler = MagicMock() + mock_firebase_handler.create_timestamp.return_value = "firebase_timestamp" + + with patch( + "cellpack.autopack.DBRecipeHandler.DATABASE_IDS.handlers" + ) as mock_handlers: + mock_handlers.return_value.get.return_value = ( + lambda default_db: mock_firebase_handler + ) + + uploader = DBUploader(mock_aws_db) + uploader.upload_job_status("test_hash", "DONE", result_path="test_path") + + mock_firebase_handler.create_timestamp.assert_called_once() + mock_firebase_handler.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "firebase_timestamp", + "status": "DONE", + "error_message": None, + "result_path": "test_path", + }, + ) + # AWS handler should not be called for timestamp + mock_aws_db.create_timestamp.assert_not_called() diff --git a/cellpack/tests/test_recipe_version_migration.py b/cellpack/tests/test_recipe_version_migration.py index a4a77044d..37801c1f9 100644 --- a/cellpack/tests/test_recipe_version_migration.py +++ b/cellpack/tests/test_recipe_version_migration.py @@ -317,7 +317,7 @@ def test_convert_v1_to_v2( [ ( RecipeLoader( - input_file_path="cellpack/tests/recipes/v1/test_single_spheres.json" + "cellpack/tests/recipes/v1/test_single_spheres.json" ).recipe_data, { "version": "1.0", @@ -386,9 +386,7 @@ def test_migrate_version_error(): "converted_compartment_data, expected_compartment_data", [ ( - RecipeLoader( - input_file_path="cellpack/tests/recipes/v1/test_compartment.json" - ).recipe_data, + RecipeLoader("cellpack/tests/recipes/v1/test_compartment.json").recipe_data, { "version": "1.0", "format_version": "2.0", diff --git a/docker/server.py b/docker/server.py index 581e0151d..00523eb5e 100644 --- a/docker/server.py +++ b/docker/server.py @@ -1,8 +1,7 @@ import asyncio -from aiohttp import web -import os import uuid -from cellpack.autopack.DBRecipeHandler import DBUploader +from aiohttp import web +from cellpack.autopack.DBRecipeHandler import DataDoc, DBUploader from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.bin.pack import pack @@ -12,41 +11,73 @@ class CellpackServer: def __init__(self): self.packing_tasks = set() - async def run_packing(self, recipe, config, job_id): - os.environ["AWS_BATCH_JOB_ID"] = job_id + def _get_firebase_handler(self, database_name="firebase"): + handler = DATABASE_IDS.handlers().get(database_name) + initialized_db = handler(default_db="staging") + if initialized_db._initialized: + return initialized_db + return None + + def job_exists(self, dedup_hash): + db = self._get_firebase_handler() + if not db: + return False + + job_status, _ = db.get_doc_by_id("job_status", dedup_hash) + return job_status is not None + + async def run_packing(self, job_id, recipe=None, config=None, body=None): self.update_job_status(job_id, "RUNNING") try: - pack(recipe=recipe, config_path=config, docker=True) + # Pack JSON recipe in body if provided, otherwise use recipe path + pack(recipe=(body if body else recipe), config_path=config, docker=True, hash=job_id) except Exception as e: self.update_job_status(job_id, "FAILED", error_message=str(e)) def update_job_status(self, job_id, status, result_path=None, error_message=None): - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler( - default_db="staging" - ) - if initialized_db._initialized: - db_uploader = DBUploader(initialized_db) + db = self._get_firebase_handler() + if db: + db_uploader = DBUploader(db) db_uploader.upload_job_status(job_id, status, result_path, error_message) async def hello_world(self, request: web.Request) -> web.Response: return web.Response(text="Hello from the cellPACK server") async def health_check(self, request: web.Request) -> web.Response: - # healthcheck endpoint needed for AWS load balancer + # health check endpoint needed for AWS load balancer return web.Response() async def pack_handler(self, request: web.Request) -> web.Response: - recipe = request.rel_url.query.get("recipe") - if recipe is None: + recipe = request.rel_url.query.get("recipe") or "" + body = None + + # If request has a body, attempt to parse it as JSON and use as recipe + # otherwise rely on recipe query param + if (request.can_read_body and request.content_length + and request.content_length > 0): + try: + body = await request.json() + except Exception: + body = None + + if not recipe and not body: raise web.HTTPBadRequest( - "Pack requests must include recipe as a query param" + "Pack requests must include a recipe, either as a query param or in the request body" ) config = request.rel_url.query.get("config") - job_id = str(uuid.uuid4()) + + if body: + dedup_hash = DataDoc.generate_hash(body) + if self.job_exists(dedup_hash): + return web.json_response({"jobId": dedup_hash}) + job_id = dedup_hash + else: + job_id = str(uuid.uuid4()) # Initiate packing task to run in background - packing_task = asyncio.create_task(self.run_packing(recipe, config, job_id)) + packing_task = asyncio.create_task( + self.run_packing(job_id, recipe, config, body) + ) # Keep track of task references to prevent them from being garbage # collected, then discard after task completion @@ -70,4 +101,4 @@ async def init_app() -> web.Application: ) return app -web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) \ No newline at end of file +web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) diff --git a/docs/DOCKER.md b/docs/DOCKER.md index a1214a335..48e4b6ddd 100644 --- a/docs/DOCKER.md +++ b/docs/DOCKER.md @@ -13,6 +13,6 @@ ## AWS ECS Docker Image 1. Build image, running `docker build -f docker/Dockerfile.ecs -t [CONTAINER-NAME] .` 2. Run packings in the container, running: `docker run -v ~/.aws:/root/.aws -p 80:80 [CONTAINER-NAME]` -3. Try hitting the test endpoint on the server, by navigating to `http://0.0.0.0:8443/hello` in your browser. -4. Try running a packing on the server, by hitting the `http://0.0.0.0:80/pack?recipe=firebase:recipes/one_sphere_v_1.0.0` in your browser. +3. Try hitting the test endpoint on the server, by navigating to `http://0.0.0.0:80/hello` in your browser. +4. Try running a packing on the server, by hitting the `http://0.0.0.0:80/start-packing?recipe=firebase:recipes/one_sphere_v_1.0.0` in your browser. 5. Verify that the packing result path was uploaded to the firebase results table, with the job id specified in the response from the request in step 4.The result simularium file can be found at the s3 path specified there. \ No newline at end of file