From 94019ebd2603d729a00ebb832d620d20212c1920 Mon Sep 17 00:00:00 2001 From: Kushaan Gupta Date: Sat, 22 Feb 2025 19:07:21 -0500 Subject: [PATCH 1/2] fix rigid job submission --- cluster_files/placefield_job.sh | 8 +++++--- linear2ac/cluster/placefield.py | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cluster_files/placefield_job.sh b/cluster_files/placefield_job.sh index 2017f84..ffb8931 100644 --- a/cluster_files/placefield_job.sh +++ b/cluster_files/placefield_job.sh @@ -8,6 +8,7 @@ REWARD_ID=$6 TRIAL_STATUS=$7 BIN_SIZE=$8 FORCE_RECALC=$9 +CONDA_ENV=${10:-vr2p} # Default to 'vr2p' if 10th argument is not provided echo "input path: $INPUT_PATH" echo "output path: $OUTPUT_PATH" @@ -17,12 +18,13 @@ echo "reward id: $REWARD_ID" echo "trial status: $TRIAL_STATUS" echo "bin size: $BIN_SIZE" echo "force recalc: $FORCE_RECALC" -source ~/anaconda3/etc/profile.d/conda.sh -conda activate vr2p + +conda activate $CONDA_ENV echo $CONDA_DEFAULT_ENV + RESULT=$(python < Date: Fri, 7 Mar 2025 02:34:40 -0500 Subject: [PATCH 2/2] customize job initiation --- .gitignore | 59 ++++++++++++++++++++++++++++++++ cluster_files/placefield_job.sh | 3 +- linear2ac/cluster/placefield.py | 60 +++++++++++++++++++++++++++------ 3 files changed, 110 insertions(+), 12 deletions(-) create mode 100755 .gitignore mode change 100644 => 100755 cluster_files/placefield_job.sh mode change 100644 => 100755 linear2ac/cluster/placefield.py diff --git a/.gitignore b/.gitignore new file mode 100755 index 0000000..80a8b74 --- /dev/null +++ b/.gitignore @@ -0,0 +1,59 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +pytestdebug.log + +# Sphinx documentation +docs/_build/ +doc/_build/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IDE specific +.vscode + +# Development +poetry.lock diff --git a/cluster_files/placefield_job.sh b/cluster_files/placefield_job.sh old mode 100644 new mode 100755 index ffb8931..b7e4a0d --- a/cluster_files/placefield_job.sh +++ b/cluster_files/placefield_job.sh @@ -8,7 +8,7 @@ REWARD_ID=$6 TRIAL_STATUS=$7 BIN_SIZE=$8 FORCE_RECALC=$9 -CONDA_ENV=${10:-vr2p} # Default to 'vr2p' if 10th argument is not provided +CONDA_ENV=${10:-suite2p} # Default to 'suite2p' if argument is not provided echo "input path: $INPUT_PATH" echo "output path: $OUTPUT_PATH" @@ -19,6 +19,7 @@ echo "trial status: $TRIAL_STATUS" echo "bin size: $BIN_SIZE" echo "force recalc: $FORCE_RECALC" +conda init bash conda activate $CONDA_ENV echo $CONDA_DEFAULT_ENV diff --git a/linear2ac/cluster/placefield.py b/linear2ac/cluster/placefield.py old mode 100644 new mode 100755 index ef1a5f4..a3d656b --- a/linear2ac/cluster/placefield.py +++ b/linear2ac/cluster/placefield.py @@ -44,7 +44,7 @@ def process_placefield_data(input_path, output_path, settings_file, session_id, # open input zarr folder. if not input_path.is_dir(): raise NameError(f'Could not find folder {input_path}') - data = vr2p.ExperimentData(input_path) + data = vr2p.ExperimentData(str(input_path)) print('Loaded data') # open output zarr folder. if not output_path.parent.is_dir(): @@ -104,7 +104,7 @@ def store_data(dataset_name, field_props,pf): all_field_props = pf_detect.putative_field_props store_data('putative', all_field_props, pf_detect.pf_putative) store_data('passed', [prop for prop in all_field_props if prop['passed']], pf_detect.pf_passed) - store_data('significant', [prop for prop in all_field_props if prop['passed'] & pf_detect.is_significant[prop['cell']]], pf_detect.pf_significant) + store_data('significant', [prop for prop in all_field_props if prop['passed'] & pf_detect.is_significant[prop['cell']]], pf_detect.pf_significant) # store object. pf_detect.clean_up() zarr_dataset.create_dataset('pf', data= pf_detect, dtype=object, object_codec = numcodecs.Pickle()) @@ -124,12 +124,13 @@ def send_placefield_job(info, server, moreargs='', pf_script_path='~/.linear2ac/ ssh = ssh_connect(server['host'], server['username'], server['password'],verbose=False) # run command run_command = f"bsub -n {server['n_cores']} -J {job_id} " - run_command +=f'{moreargs} -o logs/out-{job_id}.txt "{pf_script_path}' + run_command +=f'{moreargs} -o logs/out-{job_id}.txt "source {pf_script_path}' # arguments. for key in ['input_path','output_path','settings_file','session_id','cueset','reward_id','trial_status','bin_size','force_recalc']: run_command+= f' \'{info[key]}\'' run_command+= f'> logs/pf-log-{job_id}.txt"' stdin, stdout, stderr = ssh.exec_command(run_command) + print(run_command) # find job id. stdout = stdout.read().decode('utf-8') stdout = re.search('Job <(\d*)>',stdout) @@ -140,6 +141,37 @@ def send_placefield_job(info, server, moreargs='', pf_script_path='~/.linear2ac/ raise NameError("Could not find job id (was job submit succesfull?)") pass +def send_placefield_job(info, server, moreargs='', pf_script_path='~/.linear2ac/placefield_job.sh'): + import uuid, re, os + from pathlib import Path + from IPython import get_ipython + + job_id = f"{info['session_id']}-{info['reward_id']}-{uuid.uuid4().hex[:3].upper()}" + n_cores = server.get('n_cores', 1) + + logs_dir = Path('logs') + if not logs_dir.exists(): + logs_dir.mkdir(exist_ok=True) + + run_command = ( + f"sbatch -n {n_cores} -J {job_id} {moreargs} " + f"-o logs/out-{job_id}.txt " + f'--wrap="source {pf_script_path}' + ) + + for key in ['input_path','output_path','settings_file','session_id','cueset', + 'reward_id','trial_status','bin_size','force_recalc']: + run_command += f" '{info[key]}'" + run_command += f' > logs/pf-log-{job_id}.txt"' + + output = get_ipython().getoutput(run_command) + output_str = "\n".join(output) + match = re.search(r"Submitted batch job (\d+)", output_str) + if match: + return {'info': info, 'job_id': int(match.group(1))} + else: + raise NameError("Could not find job id (was job submit successful?)") + def check_placefield_job_status(info,local_output_path, server, verbose=True): """Checks on status of job generated by send_placefield_job. @@ -180,7 +212,7 @@ def check_placefield_results(data_path,local_output_path): with zarr.open(local_output_path.as_posix(), mode="r") as zarr_output: return (f"{data_path}/significant" in zarr_output) -def run_placefield_cluster(local_input_path, server_input_path, local_output_path, server_output_path, server_settings_file, settings, force_recalc): +def run_placefield_cluster(local_input_path, server_input_path, local_output_path, server_output_path, server_settings_file, settings, force_recalc, jobargs=''):# -> list: data = vr2p.ExperimentData(local_input_path) jobs = [] for session_id, vr in enumerate(data.vr): @@ -188,15 +220,21 @@ def run_placefield_cluster(local_input_path, server_input_path, local_output_pat for reward_id in [1,2]: for trial_status in ['correct','incorrect','excl_no_response','all']: # gather info. - process_info = {'input_path':server_input_path.as_posix(),'output_path':server_output_path.as_posix(), - 'settings_file': server_settings_file.as_posix(), - 'session_id':session_id,'cueset':cueset,'reward_id':reward_id, - 'trial_status':trial_status,'bin_size':settings['placefield']['bin_size'], - 'force_recalc':force_recalc} + process_info = { + 'input_path':Path(server_input_path).as_posix(), + 'output_path':Path(server_output_path).as_posix(), + 'settings_file': Path(server_settings_file).as_posix(), + 'session_id':session_id, + 'cueset':cueset, + 'reward_id':reward_id, + 'trial_status':trial_status, + 'bin_size':settings['placefield']['bin_size'], + 'force_recalc':force_recalc + } # check if results are already present. data_path = f"{cueset}/{reward_id}/{trial_status}/{session_id}" - if (not check_placefield_results(data_path,local_output_path)) | (force_recalc): - jobs.append(send_placefield_job(process_info, settings['server'])) + if (not check_placefield_results(data_path,Path(local_output_path))) | (force_recalc): + jobs.append(send_placefield_job(process_info, settings['server'], jobargs)) job_str = "Submitted" else: job_str = "Skipped"