Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log

# Sphinx documentation
docs/_build/
doc/_build/

# Jupyter Notebook
.ipynb_checkpoints

# IDE specific
.vscode

# Development
poetry.lock
9 changes: 6 additions & 3 deletions cluster_files/placefield_job.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ REWARD_ID=$6
TRIAL_STATUS=$7
BIN_SIZE=$8
FORCE_RECALC=$9
CONDA_ENV=${10:-suite2p} # Default to 'suite2p' if argument is not provided

echo "input path: $INPUT_PATH"
echo "output path: $OUTPUT_PATH"
Expand All @@ -17,12 +18,14 @@ echo "reward id: $REWARD_ID"
echo "trial status: $TRIAL_STATUS"
echo "bin size: $BIN_SIZE"
echo "force recalc: $FORCE_RECALC"
source ~/anaconda3/etc/profile.d/conda.sh
conda activate vr2p

conda init bash
conda activate $CONDA_ENV
echo $CONDA_DEFAULT_ENV

RESULT=$(python <<EOF
from linear2ac.cluster.placefield import process_placefield_data
process_placefield_data("$INPUT_PATH", "$OUTPUT_PATH", "$SETTINGS_FILE", $SESSION_ID, "$CUESET", $REWARD_ID, "$TRIAL_STATUS",bin_size=$BIN_SIZE,force_recalc=$FORCE_RECALC)
EOF
)
echo $RESULT
echo $RESULT
62 changes: 50 additions & 12 deletions linear2ac/cluster/placefield.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def process_placefield_data(input_path, output_path, settings_file, session_id,
# open input zarr folder.
if not input_path.is_dir():
raise NameError(f'Could not find folder {input_path}')
data = vr2p.ExperimentData(input_path)
data = vr2p.ExperimentData(str(input_path))
print('Loaded data')
# open output zarr folder.
if not output_path.parent.is_dir():
Expand Down Expand Up @@ -104,7 +104,7 @@ def store_data(dataset_name, field_props,pf):
all_field_props = pf_detect.putative_field_props
store_data('putative', all_field_props, pf_detect.pf_putative)
store_data('passed', [prop for prop in all_field_props if prop['passed']], pf_detect.pf_passed)
store_data('significant', [prop for prop in all_field_props if prop['passed'] & pf_detect.is_significant[prop['cell']]], pf_detect.pf_significant)
store_data('significant', [prop for prop in all_field_props if prop['passed'] & pf_detect.is_significant[prop['cell']]], pf_detect.pf_significant)
# store object.
pf_detect.clean_up()
zarr_dataset.create_dataset('pf', data= pf_detect, dtype=object, object_codec = numcodecs.Pickle())
Expand All @@ -118,18 +118,19 @@ def store_array(data_set_name, array, data_type):
print('Done!')
return pf_detect

def send_placefield_job(info, server):
def send_placefield_job(info, server, moreargs='', pf_script_path='~/.linear2ac/placefield_job.sh'):
job_id = f"{info['session_id']}-{info['reward_id']}-{uuid.uuid4().hex[:3].upper()}"
# connect to ssh
ssh = ssh_connect(server['host'], server['username'], server['password'],verbose=False)
# run command
run_command = f"bsub -n {server['n_cores']} -J {job_id} "
run_command +=f'-R"select[avx512]" -o logs/out-{job_id}.txt "~/placefield_job.sh'
run_command +=f'{moreargs} -o logs/out-{job_id}.txt "source {pf_script_path}'
# arguments.
for key in ['input_path','output_path','settings_file','session_id','cueset','reward_id','trial_status','bin_size','force_recalc']:
run_command+= f' \'{info[key]}\''
run_command+= f'> logs/pf-log-{job_id}.txt"'
stdin, stdout, stderr = ssh.exec_command(run_command)
print(run_command)
# find job id.
stdout = stdout.read().decode('utf-8')
stdout = re.search('Job <(\d*)>',stdout)
Expand All @@ -140,6 +141,37 @@ def send_placefield_job(info, server):
raise NameError("Could not find job id (was job submit succesfull?)")
pass

def send_placefield_job(info, server, moreargs='', pf_script_path='~/.linear2ac/placefield_job.sh'):
import uuid, re, os
from pathlib import Path
from IPython import get_ipython

job_id = f"{info['session_id']}-{info['reward_id']}-{uuid.uuid4().hex[:3].upper()}"
n_cores = server.get('n_cores', 1)

logs_dir = Path('logs')
if not logs_dir.exists():
logs_dir.mkdir(exist_ok=True)

run_command = (
f"sbatch -n {n_cores} -J {job_id} {moreargs} "
f"-o logs/out-{job_id}.txt "
f'--wrap="source {pf_script_path}'
)

for key in ['input_path','output_path','settings_file','session_id','cueset',
'reward_id','trial_status','bin_size','force_recalc']:
run_command += f" '{info[key]}'"
run_command += f' > logs/pf-log-{job_id}.txt"'

output = get_ipython().getoutput(run_command)
output_str = "\n".join(output)
match = re.search(r"Submitted batch job (\d+)", output_str)
if match:
return {'info': info, 'job_id': int(match.group(1))}
else:
raise NameError("Could not find job id (was job submit successful?)")

def check_placefield_job_status(info,local_output_path, server, verbose=True):
"""Checks on status of job generated by send_placefield_job.

Expand Down Expand Up @@ -180,23 +212,29 @@ def check_placefield_results(data_path,local_output_path):
with zarr.open(local_output_path.as_posix(), mode="r") as zarr_output:
return (f"{data_path}/significant" in zarr_output)

def run_placefield_cluster(local_input_path, server_input_path, local_output_path, server_output_path, server_settings_file, settings, force_recalc):
def run_placefield_cluster(local_input_path, server_input_path, local_output_path, server_output_path, server_settings_file, settings, force_recalc, jobargs=''):# -> list:
data = vr2p.ExperimentData(local_input_path)
jobs = []
for session_id, vr in enumerate(data.vr):
for cueset in vr.trial.set.unique():
for reward_id in [1,2]:
for trial_status in ['correct','incorrect','excl_no_response','all']:
# gather info.
process_info = {'input_path':server_input_path.as_posix(),'output_path':server_output_path.as_posix(),
'settings_file': server_settings_file.as_posix(),
'session_id':session_id,'cueset':cueset,'reward_id':reward_id,
'trial_status':trial_status,'bin_size':settings['placefield']['bin_size'],
'force_recalc':force_recalc}
process_info = {
'input_path':Path(server_input_path).as_posix(),
'output_path':Path(server_output_path).as_posix(),
'settings_file': Path(server_settings_file).as_posix(),
'session_id':session_id,
'cueset':cueset,
'reward_id':reward_id,
'trial_status':trial_status,
'bin_size':settings['placefield']['bin_size'],
'force_recalc':force_recalc
}
# check if results are already present.
data_path = f"{cueset}/{reward_id}/{trial_status}/{session_id}"
if (not check_placefield_results(data_path,local_output_path)) | (force_recalc):
jobs.append(send_placefield_job(process_info, settings['server']))
if (not check_placefield_results(data_path,Path(local_output_path))) | (force_recalc):
jobs.append(send_placefield_job(process_info, settings['server'], jobargs))
job_str = "Submitted"
else:
job_str = "Skipped"
Expand Down