diff --git a/sync-microservice/app/utils/watcher.py b/sync-microservice/app/utils/watcher.py index 902a50cf5..b882dd996 100644 --- a/sync-microservice/app/utils/watcher.py +++ b/sync-microservice/app/utils/watcher.py @@ -26,6 +26,7 @@ stop_event = threading.Event() watched_folders: List[FolderIdPath] = [] folder_id_map: Dict[str, str] = {} +state_lock = threading.RLock() def watcher_util_get_folder_id_if_watched(file_path: str) -> Optional[str]: @@ -235,7 +236,8 @@ def watcher_util_get_existing_folders( def watcher_util_is_watcher_running() -> bool: """Check if the watcher thread is running.""" - return watcher_thread is not None and watcher_thread.is_alive() + with state_lock: + return watcher_thread is not None and watcher_thread.is_alive() def watcher_util_start_folder_watcher() -> bool: @@ -247,85 +249,88 @@ def watcher_util_start_folder_watcher() -> bool: """ global watcher_thread, watched_folders, folder_id_map - if watcher_util_is_watcher_running(): - logger.info("Watcher is already running.") - return False - - logger.info("Initializing folder watcher...") - logger.debug("Debug logging is enabled") - - try: - # Simple synchronous database call - folders = db_get_all_folders_with_ids() - if not folders: - logger.info("No folders found in database") + # Make the "check then mutate global watcher state" atomic. + with state_lock: + if watcher_util_is_watcher_running(): + logger.info("Watcher is already running.") return False - logger.info(f"Found {len(folders)} folders in database") + logger.info("Initializing folder watcher...") + logger.debug("Debug logging is enabled") - # Simple synchronous file system checks - existing_folders = watcher_util_get_existing_folders(folders) - if not existing_folders: - logger.info("No existing folders to watch") + try: + # Simple synchronous database call + folders = db_get_all_folders_with_ids() + if not folders: + logger.info("No folders found in database") + return False + + logger.info(f"Found {len(folders)} folders in database") + + # Simple synchronous file system checks + existing_folders = watcher_util_get_existing_folders(folders) + if not existing_folders: + logger.info("No existing folders to watch") + return False + + watched_folders = existing_folders + folder_id_map = { + folder_path: folder_id for folder_id, folder_path in existing_folders + } + + folder_paths = [folder_path for _, folder_path in existing_folders] + + logger.info(f"Starting to watch {len(folder_paths)} folders:") + for folder_id, folder_path in existing_folders: + logger.info(f" - {folder_path} (ID: {folder_id})") + + # Reset stop event and start background thread + stop_event.clear() + watcher_thread = threading.Thread( + target=watcher_util_watcher_worker, + args=(folder_paths,), + daemon=True, # Dies when main program exits + ) + watcher_thread.start() + + logger.info("Folder watcher started successfully") + return True + + except Exception as e: + logger.error(f"Error starting folder watcher: {e}") return False - watched_folders = existing_folders - folder_id_map = { - folder_path: folder_id for folder_id, folder_path in existing_folders - } - - folder_paths = [folder_path for _, folder_path in existing_folders] - - logger.info(f"Starting to watch {len(folder_paths)} folders:") - for folder_id, folder_path in existing_folders: - logger.info(f" - {folder_path} (ID: {folder_id})") - - # Reset stop event and start background thread - stop_event.clear() - watcher_thread = threading.Thread( - target=watcher_util_watcher_worker, - args=(folder_paths,), - daemon=True, # Dies when main program exits - ) - watcher_thread.start() - - logger.info("Folder watcher started successfully") - return True - - except Exception as e: - logger.error(f"Error starting folder watcher: {e}") - return False - def watcher_util_stop_folder_watcher() -> None: """Stop the folder watcher.""" global watcher_thread, watched_folders, folder_id_map - if not watcher_util_is_watcher_running(): - logger.info("Watcher is not running") - return + with state_lock: + if not watcher_util_is_watcher_running(): + logger.info("Watcher is not running") + return - try: - logger.info("Stopping folder watcher...") + try: + logger.info("Stopping folder watcher...") - # Signal the watcher to stop - stop_event.set() + # Signal the watcher to stop + stop_event.set() - # Wait for thread to finish - watcher_thread.join(timeout=5.0) + # Wait for thread to finish + watcher_thread.join(timeout=5.0) - if watcher_thread.is_alive(): - logger.warning("Warning: Watcher thread did not stop gracefully") - else: - logger.info("Watcher stopped successfully") + if watcher_thread.is_alive(): + logger.warning("Warning: Watcher thread did not stop gracefully") + else: + logger.info("Watcher stopped successfully") - except Exception as e: - logger.error(f"Error stopping watcher: {e}") - finally: - watcher_thread = None - # Clear state - watched_folders = [] - folder_id_map = {} + except Exception as e: + logger.error(f"Error stopping watcher: {e}") + finally: + watcher_thread = None + # Clear state + watched_folders = [] + folder_id_map = {} def watcher_util_restart_folder_watcher() -> bool: @@ -335,23 +340,25 @@ def watcher_util_restart_folder_watcher() -> bool: Returns: True if restart was successful, False otherwise """ - logger.info("Restarting folder watcher...") - watcher_util_stop_folder_watcher() - return watcher_util_start_folder_watcher() + with state_lock: + logger.info("Restarting folder watcher...") + watcher_util_stop_folder_watcher() + return watcher_util_start_folder_watcher() def watcher_util_get_watcher_info() -> dict: """Get information about the current watcher state.""" - return { - "is_running": watcher_util_is_watcher_running(), - "folders_count": len(watched_folders), - "thread_alive": watcher_thread.is_alive() if watcher_thread else False, - "thread_id": watcher_thread.ident if watcher_thread else None, - "watched_folders": [ - {"id": folder_id, "path": folder_path} - for folder_id, folder_path in watched_folders - ], - } + with state_lock: + return { + "is_running": watcher_util_is_watcher_running(), + "folders_count": len(watched_folders), + "thread_alive": watcher_thread.is_alive() if watcher_thread else False, + "thread_id": watcher_thread.ident if watcher_thread else None, + "watched_folders": [ + {"id": folder_id, "path": folder_path} + for folder_id, folder_path in watched_folders + ], + } def watcher_util_wait_for_watcher() -> None: