From cfaa0d054fcd91cd22ebeeab7ebf32dbb52923b3 Mon Sep 17 00:00:00 2001 From: Sarah Gershuni Date: Sun, 7 Dec 2025 00:15:31 +0200 Subject: [PATCH 1/2] basic implementation --- .../pipelines/deepstream_pipeline_gpu.py | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 src/deepstream/pipelines/deepstream_pipeline_gpu.py diff --git a/src/deepstream/pipelines/deepstream_pipeline_gpu.py b/src/deepstream/pipelines/deepstream_pipeline_gpu.py new file mode 100644 index 0000000..87b5f51 --- /dev/null +++ b/src/deepstream/pipelines/deepstream_pipeline_gpu.py @@ -0,0 +1,184 @@ +import os +from datetime import datetime +from typing import Any, Dict, List +import gi +import numpy as np +import cv2 +import pyds + +gi.require_version("Gst", "1.0") +from gi.repository import GLib, Gst + +from src.frame_comparison.frame_change_detector import FrameChangeDetector +from src.model_conversion.onnx_to_trt import build_engine_if_missing + +from src.deepstream.helpers.load_class_labels import load_class_labels +from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent +from src.deepstream.helpers.pipeline_runner import run_pipeline + +from src.deepstream.probes.background_removal.gpu import gpu_background_probe +from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe +from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import frame_skipping_probe, GPUFrameChangeDetector + +# Configuration +RTSP_PORT = os.environ.get("RTSP_PORT", "8554") +RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test" + +CONFIG_FILE: str = "/workspace/configs/resnet18.txt" +MSGCONV_CONFIG: str = "/workspace/configs/nvmsgbroker_msgconv_config.txt" +MQTT_CONN_STR = "172.17.0.1;1883;agstream-client" +MQTT_TOPIC = "deepstream/predictions" + +CLASS_LABELS = load_class_labels() + +def gpu_frame_skip_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, detector: GPUFrameChangeDetector) -> Gst.PadProbeReturn: + buffer_ptr: int = hash(info.get_buffer()) + batch_id: int = 0 + should_process: bool = frame_skipping_probe(buffer_ptr, batch_id, detector) + if should_process: + stats["processed"] += 1 + print(f"✅ PROCESSING frame {stats['total']}") + return Gst.PadProbeReturn.OK + else: + stats["skipped"] += 1 + print(f"⏭️ SKIPPING frame {stats['total']}") + return Gst.PadProbeReturn.DROP + +def _post_error_from_pad(pad: Gst.Pad, text: str, debug: str = "") -> None: + """Post an error message to the GStreamer bus from a pad.""" + elem = pad.get_parent_element() + if not elem: + return + gerr = GLib.Error(text) + msg = Gst.Message.new_error(elem, gerr, debug) + elem.post_message(msg) + +def background_removal_gpu_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn: + """GPU pad probe that applies background removal to each frame.""" + gst_buffer = info.get_buffer() + if not gst_buffer: + _post_error_from_pad(pad, "missing GstBuffer in probe", "background_removal_gpu_probe: buf is None") + if STRICT_ERRORS: + raise RuntimeError("background_removal_gpu_probe: missing GstBuffer") + return Gst.PadProbeReturn.DROP + + try: + buffer_ptr: int = hash(gst_buffer) + batch_id: int = 0 + + success: bool = gpu_background_probe.remove_background_gpu_probe_pipeline(buffer_ptr, batch_id) + if not success: + _post_error_from_pad(pad, "GPU background removal failed") + if STRICT_ERRORS: + raise RuntimeError("GPU background removal failed") + return Gst.PadProbeReturn.DROP + + except Exception as e: + _post_error_from_pad(pad, "GPU processing error", str(e)) + if STRICT_ERRORS: + raise + return Gst.PadProbeReturn.DROP + + return Gst.PadProbeReturn.OK + +def build_pipeline() -> Gst.Pipeline: + """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" + pipeline = Gst.Pipeline.new("final-cpu-pipeline") + + # Elements + rtspsrc = Gst.ElementFactory.make("rtspsrc", "source") + depay = Gst.ElementFactory.make("rtph264depay", "depay") + parse = Gst.ElementFactory.make("h264parse", "parse") + decode = Gst.ElementFactory.make("decodebin", "decode") + convert = Gst.ElementFactory.make("videoconvert", "convert") + nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert") + capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter") + streammux = Gst.ElementFactory.make("nvstreammux", "streammux") + nvinfer = Gst.ElementFactory.make("nvinfer", "nvinfer") + nvmsgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsgconv") + nvmsgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsgbroker") + + for e in [ + rtspsrc, + depay, + parse, + decode, + convert, + nvvideoconvert, + capsfilter, + streammux, + nvinfer, + nvmsgconv, + nvmsgbroker, + ]: + assert e is not None, f"Failed to create element {e}" + pipeline.add(e) + + # Configure elements + rtspsrc.set_property("location", RTSP_URL) + rtspsrc.set_property("latency", 200) + streammux.set_property("batch-size", 1) + streammux.set_property("width", 256) + streammux.set_property("height", 256) + nvinfer.set_property("config-file-path", CONFIG_FILE) + caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA") + capsfilter.set_property("caps", caps) + nvmsgconv.set_property("config", MSGCONV_CONFIG) + nvmsgconv.set_property("payload-type", 0) + nvmsgbroker.set_property("proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so") + nvmsgbroker.set_property("conn-str", MQTT_CONN_STR) + nvmsgbroker.set_property("topic", MQTT_TOPIC) + nvmsgbroker.set_property("sync", False) + + # Dynamic pad linking + def on_pad_added_rtspsrc(src: Any, pad: Any) -> None: + sinkpad = depay.get_static_pad("sink") + if not sinkpad.is_linked(): + pad.link(sinkpad) + + rtspsrc.connect("pad-added", on_pad_added_rtspsrc) + + def on_pad_added_decode(src: Any, pad: Any) -> None: + sinkpad = convert.get_static_pad("sink") + if not sinkpad.is_linked(): + pad.link(sinkpad) + + decode.connect("pad-added", on_pad_added_decode) + + # Link capsfilter → streammux + depay.link(parse) + parse.link(decode) + convert.link(nvvideoconvert) + nvvideoconvert.link(capsfilter) + srcpad = capsfilter.get_static_pad("src") + sinkpad = streammux.get_request_pad("sink_0") + srcpad.link(sinkpad) + + streammux.link(nvinfer) + nvinfer.link(nvmsgconv) + nvmsgconv.link(nvmsgbroker) + + detector = GPUFrameChangeDetector() + + streammux_src_pad: Gst.Pad = streammux.get_static_pad("src") + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, gpu_frame_skip_probe, detector) + + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, background_removal_gpu_probe) + + tensor_extractor = TensorExtractor() + classifier = SoftmaxTopKClassifier(CLASS_LABELS) + plant_event_builder = NvdsPlantEventBuilder() + db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder) + + nvinfer_src_pad = nvinfer.get_static_pad("src") + nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.pad_probe) + + return pipeline + +if __name__ == "__main__": + Gst.init(None) + build_engine_if_missing(CONFIG_FILE) + pipeline = build_pipeline() + run_pipeline(pipeline) From b624131cd075bd292a000a0eced1cacfc53d1142 Mon Sep 17 00:00:00 2001 From: r83575 Date: Sun, 7 Dec 2025 13:27:27 +0200 Subject: [PATCH 2/2] Finalize pipeline and verify runtime execution Co-authored-by: Sarah Gershuni Co-authored-by: ruti_cohen --- .../pipelines/deepstream_pipeline_gpu.py | 2 ++ .../gpu/frame_change_detector.cpp | 4 ++-- ...ing_probe.cpython-310-aarch64-linux-gnu.so | Bin 236992 -> 236992 bytes 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/deepstream/pipelines/deepstream_pipeline_gpu.py b/src/deepstream/pipelines/deepstream_pipeline_gpu.py index 87b5f51..0c37da0 100644 --- a/src/deepstream/pipelines/deepstream_pipeline_gpu.py +++ b/src/deepstream/pipelines/deepstream_pipeline_gpu.py @@ -33,6 +33,8 @@ CLASS_LABELS = load_class_labels() +stats: Dict[str, int] = {"total": 0, "skipped": 0, "processed": 0} + def gpu_frame_skip_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, detector: GPUFrameChangeDetector) -> Gst.PadProbeReturn: buffer_ptr: int = hash(info.get_buffer()) batch_id: int = 0 diff --git a/src/deepstream/probes/frame_comparison/gpu/frame_change_detector.cpp b/src/deepstream/probes/frame_comparison/gpu/frame_change_detector.cpp index 9e252b7..06130cd 100644 --- a/src/deepstream/probes/frame_comparison/gpu/frame_change_detector.cpp +++ b/src/deepstream/probes/frame_comparison/gpu/frame_change_detector.cpp @@ -5,8 +5,8 @@ #include #include -const double MSE_THRESH = 20.0; -const double SSIM_THRESH = 0.998; +const double MSE_THRESH = 99.0; +const double SSIM_THRESH = 0.996; const double FLOW_THRESH = 0.5; const double OPTICAL_FLOW_ACTIVE_THRESH = 0.5; diff --git a/src/deepstream/probes/frame_comparison/gpu/frame_skipping_probe.cpython-310-aarch64-linux-gnu.so b/src/deepstream/probes/frame_comparison/gpu/frame_skipping_probe.cpython-310-aarch64-linux-gnu.so index da15fdba9bfcf0a36cbc55ad02e1a3a920b89719..ec5c22918d74029c759e89343dc5edd57524f064 100755 GIT binary patch delta 63 zcmV-F0KosiyAHs+4zLgc6rCrj>s$?tZl{Mh_m+*rJmjW}PlF@^ha>_4w_4w