diff --git a/scripts/export_to_onnx.py b/scripts/export_to_onnx.py index 8420db2b..38417a45 100644 --- a/scripts/export_to_onnx.py +++ b/scripts/export_to_onnx.py @@ -1,11 +1,9 @@ -import os -import sys import argparse from pathlib import Path import torch -from src.inference.utils.inference_factory import InferenceFactory, InferenceConfig, Backend, ModelArch +from src.inference.utils.inference_factory import Backend, InferenceConfig, InferenceFactory, ModelArch from src.path_utils import ensure_clean_directory NUM_CLASSES = 83 @@ -13,6 +11,7 @@ DEVICE = "cpu" MODELS_DIR_PATH = Path("models") + def main(model_name: str): arch = ModelArch.RESNET if "resnet" in model_name.lower() else ModelArch.MOBILENET backend = Backend.PYTORCH diff --git a/scripts/onnx_validation.py b/scripts/onnx_validation.py index 6fe64c81..e98c4312 100644 --- a/scripts/onnx_validation.py +++ b/scripts/onnx_validation.py @@ -1,5 +1,4 @@ import os -import argparse from pathlib import Path import numpy as np @@ -7,9 +6,9 @@ from PIL import Image from dataset.optimal_class_mapping import MODEL_NAMES as ID_TO_NAME -from src.inference.utils.inference_factory import InferenceFactory, InferenceConfig, Backend, ModelArch from src.inference.base.classifier_inference_base import ClassifierInferenceBase from src.inference.base.classifier_inference_base_onnx import OnnxClassifierInferenceBase +from src.inference.utils.inference_factory import Backend, InferenceConfig, InferenceFactory, ModelArch NUM_CLASSES = 83 INPUT_SIZE = (256, 256) diff --git a/services/consumer/consumer.py b/services/consumer/consumer.py index ec2b3cf6..981362c2 100644 --- a/services/consumer/consumer.py +++ b/services/consumer/consumer.py @@ -1,28 +1,32 @@ -import paho.mqtt.client as mqtt -import pymongo import json from datetime import datetime +import paho.mqtt.client as mqtt +import pymongo + + def on_connect(client, userdata, flags, rc): print(f"šŸ”— Connected: {rc}") client.subscribe("deepstream/predictions") + def on_message(client, userdata, msg): try: payload = json.loads(msg.payload.decode()) payload["received_at"] = datetime.now().isoformat() - + mongo_client = pymongo.MongoClient("mongodb://agstream_mongo:27017/") db = mongo_client["agstream"] collection = db["predictions"] - + collection.insert_one(payload) - print(f"šŸ’¾ Saved to MongoDB!") + print("šŸ’¾ Saved to MongoDB!") mongo_client.close() - + except Exception as e: print(f"āŒ Error: {e}") + client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message diff --git a/services/consumer/mqtt_consumer.py b/services/consumer/mqtt_consumer.py index bdafae45..28bc3010 100644 --- a/services/consumer/mqtt_consumer.py +++ b/services/consumer/mqtt_consumer.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 -import paho.mqtt.client as mqtt -import pymongo import json from datetime import datetime +import paho.mqtt.client as mqtt +import pymongo + MQTT_BROKER = "agstream_mosquitto" MQTT_PORT = 1883 MQTT_TOPIC = "deepstream/predictions" @@ -11,49 +12,49 @@ MONGO_DB = "agstream" MONGO_COLLECTION = "predictions" + def on_connect(client, userdata, flags, rc): print(f"āœ… Connected to MQTT broker with result code {rc}") client.subscribe(MQTT_TOPIC) + def on_message(client, userdata, msg): try: payload = json.loads(msg.payload.decode()) payload["received_at"] = datetime.now().isoformat() - + # Extract classification data from nvmsgbroker format obj = payload.get("object", {}) if obj.get("id") != "0" and obj.get("id"): class_id = int(obj["id"]) confidence = obj.get("confidence", 0) - + # Add extracted classification - payload["classification"] = { - "class_id": class_id, - "confidence": confidence - } + payload["classification"] = {"class_id": class_id, "confidence": confidence} print(f"🌱 FOUND CLASSIFICATION: ID {class_id}, confidence {confidence:.3f}") - + mongo_client = pymongo.MongoClient(MONGO_URI) db = mongo_client[MONGO_DB] collection = db[MONGO_COLLECTION] - + result = collection.insert_one(payload) - + if "classification" in payload: - print(f"āœ… Saved classification to MongoDB!") + print("āœ… Saved classification to MongoDB!") else: - print(f"āœ… Saved: No classification") - + print("āœ… Saved: No classification") + mongo_client.close() - + except Exception as e: print(f"āŒ Error: {e}") + if __name__ == "__main__": client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message - + print("šŸš€ Starting Enhanced MQTT Consumer...") client.connect(MQTT_BROKER, MQTT_PORT, 60) client.loop_forever() diff --git a/src/deepstream/helpers/load_class_labels.py b/src/deepstream/helpers/load_class_labels.py index 1b347215..4e71d735 100644 --- a/src/deepstream/helpers/load_class_labels.py +++ b/src/deepstream/helpers/load_class_labels.py @@ -2,6 +2,7 @@ PLANT_LABELS = "/workspace/configs/crop_and_weed_83_classes.txt" + # Load class labels def load_class_labels() -> List[str]: try: diff --git a/src/deepstream/helpers/meta_tensor_extractor.py b/src/deepstream/helpers/meta_tensor_extractor.py index 486b58c9..1addc756 100644 --- a/src/deepstream/helpers/meta_tensor_extractor.py +++ b/src/deepstream/helpers/meta_tensor_extractor.py @@ -1,8 +1,10 @@ import ctypes -import numpy.typing as npt + import numpy as np +import numpy.typing as npt import pyds + class TensorExtractor: def extract_logits(self, tensor_meta) -> npt.NDArray[np.float32]: """ @@ -13,10 +15,7 @@ def extract_logits(self, tensor_meta) -> npt.NDArray[np.float32]: dims = [layer.dims.d[i] for i in range(layer.dims.numDims)] numel = int(np.prod(dims)) - ptr = ctypes.cast( - pyds.get_ptr(layer.buffer), - ctypes.POINTER(ctypes.c_float) - ) + ptr = ctypes.cast(pyds.get_ptr(layer.buffer), ctypes.POINTER(ctypes.c_float)) logits = np.ctypeslib.as_array(ptr, shape=(numel,)) # Copy so we are not tied to DeepStream's memory lifetime diff --git a/src/deepstream/helpers/plant_msg_meta_builder.py b/src/deepstream/helpers/plant_msg_meta_builder.py index e8867686..433b7497 100644 --- a/src/deepstream/helpers/plant_msg_meta_builder.py +++ b/src/deepstream/helpers/plant_msg_meta_builder.py @@ -1,14 +1,12 @@ import sys -from pydantic import BaseModel import gi -gi.require_version("Gst", "1.0") - -from gi.repository import Gst import pyds +from pydantic import BaseModel from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction + class PlantEvent(BaseModel): frame_id: int plant_id: str diff --git a/src/deepstream/helpers/remove_background.py b/src/deepstream/helpers/remove_background.py index f5e08d36..992c3165 100644 --- a/src/deepstream/helpers/remove_background.py +++ b/src/deepstream/helpers/remove_background.py @@ -1,8 +1,9 @@ -import numpy as np import cv2 +import numpy as np MORPH_KERNEL = 34 + def remove_background(frame_bgr: np.ndarray) -> np.ndarray: rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB).astype(np.float32) R, G, B = rgb[..., 0], rgb[..., 1], rgb[..., 2] @@ -12,10 +13,7 @@ def remove_background(frame_bgr: np.ndarray) -> np.ndarray: _, mask = cv2.threshold(exg_norm, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) - kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, - (MORPH_KERNEL, MORPH_KERNEL) - ) + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (MORPH_KERNEL, MORPH_KERNEL)) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) out = np.zeros_like(frame_bgr) diff --git a/src/deepstream/helpers/should_skip_frame.py b/src/deepstream/helpers/should_skip_frame.py index d22900ac..38f459a7 100644 --- a/src/deepstream/helpers/should_skip_frame.py +++ b/src/deepstream/helpers/should_skip_frame.py @@ -1,19 +1,22 @@ -from typing import Any from enum import Enum +from typing import Any + +import cv2 import gi import numpy as np import pyds -import cv2 gi.require_version("Gst", "1.0") from gi.repository import Gst from src.frame_comparison.frame_change_detector import FrameChangeDetector + class FrameProcessDecision(str, Enum): PROCESS = "process" SKIP = "skip" + def should_skip_frame(info: Any, frame_meta: Any, batch_meta: Any, frame_change_detector: FrameChangeDetector) -> int: """Pad probe to drop frames based on frame difference analysis.""" gst_buffer = info.get_buffer() @@ -25,12 +28,8 @@ def should_skip_frame(info: Any, frame_meta: Any, batch_meta: Any, frame_change_ frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR) should_process, metrics = frame_change_detector.should_process(frame_bgr) - - decision = ( - FrameProcessDecision.PROCESS - if should_process - else FrameProcessDecision.SKIP - ) + + decision = FrameProcessDecision.PROCESS if should_process else FrameProcessDecision.SKIP print( f"Frame {frame_meta.frame_num:05d}: {decision.value} " diff --git a/src/deepstream/helpers/softmax_topk_classifier.py b/src/deepstream/helpers/softmax_topk_classifier.py index cd10e149..20f47f54 100644 --- a/src/deepstream/helpers/softmax_topk_classifier.py +++ b/src/deepstream/helpers/softmax_topk_classifier.py @@ -1,5 +1,6 @@ -import numpy as np from typing import List + +import numpy as np from pydantic import BaseModel @@ -19,7 +20,7 @@ def predict_from_logits(self, logits: np.ndarray) -> List[ClassificationPredicti exp = np.exp(logits - np.max(logits)) probs = exp / np.sum(exp) - top_idx = np.argsort(probs)[-self.top_k:][::-1] + top_idx = np.argsort(probs)[-self.top_k :][::-1] results: List[ClassificationPrediction] = [] for idx in top_idx: diff --git a/src/deepstream/pipelines/access_metadata.py b/src/deepstream/pipelines/access_metadata.py index fe70d0f8..4a800c90 100644 --- a/src/deepstream/pipelines/access_metadata.py +++ b/src/deepstream/pipelines/access_metadata.py @@ -10,7 +10,7 @@ import os import pyds -from gi.repository import GLib, Gst +from gi.repository import Gst from src.deepstream.helpers.pipeline_runner import run_pipeline @@ -22,6 +22,7 @@ MESSAGE_TEMPLATE = "Frame processed: {}" os.makedirs(OUTPUT_DIR, exist_ok=True) + def modify_metadata(frame_meta, batch_meta, message_template=MESSAGE_TEMPLATE): """ Modify the metadata by adding user-defined metadata. @@ -79,9 +80,7 @@ def read_metadata(frame_meta): class_id = label_info.result_class_id confidence = label_info.result_prob - print( - f"[Frame {frame_meta.frame_num}] Class ID: {class_id}, Confidence: {confidence:.4f}" - ) + print(f"[Frame {frame_meta.frame_num}] Class ID: {class_id}, Confidence: {confidence:.4f}") def access_metadata(pad, info, u_data): @@ -189,8 +188,8 @@ def on_pad_added_decode(src, pad): return pipeline + if __name__ == "__main__": Gst.init(None) pipeline = build_pipeline() run_pipeline(pipeline) - \ No newline at end of file diff --git a/src/deepstream/pipelines/access_raw_frames_pipeline.py b/src/deepstream/pipelines/access_raw_frames_pipeline.py index 5541debb..f033c27f 100644 --- a/src/deepstream/pipelines/access_raw_frames_pipeline.py +++ b/src/deepstream/pipelines/access_raw_frames_pipeline.py @@ -11,7 +11,7 @@ import gi import numpy as np import pyds -from gi.repository import GLib, Gst +from gi.repository import Gst gi.require_version("Gst", "1.0") @@ -36,6 +36,7 @@ def frames_manipulation(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeRet np.copyto(np.array(surface, copy=False, order="C"), frame) return Gst.PadProbeReturn.OK + def build_pipeline() -> Gst.Pipeline: pipeline: Gst.Pipeline = Gst.Pipeline.new("simple-pipeline") @@ -71,9 +72,7 @@ def build_pipeline() -> Gst.Pipeline: rtspsrc.set_property("location", RTSP_URL) rtspsrc.set_property("latency", 200) - capsfilter.set_property( - "caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA") - ) + capsfilter.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")) streammux.set_property("batch-size", 1) streammux.set_property("width", 640) streammux.set_property("height", 480) @@ -121,6 +120,7 @@ def on_decode_pad_added(src: Gst.Element, new_pad: Gst.Pad) -> None: srcpad.add_probe(Gst.PadProbeType.BUFFER, frames_manipulation) return pipeline + def main() -> None: os.makedirs(OUTPUT_DIR, exist_ok=True) Gst.init(None) diff --git a/src/deepstream/pipelines/cpu_frames_skipping_pipeline.py b/src/deepstream/pipelines/cpu_frames_skipping_pipeline.py index 08926515..0817c3ff 100644 --- a/src/deepstream/pipelines/cpu_frames_skipping_pipeline.py +++ b/src/deepstream/pipelines/cpu_frames_skipping_pipeline.py @@ -1,19 +1,14 @@ -import sys import os - from typing import Any import gi -import numpy as np -import pyds gi.require_version("Gst", "1.0") -import cv2 -from gi.repository import GLib, Gst +from gi.repository import Gst +from src.deepstream.helpers.pipeline_runner import run_pipeline from src.deepstream.probes.frame_comparison.cpu.frame_skipping_probe import frame_skip_probe from src.frame_comparison.frame_change_detector import FrameChangeDetector -from src.deepstream.helpers.pipeline_runner import run_pipeline rtsp_port = os.environ.get("RTSP_PORT", "8554") RTSP_URL = f"rtsp://127.0.0.1:{rtsp_port}/test" @@ -108,6 +103,7 @@ def on_pad_added_decode(src: Any, pad: Any) -> None: return pipeline + if __name__ == "__main__": Gst.init(None) pipeline = build_pipeline() diff --git a/src/deepstream/pipelines/deepstream_image_utils.py b/src/deepstream/pipelines/deepstream_image_utils.py index 05a6ec1b..baad28ed 100644 --- a/src/deepstream/pipelines/deepstream_image_utils.py +++ b/src/deepstream/pipelines/deepstream_image_utils.py @@ -4,6 +4,7 @@ from pathlib import Path from gi.repository import Gst + from src.path_utils import ensure_clean_directory # noqa: E402 Gst.init(None) diff --git a/src/deepstream/pipelines/deepstream_pipeline_background_removal_cpu.py b/src/deepstream/pipelines/deepstream_pipeline_background_removal_cpu.py index 03bc8768..8e135705 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_background_removal_cpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_background_removal_cpu.py @@ -1,20 +1,18 @@ -from typing import Optional import os -from datetime import datetime import gi -gi.require_version("Gst", "1.0") -from gi.repository import GLib, Gst +gi.require_version("Gst", "1.0") import cv2 import numpy as np import pyds +from gi.repository import GLib, Gst from src.deepstream.helpers.pipeline_runner import run_pipeline rtsp_port = os.environ.get("RTSP_PORT", "8554") RTSP_URL = f"rtsp://127.0.0.1:{rtsp_port}/test" -OUTPUT_DIR = f"/workspace/output/frames/" +OUTPUT_DIR = "/workspace/output/frames/" MORPH_KERNEL = 34 STRICT_ERRORS = True diff --git a/src/deepstream/pipelines/deepstream_pipeline_background_removal_gpu.py b/src/deepstream/pipelines/deepstream_pipeline_background_removal_gpu.py index 29c3278a..7381be79 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_background_removal_gpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_background_removal_gpu.py @@ -1,9 +1,8 @@ -import sys import os - -from typing import Optional from datetime import datetime + import gi + from src.deepstream.probes.background_removal.gpu import gpu_background_probe gi.require_version("Gst", "1.0") @@ -19,6 +18,7 @@ os.makedirs(OUTPUT_DIR, exist_ok=True) + def _post_error_from_pad(pad: Gst.Pad, text: str, debug: str = "") -> None: """Post an error message to the GStreamer bus from a pad.""" elem = pad.get_parent_element() @@ -28,6 +28,7 @@ def _post_error_from_pad(pad: Gst.Pad, text: str, debug: str = "") -> None: msg = Gst.Message.new_error(elem, gerr, debug) elem.post_message(msg) + def background_removal_gpu_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: """GPU pad probe that applies background removal to each frame.""" gst_buffer = info.get_buffer() @@ -40,7 +41,7 @@ def background_removal_gpu_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.Pa try: buffer_ptr: int = hash(gst_buffer) batch_id: int = 0 - + success: bool = gpu_background_probe.remove_background_gpu_probe_pipeline(buffer_ptr, batch_id) if not success: _post_error_from_pad(pad, "GPU background removal failed") @@ -56,6 +57,7 @@ def background_removal_gpu_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.Pa return Gst.PadProbeReturn.OK + def build_pipeline() -> Gst.Pipeline: """Build the GStreamer pipeline for RTSP → GPU background removal → output.""" pipeline = Gst.Pipeline.new("rtsp-gpu-background-removal") @@ -116,6 +118,7 @@ def on_pad_added_rtspsrc(src: Gst.Element, pad: Gst.Pad) -> None: return pipeline + if __name__ == "__main__": Gst.init(None) pipeline = build_pipeline() diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu.py index 9f4b15b0..6c7c6533 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_cpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu.py @@ -1,26 +1,21 @@ import os -from datetime import datetime -from typing import Any, Dict, List +from typing import Any + import gi -import numpy as np -import cv2 -import pyds gi.require_version("Gst", "1.0") -from gi.repository import GLib, Gst - -from src.frame_comparison.frame_change_detector import FrameChangeDetector -from src.model_conversion.onnx_to_trt import build_engine_if_missing +from gi.repository import Gst from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor -from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier -from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent from src.deepstream.helpers.pipeline_runner import run_pipeline - +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder +from src.deepstream.helpers.softmax_topk_classifier import SoftmaxTopKClassifier from src.deepstream.probes.background_removal.cpu.background_removal_probe import remove_background_probe from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe from src.deepstream.probes.frame_comparison.cpu.frame_skipping_probe import frame_skip_probe +from src.frame_comparison.frame_change_detector import FrameChangeDetector +from src.model_conversion.onnx_to_trt import build_engine_if_missing # Configuration RTSP_PORT = os.environ.get("RTSP_PORT", "8554") @@ -33,6 +28,7 @@ CLASS_LABELS = load_class_labels() + def build_pipeline() -> Gst.Pipeline: """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" pipeline = Gst.Pipeline.new("final-cpu-pipeline") @@ -127,6 +123,7 @@ def on_pad_added_decode(src: Any, pad: Any) -> None: return pipeline + if __name__ == "__main__": Gst.init(None) build_engine_if_missing(CONFIG_FILE) diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py index 14dded4f..189570cf 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py @@ -1,26 +1,23 @@ import os -from datetime import datetime -from typing import Any, Dict, List +from typing import Any, Dict + import gi -import numpy as np -import cv2 -import pyds gi.require_version("Gst", "1.0") -from gi.repository import GLib, Gst - -from src.frame_comparison.frame_change_detector import FrameChangeDetector -from src.model_conversion.onnx_to_trt import build_engine_if_missing +from gi.repository import Gst from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor -from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier -from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent from src.deepstream.helpers.pipeline_runner import run_pipeline - +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder +from src.deepstream.helpers.softmax_topk_classifier import SoftmaxTopKClassifier from src.deepstream.probes.background_removal.cpu.background_removal_probe import remove_background_probe from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe -from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import frame_skipping_probe, GPUFrameChangeDetector +from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import ( + GPUFrameChangeDetector, + frame_skipping_probe, +) +from src.model_conversion.onnx_to_trt import build_engine_if_missing # Configuration RTSP_PORT = os.environ.get("RTSP_PORT", "8554") @@ -35,19 +32,21 @@ stats: Dict[str, int] = {"total": 0, "skipped": 0, "processed": 0} + def gpu_frame_skip_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, detector: GPUFrameChangeDetector) -> Gst.PadProbeReturn: buffer_ptr: int = hash(info.get_buffer()) batch_id: int = 0 should_process: bool = frame_skipping_probe(buffer_ptr, batch_id, detector) if should_process: - stats["processed"] += 1 - print(f"āœ… PROCESSING frame {stats['total']}") - return Gst.PadProbeReturn.OK + stats["processed"] += 1 + print(f"āœ… PROCESSING frame {stats['total']}") + return Gst.PadProbeReturn.OK else: stats["skipped"] += 1 print(f"ā­ļø SKIPPING frame {stats['total']}") return Gst.PadProbeReturn.DROP + def build_pipeline() -> Gst.Pipeline: """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" pipeline = Gst.Pipeline.new("final-cpu-pipeline") @@ -142,6 +141,7 @@ def on_pad_added_decode(src: Any, pad: Any) -> None: return pipeline + if __name__ == "__main__": Gst.init(None) build_engine_if_missing(CONFIG_FILE) diff --git a/src/deepstream/pipelines/deepstream_pipeline_gpu.py b/src/deepstream/pipelines/deepstream_pipeline_gpu.py index 0c37da05..463fe6f5 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_gpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_gpu.py @@ -1,26 +1,23 @@ import os -from datetime import datetime -from typing import Any, Dict, List +from typing import Any, Dict + import gi -import numpy as np -import cv2 -import pyds gi.require_version("Gst", "1.0") from gi.repository import GLib, Gst -from src.frame_comparison.frame_change_detector import FrameChangeDetector -from src.model_conversion.onnx_to_trt import build_engine_if_missing - from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor -from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier -from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent from src.deepstream.helpers.pipeline_runner import run_pipeline - +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder +from src.deepstream.helpers.softmax_topk_classifier import SoftmaxTopKClassifier from src.deepstream.probes.background_removal.gpu import gpu_background_probe from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe -from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import frame_skipping_probe, GPUFrameChangeDetector +from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import ( + GPUFrameChangeDetector, + frame_skipping_probe, +) +from src.model_conversion.onnx_to_trt import build_engine_if_missing # Configuration RTSP_PORT = os.environ.get("RTSP_PORT", "8554") @@ -35,19 +32,21 @@ stats: Dict[str, int] = {"total": 0, "skipped": 0, "processed": 0} + def gpu_frame_skip_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, detector: GPUFrameChangeDetector) -> Gst.PadProbeReturn: buffer_ptr: int = hash(info.get_buffer()) batch_id: int = 0 should_process: bool = frame_skipping_probe(buffer_ptr, batch_id, detector) if should_process: - stats["processed"] += 1 - print(f"āœ… PROCESSING frame {stats['total']}") - return Gst.PadProbeReturn.OK + stats["processed"] += 1 + print(f"āœ… PROCESSING frame {stats['total']}") + return Gst.PadProbeReturn.OK else: stats["skipped"] += 1 print(f"ā­ļø SKIPPING frame {stats['total']}") return Gst.PadProbeReturn.DROP + def _post_error_from_pad(pad: Gst.Pad, text: str, debug: str = "") -> None: """Post an error message to the GStreamer bus from a pad.""" elem = pad.get_parent_element() @@ -57,6 +56,7 @@ def _post_error_from_pad(pad: Gst.Pad, text: str, debug: str = "") -> None: msg = Gst.Message.new_error(elem, gerr, debug) elem.post_message(msg) + def background_removal_gpu_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: """GPU pad probe that applies background removal to each frame.""" gst_buffer = info.get_buffer() @@ -69,7 +69,7 @@ def background_removal_gpu_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.Pa try: buffer_ptr: int = hash(gst_buffer) batch_id: int = 0 - + success: bool = gpu_background_probe.remove_background_gpu_probe_pipeline(buffer_ptr, batch_id) if not success: _post_error_from_pad(pad, "GPU background removal failed") @@ -85,6 +85,7 @@ def background_removal_gpu_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.Pa return Gst.PadProbeReturn.OK + def build_pipeline() -> Gst.Pipeline: """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" pipeline = Gst.Pipeline.new("final-cpu-pipeline") @@ -179,6 +180,7 @@ def on_pad_added_decode(src: Any, pad: Any) -> None: return pipeline + if __name__ == "__main__": Gst.init(None) build_engine_if_missing(CONFIG_FILE) diff --git a/src/deepstream/pipelines/gpu_frames_skipping_pipeline.py b/src/deepstream/pipelines/gpu_frames_skipping_pipeline.py index 19c731f0..32c684e1 100644 --- a/src/deepstream/pipelines/gpu_frames_skipping_pipeline.py +++ b/src/deepstream/pipelines/gpu_frames_skipping_pipeline.py @@ -1,14 +1,16 @@ import os +from typing import Dict + import gi -import cv2 -import pyds -import ctypes -from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import frame_skipping_probe, GPUFrameChangeDetector -from typing import Any, Dict, Optional +from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import ( + GPUFrameChangeDetector, + frame_skipping_probe, +) gi.require_version("Gst", "1.0") -from gi.repository import GLib, Gst +from gi.repository import Gst + from src.deepstream.helpers.pipeline_runner import run_pipeline # Configurations @@ -20,23 +22,25 @@ stats: Dict[str, int] = {"total": 0, "skipped": 0, "processed": 0} + def gpu_frame_skip_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, detector: GPUFrameChangeDetector) -> Gst.PadProbeReturn: buffer_ptr: int = hash(info.get_buffer()) batch_id: int = 0 should_process: bool = frame_skipping_probe(buffer_ptr, batch_id, detector) if should_process: - stats["processed"] += 1 - print(f"āœ… PROCESSING frame {stats['total']}") - return Gst.PadProbeReturn.OK + stats["processed"] += 1 + print(f"āœ… PROCESSING frame {stats['total']}") + return Gst.PadProbeReturn.OK else: stats["skipped"] += 1 print(f"ā­ļø SKIPPING frame {stats['total']}") return Gst.PadProbeReturn.DROP + def build_pipeline() -> Gst.Pipeline: """Build GPU-compatible pipeline""" pipeline: Gst.Pipeline = Gst.Pipeline.new("gpu-frame-skipping-pipeline") - + # Elements rtspsrc: Gst.Element = Gst.ElementFactory.make("rtspsrc", "source") depay: Gst.Element = Gst.ElementFactory.make("rtph264depay", "depay") @@ -51,11 +55,24 @@ def build_pipeline() -> Gst.Pipeline: nvvideoconvert2: Gst.Element = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert2") jpegenc: Gst.Element = Gst.ElementFactory.make("jpegenc", "jpegenc") sink: Gst.Element = Gst.ElementFactory.make("multifilesink", "sink") - - for e in [rtspsrc, depay, parse, decode, convert, nvvideoconvert, - capsfilter, streammux, nvinfer, nvosd, nvvideoconvert2, jpegenc, sink]: + + for e in [ + rtspsrc, + depay, + parse, + decode, + convert, + nvvideoconvert, + capsfilter, + streammux, + nvinfer, + nvosd, + nvvideoconvert2, + jpegenc, + sink, + ]: pipeline.add(e) - + # Properties rtspsrc.set_property("location", RTSP_URL) streammux.set_property("batch-size", 1) @@ -63,10 +80,10 @@ def build_pipeline() -> Gst.Pipeline: streammux.set_property("height", 480) nvinfer.set_property("config-file-path", CONFIG_FILE) sink.set_property("location", os.path.join(OUTPUT_DIR, "frame_%05d.jpg")) - + caps: Gst.Caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA") capsfilter.set_property("caps", caps) - + # Links depay.link(parse) parse.link(decode) @@ -77,33 +94,34 @@ def build_pipeline() -> Gst.Pipeline: nvosd.link(nvvideoconvert2) nvvideoconvert2.link(jpegenc) jpegenc.link(sink) - + # Dynamic pads def on_pad_added_rtspsrc(src: Gst.Element, pad: Gst.Pad) -> None: sinkpad: Gst.Pad = depay.get_static_pad("sink") if not sinkpad.is_linked(): pad.link(sinkpad) - + def on_pad_added_decode(src: Gst.Element, pad: Gst.Pad) -> None: sinkpad: Gst.Pad = convert.get_static_pad("sink") if not sinkpad.is_linked(): pad.link(sinkpad) - + rtspsrc.connect("pad-added", on_pad_added_rtspsrc) decode.connect("pad-added", on_pad_added_decode) - + srcpad: Gst.Pad = capsfilter.get_static_pad("src") sinkpad: Gst.Pad = streammux.get_request_pad("sink_0") srcpad.link(sinkpad) - + # Add probe detector = GPUFrameChangeDetector() streammux_src_pad: Gst.Pad = streammux.get_static_pad("src") streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, gpu_frame_skip_probe, detector) - + return pipeline + if __name__ == "__main__": Gst.init(None) pipeline: Gst.Pipeline = build_pipeline() @@ -111,7 +129,7 @@ def on_pad_added_decode(src: Gst.Element, pad: Gst.Pad) -> None: if stats["total"] > 0: skip_ratio: float = stats["skipped"] / stats["total"] - print(f"\nšŸ”„ Final Stats:") + print("\nšŸ”„ Final Stats:") print(f" Total: {stats['total']}") print(f" Processed: {stats['processed']}") print(f" Skipped: {stats['skipped']}") diff --git a/src/deepstream/pipelines/nvmsgbroker_pipeline.py b/src/deepstream/pipelines/nvmsgbroker_pipeline.py index 72da2d9f..5b0bdd9f 100644 --- a/src/deepstream/pipelines/nvmsgbroker_pipeline.py +++ b/src/deepstream/pipelines/nvmsgbroker_pipeline.py @@ -4,7 +4,7 @@ import gi import pyds -from gi.repository import GLib, Gst +from gi.repository import Gst from pydantic import BaseModel gi.require_version("Gst", "1.0") @@ -12,9 +12,9 @@ from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.pipeline_runner import run_pipeline from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier from src.model_conversion.onnx_to_trt import build_engine_if_missing -from src.deepstream.helpers.pipeline_runner import run_pipeline RTSP_PORT = os.environ.get("RTSP_PORT", "8554") RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test" @@ -224,5 +224,6 @@ def main() -> None: pipeline = build_pipeline() run_pipeline(pipeline) + if __name__ == "__main__": main() diff --git a/src/deepstream/pipelines/pipeline_drop_frames.py b/src/deepstream/pipelines/pipeline_drop_frames.py index c37b2e07..4b53e341 100644 --- a/src/deepstream/pipelines/pipeline_drop_frames.py +++ b/src/deepstream/pipelines/pipeline_drop_frames.py @@ -10,7 +10,7 @@ import pyds gi.require_version("Gst", "1.0") -from gi.repository import GLib, Gst # noqa: E402 +from gi.repository import Gst # noqa: E402 from src.deepstream.helpers.pipeline_runner import run_pipeline @@ -126,6 +126,7 @@ def on_pad_added_decode(src, pad): return pipeline + if __name__ == "__main__": Gst.init(None) pipeline = build_pipeline() diff --git a/src/deepstream/pipelines/pipeline_main.py b/src/deepstream/pipelines/pipeline_main.py index 69a35ab8..0f12feed 100644 --- a/src/deepstream/pipelines/pipeline_main.py +++ b/src/deepstream/pipelines/pipeline_main.py @@ -2,7 +2,7 @@ from datetime import datetime from pathlib import Path -from gi.repository import GLib, Gst +from gi.repository import Gst from src.deepstream.helpers.pipeline_runner import run_pipeline from src.model_conversion.onnx_to_trt import build_engine_if_missing @@ -103,7 +103,7 @@ def main(): print("šŸš€ Starting DeepStream End-to-End Pipeline!") print(f"šŸ“ Output directory: {OUTPUT_DIR}") - print(f"šŸ“” RTSP → Decode → Convert → Classification Model → OSD → Save") + print("šŸ“” RTSP → Decode → Convert → Classification Model → OSD → Save") pipeline = build_pipeline() @@ -113,7 +113,7 @@ def main(): run_pipeline(pipeline) frame_count = len([f for f in OUTPUT_DIR.iterdir() if f.suffix == ".jpg"]) - print(f"āœ… Pipeline completed!") + print("āœ… Pipeline completed!") print(f"šŸ“Š Generated {frame_count} frames") print(f"šŸ“ Check results: ls -la {OUTPUT_DIR}") diff --git a/src/deepstream/pipelines/pipeline_onnx_real_input.py b/src/deepstream/pipelines/pipeline_onnx_real_input.py index e5b82ed9..060e7ac9 100644 --- a/src/deepstream/pipelines/pipeline_onnx_real_input.py +++ b/src/deepstream/pipelines/pipeline_onnx_real_input.py @@ -10,10 +10,10 @@ from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.pipeline_runner import run_pipeline from src.deepstream.helpers.softmax_topk_classifier import ( SoftmaxTopKClassifier, ) -from src.deepstream.helpers.pipeline_runner import run_pipeline def list_image_files(folder: str) -> List[str]: diff --git a/src/deepstream/pipelines/pipeline_onnx_test.py b/src/deepstream/pipelines/pipeline_onnx_test.py index c530df78..4c987fda 100755 --- a/src/deepstream/pipelines/pipeline_onnx_test.py +++ b/src/deepstream/pipelines/pipeline_onnx_test.py @@ -10,12 +10,12 @@ from src.deepstream.helpers.load_class_labels import load_class_labels from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.pipeline_runner import run_pipeline from src.deepstream.helpers.softmax_topk_classifier import ( ClassificationPrediction, SoftmaxTopKClassifier, ) from src.model_conversion.onnx_to_trt import build_engine_if_missing -from src.deepstream.helpers.pipeline_runner import run_pipeline CLASS_LABELS = load_class_labels() @@ -62,6 +62,7 @@ def buffer_probe(pad, info, user_data): return Gst.PadProbeReturn.OK + def build_pipeline(config_file: str) -> Gst.Pipeline: pipeline = Gst.Pipeline() @@ -137,6 +138,7 @@ def build_pipeline(config_file: str) -> Gst.Pipeline: infer_src_pad.add_probe(Gst.PadProbeType.BUFFER, buffer_probe, None) return pipeline + def main(config_file: str) -> int: # Make sure TRT engine exists for this config build_engine_if_missing(config_file) diff --git a/src/deepstream/probes/db_message_meta_probe.py b/src/deepstream/probes/db_message_meta_probe.py index 0edf235a..8810b221 100644 --- a/src/deepstream/probes/db_message_meta_probe.py +++ b/src/deepstream/probes/db_message_meta_probe.py @@ -1,14 +1,20 @@ -from typing import Any, List +from typing import List + import pyds from gi.repository import Gst from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor -from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier class DbMessageMetaProbe: - def __init__(self, tensor_extractor : TensorExtractor, classifier: SoftmaxTopKClassifier, plant_event_builder: NvdsPlantEventBuilder): + def __init__( + self, + tensor_extractor: TensorExtractor, + classifier: SoftmaxTopKClassifier, + plant_event_builder: NvdsPlantEventBuilder, + ): self.tensor_extractor = tensor_extractor self.classifier = classifier self.plant_event_builder = plant_event_builder @@ -21,7 +27,9 @@ def extract_tensor_output(self, frame_meta) -> List[ClassificationPrediction]: logits = self.tensor_extractor.extract_logits(tensor_meta) return self.classifier.predict_from_logits(logits) - def attach_msgmeta(self, batch_meta, frame_meta, frame_number: int, predictions: List[ClassificationPrediction]) -> None: + def attach_msgmeta( + self, batch_meta, frame_meta, frame_number: int, predictions: List[ClassificationPrediction] + ) -> None: best = predictions[0] event = PlantEvent( frame_id=frame_number, diff --git a/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py b/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py index 8f769b7a..995dd314 100644 --- a/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py +++ b/src/deepstream/probes/frame_comparison/cpu/frame_skipping_probe.py @@ -1,14 +1,13 @@ from typing import Any + import gi -import numpy as np import pyds -import cv2 gi.require_version("Gst", "1.0") from gi.repository import Gst -from src.frame_comparison.frame_change_detector import FrameChangeDetector from src.deepstream.helpers.should_skip_frame import should_skip_frame +from src.frame_comparison.frame_change_detector import FrameChangeDetector def frame_skip_probe(pad: Any, info: Any, frame_change_detector: FrameChangeDetector) -> int: diff --git a/src/deepstream/speed_benchmark.py b/src/deepstream/speed_benchmark.py index f94e44f1..f4d2b0dd 100644 --- a/src/deepstream/speed_benchmark.py +++ b/src/deepstream/speed_benchmark.py @@ -8,8 +8,8 @@ import pandas as pd from pydantic import BaseModel -from src.deepstream.pipelines.pipeline_main import build_pipeline from src.deepstream.helpers.pipeline_runner import run_pipeline +from src.deepstream.pipelines.pipeline_main import build_pipeline gi.require_version("Gst", "1.0") from gi.repository import Gst diff --git a/src/evaluation/base/evaluator.py b/src/evaluation/base/evaluator.py index 4ba072ca..f070ac37 100644 --- a/src/evaluation/base/evaluator.py +++ b/src/evaluation/base/evaluator.py @@ -1,16 +1,14 @@ -import os -import sys import time from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Tuple - import torch from PIL import Image -from src.inference.base.classifier_inference_base import ClassifierInferenceBase from src.evaluation.metrics.metrics_api import ClassificationReport, compute_metrics +from src.inference.base.classifier_inference_base import ClassifierInferenceBase + @dataclass class EvaluationConfig: @@ -53,8 +51,6 @@ def evaluate_model(self, model: ClassifierInferenceBase) -> ClassificationReport end_time = time.perf_counter() latencies.append(end_time - start_time) predictions.append(result[0]["class_id"]) - report = compute_metrics( - y_true=true_labels, y_pred=predictions, latencies=latencies - ) + report = compute_metrics(y_true=true_labels, y_pred=predictions, latencies=latencies) print("āœ… Evaluation completed!") return report diff --git a/src/evaluation/run_evaluation.py b/src/evaluation/run_evaluation.py index a32c5e84..f1ddd976 100644 --- a/src/evaluation/run_evaluation.py +++ b/src/evaluation/run_evaluation.py @@ -1,20 +1,18 @@ -import os -import sys -import time -import json import argparse -import pandas as pd -from dataclasses import dataclass +import json from pathlib import Path -from typing import List, Optional, Tuple -from dataset.optimal_class_mapping import MODEL_NAMES as class_mapping, map_prediction -from src.inference.utils.inference_factory import InferenceFactory, InferenceConfig, Backend, ModelArch +import pandas as pd + +from dataset.optimal_class_mapping import MODEL_NAMES as class_mapping +from dataset.optimal_class_mapping import map_prediction from src.evaluation.base.evaluator import EvaluationConfig, ModelEvaluator +from src.inference.utils.inference_factory import Backend, InferenceConfig, InferenceFactory, ModelArch from src.path_utils import ensure_clean_directory MODELS_DIR_PATH = Path("models") + class MappedModelWrapper: """Wrapper that adds mapping between 83 model classes to 76 dataset classes""" @@ -33,6 +31,7 @@ def infer(self, image): } ] + def create_model(model_name: str, model_type: str, device: str): weights_path = MODELS_DIR_PATH / model_type / f"{model_name}" @@ -59,26 +58,24 @@ def create_model(model_name: str, model_type: str, device: str): base_model = InferenceFactory.create(config) return MappedModelWrapper(base_model) + def parse_arguments(): - parser = argparse.ArgumentParser( - description="Evaluate models on classification dataset" - ) + parser = argparse.ArgumentParser(description="Evaluate models on classification dataset") parser.add_argument("--dataset", type=str, required=True) parser.add_argument("--models", type=str, default="resnet18,mobilenet") - parser.add_argument( - "--model_type", type=str, choices=["pytorch", "onnx"], required=True - ) + parser.add_argument("--model_type", type=str, choices=["pytorch", "onnx"], required=True) parser.add_argument("--device", type=str, default="cpu") parser.add_argument("--output", type=str, default="outputs/evaluation_results.json") return parser.parse_args() + def main(): args = parse_arguments() models_to_eval = [m.strip() for m in args.models.split(",")] config = EvaluationConfig(dataset_path=Path(args.dataset), device=args.device) evaluator = ModelEvaluator(config) results = {} - + print("šŸ“Š EVALUATION STARTING") print(f"Dataset: {args.dataset}, Models: {models_to_eval}, Device: {args.device}") print("=" * 50) @@ -89,7 +86,7 @@ def main(): report = evaluator.evaluate_model(model) result_key = f"{model_name}_{args.model_type}" results[result_key] = report.dict() - + print(f"\nāœ… {model_name.upper()} Results:") print(f" Accuracy: {report.accuracy:.4f}") print(f" Precision (Micro): {report.precision_micro:.4f}") @@ -123,5 +120,6 @@ def main(): print("=" * 80) print(df.to_string(index=False)) + if __name__ == "__main__": main() diff --git a/src/evaluation/run_hierarchical_evaluation.py b/src/evaluation/run_hierarchical_evaluation.py index f5b8e4b8..4a3c5f38 100644 --- a/src/evaluation/run_hierarchical_evaluation.py +++ b/src/evaluation/run_hierarchical_evaluation.py @@ -1,15 +1,15 @@ -import os -import sys import json import time from pathlib import Path +from metrics.metrics_api import compute_metrics from PIL import Image + +from dataset.optimal_class_mapping import MODEL_NAMES as class_mapping +from dataset.optimal_class_mapping import map_prediction from dataset.utilities.datasets import DATASETS -from metrics.metrics_api import compute_metrics from src.evaluation.base.evaluator import EvaluationConfig, ModelEvaluator -from src.inference.utils.inference_factory import InferenceFactory, InferenceConfig, Backend, ModelArch -from dataset.optimal_class_mapping import MODEL_NAMES as class_mapping, map_prediction +from src.inference.utils.inference_factory import Backend, InferenceConfig, InferenceFactory, ModelArch class MappedModelWrapper: @@ -29,7 +29,7 @@ def evaluate_all_hierarchies(y_true, y_pred, latencies): report = compute_metrics(y_true, y_pred, latencies) results["CropAndWeed"] = report.model_dump() - # Fine24, CropsOrWeed9, CropOrWeed2 + # Fine24, CropsOrWeed9, CropOrWeed2 for name in ["Fine24", "CropsOrWeed9", "CropOrWeed2"]: try: dataset = DATASETS[name] @@ -74,7 +74,7 @@ def main(): device="cpu", class_mapping=class_mapping, ) - + resnet = InferenceFactory.create(resnet_config) wrapped_resnet = MappedModelWrapper(resnet) @@ -120,7 +120,7 @@ def main(): with open("src/evaluation/results/hierarchical_evaluation_results.json", "w") as f: json.dump(results, f, indent=2) - print("\nšŸ“Š תוצאות:") + print("\nresults:") for model, model_results in results.items(): print(f"\n{model.upper()}:") for hierarchy, metrics in model_results.items(): diff --git a/src/frame_comparison/compare_frames_methods.py b/src/frame_comparison/compare_frames_methods.py index 5b4d067a..1c5e1083 100644 --- a/src/frame_comparison/compare_frames_methods.py +++ b/src/frame_comparison/compare_frames_methods.py @@ -1,6 +1,7 @@ +import time + import cv2 import numpy as np -import time from numpy.typing import NDArray @@ -30,35 +31,27 @@ def simple_ssim(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8]) -> float: C2 = (K2 * L) ** 2 num = (2 * mu_x * mu_y + C1) * (2 * sigma_xy + C2) - den = (mu_x ** 2 + mu_y ** 2 + C1) * (sigma_x ** 2 + sigma_y ** 2 + C2) + den = (mu_x**2 + mu_y**2 + C1) * (sigma_x**2 + sigma_y**2 + C2) return float(num / den) def histogram_diff(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8]) -> float: - histA = cv2.calcHist([imgA], [0, 1, 2], None, [8, 8, 8], - [0, 256, 0, 256, 0, 256]) - histB = cv2.calcHist([imgB], [0, 1, 2], None, [8, 8, 8], - [0, 256, 0, 256, 0, 256]) + histA = cv2.calcHist([imgA], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) + histB = cv2.calcHist([imgB], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) cv2.normalize(histA, histA) cv2.normalize(histB, histB) return float(cv2.compareHist(histA, histB, cv2.HISTCMP_CORREL)) -def optical_flow_diff( - imgA: NDArray[np.uint8], - imgB: NDArray[np.uint8] -) -> float: +def optical_flow_diff(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8]) -> float: grayA = cv2.cvtColor(imgA, cv2.COLOR_BGR2GRAY) grayB = cv2.cvtColor(imgB, cv2.COLOR_BGR2GRAY) - flow = cv2.calcOpticalFlowFarneback( - grayA, grayB, None, 0.5, 3, 15, 3, 5, 1.2, 0 - ) + flow = cv2.calcOpticalFlowFarneback(grayA, grayB, None, 0.5, 3, 15, 3, 5, 1.2, 0) mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) return float(np.mean(mag)) -def motion_mask_diff(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8], - threshold: int = 25) -> float: +def motion_mask_diff(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8], threshold: int = 25) -> float: grayA = cv2.cvtColor(imgA, cv2.COLOR_BGR2GRAY) grayB = cv2.cvtColor(imgB, cv2.COLOR_BGR2GRAY) diff = cv2.absdiff(grayA, grayB) @@ -67,8 +60,7 @@ def motion_mask_diff(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8], return float(motion_ratio) -def orb_feature_diff(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8], - max_features: int = 500) -> float: +def orb_feature_diff(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8], max_features: int = 500) -> float: orb = cv2.ORB_create(nfeatures=max_features) kp1, des1 = orb.detectAndCompute(imgA, None) kp2, des2 = orb.detectAndCompute(imgB, None) @@ -83,9 +75,14 @@ def orb_feature_diff(imgA: NDArray[np.uint8], imgB: NDArray[np.uint8], return float(match_ratio) -def should_skip_inference(mse_val: float, ssim_val: float, flow_val: float, - mse_thresh: float = 50, ssim_thresh: float = 0.995, - flow_thresh: float = 0.1) -> bool: +def should_skip_inference( + mse_val: float, + ssim_val: float, + flow_val: float, + mse_thresh: float = 50, + ssim_thresh: float = 0.995, + flow_thresh: float = 0.1, +) -> bool: return mse_val < mse_thresh and ssim_val > ssim_thresh and flow_val < flow_thresh diff --git a/src/frame_comparison/create_sample_videos.py b/src/frame_comparison/create_sample_videos.py index 561564e6..47759f7a 100644 --- a/src/frame_comparison/create_sample_videos.py +++ b/src/frame_comparison/create_sample_videos.py @@ -1,9 +1,9 @@ -import cv2 import os + +import cv2 import numpy as np from numpy.typing import NDArray - OUTPUT_DIR = "assets/videos/frame_change_videos/" os.makedirs(OUTPUT_DIR, exist_ok=True) @@ -58,7 +58,7 @@ def save_video(frames: list[NDArray[np.uint8]], filename: str, fps: int = 2) -> return h, w, _ = frames[0].shape path = os.path.join(OUTPUT_DIR, filename) - writer = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) + writer = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h)) for f in frames: writer.write(f) writer.release() diff --git a/src/frame_comparison/cv_frame_skip_debug.py b/src/frame_comparison/cv_frame_skip_debug.py index faafbbc3..810ffe1f 100644 --- a/src/frame_comparison/cv_frame_skip_debug.py +++ b/src/frame_comparison/cv_frame_skip_debug.py @@ -1,10 +1,12 @@ -import cv2 import os + +import cv2 from frame_change_detector import FrameChangeDetector VIDEO_DIR = "/workspace/assets/videos/frame_change_videos" VIDEO_EXTENSIONS = (".mp4", ".avi", ".mov", ".mkv") + def analyze_video_skip_decisions(video_path: str) -> None: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): diff --git a/src/inference/base/classifier_inference_base.py b/src/inference/base/classifier_inference_base.py index 4176d284..56de908e 100644 --- a/src/inference/base/classifier_inference_base.py +++ b/src/inference/base/classifier_inference_base.py @@ -9,6 +9,7 @@ from torch import Tensor from torchvision import transforms + class ClassifierInferenceBase(ABC): """ Abstract base for image classification inference. @@ -24,14 +25,10 @@ def __init__( transform: Optional[transforms.Compose] = None, ) -> None: self.device = torch.device(device) - self.weights_path = ( - Path(str(weights_path)) if weights_path is not None else None - ) + self.weights_path = Path(str(weights_path)) if weights_path is not None else None self.class_mapping = class_mapping self.topk = max(1, int(topk)) - self._transform = ( - transform if transform is not None else self._build_preprocess() - ) + self._transform = transform if transform is not None else self._build_preprocess() self._initialize_model() @abstractmethod @@ -45,7 +42,7 @@ def _forward(self, x: Tensor) -> Tensor: raise NotImplementedError def _build_preprocess(self) -> transforms.Compose: - """Default preprocessing - ×Ŗ×•×× לאימון המודל""" + """Default preprocessing for the model.""" return transforms.Compose( [ transforms.Resize((256, 256)), diff --git a/src/inference/base/classifier_inference_base_onnx.py b/src/inference/base/classifier_inference_base_onnx.py index 1c635cfd..bd08fcad 100644 --- a/src/inference/base/classifier_inference_base_onnx.py +++ b/src/inference/base/classifier_inference_base_onnx.py @@ -1,9 +1,6 @@ -import os -import sys from pathlib import Path from typing import Dict, Optional, Union - import numpy as np import onnxruntime as ort import torch @@ -69,7 +66,7 @@ def _initialize_model(self) -> None: so.graph_optimization_level = level_map.get( self._opt_level.upper(), ort.GraphOptimizationLevel.ORT_ENABLE_ALL ) - + so.add_session_config_entry("session.disable_mem_pattern", "1") so.add_session_config_entry("session.disable_mem_reuse", "1") so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL diff --git a/src/inference/pytorch/mobilenet_inference.py b/src/inference/pytorch/mobilenet_inference.py index ef745901..57ade41d 100644 --- a/src/inference/pytorch/mobilenet_inference.py +++ b/src/inference/pytorch/mobilenet_inference.py @@ -1,5 +1,3 @@ -import os -import sys from pathlib import Path from typing import Dict, Optional, Union @@ -9,6 +7,7 @@ from src.inference.base.classifier_inference_base import ClassifierInferenceBase + class MobileNetInference(ClassifierInferenceBase): """MobileNetV2 classifier that strictly requires a checkpoint at `weights_path`.""" @@ -50,9 +49,7 @@ def _initialize_model(self) -> None: if isinstance(ckpt, dict): state_dict = ckpt.get("model_state_dict") or ckpt.get("state_dict") or ckpt else: - raise RuntimeError( - "Unrecognized checkpoint format (expected dict/state_dict)" - ) + raise RuntimeError("Unrecognized checkpoint format (expected dict/state_dict)") if any(k.startswith("model.") for k in state_dict.keys()): state_dict = {k.replace("model.", "", 1): v for k, v in state_dict.items()} diff --git a/src/inference/pytorch/resnet_inference.py b/src/inference/pytorch/resnet_inference.py index 087c44e3..b61154b5 100644 --- a/src/inference/pytorch/resnet_inference.py +++ b/src/inference/pytorch/resnet_inference.py @@ -1,5 +1,3 @@ -import os -import sys from pathlib import Path from typing import Dict, Optional, Union @@ -49,9 +47,7 @@ def _initialize_model(self) -> None: if isinstance(ckpt, dict): state_dict = ckpt.get("model_state_dict") or ckpt.get("state_dict") or ckpt else: - raise RuntimeError( - "Unrecognized checkpoint format (expected dict/state_dict)" - ) + raise RuntimeError("Unrecognized checkpoint format (expected dict/state_dict)") if any(k.startswith("model.") for k in state_dict.keys()): state_dict = {k.replace("model.", "", 1): v for k, v in state_dict.items()} diff --git a/src/inference/utils/inference_factory.py b/src/inference/utils/inference_factory.py index 4c397f21..f5b0c93c 100644 --- a/src/inference/utils/inference_factory.py +++ b/src/inference/utils/inference_factory.py @@ -1,11 +1,12 @@ from enum import Enum -from pydantic import BaseModel from pathlib import Path from typing import Optional, Type, Union -from src.inference.pytorch.resnet_inference import ResNetInference -from src.inference.pytorch.mobilenet_inference import MobileNetInference +from pydantic import BaseModel + from src.inference.base.classifier_inference_base_onnx import OnnxClassifierInferenceBase +from src.inference.pytorch.mobilenet_inference import MobileNetInference +from src.inference.pytorch.resnet_inference import ResNetInference class Backend(str, Enum): diff --git a/src/inference/utils/onnx_predict_images.py b/src/inference/utils/onnx_predict_images.py index 9bda588d..66e73acf 100644 --- a/src/inference/utils/onnx_predict_images.py +++ b/src/inference/utils/onnx_predict_images.py @@ -1,11 +1,12 @@ import argparse import os import pathlib + import numpy as np import torch from PIL import Image -from src.inference.utils.inference_factory import InferenceFactory, InferenceConfig, Backend +from src.inference.utils.inference_factory import Backend, InferenceConfig, InferenceFactory IMAGE_SIZE = (256, 256) COLOR_MODE = "RGB" diff --git a/src/inference/utils/run_inference_main.py b/src/inference/utils/run_inference_main.py index b493ac60..4952cd9c 100644 --- a/src/inference/utils/run_inference_main.py +++ b/src/inference/utils/run_inference_main.py @@ -1,15 +1,14 @@ +import argparse +import json import os -import sys import time -import json -import argparse from pathlib import Path -from PIL import Image import torch +from PIL import Image from dataset.optimal_class_mapping import MODEL_NAMES as ID_TO_NAME -from src.inference.utils.inference_factory import InferenceFactory, InferenceConfig, Backend, ModelArch +from src.inference.utils.inference_factory import Backend, InferenceConfig, InferenceFactory, ModelArch # Main image processing logic @@ -29,7 +28,6 @@ def run_inference(model_type: str, model_path: str, image_dir: str, device: str) class_mapping=class_mapping, ) - clf = InferenceFactory.create(config) results = {} @@ -56,10 +54,7 @@ def run_inference(model_type: str, model_path: str, image_dir: str, device: str) "inference_time_ms": inference_time, } - print( - f" {predictions[0]['class_name']} " - f"({predictions[0]['probability']:.1%})" - ) + print(f" {predictions[0]['class_name']} " f"({predictions[0]['probability']:.1%})") total_time += inference_time image_count += 1 diff --git a/src/model_conversion/onnx_to_trt.py b/src/model_conversion/onnx_to_trt.py index 643aa4b6..b5576f00 100644 --- a/src/model_conversion/onnx_to_trt.py +++ b/src/model_conversion/onnx_to_trt.py @@ -42,13 +42,7 @@ def build_engine_if_missing(config_path: str) -> str: print(f"[INFO] Engine file not found. Generating {engine_path} from {onnx_path} ...") os.makedirs(os.path.dirname(engine_path), exist_ok=True) - cmd = [ - "trtexec", - f"--onnx={onnx_path}", - f"--saveEngine={engine_path}", - "--memPoolSize=workspace:2048", - "--fp16" -] + cmd = ["trtexec", f"--onnx={onnx_path}", f"--saveEngine={engine_path}", "--memPoolSize=workspace:2048", "--fp16"] subprocess.run(cmd, check=True) # Verify created file diff --git a/src/model_conversion/pytorch_to_onnx.py b/src/model_conversion/pytorch_to_onnx.py index 1f83057d..106551f2 100644 --- a/src/model_conversion/pytorch_to_onnx.py +++ b/src/model_conversion/pytorch_to_onnx.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import argparse from pathlib import Path -import subprocess import torch diff --git a/src/model_conversion/validate_onnx_vs_pytorch.py b/src/model_conversion/validate_onnx_vs_pytorch.py index d2f044e3..b8397dff 100644 --- a/src/model_conversion/validate_onnx_vs_pytorch.py +++ b/src/model_conversion/validate_onnx_vs_pytorch.py @@ -9,9 +9,9 @@ from dataset.optimal_class_mapping import MODEL_NAMES as ID_TO_NAME from src.inference.base.classifier_inference_base import ClassifierInferenceBase +from src.inference.base.classifier_inference_base_onnx import OnnxClassifierInferenceBase from src.inference.pytorch.mobilenet_inference import MobileNetInference from src.inference.pytorch.resnet_inference import ResNetInference -from src.inference.base.classifier_inference_base_onnx import OnnxClassifierInferenceBase NUM_CLASSES = 83 INPUT_SIZE = (256, 256) diff --git a/src/model_conversion/validate_trt_vs_onnx.py b/src/model_conversion/validate_trt_vs_onnx.py index 2d7a3c5d..c5059c82 100644 --- a/src/model_conversion/validate_trt_vs_onnx.py +++ b/src/model_conversion/validate_trt_vs_onnx.py @@ -1,5 +1,4 @@ import argparse -import ctypes from pathlib import Path from threading import Timer from typing import Dict, Optional, Tuple diff --git a/src/rtsp/rtsp_server.py b/src/rtsp/rtsp_server.py index dbd63c95..103c792d 100644 --- a/src/rtsp/rtsp_server.py +++ b/src/rtsp/rtsp_server.py @@ -1,4 +1,5 @@ import os + import gi gi.require_version("Gst", "1.0") @@ -7,6 +8,7 @@ VIDEO_PATH = "/workspace/assets/videos/test_videos/video.mp4" + def main(): # Initialize GStreamer Gst.init(None) @@ -38,7 +40,7 @@ def main(): # Attach the server to the default main context and run the loop server.attach(None) print(f"RTSP server is up at rtsp://0.0.0.0:{port}/test") - + loop = GObject.MainLoop() loop.run()