diff --git a/src/ndi/cloud/api/datasets/get_dataset.py b/src/ndi/cloud/api/datasets/get_dataset.py new file mode 100644 index 0000000..6d6ece9 --- /dev/null +++ b/src/ndi/cloud/api/datasets/get_dataset.py @@ -0,0 +1,14 @@ +from ..implementation.datasets.get_dataset import GetDataset as GetDatasetImpl + +def get_dataset(dataset_id): + """ + User-facing wrapper to get dataset details. + + Args: + dataset_id (str): The ID of the dataset. + + Returns: + tuple: (success, answer, response, url) + """ + api_call = GetDatasetImpl(dataset_id) + return api_call.execute() diff --git a/src/ndi/cloud/api/documents.py b/src/ndi/cloud/api/documents.py index be387b3..2ec456e 100644 --- a/src/ndi/cloud/api/documents.py +++ b/src/ndi/cloud/api/documents.py @@ -3,6 +3,8 @@ from .implementation.documents.update_document import UpdateDocument as UpdateDocumentImpl from .implementation.documents.delete_document import DeleteDocument as DeleteDocumentImpl from .implementation.documents.list_dataset_documents import ListDatasetDocuments as ListDatasetDocumentsImpl +from .implementation.documents.list_dataset_documents_all import ListDatasetDocumentsAll as ListDatasetDocumentsAllImpl +from .implementation.documents.get_bulk_download_url import GetBulkDownloadURL as GetBulkDownloadURLImpl def add_document(dataset_id, document_info): """ @@ -75,3 +77,31 @@ def list_dataset_documents(dataset_id, page=1, page_size=20): """ api_call = ListDatasetDocumentsImpl(dataset_id, page, page_size) return api_call.execute() + +def list_dataset_documents_all(dataset_id, page_size=20): + """ + Lists all documents in a dataset. + + Args: + dataset_id (str): The ID of the dataset. + page_size (int, optional): The number of documents per page. Defaults to 20. + + Returns: + tuple: (success, answer, response, url) + """ + api_call = ListDatasetDocumentsAllImpl(dataset_id, page_size) + return api_call.execute() + +def get_bulk_download_url(dataset_id, document_ids=None): + """ + Retrieves a pre-signed URL for bulk document download. + + Args: + dataset_id (str): The ID of the dataset. + document_ids (list of str, optional): List of cloud document IDs to download. + + Returns: + tuple: (success, answer, response, url) + """ + api_call = GetBulkDownloadURLImpl(dataset_id, document_ids) + return api_call.execute() diff --git a/src/ndi/cloud/api/files/__init__.py b/src/ndi/cloud/api/files/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndi/cloud/api/files/get_file_details.py b/src/ndi/cloud/api/files/get_file_details.py new file mode 100644 index 0000000..17aef2e --- /dev/null +++ b/src/ndi/cloud/api/files/get_file_details.py @@ -0,0 +1,15 @@ +from ..implementation.files.get_file_details import GetFileDetails as GetFileDetailsImpl + +def get_file_details(dataset_id, file_uid): + """ + User-facing wrapper to get file details. + + Args: + dataset_id (str): The ID of the dataset. + file_uid (str): The UID of the file. + + Returns: + tuple: (success, answer, response, url) + """ + api_call = GetFileDetailsImpl(dataset_id, file_uid) + return api_call.execute() diff --git a/src/ndi/cloud/api/implementation/datasets/get_dataset.py b/src/ndi/cloud/api/implementation/datasets/get_dataset.py new file mode 100644 index 0000000..38ec356 --- /dev/null +++ b/src/ndi/cloud/api/implementation/datasets/get_dataset.py @@ -0,0 +1,43 @@ +from ...call import Call +from ... import url +from ....authenticate import authenticate +import requests +import json + +class GetDataset(Call): + """ + Implementation class for getting dataset details. + """ + + def __init__(self, dataset_id): + """ + Creates a new GetDataset API call object. + + Args: + dataset_id (str): The ID of the dataset. + """ + self.dataset_id = dataset_id + self.endpoint_name = 'get_dataset' + + def execute(self): + """ + Performs the API call. + """ + token = authenticate() + api_url = url.get_url(self.endpoint_name, dataset_id=self.dataset_id) + + headers = { + 'Accept': 'application/json', + 'Authorization': f'Bearer {token}' + } + + response = requests.get(api_url, headers=headers) + + if response.status_code == 200: + return True, response.json(), response, api_url + else: + try: + answer = response.json() + except json.JSONDecodeError: + answer = response.text + return False, answer, response, api_url diff --git a/src/ndi/cloud/api/implementation/documents/get_bulk_download_url.py b/src/ndi/cloud/api/implementation/documents/get_bulk_download_url.py new file mode 100644 index 0000000..954c5f1 --- /dev/null +++ b/src/ndi/cloud/api/implementation/documents/get_bulk_download_url.py @@ -0,0 +1,54 @@ +from ...call import Call +from ... import url +from ....authenticate import authenticate +import requests +import json + +class GetBulkDownloadURL(Call): + """ + Implementation class for getting a bulk download URL. + """ + + def __init__(self, dataset_id, document_ids=None): + """ + Creates a new GetBulkDownloadURL API call object. + + Args: + dataset_id (str): The ID of the dataset. + document_ids (list of str, optional): List of cloud document IDs to download. + If None or empty, all documents are included. + """ + self.dataset_id = dataset_id + self.document_ids = document_ids if document_ids is not None else [] + self.endpoint_name = 'bulk_download_documents' + + def execute(self): + """ + Performs the API call. + """ + token = authenticate() + api_url = url.get_url(self.endpoint_name, dataset_id=self.dataset_id) + + headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {token}' + } + + # The body specifies which document IDs to include + data = {'documentIds': self.document_ids} + + response = requests.post(api_url, headers=headers, json=data) + + if response.status_code in [200, 201]: + try: + answer = response.json().get('url') + return True, answer, response, api_url + except json.JSONDecodeError: + return False, response.text, response, api_url + else: + try: + answer = response.json() + except json.JSONDecodeError: + answer = response.text + return False, answer, response, api_url diff --git a/src/ndi/cloud/api/implementation/documents/list_dataset_documents_all.py b/src/ndi/cloud/api/implementation/documents/list_dataset_documents_all.py new file mode 100644 index 0000000..69a119d --- /dev/null +++ b/src/ndi/cloud/api/implementation/documents/list_dataset_documents_all.py @@ -0,0 +1,50 @@ +from ...call import Call +from .list_dataset_documents import ListDatasetDocuments + +class ListDatasetDocumentsAll(Call): + """ + Implementation class for listing all documents in a dataset. + """ + + def __init__(self, dataset_id, page_size=20): + """ + Creates a new ListDatasetDocumentsAll API call object. + + Args: + dataset_id (str): The ID of the dataset. + page_size (int, optional): The number of documents per page. Defaults to 20. + """ + self.dataset_id = dataset_id + self.page_size = page_size + self.endpoint_name = 'list_dataset_documents' + + def execute(self): + """ + Performs the API call to list all documents. + """ + all_documents = [] + page = 1 + last_response = None + last_url = None + + while True: + api_call = ListDatasetDocuments(self.dataset_id, page=page, page_size=self.page_size) + success, answer, response, url = api_call.execute() + + last_response = response + last_url = url + + if not success: + return False, answer, response, url + + if not isinstance(answer, list): + return False, "Unexpected response format: expected a list", response, url + + all_documents.extend(answer) + + if len(answer) < self.page_size: + break + + page += 1 + + return True, all_documents, last_response, last_url diff --git a/src/ndi/cloud/api/implementation/files/get_file_details.py b/src/ndi/cloud/api/implementation/files/get_file_details.py new file mode 100644 index 0000000..b9c8d80 --- /dev/null +++ b/src/ndi/cloud/api/implementation/files/get_file_details.py @@ -0,0 +1,45 @@ +from ...call import Call +from ... import url +from ....authenticate import authenticate +import requests +import json + +class GetFileDetails(Call): + """ + Implementation class for getting file details. + """ + + def __init__(self, dataset_id, file_uid): + """ + Creates a new GetFileDetails API call object. + + Args: + dataset_id (str): The ID of the dataset. + file_uid (str): The UID of the file. + """ + self.dataset_id = dataset_id + self.file_uid = file_uid + self.endpoint_name = 'get_file_details' + + def execute(self): + """ + Performs the API call. + """ + token = authenticate() + api_url = url.get_url(self.endpoint_name, dataset_id=self.dataset_id, file_uid=self.file_uid) + + headers = { + 'Accept': 'application/json', + 'Authorization': f'Bearer {token}' + } + + response = requests.get(api_url, headers=headers) + + if response.status_code == 200: + return True, response.json(), response, api_url + else: + try: + answer = response.json() + except json.JSONDecodeError: + answer = response.text + return False, answer, response, api_url diff --git a/src/ndi/cloud/download/__init__.py b/src/ndi/cloud/download/__init__.py new file mode 100644 index 0000000..8977a61 --- /dev/null +++ b/src/ndi/cloud/download/__init__.py @@ -0,0 +1 @@ +from .download_utils import download_document_collection, download_dataset_files diff --git a/src/ndi/cloud/download/download_utils.py b/src/ndi/cloud/download/download_utils.py new file mode 100644 index 0000000..729635c --- /dev/null +++ b/src/ndi/cloud/download/download_utils.py @@ -0,0 +1,160 @@ +import os +import requests +import json +import tempfile +import zipfile +import time +from ..api.documents import get_bulk_download_url +from ..api.datasets import get_dataset +from ..api.files import get_file_details +from ...document import Document + +def download_document_collection(dataset_id, document_ids=None, timeout=20, chunk_size=2000): + """ + Download documents using bulk download with chunking. + """ + if not document_ids: + # Avoid circular import by importing inside function + from ...sync.internal.document_utils import list_remote_document_ids + print('No document IDs provided; fetching all document IDs from the server...') + id_map = list_remote_document_ids(dataset_id) + document_ids = id_map['apiId'] + if not document_ids: + return [] + + num_docs = len(document_ids) + num_chunks = (num_docs + chunk_size - 1) // chunk_size + document_chunks = [document_ids[i:i + chunk_size] for i in range(0, num_docs, chunk_size)] + + all_document_structs = [] + print(f'Beginning download of {num_docs} documents in {num_chunks} chunk(s).') + + for c, chunk_doc_ids in enumerate(document_chunks, 1): + print(f' Processing chunk {c} of {num_chunks} ({len(chunk_doc_ids)} documents)...') + + success, download_url, _, _ = get_bulk_download_url(dataset_id, chunk_doc_ids) + if not success: + raise RuntimeError(f"Failed to get bulk download URL for chunk {c}") + + with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as temp_zip: + temp_zip_path = temp_zip.name + + try: + is_finished = False + start_time = time.time() + + while not is_finished and (time.time() - start_time) < timeout: + try: + response = requests.get(download_url, stream=True) + if response.status_code == 200: + with open(temp_zip_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + is_finished = True + else: + time.sleep(1) + except Exception: + time.sleep(1) + + if not is_finished: + raise RuntimeError(f"Download failed for chunk {c} after timeout") + + with tempfile.TemporaryDirectory() as extract_dir: + with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: + zip_ref.extractall(extract_dir) + + # Assume one JSON file per chunk as per Matlab logic (unzippedFiles{1}) + # But zip might contain multiple files. Matlab code: jsonFile = unzippedFiles{1} + # We iterate over extracted files + for filename in os.listdir(extract_dir): + if filename.endswith('.json'): + with open(os.path.join(extract_dir, filename), 'r') as f: + # Handling potential NaN/Null is skipped for now, assuming standard JSON + document_structs = json.load(f) + # dropDuplicateDocsFromJsonDecode logic is skipped for now + if isinstance(document_structs, list): + all_document_structs.extend(document_structs) + else: + all_document_structs.append(document_structs) + finally: + if os.path.exists(temp_zip_path): + os.remove(temp_zip_path) + + print(f'Download complete. Converting {len(all_document_structs)} structs to NDI documents...') + documents = [Document(d) for d in all_document_structs] + print('Processing complete.') + return documents + +def download_dataset_files(cloud_dataset_id, target_folder, file_uuids=None, verbose=True, abort_on_error=True): + """ + Downloads dataset files from a cloud dataset. + """ + success, dataset_info, _, _ = get_dataset(cloud_dataset_id) + if not success: + raise RuntimeError(f"Failed to get dataset: {dataset_info}") + + if 'files' not in dataset_info and file_uuids is not None: + raise RuntimeError('No files found in the dataset despite files requested.') + + if 'files' not in dataset_info: + return + + files = _filter_files_to_download(dataset_info['files'], file_uuids) + num_files = len(files) + + if verbose: + print(f'Will download {num_files} files...') + + for i, file_info in enumerate(files, 1): + if verbose: + _display_progress(i, num_files) + + file_uid = file_info['uid'] + exists_on_cloud = file_info.get('uploaded', False) + + if not exists_on_cloud: + print(f'Warning: File with uuid "{file_uid}" does not exist on the cloud, skipping...') + continue + + target_filepath = os.path.join(target_folder, file_uid) + if os.path.exists(target_filepath): + if verbose: + print(f'File {i} already exists locally, skipping...') + continue + + success, answer, _, _ = get_file_details(cloud_dataset_id, file_uid) + if not success: + print(f"Warning: Failed to get file details: {answer}") + continue + + download_url = answer.get('downloadUrl') + if not download_url: + print(f"Warning: No download URL for file {file_uid}") + continue + + try: + response = requests.get(download_url, stream=True) + response.raise_for_status() + with open(target_filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + except Exception as e: + if abort_on_error: + raise e + else: + print(f"Warning: Download failed for file {i}: {e}") + + if verbose: + print('File download complete.') + +def _filter_files_to_download(files, file_uuids): + if file_uuids is not None: + # Assuming file_uuids is a list of strings + # Filter files where uid is in file_uuids + filtered_files = [f for f in files if f['uid'] in file_uuids] + return filtered_files + return files + +def _display_progress(current_file_number, total_file_number): + percent_finished = round((current_file_number / total_file_number) * 100) + print(f'Downloading file {current_file_number} of {total_file_number} ({percent_finished}% complete) ...') diff --git a/src/ndi/cloud/internal/get_cloud_dataset_id_for_local_dataset.py b/src/ndi/cloud/internal/get_cloud_dataset_id_for_local_dataset.py new file mode 100644 index 0000000..edee8a3 --- /dev/null +++ b/src/ndi/cloud/internal/get_cloud_dataset_id_for_local_dataset.py @@ -0,0 +1,18 @@ +from ...query import Query + +def get_cloud_dataset_id_for_local_dataset(ndi_dataset): + """ + Retrieves the cloud dataset ID for a local dataset. + """ + cloud_dataset_id_query = Query('', 'isa', 'dataset_remote') + # Assuming database_search returns a list of documents + cloud_dataset_id_documents = ndi_dataset.database_search(cloud_dataset_id_query) + + if len(cloud_dataset_id_documents) > 1: + raise RuntimeError(f"Found more than one remote cloudDatasetId for the local dataset: {ndi_dataset.path}.") + elif cloud_dataset_id_documents: + # Assuming document structure + doc = cloud_dataset_id_documents[0] + return doc.document_properties['dataset_remote']['dataset_id'], cloud_dataset_id_documents + else: + return '', [] diff --git a/src/ndi/cloud/sync/__init__.py b/src/ndi/cloud/sync/__init__.py new file mode 100644 index 0000000..b083b0b --- /dev/null +++ b/src/ndi/cloud/sync/__init__.py @@ -0,0 +1,3 @@ +from .download_new_impl import download_new +from .sync_options import SyncOptions +from .enum.sync_mode import SyncMode diff --git a/src/ndi/cloud/sync/download_new_impl.py b/src/ndi/cloud/sync/download_new_impl.py new file mode 100644 index 0000000..8328dff --- /dev/null +++ b/src/ndi/cloud/sync/download_new_impl.py @@ -0,0 +1,108 @@ +from .sync_options import SyncOptions +from ..internal.get_cloud_dataset_id_for_local_dataset import get_cloud_dataset_id_for_local_dataset +from .internal.index.index_utils import read_sync_index, update_sync_index +from .internal.document_utils import list_remote_document_ids, list_local_documents +from .internal.download_ndi_documents import download_ndi_documents + +def download_new(ndi_dataset, sync_options=None): + """ + Download new documents (and associated data files) from remote to local. + """ + if sync_options is None: + sync_options = SyncOptions() + elif isinstance(sync_options, dict): + sync_options = SyncOptions(**sync_options) + + success = True + error_message = '' + report = {'downloaded_document_ids': []} + + try: + if sync_options.Verbose: + print(f'Syncing dataset "{ndi_dataset.path}". \nWill download new documents from remote.') + + cloud_dataset_id, _ = get_cloud_dataset_id_for_local_dataset(ndi_dataset) + if sync_options.Verbose: + print(f'Using Cloud Dataset ID: {cloud_dataset_id}') + + sync_index = read_sync_index(ndi_dataset) + remote_ids_last_sync = sync_index.get('remoteDocumentIdsLastSync', []) + if sync_options.Verbose: + print(f'Read sync index. Last sync recorded {len(remote_ids_last_sync)} remote documents.') + + remote_document_id_map = list_remote_document_ids(cloud_dataset_id, verbose=sync_options.Verbose) + current_remote_ndi_ids = remote_document_id_map['ndiId'] + + # Determine documents to download + # setdiff in Matlab: unique values in A that are not in B + # Python: list(set(A) - set(B)) but order might not be preserved. + # Use list comprehension to preserve order of A if important, but Matlab 'stable' does that. + # We need indices too. + + # Mapping API IDs to NDI IDs for lookup + # remote_document_id_map['ndiId'] and ['apiId'] correspond by index + + ndi_ids_to_download = [] + indices_to_download = [] + + remote_ids_last_sync_set = set(remote_ids_last_sync) + for idx, ndi_id in enumerate(current_remote_ndi_ids): + if ndi_id not in remote_ids_last_sync_set: + ndi_ids_to_download.append(ndi_id) + indices_to_download.append(idx) + + if sync_options.Verbose: + print(f'Found {len(ndi_ids_to_download)} documents added on remote since last sync.') + + if ndi_ids_to_download: + cloud_api_ids_to_download = [remote_document_id_map['apiId'][i] for i in indices_to_download] + + if sync_options.DryRun: + print(f'[DryRun] Would download {len(ndi_ids_to_download)} documents from remote.') + if sync_options.Verbose: + for i in range(len(ndi_ids_to_download)): + print(f' [DryRun] - NDI ID: {ndi_ids_to_download[i]} (Cloud Specific ID: {cloud_api_ids_to_download[i]})') + else: + if sync_options.Verbose: + print(f'Downloading {len(ndi_ids_to_download)} documents...') + + downloaded_docs = download_ndi_documents( + cloud_dataset_id, + cloud_api_ids_to_download, + ndi_dataset, + sync_options + ) + + if downloaded_docs: + report['downloaded_document_ids'] = [d.document_properties['base']['id'] for d in downloaded_docs] + + if sync_options.Verbose: + print(f'Completed downloading {len(ndi_ids_to_download)} documents.') + else: + if sync_options.Verbose: + print('No new documents to download from remote.') + + if not sync_options.DryRun: + _, final_local_document_ids = list_local_documents(ndi_dataset) + + update_sync_index( + ndi_dataset, + cloud_dataset_id, + local_document_ids=final_local_document_ids, + remote_document_ids=remote_document_id_map['ndiId'] + ) + + if sync_options.Verbose: + print('Sync index updated.') + + if sync_options.Verbose: + print(f'Syncing complete for dataset: {ndi_dataset.path}') + + except Exception as e: + success = False + error_message = str(e) + if sync_options.Verbose: + print(f'Error in downloadNew: {error_message}') + # raise e # Optionally re-raise + + return success, error_message, report diff --git a/src/ndi/cloud/sync/enum/sync_mode.py b/src/ndi/cloud/sync/enum/sync_mode.py new file mode 100644 index 0000000..b9bf595 --- /dev/null +++ b/src/ndi/cloud/sync/enum/sync_mode.py @@ -0,0 +1,28 @@ +from enum import Enum +import importlib + +class SyncMode(Enum): + """ + Enumeration of supported dataset synchronization modes. + """ + DownloadNew = "download_new" + MirrorFromRemote = "mirror_from_remote" + UploadNew = "upload_new" + MirrorToRemote = "mirror_to_remote" + TwoWaySync = "two_way_sync" + + def execute(self, ndi_dataset, sync_options): + """ + Executes the synchronization function corresponding to the mode. + + Args: + ndi_dataset: The NDI dataset object. + sync_options: The SyncOptions object. + """ + module_name = f"ndi.cloud.sync.{self.value}" + try: + module = importlib.import_module(module_name) + func = getattr(module, self.value) + return func(ndi_dataset, sync_options) + except (ImportError, AttributeError) as e: + raise NotImplementedError(f"Synchronization function '{self.value}' is not implemented.") from e diff --git a/src/ndi/cloud/sync/internal/constants.py b/src/ndi/cloud/sync/internal/constants.py new file mode 100644 index 0000000..1f08255 --- /dev/null +++ b/src/ndi/cloud/sync/internal/constants.py @@ -0,0 +1,7 @@ +import os + +class Constants: + """ + Constants for NDI Cloud Sync. + """ + FileSyncLocation = os.path.join('download', 'files') diff --git a/src/ndi/cloud/sync/internal/document_utils.py b/src/ndi/cloud/sync/internal/document_utils.py new file mode 100644 index 0000000..99f73ed --- /dev/null +++ b/src/ndi/cloud/sync/internal/document_utils.py @@ -0,0 +1,37 @@ +from ...api.documents import list_dataset_documents_all +from ....query import Query + +def list_remote_document_ids(cloud_dataset_id, verbose=False): + """ + List all NDI and API document IDs from a remote dataset. + """ + if verbose: + print(f'Fetching complete remote document list for dataset {cloud_dataset_id}...') + + success, all_documents, response, _ = list_dataset_documents_all(cloud_dataset_id) + + if not success: + error_msg = all_documents.get('message', 'Unknown error') if isinstance(all_documents, dict) else all_documents + raise RuntimeError(f"Failed to list remote documents: {error_msg}") + + if not all_documents: + if verbose: + print('No remote documents found.') + return {'ndiId': [], 'apiId': []} + + all_ndi_ids = [doc.get('ndiId') for doc in all_documents] + all_api_ids = [doc.get('id') for doc in all_documents] + + if verbose: + print(f'Total remote documents processed: {len(all_ndi_ids)}.') + + return {'ndiId': all_ndi_ids, 'apiId': all_api_ids} + +def list_local_documents(ndi_dataset): + """ + List documents in local dataset. + """ + # Assuming ndi_dataset has database_search method and Query class is available + documents = ndi_dataset.database_search(Query('', 'isa', 'base')) + document_ids = [doc.document_properties['base']['id'] for doc in documents] + return documents, document_ids diff --git a/src/ndi/cloud/sync/internal/download_ndi_documents.py b/src/ndi/cloud/sync/internal/download_ndi_documents.py new file mode 100644 index 0000000..6509c50 --- /dev/null +++ b/src/ndi/cloud/sync/internal/download_ndi_documents.py @@ -0,0 +1,88 @@ +import os +import tempfile +from ...download.download_utils import download_document_collection, download_dataset_files +from .file_utils import get_file_uids_from_documents, update_file_info_for_local_files, update_file_info_for_remote_files +from .constants import Constants + +def download_ndi_documents(cloud_dataset_id, cloud_document_ids, ndi_dataset=None, sync_options=None): + """ + Downloads a collection of NDI documents and their files. + """ + if sync_options is None: + from ..sync_options import SyncOptions + sync_options = SyncOptions() + + if not cloud_document_ids: + if sync_options.Verbose: + print('No document IDs provided to download.') + return [] + + if sync_options.Verbose: + print(f'Attempting to download {len(cloud_document_ids)} documents...') + + new_ndi_documents = download_document_collection(cloud_dataset_id, cloud_document_ids) + + if not new_ndi_documents: + print('Warning: No documents were retrieved from the cloud for the given IDs.') + return [] + + if sync_options.Verbose: + print(f'Successfully retrieved metadata for {len(new_ndi_documents)} documents.') + + if sync_options.SyncFiles: + if sync_options.Verbose: + print('SyncFiles is true. Processing associated data files...') + + if ndi_dataset is None: + root_files_folder = tempfile.gettempdir() + else: + root_files_folder = ndi_dataset.path + + files_target_folder = os.path.join(root_files_folder, Constants.FileSyncLocation) + + file_uids_to_download = get_file_uids_from_documents(new_ndi_documents) + + if file_uids_to_download: + if sync_options.Verbose: + print(f'Found {len(file_uids_to_download)} unique file UIDs to download for these documents.') + print(f'Ensuring download directory exists: {files_target_folder}') + + if not os.path.isdir(files_target_folder): + os.makedirs(files_target_folder) + + download_dataset_files( + cloud_dataset_id, + files_target_folder, + file_uids_to_download, + verbose=sync_options.Verbose + ) + + if sync_options.Verbose: + print('Completed downloading data files.') + print('Updating document file info to point to local files.') + else: + if sync_options.Verbose: + print('No associated files found for these documents, or files already local.') + + processed_documents = [] + for doc in new_ndi_documents: + processed_documents.append(update_file_info_for_local_files(doc, files_target_folder)) + new_ndi_documents = processed_documents + + else: + if sync_options.Verbose: + print('"SyncFiles" option is false. Updating document file info to reflect remote files.') + + processed_documents = [] + for doc in new_ndi_documents: + processed_documents.append(update_file_info_for_remote_files(doc, cloud_dataset_id)) + new_ndi_documents = processed_documents + + if ndi_dataset is not None: + if sync_options.Verbose: + print(f'Adding {len(new_ndi_documents)} processed documents to the local dataset...') + ndi_dataset.database_add(new_ndi_documents) + if sync_options.Verbose: + print('Documents added to the dataset.') + + return new_ndi_documents diff --git a/src/ndi/cloud/sync/internal/file_utils.py b/src/ndi/cloud/sync/internal/file_utils.py new file mode 100644 index 0000000..9be2008 --- /dev/null +++ b/src/ndi/cloud/sync/internal/file_utils.py @@ -0,0 +1,97 @@ +import os + +def get_file_uids_from_documents(ndi_documents): + """ + Extracts all unique file UIDs from a list of NDI documents. + """ + file_uids_list = [] + if not ndi_documents: + return [] + + for document in ndi_documents: + if hasattr(document, 'has_files') and document.has_files(): # Assuming document object structure + # document.document_properties is a dict + file_info = document.document_properties.get('files', {}).get('file_info', []) + for info in file_info: + locations = info.get('locations', []) + for location in locations: + uid = location.get('uid') + if uid: + file_uids_list.append(uid) + + return list(set(file_uids_list)) + +def update_file_info_for_local_files(document, file_directory): + """ + Update file info of document for local files. + """ + if hasattr(document, 'has_files') and document.has_files(): + # This part requires deeper knowledge of ndi.document methods like reset_file_info and add_file + # Assuming we can manipulate document_properties directly as per python port pattern + # The Matlab code uses document methods. Python Document class in this repo: + # has document_properties dict. + # But methods like `reset_file_info` and `add_file` might not be ported yet or available in `did.document`. + # I checked `src/ndi/document.py` and it inherits from `did.document.Document`. + # I should check `did.document.Document`. + # For now, I will implement logic manipulating `document_properties`. + + file_info = document.document_properties.get('files', {}).get('file_info', []) + # In Matlab: document = document.reset_file_info(); + # We need to replicate this logic. + + # Simulating reset: clear locations or rebuild? + # Matlab loop: + # for i = 1:numel(originalFileInfo) + # file_uid = originalFileInfo(i).locations(1).uid; + # file_location = fullfile(fileDirectory, file_uid); + # ... + + new_file_info = [] + for info in file_info: + if 'locations' in info and info['locations']: + file_uid = info['locations'][0].get('uid') + if file_uid: + file_location = os.path.join(file_directory, file_uid) + if os.path.isfile(file_location): + # Construct new file info entry compatible with add_file + # Matlab add_file(filename, file_location) + # We just update the existing entry to point to local file + # This might differ from "reset and add". + # Ideally we should use the Document methods if they exist. + # Assuming they don't, we update the dict. + + # Resetting locations to just this local file + info['locations'] = [{ + 'location': file_location, + 'location_type': 'file', # Assuming local file type + 'uid': file_uid + }] + else: + print(f'Warning: Local file does not exist for document "{document.document_properties["base"]["id"]}"') + new_file_info.append(info) + + document.document_properties['files']['file_info'] = new_file_info + return document + +def update_file_info_for_remote_files(document, cloud_dataset_id): + """ + Update file info of document for remote (cloud-only) files. + """ + if hasattr(document, 'has_files') and document.has_files(): + file_info = document.document_properties.get('files', {}).get('file_info', []) + + for info in file_info: + if 'locations' in info and info['locations']: + # Replace/override 1st file location + loc = info['locations'][0] + loc['delete_original'] = 0 + loc['ingest'] = 0 + + file_uid = loc.get('uid') + if file_uid: + file_location = f'ndic://{cloud_dataset_id}/{file_uid}' + loc['location'] = file_location + loc['location_type'] = 'ndicloud' + + document.document_properties['files']['file_info'] = file_info + return document diff --git a/src/ndi/cloud/sync/internal/index/index_utils.py b/src/ndi/cloud/sync/internal/index/index_utils.py new file mode 100644 index 0000000..30776d7 --- /dev/null +++ b/src/ndi/cloud/sync/internal/index/index_utils.py @@ -0,0 +1,61 @@ +import os +import json +import datetime + +def get_index_filepath(ndi_dataset_path, mode, verbose=True): + """ + Returns the path to the sync index file. + """ + sync_dir_path = os.path.join(ndi_dataset_path, '.ndi', 'sync') + if not os.path.isdir(sync_dir_path): + if mode == "write": + if verbose: + print(f'Creating sync directory: {sync_dir_path}') + os.makedirs(sync_dir_path) + return os.path.join(sync_dir_path, 'index.json') + +def create_sync_index_struct(local_ndi_ids, remote_ndi_ids): + """ + Creates the structure for the NDI sync index. + """ + index_struct = { + 'localDocumentIdsLastSync': local_ndi_ids, + 'remoteDocumentIdsLastSync': remote_ndi_ids, + 'lastSyncTimestamp': datetime.datetime.now(datetime.timezone.utc).isoformat() + } + return index_struct + +def read_sync_index(ndi_dataset, verbose=True): + """ + Reads the sync index from disk. + """ + index_path = get_index_filepath(ndi_dataset.path, "read", verbose=verbose) + if os.path.isfile(index_path): + with open(index_path, 'r') as f: + return json.load(f) + else: + return {} + +def write_sync_index(ndi_dataset, sync_index, verbose=False): + """ + Writes the sync index to disk. + """ + index_path = get_index_filepath(ndi_dataset.path, "write", verbose=verbose) + with open(index_path, 'w') as f: + json.dump(sync_index, f, indent=4) + +def update_sync_index(ndi_dataset, cloud_dataset_id, local_document_ids=None, remote_document_ids=None): + """ + Updates synchronization index for the dataset. + """ + if local_document_ids is None: + from ..document_utils import list_local_documents + _, local_document_ids = list_local_documents(ndi_dataset) + + if remote_document_ids is None: + from ..document_utils import list_remote_document_ids + remote_docs_map = list_remote_document_ids(cloud_dataset_id) + remote_document_ids = remote_docs_map['ndiId'] + + sync_index = create_sync_index_struct(local_document_ids, remote_document_ids) + write_sync_index(ndi_dataset, sync_index) diff --git a/src/ndi/cloud/sync/sync_options.py b/src/ndi/cloud/sync/sync_options.py new file mode 100644 index 0000000..f39cc30 --- /dev/null +++ b/src/ndi/cloud/sync/sync_options.py @@ -0,0 +1,33 @@ +class SyncOptions: + """ + Options class for controlling sync behavior. + """ + + def __init__(self, **kwargs): + """ + Initializes the SyncOptions object. + + Args: + SyncFiles (bool): If true, files will be downloaded (default: False). + Verbose (bool): If true, verbose output is printed (default: True). + DryRun (bool): If true, actions are simulated but not performed (default: False). + FileUploadStrategy (str): "serial" or "batch" (default: "batch"). + """ + self.SyncFiles = kwargs.get('SyncFiles', False) + self.Verbose = kwargs.get('Verbose', True) + self.DryRun = kwargs.get('DryRun', False) + self.FileUploadStrategy = kwargs.get('FileUploadStrategy', 'batch') + + if self.FileUploadStrategy not in ['serial', 'batch']: + raise ValueError("FileUploadStrategy must be either 'serial' or 'batch'") + + def to_dict(self): + """ + Convert properties to a dictionary. + """ + return { + 'SyncFiles': self.SyncFiles, + 'Verbose': self.Verbose, + 'DryRun': self.DryRun, + 'FileUploadStrategy': self.FileUploadStrategy + } diff --git a/tests/test_cloud_documents.py b/tests/test_cloud_documents.py index 96d8bdd..126faa2 100644 --- a/tests/test_cloud_documents.py +++ b/tests/test_cloud_documents.py @@ -1,6 +1,6 @@ import unittest from unittest.mock import patch, Mock -from ndi.cloud.api.documents import add_document, get_document, update_document, delete_document, list_dataset_documents +from ndi.cloud.api.documents import add_document, get_document, update_document, delete_document, list_dataset_documents, list_dataset_documents_all class TestCloudDocuments(unittest.TestCase): @@ -67,5 +67,31 @@ def test_list_dataset_documents(self, mock_get, mock_authenticate): self.assertTrue(success) self.assertEqual(len(answer), 1) + @patch('ndi.cloud.api.implementation.documents.list_dataset_documents.authenticate') + @patch('requests.get') + def test_list_dataset_documents_all(self, mock_get, mock_authenticate): + mock_authenticate.return_value = 'fake_token' + + # Page 1 response + resp1 = Mock() + resp1.status_code = 200 + resp1.json.return_value = [{'id': 'doc1'}, {'id': 'doc2'}] + + # Page 2 response (partial page) + resp2 = Mock() + resp2.status_code = 200 + resp2.json.return_value = [{'id': 'doc3'}] + + mock_get.side_effect = [resp1, resp2] + + # page_size=2 to force 2 pages + success, answer, _, _ = list_dataset_documents_all('ds1', page_size=2) + + self.assertTrue(success) + self.assertEqual(len(answer), 3) + self.assertEqual(answer[0]['id'], 'doc1') + self.assertEqual(answer[2]['id'], 'doc3') + self.assertEqual(mock_get.call_count, 2) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_cloud_sync_download_new.py b/tests/test_cloud_sync_download_new.py new file mode 100644 index 0000000..ed1cbc1 --- /dev/null +++ b/tests/test_cloud_sync_download_new.py @@ -0,0 +1,98 @@ +import unittest +from unittest.mock import MagicMock, patch +import sys + +# We need to ensure dependencies are handled. +# If 'did' is not installed, we can mock it safely using patch.dict +# But 'download_new_impl' imports it at module level (via get_cloud_dataset_id... -> query -> did.query) +# So we need to mock it *before* importing download_new_impl if it's missing. + +needs_did_mock = False +try: + import did +except ImportError: + needs_did_mock = True + +class TestCloudSyncDownloadNew(unittest.TestCase): + + def setUp(self): + self.mock_dataset = MagicMock() + self.mock_dataset.path = '/tmp/fake_dataset' + + if needs_did_mock: + self.did_patcher = patch.dict('sys.modules', { + 'did': MagicMock(), + 'did.query': MagicMock(), + 'did.document': MagicMock() + }) + self.did_patcher.start() + + # Re-import to ensure mocks are used if patched + # Or import inside test methods + + def tearDown(self): + if needs_did_mock: + self.did_patcher.stop() + + @patch('ndi.cloud.sync.download_new_impl.get_cloud_dataset_id_for_local_dataset') + @patch('ndi.cloud.sync.download_new_impl.read_sync_index') + @patch('ndi.cloud.sync.download_new_impl.list_remote_document_ids') + @patch('ndi.cloud.sync.download_new_impl.download_ndi_documents') + @patch('ndi.cloud.sync.download_new_impl.list_local_documents') + @patch('ndi.cloud.sync.download_new_impl.update_sync_index') + def test_download_new_success(self, mock_update_index, mock_list_local, mock_download_docs, mock_list_remote, mock_read_index, mock_get_cloud_id): + # Delayed import to allow setUp to patch sys.modules if needed + from ndi.cloud.sync.download_new_impl import download_new + from ndi.cloud.sync.sync_options import SyncOptions + + # Setup mocks + mock_get_cloud_id.return_value = ('cloud-id-123', []) + mock_read_index.return_value = {'remoteDocumentIdsLastSync': ['doc1']} + mock_list_remote.return_value = {'ndiId': ['doc1', 'doc2'], 'apiId': ['api1', 'api2']} + + mock_doc = MagicMock() + mock_doc.document_properties = {'base': {'id': 'doc2'}} + mock_download_docs.return_value = [mock_doc] + + mock_list_local.return_value = ([], ['doc1', 'doc2']) + + sync_options = SyncOptions(Verbose=False) + + # Execute + success, msg, report = download_new(self.mock_dataset, sync_options) + + # Verify + self.assertTrue(success) + self.assertEqual(report['downloaded_document_ids'], ['doc2']) + + mock_download_docs.assert_called_once() + args, _ = mock_download_docs.call_args + self.assertEqual(args[0], 'cloud-id-123') + self.assertEqual(args[1], ['api2']) # doc2 is new, corresponds to api2 + + mock_update_index.assert_called_once() + + @patch('ndi.cloud.sync.download_new_impl.get_cloud_dataset_id_for_local_dataset') + @patch('ndi.cloud.sync.download_new_impl.read_sync_index') + @patch('ndi.cloud.sync.download_new_impl.list_remote_document_ids') + @patch('ndi.cloud.sync.download_new_impl.download_ndi_documents') + @patch('ndi.cloud.sync.download_new_impl.update_sync_index') + def test_download_new_no_changes(self, mock_update_index, mock_download_docs, mock_list_remote, mock_read_index, mock_get_cloud_id): + # Delayed import + from ndi.cloud.sync.download_new_impl import download_new + from ndi.cloud.sync.sync_options import SyncOptions + + mock_get_cloud_id.return_value = ('cloud-id-123', []) + mock_read_index.return_value = {'remoteDocumentIdsLastSync': ['doc1']} + mock_list_remote.return_value = {'ndiId': ['doc1'], 'apiId': ['api1']} + + sync_options = SyncOptions(Verbose=False) + success, msg, report = download_new(self.mock_dataset, sync_options) + + self.assertTrue(success) + self.assertEqual(report['downloaded_document_ids'], []) + mock_download_docs.assert_not_called() + mock_update_index.assert_called_once() + +if __name__ == '__main__': + unittest.main()