Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 85 additions & 78 deletions sync-microservice/app/utils/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
stop_event = threading.Event()
watched_folders: List[FolderIdPath] = []
folder_id_map: Dict[str, str] = {}
state_lock = threading.RLock()


def watcher_util_get_folder_id_if_watched(file_path: str) -> Optional[str]:
Expand Down Expand Up @@ -235,7 +236,8 @@ def watcher_util_get_existing_folders(

def watcher_util_is_watcher_running() -> bool:
"""Check if the watcher thread is running."""
return watcher_thread is not None and watcher_thread.is_alive()
with state_lock:
return watcher_thread is not None and watcher_thread.is_alive()


def watcher_util_start_folder_watcher() -> bool:
Expand All @@ -247,85 +249,88 @@ def watcher_util_start_folder_watcher() -> bool:
"""
global watcher_thread, watched_folders, folder_id_map

if watcher_util_is_watcher_running():
logger.info("Watcher is already running.")
return False

logger.info("Initializing folder watcher...")
logger.debug("Debug logging is enabled")

try:
# Simple synchronous database call
folders = db_get_all_folders_with_ids()
if not folders:
logger.info("No folders found in database")
# Make the "check then mutate global watcher state" atomic.
with state_lock:
if watcher_util_is_watcher_running():
logger.info("Watcher is already running.")
return False

logger.info(f"Found {len(folders)} folders in database")
logger.info("Initializing folder watcher...")
logger.debug("Debug logging is enabled")

# Simple synchronous file system checks
existing_folders = watcher_util_get_existing_folders(folders)
if not existing_folders:
logger.info("No existing folders to watch")
try:
# Simple synchronous database call
folders = db_get_all_folders_with_ids()
if not folders:
logger.info("No folders found in database")
return False

logger.info(f"Found {len(folders)} folders in database")

# Simple synchronous file system checks
existing_folders = watcher_util_get_existing_folders(folders)
if not existing_folders:
logger.info("No existing folders to watch")
return False

watched_folders = existing_folders
folder_id_map = {
folder_path: folder_id for folder_id, folder_path in existing_folders
}

folder_paths = [folder_path for _, folder_path in existing_folders]

logger.info(f"Starting to watch {len(folder_paths)} folders:")
for folder_id, folder_path in existing_folders:
logger.info(f" - {folder_path} (ID: {folder_id})")

# Reset stop event and start background thread
stop_event.clear()
watcher_thread = threading.Thread(
target=watcher_util_watcher_worker,
args=(folder_paths,),
daemon=True, # Dies when main program exits
)
watcher_thread.start()

logger.info("Folder watcher started successfully")
return True

except Exception as e:
logger.error(f"Error starting folder watcher: {e}")
return False

watched_folders = existing_folders
folder_id_map = {
folder_path: folder_id for folder_id, folder_path in existing_folders
}

folder_paths = [folder_path for _, folder_path in existing_folders]

logger.info(f"Starting to watch {len(folder_paths)} folders:")
for folder_id, folder_path in existing_folders:
logger.info(f" - {folder_path} (ID: {folder_id})")

# Reset stop event and start background thread
stop_event.clear()
watcher_thread = threading.Thread(
target=watcher_util_watcher_worker,
args=(folder_paths,),
daemon=True, # Dies when main program exits
)
watcher_thread.start()

logger.info("Folder watcher started successfully")
return True

except Exception as e:
logger.error(f"Error starting folder watcher: {e}")
return False


def watcher_util_stop_folder_watcher() -> None:
"""Stop the folder watcher."""
global watcher_thread, watched_folders, folder_id_map

if not watcher_util_is_watcher_running():
logger.info("Watcher is not running")
return
with state_lock:
if not watcher_util_is_watcher_running():
logger.info("Watcher is not running")
return

try:
logger.info("Stopping folder watcher...")
try:
logger.info("Stopping folder watcher...")

# Signal the watcher to stop
stop_event.set()
# Signal the watcher to stop
stop_event.set()

# Wait for thread to finish
watcher_thread.join(timeout=5.0)
# Wait for thread to finish
watcher_thread.join(timeout=5.0)

if watcher_thread.is_alive():
logger.warning("Warning: Watcher thread did not stop gracefully")
else:
logger.info("Watcher stopped successfully")
if watcher_thread.is_alive():
logger.warning("Warning: Watcher thread did not stop gracefully")
else:
logger.info("Watcher stopped successfully")

except Exception as e:
logger.error(f"Error stopping watcher: {e}")
finally:
watcher_thread = None
# Clear state
watched_folders = []
folder_id_map = {}
except Exception as e:
logger.error(f"Error stopping watcher: {e}")
finally:
watcher_thread = None
# Clear state
watched_folders = []
folder_id_map = {}


def watcher_util_restart_folder_watcher() -> bool:
Expand All @@ -335,23 +340,25 @@ def watcher_util_restart_folder_watcher() -> bool:
Returns:
True if restart was successful, False otherwise
"""
logger.info("Restarting folder watcher...")
watcher_util_stop_folder_watcher()
return watcher_util_start_folder_watcher()
with state_lock:
logger.info("Restarting folder watcher...")
watcher_util_stop_folder_watcher()
return watcher_util_start_folder_watcher()


def watcher_util_get_watcher_info() -> dict:
"""Get information about the current watcher state."""
return {
"is_running": watcher_util_is_watcher_running(),
"folders_count": len(watched_folders),
"thread_alive": watcher_thread.is_alive() if watcher_thread else False,
"thread_id": watcher_thread.ident if watcher_thread else None,
"watched_folders": [
{"id": folder_id, "path": folder_path}
for folder_id, folder_path in watched_folders
],
}
with state_lock:
return {
"is_running": watcher_util_is_watcher_running(),
"folders_count": len(watched_folders),
"thread_alive": watcher_thread.is_alive() if watcher_thread else False,
"thread_id": watcher_thread.ident if watcher_thread else None,
"watched_folders": [
{"id": folder_id, "path": folder_path}
for folder_id, folder_path in watched_folders
],
}


def watcher_util_wait_for_watcher() -> None:
Expand Down
Loading