diff --git a/comparison_table.md b/comparison_table.md index 864b4f3..dc32a71 100644 --- a/comparison_table.md +++ b/comparison_table.md @@ -135,32 +135,32 @@ | ndi.cloud.internal.getActiveToken | No | No | | ndi.cloud.internal.getCloudDatasetIdForLocalDataset | No | No | | ndi.cloud.internal.getTokenExpiration | No | No | -| ndi.cloud.internal.getUploadedDocumentIds | No | No | -| ndi.cloud.internal.getUploadedFileIds | No | No | -| ndi.cloud.internal.getWeboptionsWithAuthHeader | No | No | -| ndi.cloud.sync.enum.SyncMode | No | No | -| ndi.cloud.sync.internal.index.createSyncIndexStruct | No | No | -| ndi.cloud.sync.internal.index.getIndexFilepath | No | No | -| ndi.cloud.sync.internal.index.readSyncIndex | No | No | -| ndi.cloud.sync.internal.index.updateSyncIndex | No | No | -| ndi.cloud.sync.internal.index.writeSyncIndex | No | No | -| ndi.cloud.sync.internal.Constants | No | No | -| ndi.cloud.sync.internal.datasetSessionIdFromDocs | No | No | -| ndi.cloud.sync.internal.deleteLocalDocuments | No | No | -| ndi.cloud.sync.internal.deleteRemoteDocuments | No | No | -| ndi.cloud.sync.internal.downloadNdiDocuments | No | No | +| ndi.cloud.internal.getUploadedDocumentIds | Yes | Yes | +| ndi.cloud.internal.getUploadedFileIds | Yes | Yes | +| ndi.cloud.internal.getWeboptionsWithAuthHeader | Yes | Yes | +| ndi.cloud.sync.enum.SyncMode | Yes | Yes | +| ndi.cloud.sync.internal.index.createSyncIndexStruct | Yes | Yes | +| ndi.cloud.sync.internal.index.getIndexFilepath | Yes | Yes | +| ndi.cloud.sync.internal.index.readSyncIndex | Yes | Yes | +| ndi.cloud.sync.internal.index.updateSyncIndex | Yes | Yes | +| ndi.cloud.sync.internal.index.writeSyncIndex | Yes | Yes | +| ndi.cloud.sync.internal.Constants | Yes | Yes | +| ndi.cloud.sync.internal.datasetSessionIdFromDocs | Yes | Yes | +| ndi.cloud.sync.internal.deleteLocalDocuments | Yes | Yes | +| ndi.cloud.sync.internal.deleteRemoteDocuments | Yes | Yes | +| ndi.cloud.sync.internal.downloadNdiDocuments | Yes | Yes | | ndi.cloud.sync.internal.filesNotYetUploaded | No | No | -| ndi.cloud.sync.internal.getFileUidsFromDocuments | No | No | -| ndi.cloud.sync.internal.listLocalDocuments | No | No | -| ndi.cloud.sync.internal.listRemoteDocumentIds | No | No | -| ndi.cloud.sync.internal.updateFileInfoForLocalFiles | No | No | -| ndi.cloud.sync.internal.updateFileInfoForRemoteFiles | No | No | -| ndi.cloud.sync.internal.uploadFilesForDatasetDocuments | No | No | -| ndi.cloud.sync.SyncOptions | No | No | -| ndi.cloud.sync.downloadNew | No | No | -| ndi.cloud.sync.mirrorFromRemote | No | No | -| ndi.cloud.sync.mirrorToRemote | No | No | -| ndi.cloud.sync.twoWaySync | No | No | +| ndi.cloud.sync.internal.getFileUidsFromDocuments | Yes | Yes | +| ndi.cloud.sync.internal.listLocalDocuments | Yes | Yes | +| ndi.cloud.sync.internal.listRemoteDocumentIds | Yes | Yes | +| ndi.cloud.sync.internal.updateFileInfoForLocalFiles | Yes | Yes | +| ndi.cloud.sync.internal.updateFileInfoForRemoteFiles | Yes | Yes | +| ndi.cloud.sync.internal.uploadFilesForDatasetDocuments | Yes | Yes | +| ndi.cloud.sync.SyncOptions | Yes | Yes | +| ndi.cloud.sync.downloadNew | Yes | Yes | +| ndi.cloud.sync.mirrorFromRemote | Yes | Yes | +| ndi.cloud.sync.mirrorToRemote | Yes | Yes | +| ndi.cloud.sync.twoWaySync | Yes | Yes | | ndi.cloud.sync.uploadNew | No | No | | ndi.cloud.sync.validate | No | No | | ndi.cloud.ui.dialog.selectCloudDataset | No | No | diff --git a/src/ndi/cloud/api/files/__init__.py b/src/ndi/cloud/api/files/__init__.py index e69de29..adf9b86 100644 --- a/src/ndi/cloud/api/files/__init__.py +++ b/src/ndi/cloud/api/files/__init__.py @@ -0,0 +1,2 @@ +from .get_file_details import get_file_details +from .get_file_upload_url import get_file_upload_url diff --git a/src/ndi/cloud/api/files/get_file_upload_url.py b/src/ndi/cloud/api/files/get_file_upload_url.py new file mode 100644 index 0000000..66c92bb --- /dev/null +++ b/src/ndi/cloud/api/files/get_file_upload_url.py @@ -0,0 +1,16 @@ +from ..implementation.files.get_file_upload_url import GetFileUploadURL as GetFileUploadURLImpl + +def get_file_upload_url(dataset_id, file_uid, organization_id=None): + """ + Gets the upload URL for a file in a dataset. + + Args: + dataset_id (str): The ID of the dataset. + file_uid (str): The UID of the file. + organization_id (str, optional): The ID of the organization. + + Returns: + tuple: (success, answer, response, url) + """ + api_call = GetFileUploadURLImpl(dataset_id, file_uid, organization_id) + return api_call.execute() diff --git a/src/ndi/cloud/api/implementation/files/get_file_upload_url.py b/src/ndi/cloud/api/implementation/files/get_file_upload_url.py new file mode 100644 index 0000000..75e3749 --- /dev/null +++ b/src/ndi/cloud/api/implementation/files/get_file_upload_url.py @@ -0,0 +1,60 @@ +from ...call import Call +from ... import url +from ....authenticate import authenticate +import requests +import json + +class GetFileUploadURL(Call): + """ + Implementation class for getting a file upload URL. + """ + + def __init__(self, dataset_id, file_uid, organization_id=None): + """ + Creates a new GetFileUploadURL API call object. + + Args: + dataset_id (str): The ID of the dataset. + file_uid (str): The UID of the file. + organization_id (str, optional): The ID of the organization. If not provided, it will be retrieved from the environment. + """ + self.dataset_id = dataset_id + self.file_uid = file_uid + self.organization_id = organization_id + self.endpoint_name = 'get_file_upload_url' + + def execute(self): + """ + Performs the API call. + """ + token = authenticate() + + # Pass organization_id if available, otherwise url.get_url will try env var + kwargs = { + 'dataset_id': self.dataset_id, + 'file_uid': self.file_uid + } + if self.organization_id: + kwargs['organization_id'] = self.organization_id + + try: + api_url = url.get_url(self.endpoint_name, **kwargs) + except ValueError as e: + # Likely missing organizationId + return False, str(e), None, None + + headers = { + 'Accept': 'application/json', + 'Authorization': f'Bearer {token}' + } + + response = requests.get(api_url, headers=headers) + + if response.status_code == 200: + return True, response.json(), response, api_url + else: + try: + answer = response.json() + except json.JSONDecodeError: + answer = response.text + return False, answer, response, api_url diff --git a/src/ndi/cloud/api/url.py b/src/ndi/cloud/api/url.py index 202fb9f..519fe0a 100644 --- a/src/ndi/cloud/api/url.py +++ b/src/ndi/cloud/api/url.py @@ -30,7 +30,7 @@ def get_url(endpoint_name, **kwargs): 'uid': kwargs.get('file_uid'), 'datasetId': kwargs.get('dataset_id'), 'documentId': kwargs.get('document_id'), - 'organizationId': kwargs.get('organization_id'), + 'organizationId': kwargs.get('organization_id') or os.environ.get('NDI_CLOUD_ORGANIZATION_ID'), 'userId': kwargs.get('user_id'), 'pageSize': kwargs.get('page_size', 20), 'page': kwargs.get('page', 1) diff --git a/src/ndi/cloud/internal/__init__.py b/src/ndi/cloud/internal/__init__.py index e69de29..c034591 100644 --- a/src/ndi/cloud/internal/__init__.py +++ b/src/ndi/cloud/internal/__init__.py @@ -0,0 +1,4 @@ +from .get_cloud_dataset_id_for_local_dataset import get_cloud_dataset_id_for_local_dataset +from .get_weboptions_with_auth_header import get_weboptions_with_auth_header +from .get_uploaded_document_ids import get_uploaded_document_ids +from .get_uploaded_file_ids import get_uploaded_file_ids diff --git a/src/ndi/cloud/internal/get_uploaded_document_ids.py b/src/ndi/cloud/internal/get_uploaded_document_ids.py new file mode 100644 index 0000000..094e28b --- /dev/null +++ b/src/ndi/cloud/internal/get_uploaded_document_ids.py @@ -0,0 +1,34 @@ +def get_uploaded_document_ids(dataset_id, verbose=False): + """ + Returns a dictionary of uploaded document IDs from the cloud. + + Args: + dataset_id (str): The cloud dataset ID. + verbose (bool): Whether to print progress messages. + + Returns: + dict: A dictionary with keys 'ndiId' and 'apiId', containing lists of NDI and API document IDs. + """ + from ..api.documents import list_dataset_documents_all + + if verbose: + print(f'Fetching complete remote document list for dataset {dataset_id}...') + + success, all_documents, _, _ = list_dataset_documents_all(dataset_id) + + if not success: + error_msg = all_documents.get('message', 'Unknown error') if isinstance(all_documents, dict) else all_documents + raise RuntimeError(f"Failed to list remote documents: {error_msg}") + + if not all_documents: + if verbose: + print('No remote documents found.') + return {'ndiId': [], 'apiId': []} + + all_ndi_ids = [doc.get('ndiId') for doc in all_documents] + all_api_ids = [doc.get('id') for doc in all_documents] + + if verbose: + print(f'Total remote documents processed: {len(all_ndi_ids)}.') + + return {'ndiId': all_ndi_ids, 'apiId': all_api_ids} diff --git a/src/ndi/cloud/internal/get_uploaded_file_ids.py b/src/ndi/cloud/internal/get_uploaded_file_ids.py new file mode 100644 index 0000000..3c2f599 --- /dev/null +++ b/src/ndi/cloud/internal/get_uploaded_file_ids.py @@ -0,0 +1,40 @@ +def get_uploaded_file_ids(dataset_id, verbose=False): + """ + Returns a list of uploaded file IDs from the cloud. + + Args: + dataset_id (str): The cloud dataset ID. + verbose (bool): Whether to print progress messages. + + Returns: + list: A list of unique file UIDs. + """ + from ..api.documents import list_dataset_documents_all + + if verbose: + print(f'Listing uploaded files for dataset {dataset_id}...') + + success, all_documents, _, _ = list_dataset_documents_all(dataset_id) + + if not success: + error_msg = all_documents.get('message', 'Unknown error') if isinstance(all_documents, dict) else all_documents + raise RuntimeError(f"Failed to list remote documents for files: {error_msg}") + + file_uids = set() + if all_documents: + for document in all_documents: + # Assuming document structure from cloud: properties are usually top-level or under 'document_properties' depending on API + # Cloud usually returns the document object itself as dict + # Check if 'files' property exists + file_info = document.get('files', {}).get('file_info', []) + for info in file_info: + locations = info.get('locations', []) + for location in locations: + uid = location.get('uid') + if uid: + file_uids.add(uid) + + if verbose: + print(f'Found {len(file_uids)} unique file UIDs.') + + return list(file_uids) diff --git a/src/ndi/cloud/internal/get_weboptions_with_auth_header.py b/src/ndi/cloud/internal/get_weboptions_with_auth_header.py new file mode 100644 index 0000000..2268225 --- /dev/null +++ b/src/ndi/cloud/internal/get_weboptions_with_auth_header.py @@ -0,0 +1,27 @@ +def get_weboptions_with_auth_header(token=None): + """ + Returns the web options (headers) with the authentication header. + + Args: + token (str, optional): The authentication token. If not provided, it will be retrieved using authenticate(). + + Returns: + dict: A dictionary containing the headers. + """ + if token is None: + from ..authenticate import authenticate + token = authenticate() + + if not token: + # Should we raise error? + # Matlab might return empty or error. + # For now, return empty headers or raise error? + # authenticate() prints error if interactive login fails. + pass + + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + + return headers diff --git a/src/ndi/cloud/sync/__init__.py b/src/ndi/cloud/sync/__init__.py index b083b0b..29c71ab 100644 --- a/src/ndi/cloud/sync/__init__.py +++ b/src/ndi/cloud/sync/__init__.py @@ -1,3 +1,6 @@ from .download_new_impl import download_new from .sync_options import SyncOptions from .enum.sync_mode import SyncMode +from .mirror_from_remote import mirror_from_remote +from .mirror_to_remote import mirror_to_remote +from .two_way_sync import two_way_sync diff --git a/src/ndi/cloud/sync/internal/__init__.py b/src/ndi/cloud/sync/internal/__init__.py new file mode 100644 index 0000000..2532f9f --- /dev/null +++ b/src/ndi/cloud/sync/internal/__init__.py @@ -0,0 +1,8 @@ +from .dataset_session_id_from_docs import dataset_session_id_from_docs +from .delete_local_documents import delete_local_documents +from .delete_remote_documents import delete_remote_documents +from .download_ndi_documents import download_ndi_documents +from .file_utils import get_file_uids_from_documents, update_file_info_for_local_files, update_file_info_for_remote_files +from .upload_files_for_dataset_documents import upload_files_for_dataset_documents +from .constants import Constants +from .document_utils import list_local_documents, list_remote_document_ids diff --git a/src/ndi/cloud/sync/internal/dataset_session_id_from_docs.py b/src/ndi/cloud/sync/internal/dataset_session_id_from_docs.py new file mode 100644 index 0000000..d119f37 --- /dev/null +++ b/src/ndi/cloud/sync/internal/dataset_session_id_from_docs.py @@ -0,0 +1,34 @@ +def dataset_session_id_from_docs(documents): + """ + Extracts the session ID from a list of documents. + + Args: + documents (list): A list of NDI documents (objects or dicts). + + Returns: + str: The session ID, or None if not found. + """ + for doc in documents: + # Check if doc is dict or object + if isinstance(doc, dict): + props = doc + elif hasattr(doc, 'document_properties'): + props = doc.document_properties + else: + continue + + # Look for 'ndi_session_id' or similar + # Based on naming convention, session ID might be in 'session' -> 'id' + if 'session' in props: + if isinstance(props['session'], dict) and 'id' in props['session']: + return props['session']['id'] + # Sometimes it might be directly 'session_id' in base? + + if 'base' in props and 'session_id' in props['base']: + return props['base']['session_id'] + + # Or maybe the document itself IS a session document? + if 'ndi_document' in props and props['ndi_document'].get('type') == 'ndi_session': + return props.get('id') or props.get('base', {}).get('id') + + return None diff --git a/src/ndi/cloud/sync/internal/delete_local_documents.py b/src/ndi/cloud/sync/internal/delete_local_documents.py new file mode 100644 index 0000000..2e1cb9c --- /dev/null +++ b/src/ndi/cloud/sync/internal/delete_local_documents.py @@ -0,0 +1,23 @@ +def delete_local_documents(ndi_dataset, document_ids): + """ + Deletes documents from the local dataset. + + Args: + ndi_dataset (ndi.dataset.Dataset): The local dataset object. + document_ids (list): A list of document IDs to delete. + """ + if not document_ids: + return + + # Check if ndi_dataset has database_rm method + if hasattr(ndi_dataset, 'database_rm'): + for doc_id in document_ids: + try: + ndi_dataset.database_rm(doc_id) + except Exception as e: + print(f"Warning: Failed to delete local document {doc_id}: {e}") + else: + # Fallback or error if method is missing? + # Maybe access database directly if available? + # But database_rm is the public interface. + print("Warning: database_rm method not found on dataset object. Cannot delete local documents.") diff --git a/src/ndi/cloud/sync/internal/delete_remote_documents.py b/src/ndi/cloud/sync/internal/delete_remote_documents.py new file mode 100644 index 0000000..ce45ea1 --- /dev/null +++ b/src/ndi/cloud/sync/internal/delete_remote_documents.py @@ -0,0 +1,41 @@ +def delete_remote_documents(cloud_dataset_id, document_ids, verbose=False): + """ + Deletes documents from the remote cloud dataset. + + Args: + cloud_dataset_id (str): The cloud dataset ID. + document_ids (list): A list of cloud document IDs to delete. + verbose (bool): Whether to print progress messages. + + Returns: + tuple: (success, deleted_ids) + """ + from ...api.documents import delete_document + + if not document_ids: + return True, [] + + if verbose: + print(f'Deleting {len(document_ids)} documents from remote dataset {cloud_dataset_id}...') + + success_all = True + deleted_ids = [] + + for doc_id in document_ids: + try: + success, _, _, _ = delete_document(cloud_dataset_id, doc_id) + if success: + deleted_ids.append(doc_id) + else: + success_all = False + if verbose: + print(f'Failed to delete remote document: {doc_id}') + except Exception as e: + success_all = False + if verbose: + print(f'Error deleting remote document {doc_id}: {e}') + + if verbose: + print(f'Deleted {len(deleted_ids)} documents.') + + return success_all, deleted_ids diff --git a/src/ndi/cloud/sync/internal/upload_documents.py b/src/ndi/cloud/sync/internal/upload_documents.py new file mode 100644 index 0000000..d6b0c07 --- /dev/null +++ b/src/ndi/cloud/sync/internal/upload_documents.py @@ -0,0 +1,58 @@ +from ...api.documents import add_document + +def upload_documents(ndi_dataset, cloud_dataset_id, documents, verbose=False): + """ + Uploads document metadata to the cloud. + + Args: + ndi_dataset (ndi.dataset.Dataset): Local dataset. + cloud_dataset_id (str): Cloud dataset ID. + documents (list): List of NDI documents to upload. + verbose (bool): Verbosity. + + Returns: + tuple: (success, uploaded_ids) + """ + if not documents: + return True, [] + + if verbose: + print(f"Uploading {len(documents)} documents to cloud dataset {cloud_dataset_id}...") + + success_all = True + uploaded_ids = [] + + for doc in documents: + # doc is an object or dict? + # add_document expects dict (json structure) + if hasattr(doc, 'document_properties'): + doc_data = doc.document_properties + elif isinstance(doc, dict): + doc_data = doc + else: + # Maybe it has a method to get properties? + # Assuming dict or properties attribute for now. + print(f"Warning: Unknown document format for upload: {type(doc)}") + continue + + try: + # Check if doc has 'base.id' + doc_id = doc_data.get('base', {}).get('id', 'unknown') + + success, answer, _, _ = add_document(cloud_dataset_id, doc_data) + + if success: + uploaded_ids.append(doc_id) + else: + # If 409 Conflict (already exists), we might treat as success or ignore? + # But here we are mirroring/uploading new. + print(f"Failed to upload document {doc_id}: {answer}") + success_all = False + except Exception as e: + print(f"Error uploading document: {e}") + success_all = False + + if verbose: + print(f"Uploaded {len(uploaded_ids)} documents.") + + return success_all, uploaded_ids diff --git a/src/ndi/cloud/sync/internal/upload_files_for_dataset_documents.py b/src/ndi/cloud/sync/internal/upload_files_for_dataset_documents.py new file mode 100644 index 0000000..46a37aa --- /dev/null +++ b/src/ndi/cloud/sync/internal/upload_files_for_dataset_documents.py @@ -0,0 +1,110 @@ +import os +import requests +from ...api.files import get_file_upload_url +from ...internal.get_uploaded_file_ids import get_uploaded_file_ids + +def upload_files_for_dataset_documents(ndi_dataset, cloud_dataset_id, documents, verbose=False): + """ + Uploads files associated with the given documents to the cloud dataset. + + Args: + ndi_dataset (ndi.dataset.Dataset): The local dataset. + cloud_dataset_id (str): The cloud dataset ID. + documents (list): List of NDI documents. + verbose (bool): Whether to print progress messages. + + Returns: + tuple: (success, uploaded_file_uids) + """ + if not documents: + return True, [] + + # Get existing remote files to avoid re-uploading + try: + remote_file_uids = set(get_uploaded_file_ids(cloud_dataset_id, verbose=verbose)) + except Exception as e: + if verbose: + print(f"Warning: Could not fetch remote file list: {e}") + remote_file_uids = set() + + uploaded_file_uids = [] + success_all = True + + for doc in documents: + # Access document properties + if hasattr(doc, 'document_properties'): + props = doc.document_properties + elif isinstance(doc, dict): + props = doc + else: + continue + + file_info = props.get('files', {}).get('file_info', []) + + for info in file_info: + locations = info.get('locations', []) + file_uid = None + local_path = None + + # Find local location and UID + for loc in locations: + if 'uid' in loc: + file_uid = loc['uid'] + + # Check for local file path + # Assuming location_type 'file' or just checking if 'location' is a path + if loc.get('location_type') == 'file' or loc.get('location_type') == 'local': # 'local' is guess + path = loc.get('location') + if path: + if os.path.isfile(path): + local_path = path + elif os.path.isfile(os.path.join(ndi_dataset.path, path)): + local_path = os.path.join(ndi_dataset.path, path) + + if file_uid and local_path: + if file_uid in remote_file_uids: + if verbose: + print(f"File {file_uid} already exists remotely. Skipping.") + continue + + if verbose: + print(f"Uploading file {file_uid} from {local_path}...") + + try: + # Get upload URL + success, answer, response, _ = get_file_upload_url(cloud_dataset_id, file_uid) + + if success: + upload_url = answer.get('url') # Assuming API returns 'url' + if upload_url: + with open(local_path, 'rb') as f: + # PUT to presigned URL + # Content-Type should match? Or generic? + # Usually S3 presigned URLs don't enforce unless signed. + # But let's try generic binary. + upload_response = requests.put(upload_url, data=f) + + if upload_response.status_code in [200, 201]: + if verbose: + print(f"Successfully uploaded file {file_uid}.") + uploaded_file_uids.append(file_uid) + else: + print(f"Failed to upload content for file {file_uid}. Status: {upload_response.status_code}") + success_all = False + else: + print(f"Failed to get upload URL for file {file_uid}. Response missing 'url'.") + success_all = False + else: + print(f"Failed to get upload URL for file {file_uid}. API Error: {answer}") + success_all = False + + except Exception as e: + print(f"Error uploading file {file_uid}: {e}") + success_all = False + elif file_uid and not local_path: + # File defined but not found locally + # Maybe strictly a warning? + # print(f"Warning: File {file_uid} not found locally for document {props.get('base', {}).get('id')}.") + pass + + return success_all, uploaded_file_uids diff --git a/src/ndi/cloud/sync/mirror_from_remote.py b/src/ndi/cloud/sync/mirror_from_remote.py new file mode 100644 index 0000000..38f5cf7 --- /dev/null +++ b/src/ndi/cloud/sync/mirror_from_remote.py @@ -0,0 +1,94 @@ +from .internal.document_utils import list_remote_document_ids, list_local_documents +from .internal.download_ndi_documents import download_ndi_documents +from .internal.delete_local_documents import delete_local_documents +from .internal.index.index_utils import update_sync_index +from ..internal.get_cloud_dataset_id_for_local_dataset import get_cloud_dataset_id_for_local_dataset +from .sync_options import SyncOptions + +def mirror_from_remote(ndi_dataset, sync_options=None): + """ + Mirrors the remote dataset to the local dataset. + This will delete local documents that are not present in the remote dataset, + and download documents from the remote dataset that are missing locally. + + Args: + ndi_dataset (ndi.dataset.Dataset): The local dataset. + sync_options (SyncOptions, optional): Sync options. + + Returns: + tuple: (success, error_message, report) + """ + if sync_options is None: + sync_options = SyncOptions() + + success = True + error_message = '' + report = {'downloaded_document_ids': [], 'deleted_document_ids': []} + + try: + if sync_options.Verbose: + print(f'Mirroring from remote for dataset "{ndi_dataset.path}"...') + + cloud_dataset_id, _ = get_cloud_dataset_id_for_local_dataset(ndi_dataset) + + # List remote documents + remote_docs_map = list_remote_document_ids(cloud_dataset_id, verbose=sync_options.Verbose) + remote_ndi_ids = set(remote_docs_map['ndiId']) + remote_api_ids = remote_docs_map['apiId'] + + # List local documents + _, local_ndi_ids = list_local_documents(ndi_dataset) + local_ndi_ids_set = set(local_ndi_ids) + + # Calculate differences + to_delete = list(local_ndi_ids_set - remote_ndi_ids) + to_download_ndi = list(remote_ndi_ids - local_ndi_ids_set) + + # Find API IDs for download + to_download_api = [] + for ndi_id in to_download_ndi: + try: + idx = remote_docs_map['ndiId'].index(ndi_id) + to_download_api.append(remote_api_ids[idx]) + except ValueError: + pass + + if sync_options.Verbose: + print(f'Found {len(to_delete)} documents to delete locally.') + print(f'Found {len(to_download_ndi)} documents to download from remote.') + + if sync_options.DryRun: + print('[DryRun] Would delete local documents:', to_delete) + print('[DryRun] Would download remote documents:', to_download_ndi) + else: + # Delete local + if to_delete: + delete_local_documents(ndi_dataset, to_delete) + report['deleted_document_ids'] = to_delete + + # Download remote + if to_download_api: + downloaded_docs = download_ndi_documents( + cloud_dataset_id, + to_download_api, + ndi_dataset, + sync_options + ) + if downloaded_docs: + report['downloaded_document_ids'] = [doc.document_properties['base']['id'] for doc in downloaded_docs] + + # Update index + update_sync_index( + ndi_dataset, + cloud_dataset_id, + local_document_ids=list(set(local_ndi_ids) - set(to_delete) | set(report['downloaded_document_ids'])), + remote_document_ids=list(remote_ndi_ids) + ) + + except Exception as e: + success = False + error_message = str(e) + if sync_options.Verbose: + print(f'Error in mirrorFromRemote: {error_message}') + + return success, error_message, report diff --git a/src/ndi/cloud/sync/mirror_to_remote.py b/src/ndi/cloud/sync/mirror_to_remote.py new file mode 100644 index 0000000..c3d6e4e --- /dev/null +++ b/src/ndi/cloud/sync/mirror_to_remote.py @@ -0,0 +1,102 @@ +from .internal.document_utils import list_remote_document_ids, list_local_documents +from .internal.delete_remote_documents import delete_remote_documents +from .internal.upload_documents import upload_documents +from .internal.upload_files_for_dataset_documents import upload_files_for_dataset_documents +from .internal.index.index_utils import update_sync_index +from ..internal.get_cloud_dataset_id_for_local_dataset import get_cloud_dataset_id_for_local_dataset +from .sync_options import SyncOptions + +def mirror_to_remote(ndi_dataset, sync_options=None): + """ + Mirrors the local dataset to the remote dataset. + This will delete remote documents that are not present locally, + and upload documents/files from the local dataset that are missing remotely. + + Args: + ndi_dataset (ndi.dataset.Dataset): The local dataset. + sync_options (SyncOptions, optional): Sync options. + + Returns: + tuple: (success, error_message, report) + """ + if sync_options is None: + sync_options = SyncOptions() + + success = True + error_message = '' + report = {'uploaded_document_ids': [], 'deleted_document_ids': []} + + try: + if sync_options.Verbose: + print(f'Mirroring to remote for dataset "{ndi_dataset.path}"...') + + cloud_dataset_id, _ = get_cloud_dataset_id_for_local_dataset(ndi_dataset) + + # List remote documents + remote_docs_map = list_remote_document_ids(cloud_dataset_id, verbose=sync_options.Verbose) + remote_ndi_ids = set(remote_docs_map['ndiId']) + remote_api_ids = remote_docs_map['apiId'] + + # List local documents + local_docs, local_ndi_ids = list_local_documents(ndi_dataset) + local_ndi_ids_set = set(local_ndi_ids) + + # Calculate differences + to_delete_remote_ndi = list(remote_ndi_ids - local_ndi_ids_set) + to_upload_local_ndi = list(local_ndi_ids_set - remote_ndi_ids) + + # Find API IDs for remote deletion + to_delete_remote_api = [] + for ndi_id in to_delete_remote_ndi: + try: + idx = remote_docs_map['ndiId'].index(ndi_id) + to_delete_remote_api.append(remote_api_ids[idx]) + except ValueError: + pass + + # Find local objects for upload + to_upload_docs = [] + for doc in local_docs: + # Assuming doc has id or we map by index + # list_local_documents returns (docs, ids) in sync + doc_id = doc.document_properties['base']['id'] + if doc_id in to_upload_local_ndi: + to_upload_docs.append(doc) + + if sync_options.Verbose: + print(f'Found {len(to_delete_remote_ndi)} documents to delete remotely.') + print(f'Found {len(to_upload_local_ndi)} documents to upload from local.') + + if sync_options.DryRun: + print('[DryRun] Would delete remote documents:', to_delete_remote_ndi) + print('[DryRun] Would upload local documents:', to_upload_local_ndi) + else: + # Delete remote + if to_delete_remote_api: + delete_remote_documents(cloud_dataset_id, to_delete_remote_api, verbose=sync_options.Verbose) + report['deleted_document_ids'] = to_delete_remote_ndi + + # Upload local + if to_upload_docs: + success_upload, uploaded_ids = upload_documents(ndi_dataset, cloud_dataset_id, to_upload_docs, verbose=sync_options.Verbose) + if success_upload: + report['uploaded_document_ids'] = uploaded_ids + + if sync_options.SyncFiles: + upload_files_for_dataset_documents(ndi_dataset, cloud_dataset_id, to_upload_docs, verbose=sync_options.Verbose) + + # Update index + update_sync_index( + ndi_dataset, + cloud_dataset_id, + local_document_ids=list(local_ndi_ids), + remote_document_ids=list(set(remote_ndi_ids) - set(to_delete_remote_ndi) | set(report['uploaded_document_ids'])) + ) + + except Exception as e: + success = False + error_message = str(e) + if sync_options.Verbose: + print(f'Error in mirrorToRemote: {error_message}') + + return success, error_message, report diff --git a/src/ndi/cloud/sync/two_way_sync.py b/src/ndi/cloud/sync/two_way_sync.py new file mode 100644 index 0000000..d20d54a --- /dev/null +++ b/src/ndi/cloud/sync/two_way_sync.py @@ -0,0 +1,173 @@ +from .internal.document_utils import list_remote_document_ids, list_local_documents +from .internal.download_ndi_documents import download_ndi_documents +from .internal.delete_local_documents import delete_local_documents +from .internal.delete_remote_documents import delete_remote_documents +from .internal.upload_documents import upload_documents +from .internal.upload_files_for_dataset_documents import upload_files_for_dataset_documents +from .internal.index.index_utils import read_sync_index, update_sync_index +from ..internal.get_cloud_dataset_id_for_local_dataset import get_cloud_dataset_id_for_local_dataset +from .sync_options import SyncOptions + +def two_way_sync(ndi_dataset, sync_options=None): + """ + Performs a two-way synchronization between local and remote datasets. + + Args: + ndi_dataset (ndi.dataset.Dataset): The local dataset. + sync_options (SyncOptions, optional): Sync options. + + Returns: + tuple: (success, error_message, report) + """ + if sync_options is None: + sync_options = SyncOptions() + + success = True + error_message = '' + report = { + 'downloaded_document_ids': [], + 'uploaded_document_ids': [], + 'deleted_local_document_ids': [], + 'deleted_remote_document_ids': [] + } + + try: + if sync_options.Verbose: + print(f'Starting two-way sync for dataset "{ndi_dataset.path}"...') + + cloud_dataset_id, _ = get_cloud_dataset_id_for_local_dataset(ndi_dataset) + + # Current State + remote_docs_map = list_remote_document_ids(cloud_dataset_id, verbose=sync_options.Verbose) + current_remote_ids = set(remote_docs_map['ndiId']) + remote_api_ids = remote_docs_map['apiId'] + + local_docs, current_local_ids = list_local_documents(ndi_dataset) + current_local_ids_set = set(current_local_ids) + + # Last Sync State + sync_index = read_sync_index(ndi_dataset) + last_remote_ids = set(sync_index.get('remoteDocumentIdsLastSync', [])) + last_local_ids = set(sync_index.get('localDocumentIdsLastSync', [])) + + # Calculate Deltas + added_remote = current_remote_ids - last_remote_ids + deleted_remote = last_remote_ids - current_remote_ids + + added_local = current_local_ids_set - last_local_ids + deleted_local = last_local_ids - current_local_ids_set + + # Determine Actions + to_download = list(added_remote) + to_upload = list(added_local) + + # Propagate deletions + # If deleted on remote, delete on local (unless it was just added locally?) + to_delete_local = list(deleted_remote - added_local) + + # If deleted on local, delete on remote (unless it was just added remotely?) + to_delete_remote = list(deleted_local - added_remote) + + # Conflict Handling (Intersection of added_remote and added_local) + conflicts = set(to_download) & set(to_upload) + if conflicts: + print(f"Warning: {len(conflicts)} documents added on both sides. Skipping these to avoid overwrite.") + to_download = list(set(to_download) - conflicts) + to_upload = list(set(to_upload) - conflicts) + # Maybe we should download to update local? Or keep local? + # Keeping local is safer for now. + + if sync_options.Verbose: + print(f"To Download: {len(to_download)}") + print(f"To Upload: {len(to_upload)}") + print(f"To Delete Local: {len(to_delete_local)}") + print(f"To Delete Remote: {len(to_delete_remote)}") + + if sync_options.DryRun: + pass # Already printed counts + else: + # Execute Local Deletes + if to_delete_local: + delete_local_documents(ndi_dataset, to_delete_local) + report['deleted_local_document_ids'] = to_delete_local + + # Execute Remote Deletes + if to_delete_remote: + # Map to API IDs + api_ids_to_delete = [] + for nid in to_delete_remote: + if nid in last_remote_ids: # Was in remote last time + # We don't have mapping for deleted remote docs easily if they are gone from current remote + # Wait, we want to delete from remote docs that ARE in current remote + pass + + # Logic correction: deleted_local means it IS in current_remote (if not deleted there too) + # but NOT in current_local. + # So we check if it is in current_remote. + real_delete_remote = [] + for nid in to_delete_remote: + if nid in current_remote_ids: + real_delete_remote.append(nid) + + api_ids_to_delete = [] + for nid in real_delete_remote: + try: + idx = remote_docs_map['ndiId'].index(nid) + api_ids_to_delete.append(remote_api_ids[idx]) + except ValueError: + pass + + if api_ids_to_delete: + delete_remote_documents(cloud_dataset_id, api_ids_to_delete, verbose=sync_options.Verbose) + report['deleted_remote_document_ids'] = real_delete_remote + + # Execute Downloads + if to_download: + # Map to API IDs + api_ids_to_download = [] + for nid in to_download: + try: + idx = remote_docs_map['ndiId'].index(nid) + api_ids_to_download.append(remote_api_ids[idx]) + except ValueError: + pass + + if api_ids_to_download: + downloaded = download_ndi_documents(cloud_dataset_id, api_ids_to_download, ndi_dataset, sync_options) + if downloaded: + report['downloaded_document_ids'] = [d.document_properties['base']['id'] for d in downloaded] + + # Execute Uploads + if to_upload: + docs_to_upload = [d for d in local_docs if d.document_properties['base']['id'] in to_upload] + success_up, uploaded_ids = upload_documents(ndi_dataset, cloud_dataset_id, docs_to_upload, verbose=sync_options.Verbose) + if success_up: + report['uploaded_document_ids'] = uploaded_ids + if sync_options.SyncFiles: + upload_files_for_dataset_documents(ndi_dataset, cloud_dataset_id, docs_to_upload, verbose=sync_options.Verbose) + + # Update Index + # New state is roughly current state + downloaded - deleted local + # But simpler to just list again? Or construct from known changes. + # Listing again is safer but slower. + # We can construct expected state. + + new_local = set(current_local_ids) - set(report['deleted_local_document_ids']) | set(report['downloaded_document_ids']) + new_remote = set(current_remote_ids) - set(report['deleted_remote_document_ids']) | set(report['uploaded_document_ids']) + + update_sync_index( + ndi_dataset, + cloud_dataset_id, + local_document_ids=list(new_local), + remote_document_ids=list(new_remote) + ) + + except Exception as e: + success = False + error_message = str(e) + if sync_options.Verbose: + print(f'Error in twoWaySync: {error_message}') + import traceback + traceback.print_exc() + + return success, error_message, report