From c252ba621280130787d177076d2f01638e345900 Mon Sep 17 00:00:00 2001 From: yohanchatelain Date: Fri, 24 Sep 2021 12:58:49 -0400 Subject: [PATCH 1/7] Format 80 README.md --- README.md | 218 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 165 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index e38b543..97a3a7a 100644 --- a/README.md +++ b/README.md @@ -7,16 +7,35 @@ ### Use Python version 3.7 or later ## Overview -**Goal:** To build a generic CI framework for the evaluation of neuroimaging results, which iteratively evaluates and updates results as new data, pipelines, or processing conditions become available. It allows users to systematically evaluate the variability and robustness of results within their neuroscience domain, and facilitate reproducibility. -It currently relies on the CBRAIN distributed computation framework. -It is work in progress :) +**Goal:** To build a generic CI framework for the evaluation of neuroimaging +results, which iteratively evaluates and updates results as new data, pipelines, +or processing conditions become available. It allows users to systematically +evaluate the variability and robustness of results within their neuroscience +domain, and facilitate reproducibility. It currently relies on the CBRAIN +distributed computation framework. It is work in progress :) ## Description and Diagram -* An ‘experiment definition’ yaml file allows the user to specify the pipelines, parameters, and datasets needed to describe and run their computational experiment. It will perform a multiverse-style analysis, where all of the specified pipelines and configurations are run on all the specified datasets. We can therefore determine if the results obtained are robust across pipelines and replicable across datasets. -* Each time the experiment definition is changed, a run in the CircleCI CI platform will be triggered, dynamically launching the required data processing, collecting the results, and transparently committing them along with statistical summaries and visualizations to the experiment CI workflow. -* It makes use of existing open-source tools: the Datalad framework will reference datasets indexed in the CONP (Canadian Open Neuroscience Platform) repositories, which are transferred to Compute Canada. Here, the data can be accessed by CBRAIN, a neuroimaging distributed cloud computing platform able to handle computationally intensive research and that features an API, allowing computations to be executed from within CircleCI. -* Pipelines will point to the Boutiques tool descriptor, which allows them to be containerized and run on CBRAIN/NeuroHub. -* The initial pipelines and datasets that are being integrated can be seen in the diagram below. They have been chosen as they are pertinent to my research question on the associations of hearing loss and brain structure. +* An ‘experiment definition’ yaml file allows the user to specify the pipelines, + parameters, and datasets needed to describe and run their computational + experiment. It will perform a multiverse-style analysis, where all of the + specified pipelines and configurations are run on all the specified datasets. + We can therefore determine if the results obtained are robust across pipelines + and replicable across datasets. +* Each time the experiment definition is changed, a run in the CircleCI CI + platform will be triggered, dynamically launching the required data + processing, collecting the results, and transparently committing them along + with statistical summaries and visualizations to the experiment CI workflow. +* It makes use of existing open-source tools: the Datalad framework will + reference datasets indexed in the CONP (Canadian Open Neuroscience Platform) + repositories, which are transferred to Compute Canada. Here, the data can be + accessed by CBRAIN, a neuroimaging distributed cloud computing platform able + to handle computationally intensive research and that features an API, + allowing computations to be executed from within CircleCI. +* Pipelines will point to the Boutiques tool descriptor, which allows them to be + containerized and run on CBRAIN/NeuroHub. +* The initial pipelines and datasets that are being integrated can be seen in + the diagram below. They have been chosen as they are pertinent to my research + question on the associations of hearing loss and brain structure. ![](./Related_Files/Other/Diagram.jpg) @@ -27,83 +46,176 @@ It is work in progress :) * [CircleCI account](https://circleci.com/) ## How to run this platform -* **Note that the structure of many of the following files I mention can easily be learnt by looking at the files and how I currently have them setup. These instructions include adding your own pipelines and datasets.** +* **Note that the structure of many of the following files I mention can easily + be learnt by looking at the files and how I currently have them setup. These + instructions include adding your own pipelines and datasets.** * (1) Create the required accounts to run this platform. * (2) Fork this repository. -* (3) Edit the Experiment Definition yaml file to add the pipelines and datasets, make sure the pipeline components are specified in the correct order. - -* (4) Add the relevant CBRAIN IDs for tools and data providers in the *Config_Files/CBRAIN_IDs.yaml* file. +* (3) Edit the Experiment Definition yaml file to add the pipelines and + datasets, make sure the pipeline components are specified in the correct + order. -* (5) Create and add, or modify the pipeline parameter json files to the *Task_Parameters* directory. These are the parameters which the tool will run on in CBRAIN. Currently I find that the easiest way to create a new parameter json file is to run a single task with the pipeline through CBRAIN, then querying the task through the *cbrain_get_task_info* function in *cbrainAPI.py*, and then copying and pasting the 'params' field with desired modifications into a json file. +* (4) Add the relevant CBRAIN IDs for tools and data providers in the + *Config_Files/CBRAIN_IDs.yaml* file. -* (6) Provide the path to the parameters for each component in the Experiment Definition. Note that the names by which you refer to the pipelines and datasets have to be written identically in the config file and the Experiment Definition. +* (5) Create and add, or modify the pipeline parameter json files to the + *Task_Parameters* directory. These are the parameters which the tool will run + on in CBRAIN. Currently I find that the easiest way to create a new parameter + json file is to run a single task with the pipeline through CBRAIN, then + querying the task through the *cbrain_get_task_info* function in + *cbrainAPI.py*, and then copying and pasting the 'params' field with desired + modifications into a json file. -* (7) Edit the analysesVisualizations.py module to process your results from the cache files and produce the plots as you see fit. +* (6) Provide the path to the parameters for each component in the Experiment + Definition. Note that the names by which you refer to the pipelines and + datasets have to be written identically in the config file and the Experiment + Definition. -* (8) Depending on the output format of the pipeline you will likely have to add some code to the *populate_results* function in the *cacheOps.py* module to ensure the results are extracted and placed in the cache in the specific way. +* (7) Edit the analysesVisualizations.py module to process your results from the + cache files and produce the plots as you see fit. -* (9) Similarly to the previous step, the code for the *update_statuses* function in *cacheOps.py* may need to be updated depending on the specific pipeline's CBRAIN ID output key, which in turn depends on the Boutiques descriptor for the pipeline. +* (8) Depending on the output format of the pipeline you will likely have to add + some code to the *populate_results* function in the *cacheOps.py* module to + ensure the results are extracted and placed in the cache in the specific way. -* (10) In CircleCI, and create a new project for your forked repo. Use the *.circleci/config.yml* from the repo as your circleCI config file. +* (9) Similarly to the previous step, the code for the *update_statuses* + function in *cacheOps.py* may need to be updated depending on the specific + pipeline's CBRAIN ID output key, which in turn depends on the Boutiques + descriptor for the pipeline. -* (11) Go to the CircleCI project settings. Modify the environment variables in CircleCI. Add your CBRAIN credentials in environment variables called 'cbrain_user' and 'cbrain_password' +* (10) In CircleCI, and create a new project for your forked repo. Use the + *.circleci/config.yml* from the repo as your circleCI config file. -* (12) In the CircleCI project settings, generate a CircleCI token and add it as 'CCI_token' to the CircleCI environment variables (to be able to download the latest cache file from the artifacts using their API). +* (11) Go to the CircleCI project settings. Modify the environment variables in + CircleCI. Add your CBRAIN credentials in environment variables called + 'cbrain_user' and 'cbrain_password' -* (13) Note that this step is probably already automatically done by CircleCI (but worth checking): In the forked GitHub repo settings, go to the ‘Deploy keys’ tab in the project’s settings, generate a key, and paste this into the CircleCI Project Settings ‘SSH Keys’ tab. +* (12) In the CircleCI project settings, generate a CircleCI token and add it as + 'CCI_token' to the CircleCI environment variables (to be able to download the + latest cache file from the artifacts using their API). -* (14) Modify the circleCI config file in *.circleci/config.yml*. Make sure the cron-job is running at the desired frequency, and make sure all of your outputs (By default I have this setup as all json cache files and the plots) are deposited in the 'artifacts' directory which is created in this file. +* (13) Note that this step is probably already automatically done by CircleCI + (but worth checking): In the forked GitHub repo settings, go to the ‘Deploy + keys’ tab in the project’s settings, generate a key, and paste this into the + CircleCI Project Settings ‘SSH Keys’ tab. + +* (14) Modify the circleCI config file in *.circleci/config.yml*. Make sure the + cron-job is running at the desired frequency, and make sure all of your + outputs (By default I have this setup as all json cache files and the plots) + are deposited in the 'artifacts' directory which is created in this file. ## File and data layout and descriptions The main directories in the repository are: -* **Config_Files:** Contains configuration files. Currently only contains the CBRAIN_IDs.yaml file (see below). +* **Config_Files:** Contains configuration files. Currently only contains the + CBRAIN_IDs.yaml file (see below). -* **Related_Files:** Miscellaneous bits and bobs I have made over the course of the project which make life easier, such as a script to register files in CBRAIN, a bash script to help creating a flat directory of .nii.gz files from datasets, etc. +* **Related_Files:** Miscellaneous bits and bobs I have made over the course of + the project which make life easier, such as a script to register files in + CBRAIN, a bash script to help creating a flat directory of .nii.gz files from + datasets, etc. -* **Task_Parameters:** Has a json file specifying the parameters for each pipeline component as a dictionary. I recommend exploring the tools on CBRAIN, copying their parameter dictionary for posting a task, and pasting it into a json file in this directory to ensure the API calls are made properly. +* **Task_Parameters:** Has a json file specifying the parameters for each + pipeline component as a dictionary. I recommend exploring the tools on CBRAIN, + copying their parameter dictionary for posting a task, and pasting it into a + json file in this directory to ensure the API calls are made properly. -* **Tests:** Contains the unit tests for the code. Still very much a work in progress. +* **Tests:** Contains the unit tests for the code. Still very much a work in + progress. Some important files to start with are: -* **Experiment_Definition.yaml:** The user specifies the names of the datasets, pipeline. They specify pipeline components in the order they will execute in, as well as the paths for the parameters of each component. Note that the names provided in this file must match the names in Config_Files. -This is the main file the user is meant to modify to define the computational experiments being run. - -* **.circleci/config.yml**: Downloads and installs the necessary packages for Python. Launches the NeuroCI.py script using CircleCI environment variables to pass it arguments. It creates an artifacts directory which will contain all of the CircleCI artifacts available (our desired output). It copies the json cache files to this directory. -It also has a section to specify a cron-job to routinely update your caches and computations. - -* **Config_Files/CBRAIN_IDs.yaml:** Allows the user to specify the CBRAIN data provider ID for the datasets (points CBRAIN to your dataset on Compute Canada) and the pipeline component IDs ("Tool_config_IDs" on CBRAIN). These are the IDs the script will use to access the data and pipelines in CBRAIN. Note that the names provided in this file must match the names in the Experiment Definition. - -* A json cache file with the name of each dataset will automatically be produced, and keeps track of all the CBRAIN IDs of files and tasks, computation statuses, and the results of the current CI run. It is then exported as an artifact to be downloaded at the beginning of the next CI run. The first level of a json cache file are all the filenames in the dataset's CBRAIN data provider, and then under the file the separate pipelines (and then pipeline components and results) are present with their relevant information. +* **Experiment_Definition.yaml:** The user specifies the names of the datasets, + pipeline. They specify pipeline components in the order they will execute in, + as well as the paths for the parameters of each component. Note that the names + provided in this file must match the names in Config_Files. This is the main + file the user is meant to modify to define the computational experiments being + run. + +* **.circleci/config.yml**: Downloads and installs the necessary packages for + Python. Launches the NeuroCI.py script using CircleCI environment variables to + pass it arguments. It creates an artifacts directory which will contain all of + the CircleCI artifacts available (our desired output). It copies the json + cache files to this directory. It also has a section to specify a cron-job to + routinely update your caches and computations. + +* **Config_Files/CBRAIN_IDs.yaml:** Allows the user to specify the CBRAIN data + provider ID for the datasets (points CBRAIN to your dataset on Compute Canada) + and the pipeline component IDs ("Tool_config_IDs" on CBRAIN). These are the + IDs the script will use to access the data and pipelines in CBRAIN. Note that + the names provided in this file must match the names in the Experiment + Definition. + +* A json cache file with the name of each dataset will automatically be + produced, and keeps track of all the CBRAIN IDs of files and tasks, + computation statuses, and the results of the current CI run. It is then + exported as an artifact to be downloaded at the beginning of the next CI run. + The first level of a json cache file are all the filenames in the dataset's + CBRAIN data provider, and then under the file the separate pipelines (and then + pipeline components and results) are present with their relevant information. The main Python modules are: -* **NeuroCI.py:** The "main" central module which orchestrates the workflow. It logs the user on to CBRAIN, obtains the circleCI token, and the url to download the newest artifacts from the last run (all stored in the CircleCI environment variables, and passed to the script in the CircleCI config file). It then reads the Experiment Definition and the config file, and passes all relevant info to the "main" function. -The main function is a simple nested loop. The outer loop iterates over the datasets in the experiment definition, downloads the latest cache file (json file which keeps track of intermediate computations and completed computation results from CBRAIN for a single dataset) from the previous circleCI run, and queries CBRAIN in order to update all of the task statuses. -The inner loop iterates over the pipelines in the experiment definition file. It adds any new files available in the data provider to the cache, and calls the pipeline manager to deal with the posting of tasks for each component of the current pipeline and dataset. - -* **cacheOps.py:** Contains functions that perform operations on the dataset cache files. These range from downloading the latest cache file, to generating a new cache file from scratch, generating new subjects, to updating task and result statuses in a cache file, etc. -One function that is important to highlight is the pipeline manager, which iterates over each component in a pipeline, organizes, and feeds the necessary data to the functions which post tasks on CBRAIN and update the caches with the newest progress in computations. - -* **cbrainAPI.py:** Contains all of the API calls used to communicate with CBRAIN. Each API call has it's own function. - -* **analysesVisualizations.py:** This is run as a separate script from NeuroCI.py in the CircleCI config file. It iterates through the results from the cache files, processing them however the user desires, and plots them. It downloads further files from CBRAIN (such as sensitive data we don't wish to make public in the repo) loops through all the pipelines in the Experiment Definition, and launches a function to process a dataset cache file (this function will vary depending on the users experiment). -For example, for Prevent-AD, this function iterates through the cache, matches the results from finished computations to the corresponding measures from the file we just downloaded earlier in the script, and then plots a correlation which we can output along with the cache file as artifacts in CircleCI (the plot is saved directly to the artifacts directory - see the **.circleci/config.yml** heading above). Thus the user has access to the most up-to-date results and graphics. Currently in very early stages. +* **NeuroCI.py:** The "main" central module which orchestrates the workflow. It + logs the user on to CBRAIN, obtains the circleCI token, and the url to + download the newest artifacts from the last run (all stored in the CircleCI + environment variables, and passed to the script in the CircleCI config file). + It then reads the Experiment Definition and the config file, and passes all + relevant info to the "main" function. The main function is a simple nested + loop. The outer loop iterates over the datasets in the experiment definition, + downloads the latest cache file (json file which keeps track of intermediate + computations and completed computation results from CBRAIN for a single + dataset) from the previous circleCI run, and queries CBRAIN in order to update + all of the task statuses. The inner loop iterates over the pipelines in the + experiment definition file. It adds any new files available in the data + provider to the cache, and calls the pipeline manager to deal with the posting + of tasks for each component of the current pipeline and dataset. + +* **cacheOps.py:** Contains functions that perform operations on the dataset + cache files. These range from downloading the latest cache file, to generating + a new cache file from scratch, generating new subjects, to updating task and + result statuses in a cache file, etc. One function that is important to + highlight is the pipeline manager, which iterates over each component in a + pipeline, organizes, and feeds the necessary data to the functions which post + tasks on CBRAIN and update the caches with the newest progress in + computations. + +* **cbrainAPI.py:** Contains all of the API calls used to communicate with + CBRAIN. Each API call has it's own function. + +* **analysesVisualizations.py:** This is run as a separate script from + NeuroCI.py in the CircleCI config file. It iterates through the results from + the cache files, processing them however the user desires, and plots them. It + downloads further files from CBRAIN (such as sensitive data we don't wish to + make public in the repo) loops through all the pipelines in the Experiment + Definition, and launches a function to process a dataset cache file (this + function will vary depending on the users experiment). For example, for + Prevent-AD, this function iterates through the cache, matches the results from + finished computations to the corresponding measures from the file we just + downloaded earlier in the script, and then plots a correlation which we can + output along with the cache file as artifacts in CircleCI (the plot is saved + directly to the artifacts directory - see the **.circleci/config.yml** heading + above). Thus the user has access to the most up-to-date results and graphics. + Currently in very early stages. ## Function descriptions -While I may indeed someday make an entire written document for each individual function, for now I recommend parsing through the code and seeing the comments and docstrings in it to understand it. I've spent some time trying to make the code and commenting informative and modular. +While I may indeed someday make an entire written document for each individual +function, for now I recommend parsing through the code and seeing the comments +and docstrings in it to understand it. I've spent some time trying to make the +code and commenting informative and modular. ## Quick tips, fixes and other notes. -* Note that the first time the circleCI run of this softwrae attempts to populate the results into a json, it will likely say there is an error populating it in the terminal output, giving something along the lines of: -*Streaming text for fileID: 3700641 -Synchronized userfiles 3700641 -Download failure -401* -If this is the case, don't worry! The file synchronization doesn't occur instantaneously, so it doesn't find the file on CBRAIN to download until a minute or two have passed. It should download with no issues the next time the CI runs a few hours later. \ No newline at end of file +* Note that the first time the circleCI run of this softwrae attempts to + populate the results into a json, it will likely say there is an error + populating it in the terminal output, giving something along the lines of: + *Streaming text for fileID: 3700641 Synchronized userfiles 3700641 Download + failure 401* If this is the case, don't worry! The file synchronization + doesn't occur instantaneously, so it doesn't find the file on CBRAIN to + download until a minute or two have passed. It should download with no issues + the next time the CI runs a few hours later. \ No newline at end of file From 317721de38ab61b49543a59063128b8fef1f23c6 Mon Sep 17 00:00:00 2001 From: yohanchatelain Date: Fri, 24 Sep 2021 13:14:46 -0400 Subject: [PATCH 2/7] Update README.md --- README.md | 158 +++++++++++++++++++++++++++--------------------------- 1 file changed, 78 insertions(+), 80 deletions(-) diff --git a/README.md b/README.md index 97a3a7a..afc789c 100644 --- a/README.md +++ b/README.md @@ -12,21 +12,21 @@ results, which iteratively evaluates and updates results as new data, pipelines, or processing conditions become available. It allows users to systematically evaluate the variability and robustness of results within their neuroscience domain, and facilitate reproducibility. It currently relies on the CBRAIN -distributed computation framework. It is work in progress :) +distributed computation framework. It is a work in progress :) ## Description and Diagram -* An ‘experiment definition’ yaml file allows the user to specify the pipelines, +* An ‘experiment definition’ YAML file allows the user to specify the pipelines, parameters, and datasets needed to describe and run their computational experiment. It will perform a multiverse-style analysis, where all of the - specified pipelines and configurations are run on all the specified datasets. + selected pipelines and configurations are run on all the specified datasets. We can therefore determine if the results obtained are robust across pipelines and replicable across datasets. * Each time the experiment definition is changed, a run in the CircleCI CI platform will be triggered, dynamically launching the required data processing, collecting the results, and transparently committing them along with statistical summaries and visualizations to the experiment CI workflow. -* It makes use of existing open-source tools: the Datalad framework will - reference datasets indexed in the CONP (Canadian Open Neuroscience Platform) +* It uses existing open-source tools: the Datalad framework will reference + datasets indexed in the CONP (Canadian Open Neuroscience Platform) repositories, which are transferred to Compute Canada. Here, the data can be accessed by CBRAIN, a neuroimaging distributed cloud computing platform able to handle computationally intensive research and that features an API, @@ -35,7 +35,7 @@ distributed computation framework. It is work in progress :) containerized and run on CBRAIN/NeuroHub. * The initial pipelines and datasets that are being integrated can be seen in the diagram below. They have been chosen as they are pertinent to my research - question on the associations of hearing loss and brain structure. + question on the associations between hearing loss and brain structure. ![](./Related_Files/Other/Diagram.jpg) @@ -47,39 +47,38 @@ distributed computation framework. It is work in progress :) ## How to run this platform * **Note that the structure of many of the following files I mention can easily - be learnt by looking at the files and how I currently have them setup. These - instructions include adding your own pipelines and datasets.** + be learned by looking at the files and how I currently have them set up. These + instructions include adding your pipelines and datasets.** * (1) Create the required accounts to run this platform. * (2) Fork this repository. -* (3) Edit the Experiment Definition yaml file to add the pipelines and +* (3) Edit the Experiment Definition YAML file to add the pipelines and datasets, make sure the pipeline components are specified in the correct order. * (4) Add the relevant CBRAIN IDs for tools and data providers in the *Config_Files/CBRAIN_IDs.yaml* file. -* (5) Create and add, or modify the pipeline parameter json files to the - *Task_Parameters* directory. These are the parameters which the tool will run - on in CBRAIN. Currently I find that the easiest way to create a new parameter - json file is to run a single task with the pipeline through CBRAIN, then - querying the task through the *cbrain_get_task_info* function in +* (5) Create, add, or modify the pipeline parameter JSON files to the + *Task_Parameters* directory. These are the parameters on which the tool will + run in CBRAIN. Currently, I find that the easiest way to create a new + parameter JSON file is to run a single task with the pipeline through CBRAIN, + then querying the task through the *cbrain_get_task_info* function in *cbrainAPI.py*, and then copying and pasting the 'params' field with desired - modifications into a json file. + modifications into a JSON file. * (6) Provide the path to the parameters for each component in the Experiment - Definition. Note that the names by which you refer to the pipelines and - datasets have to be written identically in the config file and the Experiment - Definition. + Definition. Note that the names you refer to the pipelines and datasets have + to be written identically in the config file and the Experiment Definition. * (7) Edit the analysesVisualizations.py module to process your results from the cache files and produce the plots as you see fit. -* (8) Depending on the output format of the pipeline you will likely have to add +* (8) Depending on the pipeline's output format, you will likely have to add some code to the *populate_results* function in the *cacheOps.py* module to - ensure the results are extracted and placed in the cache in the specific way. + ensure the results are extracted and placed in the cache in a specific way. * (9) Similarly to the previous step, the code for the *update_statuses* function in *cacheOps.py* may need to be updated depending on the specific @@ -102,10 +101,10 @@ distributed computation framework. It is work in progress :) keys’ tab in the project’s settings, generate a key, and paste this into the CircleCI Project Settings ‘SSH Keys’ tab. -* (14) Modify the circleCI config file in *.circleci/config.yml*. Make sure the +* (14) Modify the circleCI config file in *.circleci/config.yml*. Ensure the cron-job is running at the desired frequency, and make sure all of your - outputs (By default I have this setup as all json cache files and the plots) - are deposited in the 'artifacts' directory which is created in this file. + outputs (By default, I have this setup as all JSON cache files and the plots) + are deposited in the 'artifacts' directory created in this file. ## File and data layout and descriptions @@ -115,14 +114,14 @@ The main directories in the repository are: CBRAIN_IDs.yaml file (see below). * **Related_Files:** Miscellaneous bits and bobs I have made over the course of - the project which make life easier, such as a script to register files in - CBRAIN, a bash script to help creating a flat directory of .nii.gz files from - datasets, etc. + the project make life easier, such as a script to register files in CBRAIN and + a bash script to help create a flat directory of .nii.gz files from datasets, + etc. -* **Task_Parameters:** Has a json file specifying the parameters for each +* **Task_Parameters:** It has a JSON file specifying the parameters for each pipeline component as a dictionary. I recommend exploring the tools on CBRAIN, copying their parameter dictionary for posting a task, and pasting it into a - json file in this directory to ensure the API calls are made properly. + JSON file in this directory to ensure the API calls are made properly. * **Tests:** Contains the unit tests for the code. Still very much a work in progress. @@ -138,10 +137,10 @@ Some important files to start with are: * **.circleci/config.yml**: Downloads and installs the necessary packages for Python. Launches the NeuroCI.py script using CircleCI environment variables to - pass it arguments. It creates an artifacts directory which will contain all of - the CircleCI artifacts available (our desired output). It copies the json - cache files to this directory. It also has a section to specify a cron-job to - routinely update your caches and computations. + pass its arguments. It creates an artifacts directory that will contain all of + the CircleCI artifacts available (our desired output). Finally, it copies the + JSON cache files to this directory. It also has a section to specify a + cron-job to update your caches and computations routinely. * **Config_Files/CBRAIN_IDs.yaml:** Allows the user to specify the CBRAIN data provider ID for the datasets (points CBRAIN to your dataset on Compute Canada) @@ -150,72 +149,71 @@ Some important files to start with are: the names provided in this file must match the names in the Experiment Definition. -* A json cache file with the name of each dataset will automatically be - produced, and keeps track of all the CBRAIN IDs of files and tasks, - computation statuses, and the results of the current CI run. It is then - exported as an artifact to be downloaded at the beginning of the next CI run. - The first level of a json cache file are all the filenames in the dataset's - CBRAIN data provider, and then under the file the separate pipelines (and then - pipeline components and results) are present with their relevant information. +* A JSON cache file with the name of each dataset will automatically be produced + and keep track of all the CBRAIN IDs of files and tasks, computation statuses, + and the results of the current CI run. It is then exported as an artifact to + be downloaded at the beginning of the next CI run. The first level of a JSON + cache file is all the filenames in the dataset's CBRAIN data provider. Then + under the file, the separate pipelines (and then pipeline components and + results) are present with their relevant information. The main Python modules are: * **NeuroCI.py:** The "main" central module which orchestrates the workflow. It - logs the user on to CBRAIN, obtains the circleCI token, and the url to - download the newest artifacts from the last run (all stored in the CircleCI - environment variables, and passed to the script in the CircleCI config file). - It then reads the Experiment Definition and the config file, and passes all - relevant info to the "main" function. The main function is a simple nested - loop. The outer loop iterates over the datasets in the experiment definition, - downloads the latest cache file (json file which keeps track of intermediate - computations and completed computation results from CBRAIN for a single - dataset) from the previous circleCI run, and queries CBRAIN in order to update - all of the task statuses. The inner loop iterates over the pipelines in the - experiment definition file. It adds any new files available in the data - provider to the cache, and calls the pipeline manager to deal with the posting - of tasks for each component of the current pipeline and dataset. + logs the user onto CBRAIN, obtains the circleCI token and the URL to download + the newest artifacts from the last run (all stored in the CircleCI environment + variables and passed to the script in the CircleCI config file). It then reads + the Experiment Definition and the config file and gives all relevant info to + the "main" function. The main function is a simple nested loop. The outer loop + iterates over the datasets in the experiment definition downloads the latest + cache file (JSON file which keeps track of intermediate computations and + completed computation results from CBRAIN for a single dataset) from the + previous circleCI run queries CBRAIN to update all of the task statuses. The + inner loop iterates over the pipelines in the experiment definition file. It + adds any new files available in the data provider to the cache. It calls the + pipeline manager to deal with posting tasks for each component of the current + pipeline and dataset. * **cacheOps.py:** Contains functions that perform operations on the dataset - cache files. These range from downloading the latest cache file, to generating - a new cache file from scratch, generating new subjects, to updating task and - result statuses in a cache file, etc. One function that is important to - highlight is the pipeline manager, which iterates over each component in a - pipeline, organizes, and feeds the necessary data to the functions which post - tasks on CBRAIN and update the caches with the newest progress in - computations. + cache files. These range from downloading the latest cache file, generating a + new cache file from scratch, generating new subjects, updating task and result + statuses in a cache file, etc. One function that is important to highlight is + the pipeline manager, which iterates over each component in a pipeline, + organizes, and feeds the necessary data to the functions that post tasks on + CBRAIN, and updates the caches with the newest progress computations. * **cbrainAPI.py:** Contains all of the API calls used to communicate with CBRAIN. Each API call has it's own function. -* **analysesVisualizations.py:** This is run as a separate script from - NeuroCI.py in the CircleCI config file. It iterates through the results from - the cache files, processing them however the user desires, and plots them. It - downloads further files from CBRAIN (such as sensitive data we don't wish to - make public in the repo) loops through all the pipelines in the Experiment - Definition, and launches a function to process a dataset cache file (this - function will vary depending on the users experiment). For example, for - Prevent-AD, this function iterates through the cache, matches the results from - finished computations to the corresponding measures from the file we just - downloaded earlier in the script, and then plots a correlation which we can - output along with the cache file as artifacts in CircleCI (the plot is saved - directly to the artifacts directory - see the **.circleci/config.yml** heading - above). Thus the user has access to the most up-to-date results and graphics. +* **analysesVisualizations.py:** This is run separately from NeuroCI.py in the + CircleCI config file. It iterates through the results from the cache files, + processing them however the user desires and plots them. It downloads further + files from CBRAIN (such as sensitive data we don't wish to make public in the + repo) loops through all the pipelines in the Experiment Definition, and + launches a function to process a dataset cache file (this function will vary + depending on the user's experiment). For example, for Prevent-AD, this + function iterates through the cache, matches the results from finished + computations to the corresponding measures from the file we just downloaded + earlier in the script, and then plots a correlation which we can output along + with the cache file as artifacts in CircleCI (the plot is saved directly to + the artifacts directory - see the **.circleci/config.yml** heading above). + Thus the user has access to the most up-to-date results and graphics. Currently in very early stages. ## Function descriptions -While I may indeed someday make an entire written document for each individual -function, for now I recommend parsing through the code and seeing the comments -and docstrings in it to understand it. I've spent some time trying to make the -code and commenting informative and modular. +While I may indeed someday make an entire written document for each function, +for now, I recommend parsing through the code and seeing the comments and +docstrings in it to understand it. I've spent some time trying to make the code +and commenting informative and modular. ## Quick tips, fixes and other notes. -* Note that the first time the circleCI run of this softwrae attempts to - populate the results into a json, it will likely say there is an error +* Note that the first time the circleCI run of this software attempts to + populate the results into a JSON, it will likely say there is an error populating it in the terminal output, giving something along the lines of: *Streaming text for fileID: 3700641 Synchronized userfiles 3700641 Download failure 401* If this is the case, don't worry! The file synchronization doesn't occur instantaneously, so it doesn't find the file on CBRAIN to - download until a minute or two have passed. It should download with no issues - the next time the CI runs a few hours later. \ No newline at end of file + download until a minute or two has passed. It should download with no issues + the next time the CI runs a few hours later. From 464efb763b5b6f735a562db5026ed7a5329032e0 Mon Sep 17 00:00:00 2001 From: yohanchatelain Date: Fri, 24 Sep 2021 15:52:46 -0400 Subject: [PATCH 3/7] Refactoring --- .circleci/config.yml | 2 +- Config_Files/CBRAIN_IDs.yaml | 2 +- NeuroCI.py | 150 ++++---- cacheOps.py | 667 ++++++++++++++++++----------------- cbrainAPI.py | 533 ++++++++++++++-------------- neuroCIdata.py | 4 + utils.py | 19 + 7 files changed, 735 insertions(+), 642 deletions(-) create mode 100644 neuroCIdata.py create mode 100644 utils.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 7ab85f3..461efb4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -17,7 +17,7 @@ jobs: # A basic unit of work in a run - run: no_output_timeout: 10m #change this to set the limit for how long it can run...useful for when Beluga crashes. command: | - python NeuroCI.py $cbrain_user $cbrain_password $CCI_token https://circleci.com/api/v1.1/project/github/$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME/latest/artifacts + python NeuroCI.py --cbrain-user="${cbrain_user}" --cbrain-password="${cbrain_password}" --CCI-token="${CCI_token}" --artifacts-url="https://circleci.com/api/v1.1/project/github/$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME/latest/artifacts" mkdir artifacts cp *.json artifacts python analysesVisualizations.py $cbrain_user $cbrain_password diff --git a/Config_Files/CBRAIN_IDs.yaml b/Config_Files/CBRAIN_IDs.yaml index 182d184..892f948 100644 --- a/Config_Files/CBRAIN_IDs.yaml +++ b/Config_Files/CBRAIN_IDs.yaml @@ -1,5 +1,5 @@ Data_Provider_IDs: - Prevent-AD: 318 + Prevent-AD: 504 Tool_Config_IDs: FSL_First: 721 diff --git a/NeuroCI.py b/NeuroCI.py index f8973dc..1b06f8a 100644 --- a/NeuroCI.py +++ b/NeuroCI.py @@ -1,89 +1,101 @@ -import requests -import yaml -import json -import sys +import argparse import os -#from github import Github -from ast import literal_eval -import time -import datetime -from cbrainAPI import * -from cacheOps import * +import cacheOps +import cbrainAPI +import neuroCIdata +import utils ################################################################################## -def main(cbrain_token, CCI_token, experiment_definition, cbrain_ids, latest_artifacts_url): - - for dataset in experiment_definition['Datasets']: - - download_cache(dataset + '.json', CCI_token, latest_artifacts_url) #Downloads newest cache to json file - print('Downloaded newest cache for: ' + dataset + '.json') - - task_list = cbrain_get_all_tasks(cbrain_token) #Gets the complete list of tasks for the user on CBRAIN - print('Fetched the list of tasks for the CBRAIN user') - - start = time.time() - update_statuses(dataset + '.json', task_list) #Updates the contents of a cache to reflect CBRAIN task statuses - end = time.time() - print('Updated statuses in cache for: ' + dataset + '.json in' + str(datetime.timedelta(seconds=(end - start)))) - - for pipeline in experiment_definition['Pipelines']: - - start = time.time() - populate_cache_filenames(dataset + '.json', cbrain_token, experiment_definition['Datasets'][dataset]['Blocklist'], pipeline, cbrain_ids['Data_Provider_IDs'][dataset], experiment_definition) #Populates a cache with any new files found - end = time.time() - print('Populated cache filenames for: ' + dataset + '.json' + ', ' + pipeline + " in" + str(datetime.timedelta(seconds=(end - start)))) - - pipeline_manager(cbrain_token, experiment_definition, cbrain_ids, pipeline, dataset) - print('Posted tasks for: ' + dataset + '.json' + ', ' + pipeline) - - populate_results(dataset + '.json', cbrain_token) - print('Populated results for ' + dataset + '.json') - #extract_results() - #analysis(expdef[script]) - - #start = time.time() - #update_statuses(dataset + '.json', cbrain_token) - #end = time.time() - #print('Updated statuses in cache for: ' + dataset + '.json in' + str(datetime.timedelta(seconds=(end - start)))) +_json_extension = "json" -################################################################################## -#Obtain login credentials from args, stored in CI environment variables. +def run(args, cbrain_api, cbrain_ids, experiment_definition): -cbrain_user = sys.argv[1] -cbrain_password = sys.argv[2] -CCI_token = sys.argv[3] -latest_artifacts_url = sys.argv[4] -cbrain_token = cbrain_login(cbrain_user, cbrain_password) + for dataset in experiment_definition['Datasets']: -################################################################################## + # Downloads newest cache to json file + dataset_filename = os.path.extsep.join(dataset, _json_extension) + cacheOps.download_cache( + dataset_filename, args.CCI_token, args.artifacts_url) + print(f'Downloaded newest cache for: {dataset_filename}') + # Gets the complete list of tasks for the user on CBRAIN + task_list = cbrain_api.get_all_tasks(args.cbrain_token) + print('Fetched the list of tasks for the CBRAIN user') -#Main code execution section + # Updates the contents of a cache to reflect CBRAIN task statuses + with utils.measure_time() as execution_time: + cacheOps.update_statuses(dataset_filename, task_list) -with open('Experiment_Definition.yaml') as file: #Load experiment definition - try: - experiment_definition = yaml.safe_load(file) - except yaml.YAMLError as exception: #yaml file not valid - print('The Experiment Definition file is not valid') - print(exception) + print((f"Updated statuses in cache for: " + f"{dataset_filename} in {execution_time()}")) + for pipeline in experiment_definition['Pipelines']: -with open('./Config_Files/CBRAIN_IDs.yaml') as file: #Load mappings for all CBRAIN DP_IDs and toolconfig IDs - try: - cbrain_ids = yaml.safe_load(file) - except yaml.YAMLError as exception: #yaml file not valid - print('The configuration file is not valid') - print(exception) + with utils.measure_time() as execution_time: + # Populates a cache with any new files found + blocklist = experiment_definition['Datasets'][dataset]['Blocklist'] + cbrain_dataset = cbrain_ids['Data_Provider_IDs'][dataset] + cacheOps.populate_cache_filenames(dataset_filename, + args.cbrain_token, + blocklist, + pipeline, + cbrain_dataset, + experiment_definition) -print("Using artifacts from : " + latest_artifacts_url) + print((f"Populated cache filenames for: " + f"{dataset_filename}, {pipeline} in {execution_time()}")) + cacheOps.pipeline_manager(args.cbrain_token, experiment_definition, + cbrain_ids, pipeline, dataset) + print(f'Posted tasks for: {dataset_filename}, {pipeline}') -main(cbrain_token, CCI_token, experiment_definition, cbrain_ids, latest_artifacts_url) + cacheOps.populate_results(cbrain_api, dataset_filename) + print(f'Populated results for {dataset_filename}') + # extract_results() +# analysis(expdef[script]) -print("Finished the scheduled computations") +# start = time.time() +# update_statuses(dataset + '.json', cbrain_token) +# end = time.time() +# print('Updated statuses in cache for: ' + dataset + '.json in' + str(datetime.timedelta(seconds=(end - start)))) -cbrain_logout(cbrain_token) ################################################################################## +# Obtain login credentials from args, stored in CI environment variables. + + +def parse_args(): + + parser = argparse.ArgumentParser('NeuroCI', help='to add...') + parser.add_argument('--cbrain-user', help="CBRAIN user", required=True) + parser.add_argument('--cbrain-password', + help="CBRAIN password", required=True) + parser.add_argument('--CCI-token', help="CCI_token", required=True) + parser.add_argument('--artifacts-url', help="artifacts-url", required=True) + args = parser.parse_args() + return args + + +def main(): + args = parse_args() + cbrain_api = cbrainAPI.CbrainAPI(args.cbrain_user, args.cbrain_password) + + ################################################################################## + + # Main code execution section + + experiment_definition = utils.get_yaml_file( + neuroCIdata.experiment_definition_path, "Experiment Definition") + + # Load mappings for all CBRAIN DP_IDs and toolconfig IDs + cbrain_ids = utils.get_yaml_file( + neuroCIdata.cbrain_ids_path, "Configuration") + + print(f"Using artifacts from : {args.artifacts_url}") + + run(args, cbrain_api, cbrain_ids, experiment_definition) + + print("Finished the scheduled computations") + ################################################################################## diff --git a/cacheOps.py b/cacheOps.py index 92774f8..320ea2c 100644 --- a/cacheOps.py +++ b/cacheOps.py @@ -1,328 +1,365 @@ -import requests -import yaml -import json -import sys -import os import csv -from github import Github +import json from ast import literal_eval -from cbrainAPI import * +import requests +from github import Github + ############################################# -'''Downloads newest cache file to json, or if it's not found in the circleCI artifacts, creates a new cache file''' + def download_cache(cache_file, CCI_token, latest_artifacts_url): + '''Downloads newest cache file to json, or if it's not found in the circleCI artifacts, creates a new cache file''' + + headers = {'Circle-Token': CCI_token} + # finds the link to the cache file amongst all the artifacts + response = requests.get(str(latest_artifacts_url), headers=headers) + # example URL for this repo: https://circleci.com/api/v1.1/project/github/jacobsanz97/NDR-CI/latest/artifacts + + link_to_cache = "http://" + if response.status_code == requests.status_codes.codes['OK']: + # convert text to dictionary so we can browse it + literal_list = literal_eval(response.text) + for file in literal_list: + if cache_file in file['url']: + link_to_cache = file['url'] + else: + print("Error loading CircleCI artifacts") + print(response.text) + + try: + # download the cache file to json + response = requests.get(link_to_cache, headers=headers) + except Exception: + # Cache file couldn't be loaded, so we create an empty json + json_cache = json.loads("{}") + print("Cache file not found...Creating a new one.") + else: + json_cache = json.loads(response.text) + + with open(cache_file, 'w') as outfile: # create cache file for CI + json.dump(json_cache, outfile) + print('written cache to temp file') + - headers = {'Circle-Token': CCI_token} - response = requests.get(str(latest_artifacts_url), headers=headers) #finds the link to the cache file amongst all the artifacts - #example URL for this repo: https://circleci.com/api/v1.1/project/github/jacobsanz97/NDR-CI/latest/artifacts - - link_to_cache = "http://" - if response.status_code == 200: - literal_list = literal_eval(response.text) #convert text to dictionary so we can browse it - for file in literal_list: - if cache_file in file['url']: - link_to_cache = file['url'] - else: - print("Error loading CircleCI artifacts") - print(response.text) - - try: - response = requests.get(link_to_cache, headers=headers) #download the cache file to json - except: - json_cache = json.loads("{}") #Cache file couldn't be loaded, so we create an empty json - print("Cache file not found...Creating a new one.") - else: - json_cache = json.loads(response.text) - - with open(cache_file, 'w') as outfile: #create cache file for CI - json.dump(json_cache, outfile) - print('written cache to temp file') - - -'''Creates a template for a cache entry (cbrain data provider file), for a specific pipeline. Provides a userfile ID as a starting point for task computations''' def generate_cache_subject(nifti_file, cbrain_userfile_ID, pipeline, experiment_definition): + '''Creates a template for a cache entry (cbrain data provider file), for a specific pipeline. Provides a userfile ID as a starting point for task computations''' + + data = {nifti_file: { + pipeline: {}}} + + result = {"result": None, "isUsed": None} + + # Keeps track of the order of the component (we need to flag the first one) + component_number = 0 + for pipeline_component in experiment_definition['Pipelines'][pipeline]['Components']: + + if component_number == 0: + component_record = { + "inputID": cbrain_userfile_ID, # only do this for first component + "toolConfigID": None, + "taskID": None, + "status": None, + "outputID": None, + "isUsed": None + } + else: + component_record = { + "inputID": None, + "toolConfigID": None, + "taskID": None, + "status": None, + "outputID": None, + "isUsed": None + } + + # add this component to the cache + data[nifti_file][pipeline][pipeline_component] = component_record + component_number += 1 + + # add the results section after all the component sections + data[nifti_file][pipeline]['Result'] = result + return data + + +def populate_cache_filenames(cbrain_api, cache_file, blocklist, pipeline, data_provider_id, experiment_definition): + '''Generates the template for every file in a cache, for a specific pipeline''' + + filelist = [] + # Query CBRAIN to list all files in data provider. + data_provider_browse = cbrain_api.list_data_provider(str(data_provider_id)) + + try: + for entry in data_provider_browse: + if 'userfile_id' in entry: # if it's a registered file, add to filelist. + filelist.append([entry['name'], entry['userfile_id']]) + except Exception: + print("Error in browsing data provider, will continue using the filelist from the previous CI run") + return # skips the function without crashing + + with open(cache_file, "r+") as file: + data = json.load(file) + for entry in filelist: + + # if entry[name] is not in cache AND is not in the blocklist...add to cache + if entry[0] not in data and entry[0] not in blocklist: + leaf = generate_cache_subject( + entry[0], entry[1], pipeline, experiment_definition) + data.update(leaf) + + # if already in cache, just add entry for new pipeline. + if entry[0] not in blocklist and pipeline not in data[entry[0]]: + leaf = generate_cache_subject( + entry[0], entry[1], pipeline, experiment_definition) + data[entry[0]][pipeline] = leaf[entry[0]][pipeline] + + file.seek(0) # rewind + json.dump(data, file, indent=2) + file.truncate() + return data + + +def update_statuses(cbrain_api, cache_filename, task_list): + '''Updates a cache file with the newest task statuses from CBRAIN''' + + with open(cache_filename, "r+") as cache_file: + data = json.load(cache_file) + for (file, pipeline) in data.items(): # Parse the json + for (pipeline_name, task_name) in pipeline.items(): + for (task_name_str, params) in task_name.items(): + + # If this is a task (not a result) with an existent ID on CBRAIN, and hasn't yet run to completion + if task_name_str != "Result" and params["taskID"] != None and params["status"] != "Completed": + + try: + + jayson = cbrain_api.get_task_info_from_list( + task_list, params["taskID"]) + + if jayson['status'] == "Completed": + # Task completed, update status and get output file ID + data[file][pipeline_name][task_name_str]["status"] = jayson["status"] + # differentiate between one and many outputs + if '_cbrain_output_outputs' in jayson['params']: + data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_outputs'][0] + if '_cbrain_output_output' in jayson['params']: + data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_output'][0] + if '_cbrain_output_outfile' in jayson['params']: + data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_outfile'][0] + if 'outfile_id' in jayson['params']: + data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['outfile_id'] + + else: + # Task not completed, just update status + data[file][pipeline_name][task_name_str]["status"] = jayson["status"] + + except Exception: + pass + + cache_file.seek(0) + json.dump(data, cache_file, indent=2) + cache_file.truncate() + + +def pipeline_manager(experiment_definition, cbrain_ids, pipeline, dataset): + '''Iterates over each component in a pipeline, organizes, and feeds the necessary data to the functions which post tasks on CBRAIN and update the caches''' + + # Keeps track of the order of the component (we need to flag the first one) + component_number = 0 + + for pipeline_component in experiment_definition['Pipelines'][pipeline]['Components']: + + # Load parameters for current pipeline component + with open(experiment_definition['Pipelines'][pipeline]['Components'][pipeline_component]['Parameter_dictionary'], "r+") as param_file: + parameter_dictionary = json.load(param_file) + + if component_number == 0: + first_task_handler(parameter_dictionary, + cbrain_ids['Tool_Config_IDs'][pipeline_component], dataset + '.json', pipeline_component, pipeline) + else: + nth_task_handler(parameter_dictionary, cbrain_ids['Tool_Config_IDs'][ + pipeline_component], dataset + '.json', pipeline_component, previous_pipeline_component, pipeline) + + # if there are any tasks to resubmit... + if len(experiment_definition['Resubmit_tasks']['taskIDs']) > 0: + task_resubmission_handler(parameter_dictionary, cbrain_ids['Tool_Config_IDs'][pipeline_component], + dataset + '.json', pipeline_component, pipeline, experiment_definition['Resubmit_tasks']['taskIDs']) + + previous_pipeline_component = pipeline_component + component_number = component_number + 1 + + +def first_task_handler(cbrain_api, parameter_dictionary, tool_config_id, cache_file, pipeline_component, pipeline_name): + '''Handles the cache writing for the first task in a pipeline, and calls to post the task to CBRAIN''' + + with open(cache_file, "r+") as file: + data = json.load(file) + for filename in data: + if data[filename][pipeline_name][pipeline_component]['isUsed'] is None: + + try: + + userfile_id = data[filename][pipeline_name][pipeline_component]['inputID'] + jayson = cbrain_api.post_task( + userfile_id, tool_config_id, parameter_dictionary) + data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] + data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] + data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] + data[filename][pipeline_name][pipeline_component]['isUsed'] = True + + except Exception: + pass + + file.seek(0) # rewind + json.dump(data, file, indent=2) + file.truncate() + + +def nth_task_handler(cbrain_api, parameter_dictionary, tool_config_id, cache_file, pipeline_component, previous_pipeline_component, pipeline_name): + '''Handles the cache writing and task posting for any pipeline component except the first task''' + + with open(cache_file, "r+") as file: + data = json.load(file) + for filename in data: + if data[filename][pipeline_name][pipeline_component]['isUsed'] is None and data[filename][pipeline_name][previous_pipeline_component]['status'] == "Completed": + + try: + + # output of last task + userfile_id = data[filename][pipeline_name][previous_pipeline_component]['outputID'] + jayson = cbrain_api.post_task( + userfile_id, tool_config_id, parameter_dictionary) + data[filename][pipeline_name][pipeline_component]['inputID'] = userfile_id + data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] + data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] + data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] + data[filename][pipeline_name][pipeline_component]['isUsed'] = True + + except Exception: + pass + + file.seek(0) # rewind + json.dump(data, file, indent=2) + file.truncate() + + +def task_resubmission_handler(cbrain_api, parameter_dictionary, tool_config_id, cache_file, pipeline_component, pipeline_name, rerun_ID_list): + '''Resubmits a task, and sets all subsequent pipeline component dependencies to null in the cache''' + + with open(cache_file, "r+") as file: + data = json.load(file) + for filename in data: + + if 'taskID' in data[filename][pipeline_name][pipeline_component]: + + if data[filename][pipeline_name][pipeline_component]['taskID'] in rerun_ID_list: + + try: + userfile_id = data[filename][pipeline_name][pipeline_component]['inputID'] + jayson = cbrain_api.post_task( + userfile_id, tool_config_id, parameter_dictionary) + data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] + data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] + data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] + data[filename][pipeline_name][pipeline_component]['isUsed'] = True + print( + "Reposting " + str(data[filename][pipeline_name][pipeline_component]['taskID'])) + + except Exception: + pass + + # The code section sets all the subsequent pipeline components following the reposted task to null + # total number of components in pipeline + pipeline_length = len( + data[filename][pipeline_name].items()) + curr_index = list(data[filename][pipeline_name].keys()).index( + pipeline_component) # index of current (reposted) component + + index_counter = 0 + for component in data[filename][pipeline_name].items(): + + # if we are on a component after the one being submitted, and not the result + if index_counter > curr_index and index_counter < pipeline_length-1: + data[filename][pipeline_name][component[0] + ]['inputID'] = None + data[filename][pipeline_name][component[0] + ]['toolConfigID'] = None + data[filename][pipeline_name][component[0] + ]['taskID'] = None + data[filename][pipeline_name][component[0] + ]['status'] = None + data[filename][pipeline_name][component[0] + ]['outputID'] = None + data[filename][pipeline_name][component[0] + ]['isUsed'] = None + + # if we are on the result component of the pipeline + if index_counter > curr_index and index_counter == pipeline_length-1: + data[filename][pipeline_name][component[0] + ]['result'] = None + data[filename][pipeline_name][component[0] + ]['isUsed'] = None + + index_counter += 1 + + file.seek(0) # rewind + json.dump(data, file, indent=2) + file.truncate() + + +def populate_results(cbrain_api, cache_filename): + '''Fetches the text from a file on CBRAIN and writes it to the cache. Originally this designed for extracting a hippocampal volume from an FSL Stats text output''' + + with open(cache_filename, "r+") as cache_file: + data = json.load(cache_file) + for (file, pipeline) in data.items(): + for (pipeline_name, pipeline_component) in pipeline.items(): + previous_string = None + for (pipeline_component_str, params) in pipeline_component.items(): + + if pipeline_component_str == "Result": # Find the task before the result in the json + + if data[file][pipeline_name]['Result']['isUsed'] is None and data[file][pipeline_name][previous_string]['status'] == "Completed": + + fileID = data[file][pipeline_name][previous_string]['outputID'] + print("Streaming text for fileID: " + str(fileID)) + cbrain_api.sync_file(str(fileID)) + + try: + + # Note that result population is hardcoded, as the pipelines all produce different outputs that need different parsing procedures. + if pipeline_name == "FSL": + vol_string = cbrain_api.download_text( + fileID) + # get first word + vol = vol_string.split()[0] + + if pipeline_name == "FreeSurfer": + asegstats_string = cbrain_api.download_text( + fileID) + vol = retrieve_FreeSurfer_volume( + asegstats_string, "Left-Hippocampus") + + data[file][pipeline_name]['Result']['result'] = vol + data[file][pipeline_name]['Result']['isUsed'] = True - data = { nifti_file: { - pipeline: {}}} - - result = {"result": None, "isUsed": None} - - component_number = 0 #Keeps track of the order of the component (we need to flag the first one) - for pipeline_component in experiment_definition['Pipelines'][pipeline]['Components']: - - if component_number == 0: - component_record = { - "inputID": cbrain_userfile_ID, #only do this for first component - "toolConfigID": None, - "taskID": None, - "status": None, - "outputID": None, - "isUsed": None - } - else: - component_record = { - "inputID": None, - "toolConfigID": None, - "taskID": None, - "status": None, - "outputID": None, - "isUsed": None - } - - data[nifti_file][pipeline][pipeline_component] = component_record #add this component to the cache - component_number = component_number + 1 - - data[nifti_file][pipeline]['Result'] = result #add the results section after all the component sections - return data - - -'''Generates the template for every file in a cache, for a specific pipeline''' -def populate_cache_filenames(cache_file, cbrain_token, blocklist, pipeline, data_provider_id, experiment_definition): - - filelist = [] - data_provider_browse = cbrain_list_data_provider(str(data_provider_id), cbrain_token) #Query CBRAIN to list all files in data provider. - - try: - for entry in data_provider_browse: - if 'userfile_id' in entry: #if it's a registered file, add to filelist. - filelist.append([entry['name'], entry['userfile_id']]) - except Exception as e: - print("Error in browsing data provider, will continue using the filelist from the previous CI run") - return #skips the function without crashing - - with open(cache_file, "r+") as file: - data = json.load(file) - for entry in filelist: - - if entry[0] not in data and entry[0] not in blocklist: #if entry[name] is not in cache AND is not in the blocklist...add to cache - leaf = generate_cache_subject(entry[0], entry[1], pipeline, experiment_definition) - data.update(leaf) - - if entry[0] not in blocklist and pipeline not in data[entry[0]]: #if already in cache, just add entry for new pipeline. - leaf = generate_cache_subject(entry[0], entry[1], pipeline, experiment_definition) - data[entry[0]][pipeline] = leaf[entry[0]][pipeline] - - file.seek(0) # rewind - json.dump(data, file, indent=2) - file.truncate() - return data - - - -'''Updates a cache file with the newest task statuses from CBRAIN''' -def update_statuses(cache_filename, task_list): - - with open(cache_filename, "r+") as cache_file: - data = json.load(cache_file) - for (file, pipeline) in data.items(): #Parse the json - for (pipeline_name, task_name) in pipeline.items(): - for (task_name_str, params) in task_name.items(): - - if task_name_str != "Result" and params["taskID"] != None and params["status"] != "Completed": #If this is a task (not a result) with an existent ID on CBRAIN, and hasn't yet run to completion - - try: - - jayson = cbrain_get_task_info_from_list(task_list, params["taskID"]) - - if jayson['status'] == "Completed": - #Task completed, update status and get output file ID - data[file][pipeline_name][task_name_str]["status"] = jayson["status"] - #differentiate between one and many outputs - if '_cbrain_output_outputs' in jayson['params']: - data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_outputs'][0] - if '_cbrain_output_output' in jayson['params']: - data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_output'][0] - if '_cbrain_output_outfile' in jayson['params']: - data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_outfile'][0] - if 'outfile_id' in jayson['params']: - data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['outfile_id'] - - else: - #Task not completed, just update status - data[file][pipeline_name][task_name_str]["status"] = jayson["status"] - - except Exception as e: - pass - - - cache_file.seek(0) - json.dump(data, cache_file, indent=2) - cache_file.truncate() - - -'''Iterates over each component in a pipeline, organizes, and feeds the necessary data to the functions which post tasks on CBRAIN and update the caches''' -def pipeline_manager(cbrain_token, experiment_definition, cbrain_ids, pipeline, dataset): - - component_number = 0 #Keeps track of the order of the component (we need to flag the first one) - - for pipeline_component in experiment_definition['Pipelines'][pipeline]['Components']: - - with open(experiment_definition['Pipelines'][pipeline]['Components'][pipeline_component]['Parameter_dictionary'], "r+") as param_file: #Load parameters for current pipeline component - parameter_dictionary = json.load(param_file) - - if component_number == 0: - first_task_handler(cbrain_token, parameter_dictionary, cbrain_ids['Tool_Config_IDs'][pipeline_component], dataset + '.json', pipeline_component, pipeline) - else: - nth_task_handler(cbrain_token, parameter_dictionary, cbrain_ids['Tool_Config_IDs'][pipeline_component], dataset + '.json', pipeline_component, previous_pipeline_component, pipeline) - - - if len(experiment_definition['Resubmit_tasks']['taskIDs']) > 0: #if there are any tasks to resubmit... - task_resubmission_handler(cbrain_token, parameter_dictionary, cbrain_ids['Tool_Config_IDs'][pipeline_component], dataset + '.json', pipeline_component, pipeline, experiment_definition['Resubmit_tasks']['taskIDs']) - - previous_pipeline_component = pipeline_component - component_number = component_number + 1 - - -'''Handles the cache writing for the first task in a pipeline, and calls to post the task to CBRAIN''' -def first_task_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, pipeline_name): - - with open(cache_file, "r+") as file: - data = json.load(file) - for filename in data: - if data[filename][pipeline_name][pipeline_component]['isUsed'] == None: - - try: - - userfile_id = data[filename][pipeline_name][pipeline_component]['inputID'] - jayson = cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary) - data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] - data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] - data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] - data[filename][pipeline_name][pipeline_component]['isUsed'] = True - - except Exception as e: - pass - - file.seek(0) # rewind - json.dump(data, file, indent=2) - file.truncate() - - -'''Handles the cache writing and task posting for any pipeline component except the first task''' -def nth_task_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, previous_pipeline_component, pipeline_name): - - with open(cache_file, "r+") as file: - data = json.load(file) - for filename in data: - if data[filename][pipeline_name][pipeline_component]['isUsed'] == None and data[filename][pipeline_name][previous_pipeline_component]['status'] == "Completed": - - try: - - userfile_id = data[filename][pipeline_name][previous_pipeline_component]['outputID'] #output of last task - jayson = cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary) - data[filename][pipeline_name][pipeline_component]['inputID'] = userfile_id - data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] - data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] - data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] - data[filename][pipeline_name][pipeline_component]['isUsed'] = True - - except Exception as e: - pass - - file.seek(0) # rewind - json.dump(data, file, indent=2) - file.truncate() - -'''Resubmits a task, and sets all subsequent pipeline component dependencies to null in the cache''' -def task_resubmission_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, pipeline_name, rerun_ID_list): - - with open(cache_file, "r+") as file: - data = json.load(file) - for filename in data: - - if 'taskID' in data[filename][pipeline_name][pipeline_component]: - - if data[filename][pipeline_name][pipeline_component]['taskID'] in rerun_ID_list: - - try: - userfile_id = data[filename][pipeline_name][pipeline_component]['inputID'] - jayson = cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary) - data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] - data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] - data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] - data[filename][pipeline_name][pipeline_component]['isUsed'] = True - print("Reposting " + str(data[filename][pipeline_name][pipeline_component]['taskID'])) - - except Exception as e: - pass - - #The code section sets all the subsequent pipeline components following the reposted task to null - pipeline_length = len(data[filename][pipeline_name].items()) #total number of components in pipeline - curr_index = list(data[filename][pipeline_name].keys()).index(pipeline_component) #index of current (reposted) component - - index_counter = 0 - for component in data[filename][pipeline_name].items(): - - #if we are on a component after the one being submitted, and not the result - if index_counter > curr_index and index_counter < pipeline_length-1: - data[filename][pipeline_name][component[0]]['inputID'] = None - data[filename][pipeline_name][component[0]]['toolConfigID'] = None - data[filename][pipeline_name][component[0]]['taskID'] = None - data[filename][pipeline_name][component[0]]['status'] = None - data[filename][pipeline_name][component[0]]['outputID'] = None - data[filename][pipeline_name][component[0]]['isUsed'] = None - - #if we are on the result component of the pipeline - if index_counter > curr_index and index_counter == pipeline_length-1: - data[filename][pipeline_name][component[0]]['result'] = None - data[filename][pipeline_name][component[0]]['isUsed'] = None - - index_counter += 1 - - file.seek(0) # rewind - json.dump(data, file, indent=2) - file.truncate() - - -'''Fetches the text from a file on CBRAIN and writes it to the cache. Originally this designed for extracting a hippocampal volume from an FSL Stats text output''' -def populate_results(cache_filename, cbrain_token): - - with open(cache_filename, "r+") as cache_file: - data = json.load(cache_file) - for (file, pipeline) in data.items(): - for (pipeline_name, pipeline_component) in pipeline.items(): - previous_string = None - for (pipeline_component_str, params) in pipeline_component.items(): - - if pipeline_component_str == "Result": #Find the task before the result in the json - - if data[file][pipeline_name]['Result']['isUsed'] == None and data[file][pipeline_name][previous_string]['status'] == "Completed": - - fileID = data[file][pipeline_name][previous_string]['outputID'] - print("Streaming text for fileID: " + str(fileID)) - cbrain_sync_file(str(fileID), cbrain_token) - - try: - - #Note that result population is hardcoded, as the pipelines all produce different outputs that need different parsing procedures. - if pipeline_name == "FSL": - vol_string = cbrain_download_text(fileID, cbrain_token) - vol = vol_string.split()[0] #get first word - - if pipeline_name == "FreeSurfer": - asegstats_string = cbrain_download_text(fileID, cbrain_token) - vol = retrieve_FreeSurfer_volume(asegstats_string, "Left-Hippocampus") - - data[file][pipeline_name]['Result']['result'] = vol - data[file][pipeline_name]['Result']['isUsed'] = True - - except Exception as e: - pass - - previous_string = pipeline_component_str - - cache_file.seek(0) # rewind - json.dump(data, cache_file, indent=2) - cache_file.truncate() + except Exception: + pass + + previous_string = pipeline_component_str + + cache_file.seek(0) # rewind + json.dump(data, cache_file, indent=2) + cache_file.truncate() def retrieve_FreeSurfer_volume(asegstats_string, structName): -#Take as input the aseg.stats file from the freesurfer output as a string, and the StructName field. - lines = asegstats_string.splitlines() - reader = csv.reader(lines, delimiter=" ") - for row in reader: - if structName in row: - index = row.index(structName) - return row[index-2] #Returns the word which is two before the name of the structure. + # Take as input the aseg.stats file from the freesurfer output as a string, and the StructName field. + lines = asegstats_string.splitlines() + reader = csv.reader(lines, delimiter=" ") + for row in reader: + if structName in row: + index = row.index(structName) + # Returns the word which is two before the name of the structure. + return row[index-2] diff --git a/cbrainAPI.py b/cbrainAPI.py index 5f4d5e7..d5ea6cc 100644 --- a/cbrainAPI.py +++ b/cbrainAPI.py @@ -1,263 +1,284 @@ import requests import json +import atexit import sys import os -import csv - ################################################################################## '''Posts API call to CBRAIN to obtain a authentication token given a username and password''' -def cbrain_login(username, password): - - headers = { - 'Content-Type': 'application/x-www-form-urlencoded', - 'Accept': 'application/json', - } - data = { - 'login': username, - 'password': password - } - - response = requests.post('https://portal.cbrain.mcgill.ca/session', headers=headers, data=data) - - if response.status_code == 200: - print("Login success") - print(response.content) - jsonResponse = response.json() - return jsonResponse["cbrain_api_token"] - else: - print("Login failure") - return 1 - - -'''End a CBRAIN session''' -def cbrain_logout(cbrain_token): - - headers = { - 'Accept': 'application/json', - } - params = ( - ('cbrain_api_token', cbrain_token), - ) - - response = requests.delete('https://portal.cbrain.mcgill.ca/session', headers=headers, params=params) - - if response.status_code == 200: - print("Logout success") - return 0 - else: - print("Logout failure") - return 1 - - -'''Lists all files in a CBRAIN data provider''' -def cbrain_list_data_provider(data_provider_ID, cbrain_token): - - data_provider_ID = str(data_provider_ID) - headers = { - 'Accept': 'application/json', - } - params = ( - ('id', data_provider_ID), - ('cbrain_api_token', cbrain_token), - ) - url = 'https://portal.cbrain.mcgill.ca/data_providers/' + data_provider_ID + '/browse' - - response = requests.get(url, headers=headers, params=params, allow_redirects=True) - - if response.status_code == 200: - return response.json() - else: - print('DP browse failure') - return 1 - - -'''Posts a task in CBRAIN''' -def cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary): - - userfile_id = str(userfile_id) - - #Parse the parameter dictionary json, and insert the userfile IDs. - parameter_dictionary['interface_userfile_ids'] = [userfile_id] - parameter_dictionary['input_file'] = userfile_id - - headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - } - params = ( - ('cbrain_api_token', cbrain_token), - ) - data = { - "cbrain_task": { - 'tool_config_id': tool_config_id, - 'params': parameter_dictionary, - 'run_number': None, - 'results_data_provider_id': 179, #Using Beluga - 'cluster_workdir_size': None, - 'workdir_archived': True, - 'description': ''} - } - - y = json.dumps(data) #convert data field to JSON: - response = requests.post('https://portal.cbrain.mcgill.ca/tasks', headers=headers, params=params, data=y) - - if response.status_code == 200: - print(response.text) - jsonResponse = response.json() - return jsonResponse - else: - print("Task posting failed.") - print(response.content) - return 1 - -'''Gets the list of all the tasks of the user on CBRAIN''' -def cbrain_get_all_tasks(cbrain_token): - - headers = { - 'Accept': 'application/json', - } - params = { - 'cbrain_api_token': cbrain_token, - 'page': 1, - 'per_page': 1000 - } - url = 'https://portal.cbrain.mcgill.ca/tasks' - task_list = [] - - while True: - - response = requests.get(url, headers=headers, params=params) - - if response.status_code == 200: - jsonResponse = response.json() - task_list += jsonResponse - params['page'] += 1 - else: - print("Task list retrieval failed.") - return 1 - - if len(jsonResponse) < params['per_page']: - break - - return task_list - -'''Obtains info on the progress of a single task, given the list of all tasks for the user''' -def cbrain_get_task_info_from_list(task_list, task_ID): - - for task in task_list: - if task_ID == task['id'] or int(task_ID) == task['id']: - return task - - -'''Obtains information on the progress of a single task by querying for a single task''' -def cbrain_get_task_info(cbrain_token, task_ID): - - task_ID = str(task_ID) - headers = { - 'Accept': 'application/json', - } - params = ( - ('id', task_ID), - ('cbrain_api_token', cbrain_token) - ) - url = 'https://portal.cbrain.mcgill.ca/tasks/' + task_ID - - response = requests.get(url, headers=headers, params=params) - - if response.status_code == 200: - jsonResponse = response.json() - return jsonResponse - else: - print("Task Info retrieval failed.") - return 1 - - -'''Downloads the text from a file on CBRAIN''' -def cbrain_download_text(userfile_ID, cbrain_token): - - userfile_ID = str(userfile_ID) - headers = { - 'Accept': 'text', - } - params = ( - ('cbrain_api_token', cbrain_token), - ) - url = 'https://portal.cbrain.mcgill.ca/userfiles/' + userfile_ID + '/content' - - response = requests.get(url, headers=headers, params=params, allow_redirects=True) - - if response.status_code == 200: - return response.text - else: - print('Download failure') - print(response.status_code) - return 1 - - -'''Downloads a file from CBRAIN and saves it, given a userfile ID''' -def cbrain_download_file(userfile_ID, filename, cbrain_token): - - fileID = str(userfile_ID) - headers = { - 'Accept': 'application/json', - } - params = ( - ('id', fileID), - ('cbrain_api_token', cbrain_token), - ) - url = 'https://portal.cbrain.mcgill.ca/userfiles/' + fileID + '/content' - - response = requests.get(url, headers=headers, params=params, allow_redirects=True) - if response.status_code == 200: - open(filename, 'wb').write(response.content) - print("Downloaded file " + filename) - return 0 - else: - print('File download failure: ' + filename) - return 1 - - -'''Given a filename and data provider, download the file from the data provider''' -def cbrain_download_DP_file(filename, data_provider_id, cbrain_token): - - data_provider_browse = cbrain_list_data_provider(str(data_provider_id), cbrain_token) #Query CBRAIN to list all files in data provider. - print(data_provider_browse) - - try: - for entry in data_provider_browse: - if 'userfile_id' in entry and entry['name'] == filename: #if it's a registered file, and filename matches - print("Found registered file: " + filename + " in Data Provider with ID " + str(data_provider_id)) - cbrain_download_file(entry['userfile_id'], filename, cbrain_token) - return 0 - else: - print("File " + filename + " not found in Data Provider " + str(data_provider_id)) - return 1 - - except Exception as e: - print("Error in browsing data provider or file download") - return - - -'''Makes sure a file in a data provider is synchronized with CBRAIN''' -def cbrain_sync_file(userfile_id_list, cbrain_token): - #userfile_id_list can either be a string eg. '3663657', or a list eg. ['3663729', '3663714'] - headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - } - - params = ( - ('file_ids[]', userfile_id_list), - ('cbrain_api_token', cbrain_token), - ) - - response = requests.post('https://portal.cbrain.mcgill.ca/userfiles/sync_multiple', headers=headers, params=params) - - if response.status_code == 200: - print("Synchronized userfiles " + str(userfile_id_list)) - return - else: - print("Userfile sync failed for IDs: " + str(userfile_id_list)) - print(response.status_code) - return + + +class CbrainAPI: + + _url_cbrain_portal = 'https://portal.cbrain.mcgill.ca' + _url_cbrain_session = os.path.join(_url_cbrain_portal, 'session') + _url_cbrain_data_providers = os.path.join( + _url_cbrain_portal, 'data_providers') + _url_cbrain_tasks = os.path.join(_url_cbrain_portal, 'tasks') + _url_cbrain_userfiles = os.path.join(_url_cbrain_portal, 'userfiles') + _url_cbrain_sync_multiple = os.path.join( + _url_cbrain_userfiles, 'sync_multiple') + + def __init__(self, username, password): + self._token = self.login(username, password) + atexit.register(self.logout) + + def failure(self, msg): + print(msg, file=sys.stderr) + sys.exit(1) + + def get_token(self): + return self._token + + def login(self, username, password): + + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Accept': 'application/json', + } + + data = { + 'login': username, + 'password': password + } + + response = requests.post( + self._url_cbrain_session, headers=headers, data=data) + + if response.status_code == requests.status_codes.codes['OK']: + print("Login success") + print(response.content) + jsonResponse = response.json() + return jsonResponse["cbrain_api_token"] + else: + self.failure("Login failure") + + def logout(self): + '''End a CBRAIN session''' + + headers = { + 'Accept': 'application/json', + } + params = ( + ('cbrain_api_token', self.get_token()), + ) + + response = requests.delete( + self._url_cbrain_session, headers=headers, params=params) + + if response.status_code == requests.status_codes.codes['OK']: + print("Logout success") + return 0 + else: + self.failure("Logout failure") + + def list_data_provider(self, data_provider_ID): + '''Lists all files in a CBRAIN data provider''' + + data_provider_ID = str(data_provider_ID) + headers = { + 'Accept': 'application/json', + } + params = ( + ('id', data_provider_ID), + ('cbrain_api_token', self.get_token()), + ) + url = os.path.join(self._url_cbrain_data_providers, + data_provider_ID, 'browse') + + response = requests.get(url, headers=headers, + params=params, allow_redirects=True) + + if response.status_code == requests.status_codes.codes['OK']: + return response.json() + else: + self.failure('DP browse failure') + + def post_task(self, userfile_id, tool_config_id, parameter_dictionary): + '''Posts a task in CBRAIN''' + + userfile_id = str(userfile_id) + + # Parse the parameter dictionary json, and insert the userfile IDs. + parameter_dictionary['interface_userfile_ids'] = [userfile_id] + parameter_dictionary['input_file'] = userfile_id + + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + params = ( + ('cbrain_api_token', self.get_token()), + ) + data = { + "cbrain_task": { + 'tool_config_id': tool_config_id, + 'params': parameter_dictionary, + 'run_number': None, + 'results_data_provider_id': 179, # Using Beluga + 'cluster_workdir_size': None, + 'workdir_archived': True, + 'description': ''} + } + + y = json.dumps(data) # convert data field to JSON: + response = requests.post( + self._url_cbrain_tasks, headers=headers, params=params, data=y) + + if response.status_code == requests.status_codes.codes['OK']: + print(response.text) + jsonResponse = response.json() + return jsonResponse + else: + self.failure( + f"Task posting failed.{os.path.sep}{response.content}") + + def get_all_tasks(self): + '''Gets the list of all the tasks of the user on CBRAIN''' + + headers = { + 'Accept': 'application/json', + } + params = { + 'cbrain_api_token': self.get_token(), + 'page': 1, + 'per_page': 1000 + } + task_list = [] + + while True: + + response = requests.get( + self._url_cbrain_tasks, headers=headers, params=params) + + if response.status_code == requests.status_codes.codes['OK']: + jsonResponse = response.json() + task_list += jsonResponse + params['page'] += 1 + else: + self.failure("Task list retrieval failed.") + + if len(jsonResponse) < params['per_page']: + break + + return task_list + + def get_task_info_from_list(self, task_list, task_ID): + '''Obtains info on the progress of a single task, given the list of all tasks for the user''' + + for task in task_list: + if task_ID == task['id'] or int(task_ID) == task['id']: + return task + + def get_task_info(self, task_ID): + '''Obtains information on the progress of a single task by querying for a single task''' + + task_ID = str(task_ID) + headers = { + 'Accept': 'application/json', + } + params = ( + ('id', task_ID), + ('cbrain_api_token', self.get_token()) + ) + + url = os.path.join(self._url_cbrain_tasks, task_ID) + + response = requests.get(url, headers=headers, params=params) + + if response.status_code == requests.status_codes.codes['OK']: + jsonResponse = response.json() + return jsonResponse + else: + self.failure("Task Info retrieval failed.") + + def download_text(self, userfile_ID): + '''Downloads the text from a file on CBRAIN''' + + userfile_ID = str(userfile_ID) + headers = { + 'Accept': 'text', + } + params = ( + ('cbrain_api_token', self.get_token()), + ) + url = os.path.join(self._url_cbrain_userfiles, userfile_ID, 'content') + + response = requests.get(url, headers=headers, + params=params, allow_redirects=True) + + if response.status_code == requests.status_codes.codes['OK']: + return response.text + else: + msg = f'Download failure{os.path.sep}{response.status_code}' + self.failure(msg) + + def download_file(self, userfile_ID, filename): + '''Downloads a file from CBRAIN and saves it, given a userfile ID''' + + fileID = str(userfile_ID) + headers = { + 'Accept': 'application/json', + } + params = ( + ('id', fileID), + ('cbrain_api_token', self.get_token()), + ) + + url = os.path.join(self._url_cbrain_userfiles, fileID, 'content') + + response = requests.get(url, headers=headers, + params=params, allow_redirects=True) + if response.status_code == requests.status_codes.codes['OK']: + open(filename, 'wb').write(response.content) + print(f"Downloaded file {filename}") + return 0 + else: + self.failure(f'File download failure: {filename}') + + '''Given a filename and data provider, download the file from the data provider''' + + def download_DP_file(self, filename, data_provider_id): + + # Query CBRAIN to list all files in data provider. + data_provider_browse = self.list_data_provider(str(data_provider_id)) + print(data_provider_browse) + + try: + for entry in data_provider_browse: + # if it's a registered file, and filename matches + if 'userfile_id' in entry and entry['name'] == filename: + print( + f"Found registered file: {filename} in Data Provider with ID {data_provider_id}") + self.download_file(entry['userfile_id'], filename) + return 0 + else: + msg = f"File {filename} not found in Data Provider {data_provider_id}" + self.failure(msg) + + except Exception as e: + self.failure("Error in browsing data provider or file download") + + '''Makes sure a file in a data provider is synchronized with CBRAIN''' + + def sync_file(self, userfile_id_list): + # userfile_id_list can either be a string eg. '3663657', or a list eg. ['3663729', '3663714'] + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + + params = ( + ('file_ids[]', userfile_id_list), + ('cbrain_api_token', self.get_token()), + ) + + response = requests.post( + self._url_cbrain_sync_multiple, headers=headers, params=params) + + if response.status_code == requests.status_codes.codes['OK']: + print(f"Synchronized userfiles {userfile_id_list}")) + return + else: + self.failure( + f"Userfile sync failed for IDs: {userfile_id_list}{os.path.sep}{response.status_code}") diff --git a/neuroCIdata.py b/neuroCIdata.py new file mode 100644 index 0000000..9695387 --- /dev/null +++ b/neuroCIdata.py @@ -0,0 +1,4 @@ +import os + +experiment_definition_path = os.path.abspath("Experiment_Definition.yaml") +cbrain_ids_path = os.path.abspath('Config_Files/CBRAIN_IDs.yaml') diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..42803d6 --- /dev/null +++ b/utils.py @@ -0,0 +1,19 @@ +from contextlib import contextmanager +import datetime +import time +import yaml + + +@contextmanager +def measure_time(): + start = time.time() + yield lambda: datetime.timedelta(seconds=(time.time() - start)) + + +def get_yaml_file(file, name): + with open(file) as file: + try: + return yaml.safe_load(file) + except yaml.YAMLError as exception: # yaml file not valid + print(f'The {name} file ({file.name}) is not valid') + print(exception) From 1a4a8f8ee50d71b433b4701a3dbe4167591495a1 Mon Sep 17 00:00:00 2001 From: yohanchatelain Date: Fri, 24 Sep 2021 15:53:58 -0400 Subject: [PATCH 4/7] Fix issue --- cbrainAPI.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cbrainAPI.py b/cbrainAPI.py index d5ea6cc..f884a78 100644 --- a/cbrainAPI.py +++ b/cbrainAPI.py @@ -277,7 +277,7 @@ def sync_file(self, userfile_id_list): self._url_cbrain_sync_multiple, headers=headers, params=params) if response.status_code == requests.status_codes.codes['OK']: - print(f"Synchronized userfiles {userfile_id_list}")) + print(f"Synchronized userfiles {userfile_id_list}") return else: self.failure( From d89d834835611af6b29123795347def22f65b897 Mon Sep 17 00:00:00 2001 From: yohanchatelain Date: Fri, 24 Sep 2021 15:56:51 -0400 Subject: [PATCH 5/7] Update analysesVisualizations.py --- analysesVisualizations.py | 156 ++++++++++++++++++++------------------ 1 file changed, 84 insertions(+), 72 deletions(-) diff --git a/analysesVisualizations.py b/analysesVisualizations.py index 63402b7..cdecd15 100644 --- a/analysesVisualizations.py +++ b/analysesVisualizations.py @@ -6,125 +6,137 @@ import matplotlib.pyplot as plt from numpy.polynomial.polynomial import polyfit import sys -import os -from cbrainAPI import * -from cacheOps import * +import cbrainAPI +import cacheOps ########################################################################################################################### -#General functions +# General functions '''Generates a simple boxplot, not used for now.''' + + def boxplot(volume_list, pipeline_name, dataset_name): data = np.array(volume_list).astype(np.float) fig1, ax1 = plt.subplots() ax1.set_title('Left Hippocampal Volumes (mm3)') ax1.boxplot(data) plt.xticks([1], [dataset_name + '/' + pipeline_name]) - plt.savefig('./artifacts/' + dataset_name + '_' + pipeline_name + '_box' + '.png') # Saves in artifact directory - #plt.show() - + plt.savefig('./artifacts/' + dataset_name + '_' + + pipeline_name + '_box' + '.png') # Saves in artifact directory + # plt.show() + + '''Scatter plot and line of best fit''' + + def corrplot(volume_list, hearing_loss_list, pipeline_name, dataset_name): - + new_hl_list = [] new_vol_list = [] index = 0 for elem in hearing_loss_list: - if elem != 'NA': #Append to new list if value is not NA + if elem != 'NA': # Append to new list if value is not NA new_hl_list.append(hearing_loss_list[index]) new_vol_list.append(volume_list[index]) index = index + 1 - + x = np.array(new_hl_list).astype(np.float) y = np.array(new_vol_list).astype(np.float) b, m = polyfit(x, y, 1) plt.plot(x, y, '.') plt.plot(x, b + m * x, '-') plt.ylim(ymin=0) - plt.title('Left Hippocampal Volumes vs Worse_ear_dsi' + '\n' + dataset_name + ' with ' + pipeline_name) + plt.title('Left Hippocampal Volumes vs Worse_ear_dsi' + + '\n' + dataset_name + ' with ' + pipeline_name) plt.xlabel('Worse_ear_dsi') plt.ylabel('Hippocampal Volume (mm3)') - plt.savefig('./artifacts/' + dataset_name + '_' + pipeline_name + '_corr' + '.png') # Saves in artifact directory - plt.close() #so we have separate figures and not overlaid. - #plt.show() - + plt.savefig('./artifacts/' + dataset_name + '_' + pipeline_name + + '_corr' + '.png') # Saves in artifact directory + plt.close() # so we have separate figures and not overlaid. + # plt.show() + #################################################################################################################### -#Prevent-AD and hearing loss +# Prevent-AD and hearing loss + def preventAD_get_labels_from_filename(filename): - subject = filename[4:11] - visit = filename[16:23] - return (subject, visit) + subject = filename[4:11] + visit = filename[16:23] + return (subject, visit) def preventAD_get_measure_from_csv(subject, visit, data_file): - with open(data_file, 'r') as read_obj: - csv_reader = reader(read_obj) - for row in csv_reader: - if row[1] == subject and row[2]==visit: - return row[19] #change this to get a different column in CSV + with open(data_file, 'r') as read_obj: + csv_reader = reader(read_obj) + for row in csv_reader: + if row[1] == subject and row[2] == visit: + return row[19] # change this to get a different column in CSV -#Process the cache results for a single pipeline +# Process the cache results for a single pipeline def preventAD_process(data_file, cache_file, pipeline_name): - - hearing_loss_list = [] - volume_list = [] - with open(cache_file, "r") as file: - cache = json.load(file) - for entry in cache: - - if cache[entry][pipeline_name]['Result']['result'] != None: - - volume = cache[entry][pipeline_name]['Result']['result'] - - if volume != 1: #If there is more than one word in the result string - necessary for FSL, but maybe not for other pipelines in future. - volume = volume.partition(' ')[0] #Get the first word - subject, visit = preventAD_get_labels_from_filename(entry) - - try: - hearing_loss = preventAD_get_measure_from_csv(subject, visit, data_file) - except Exception as e: - print("Error getting CSV file measures for Prevent-AD.") - return #skips the plotting - - if hearing_loss != None: #only visualize if we have a hearing loss measure for subject/visit - hearing_loss_list.append(hearing_loss) - volume_list.append(volume) - - if len(volume_list) >= 1 and len(hearing_loss_list)>=1: #If there is at least one data point. - corrplot(volume_list, hearing_loss_list, pipeline_name, 'Prevent-AD') - #boxplot(volume_list, pipeline_name, 'Prevent-AD') - print('Generated plots for ' + cache_file + '/' + pipeline_name) + + hearing_loss_list = [] + volume_list = [] + with open(cache_file, "r") as file: + cache = json.load(file) + for entry in cache: + + if cache[entry][pipeline_name]['Result']['result'] != None: + + volume = cache[entry][pipeline_name]['Result']['result'] + + # If there is more than one word in the result string - necessary for FSL, but maybe not for other pipelines in future. + if volume != 1: + volume = volume.partition(' ')[0] # Get the first word + subject, visit = preventAD_get_labels_from_filename(entry) + + try: + hearing_loss = preventAD_get_measure_from_csv( + subject, visit, data_file) + except Exception as e: + print("Error getting CSV file measures for Prevent-AD.") + return # skips the plotting + + if hearing_loss != None: # only visualize if we have a hearing loss measure for subject/visit + hearing_loss_list.append(hearing_loss) + volume_list.append(volume) + + # If there is at least one data point. + if len(volume_list) >= 1 and len(hearing_loss_list) >= 1: + corrplot(volume_list, hearing_loss_list, pipeline_name, 'Prevent-AD') + #boxplot(volume_list, pipeline_name, 'Prevent-AD') + print('Generated plots for ' + cache_file + '/' + pipeline_name) + preventAD_data_file = 'Auditory_processing_Registered_PREVENTAD.csv' preventAD_cache_file = 'Prevent-AD.json' ######################################################################################################### -#Compass-ND +# Compass-ND ######################################################################################################### -#UK-BioBank +# UK-BioBank ######################################################################################################### # Main section of Analyses cbrain_user = sys.argv[1] cbrain_password = sys.argv[2] -cbrain_token = cbrain_login(cbrain_user, cbrain_password) - -#cbrain_download_DP_file('Auditory_processing_Registered_PREVENTAD.csv', 318, cbrain_token) #use this if you know the file name but not the ID. Takes a long time though. -cbrain_download_file(3497558, preventAD_data_file, cbrain_token) #use this (quicker) if you know the CBRAIN userfileID - -with open('Experiment_Definition.yaml') as file: #Load experiment definition - try: - experiment_definition = yaml.safe_load(file) - except yaml.YAMLError as exception: #yaml file not valid - print('The Experiment Definition file is not valid') - print(exception) - - for pipeline in experiment_definition['Pipelines']: - preventAD_process(preventAD_data_file, preventAD_cache_file, pipeline) - -cbrain_logout(cbrain_token) + +cbrain_api = cbrainAPI.CbrainAPI(cbrain_user, cbrain_password) + +# cbrain_download_DP_file('Auditory_processing_Registered_PREVENTAD.csv', 318, cbrain_token) #use this if you know the file name but not the ID. Takes a long time though. +# use this (quicker) if you know the CBRAIN userfileID +cbrain_api.download_file(3497558, preventAD_data_file) + +with open('Experiment_Definition.yaml') as file: # Load experiment definition + try: + experiment_definition = yaml.safe_load(file) + except yaml.YAMLError as exception: # yaml file not valid + print('The Experiment Definition file is not valid') + print(exception) + + for pipeline in experiment_definition['Pipelines']: + preventAD_process(preventAD_data_file, preventAD_cache_file, pipeline) From 20e41f992b1ea37067c596964279846c38ea18fe Mon Sep 17 00:00:00 2001 From: yohanchatelain Date: Fri, 24 Sep 2021 16:57:27 -0400 Subject: [PATCH 6/7] Change failure to warnings for silent fails in CbrainAPI --- cbrainAPI.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/cbrainAPI.py b/cbrainAPI.py index f884a78..a88bcff 100644 --- a/cbrainAPI.py +++ b/cbrainAPI.py @@ -23,6 +23,9 @@ def __init__(self, username, password): self._token = self.login(username, password) atexit.register(self.logout) + def warning(self, msg): + print(msg, file=sys.stderr) + def failure(self, msg): print(msg, file=sys.stderr) sys.exit(1) @@ -92,7 +95,7 @@ def list_data_provider(self, data_provider_ID): if response.status_code == requests.status_codes.codes['OK']: return response.json() else: - self.failure('DP browse failure') + self.warning('DP browse failure') def post_task(self, userfile_id, tool_config_id, parameter_dictionary): '''Posts a task in CBRAIN''' @@ -130,7 +133,7 @@ def post_task(self, userfile_id, tool_config_id, parameter_dictionary): jsonResponse = response.json() return jsonResponse else: - self.failure( + self.warning( f"Task posting failed.{os.path.sep}{response.content}") def get_all_tasks(self): @@ -156,7 +159,7 @@ def get_all_tasks(self): task_list += jsonResponse params['page'] += 1 else: - self.failure("Task list retrieval failed.") + self.warning("Task list retrieval failed.") if len(jsonResponse) < params['per_page']: break @@ -190,7 +193,7 @@ def get_task_info(self, task_ID): jsonResponse = response.json() return jsonResponse else: - self.failure("Task Info retrieval failed.") + self.warning("Task Info retrieval failed.") def download_text(self, userfile_ID): '''Downloads the text from a file on CBRAIN''' @@ -211,7 +214,7 @@ def download_text(self, userfile_ID): return response.text else: msg = f'Download failure{os.path.sep}{response.status_code}' - self.failure(msg) + self.warning(msg) def download_file(self, userfile_ID, filename): '''Downloads a file from CBRAIN and saves it, given a userfile ID''' @@ -234,7 +237,7 @@ def download_file(self, userfile_ID, filename): print(f"Downloaded file {filename}") return 0 else: - self.failure(f'File download failure: {filename}') + self.warning(f'File download failure: {filename}') '''Given a filename and data provider, download the file from the data provider''' @@ -254,10 +257,10 @@ def download_DP_file(self, filename, data_provider_id): return 0 else: msg = f"File {filename} not found in Data Provider {data_provider_id}" - self.failure(msg) + self.warning(msg) except Exception as e: - self.failure("Error in browsing data provider or file download") + self.warning("Error in browsing data provider or file download") '''Makes sure a file in a data provider is synchronized with CBRAIN''' @@ -280,5 +283,5 @@ def sync_file(self, userfile_id_list): print(f"Synchronized userfiles {userfile_id_list}") return else: - self.failure( + self.warning( f"Userfile sync failed for IDs: {userfile_id_list}{os.path.sep}{response.status_code}") From 37a9c3d5c00bee31058477866558c484b047fb82 Mon Sep 17 00:00:00 2001 From: Yohan Chatelain Date: Tue, 18 Jan 2022 14:43:57 -0500 Subject: [PATCH 7/7] just runs once a year instead of every few hours! --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 461efb4..a569e18 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -35,7 +35,7 @@ workflows: schedule-workflow: triggers: - schedule: - cron: "0 0,8,16 * * *" #0 0 * * 0 for weekly, 0 * * * * for hourly, 0 0,6,12,18 * * * for every 6hrs, 0 0 * * * for daily. + cron: "0 0 1 1 *" #0 0 * * 0 for weekly, 0 * * * * for hourly, 0 0,6,12,18 * * * for every 6hrs, 0 0 * * * for daily. filters: branches: only: