From f9c885218c13a9e23141efd58a2e1d9fe190e6af Mon Sep 17 00:00:00 2001 From: Vadim Peretokin Date: Fri, 30 Jan 2026 12:10:21 +0100 Subject: [PATCH] Persist MediaPipe model across camera cycles --- postured/app.py | 2 +- postured/pose_detector.py | 69 ++++++++++++++++++++++++--------------- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/postured/app.py b/postured/app.py index 439b8f3..3e6b62d 100644 --- a/postured/app.py +++ b/postured/app.py @@ -556,7 +556,7 @@ def _on_screen_removed(self, screen: QScreen): def shutdown(self): """Clean up resources for graceful shutdown.""" - self.pose_detector.stop() + self.pose_detector.close() self.overlay.cleanup() def _quit(self): diff --git a/postured/pose_detector.py b/postured/pose_detector.py index b432709..a45bb2d 100644 --- a/postured/pose_detector.py +++ b/postured/pose_detector.py @@ -1,4 +1,5 @@ import threading +import time import cv2 import mediapipe as mp @@ -30,9 +31,11 @@ class PoseWorker(QObject): MIN_FRAME_VARIANCE = 20.0 # detect blank frames (e.g. hardware privacy switch) MIN_CONFIDENCE = 0.5 - def __init__(self, model_path: Path, camera_index: int, debug: bool = False): + def __init__( + self, landmarker: PoseLandmarker, camera_index: int, debug: bool = False + ): super().__init__() - self.model_path = model_path + self.landmarker = landmarker self.camera_index = camera_index self.debug = debug self._stop_event = threading.Event() @@ -41,32 +44,12 @@ def __init__(self, model_path: Path, camera_index: int, debug: bool = False): def run(self): """Main loop - runs in background thread.""" - if not self.model_path.exists(): - self.error.emit(f"Model file not found: {self.model_path}") - return - - options = PoseLandmarkerOptions( - base_options=BaseOptions(model_asset_path=str(self.model_path)), - running_mode=RunningMode.VIDEO, - num_poses=1, - min_pose_detection_confidence=self.MIN_CONFIDENCE, - min_pose_presence_confidence=self.MIN_CONFIDENCE, - min_tracking_confidence=self.MIN_CONFIDENCE, - ) - try: - landmarker = PoseLandmarker.create_from_options(options) - except Exception as e: - self.error.emit(f"Failed to load pose model: {e}") - return - capture = cv2.VideoCapture(self.camera_index) if not capture.isOpened(): self.error.emit("Failed to open camera") - landmarker.close() return self._stop_event.clear() - frame_timestamp = 0 consecutive_failures = 0 camera_lost = False @@ -103,8 +86,8 @@ def run(self): rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame) - frame_timestamp += int(self.FRAME_INTERVAL_S * 1000) - results = landmarker.detect_for_video(mp_image, frame_timestamp) + frame_timestamp = int(time.monotonic() * 1000) + results = self.landmarker.detect_for_video(mp_image, frame_timestamp) if results.pose_landmarks: landmarks = results.pose_landmarks[0] @@ -118,7 +101,6 @@ def run(self): self._stop_event.wait(self.FRAME_INTERVAL_S) capture.release() - landmarker.close() def stop(self): self._stop_event.set() @@ -147,16 +129,42 @@ def __init__(self, parent=None, debug: bool = False): self.thread: QThread | None = None self.worker: PoseWorker | None = None self.debug = debug - self.model_path = ( + self._model_path = ( Path(__file__).parent / "resources" / "pose_landmarker_lite.task" ) + self._landmarker: PoseLandmarker | None = None + + def _get_or_create_landmarker(self) -> PoseLandmarker: + """Lazily create the landmarker on first use.""" + if self._landmarker is None: + if not self._model_path.exists(): + raise FileNotFoundError(f"Model file not found: {self._model_path}") + options = PoseLandmarkerOptions( + base_options=BaseOptions(model_asset_path=str(self._model_path)), + running_mode=RunningMode.VIDEO, + num_poses=1, + min_pose_detection_confidence=PoseWorker.MIN_CONFIDENCE, + min_pose_presence_confidence=PoseWorker.MIN_CONFIDENCE, + min_tracking_confidence=PoseWorker.MIN_CONFIDENCE, + ) + self._landmarker = PoseLandmarker.create_from_options(options) + return self._landmarker def start(self, camera_index: int = 0): if self.thread is not None: self.stop() + try: + landmarker = self._get_or_create_landmarker() + except FileNotFoundError as e: + self.camera_error.emit(str(e)) + return + except Exception as e: + self.camera_error.emit(f"Failed to load pose model: {e}") + return + self.thread = QThread() - self.worker = PoseWorker(self.model_path, camera_index, self.debug) + self.worker = PoseWorker(landmarker, camera_index, self.debug) self.worker.moveToThread(self.thread) self.thread.started.connect(self.worker.run) @@ -177,6 +185,13 @@ def stop(self): self.thread = None self.worker = None + def close(self): + """Close the landmarker (call on app shutdown).""" + self.stop() + if self._landmarker: + self._landmarker.close() + self._landmarker = None + @staticmethod def available_cameras() -> list[tuple[int, str]]: """Return list of (index, name) for available cameras."""