Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions src/deepstream/pipelines/deepstream_pipeline_gpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import os
from datetime import datetime
from typing import Any, Dict, List
import gi
import numpy as np
import cv2
import pyds

gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst

from src.frame_comparison.frame_change_detector import FrameChangeDetector
from src.model_conversion.onnx_to_trt import build_engine_if_missing

from src.deepstream.helpers.load_class_labels import load_class_labels
from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor
from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier
from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent
from src.deepstream.helpers.pipeline_runner import run_pipeline

from src.deepstream.probes.background_removal.gpu import gpu_background_probe
from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe
from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import frame_skipping_probe, GPUFrameChangeDetector

# Configuration
RTSP_PORT = os.environ.get("RTSP_PORT", "8554")
RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test"

CONFIG_FILE: str = "/workspace/configs/resnet18.txt"
MSGCONV_CONFIG: str = "/workspace/configs/nvmsgbroker_msgconv_config.txt"
MQTT_CONN_STR = "172.17.0.1;1883;agstream-client"
MQTT_TOPIC = "deepstream/predictions"

CLASS_LABELS = load_class_labels()

stats: Dict[str, int] = {"total": 0, "skipped": 0, "processed": 0}

def gpu_frame_skip_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, detector: GPUFrameChangeDetector) -> Gst.PadProbeReturn:
buffer_ptr: int = hash(info.get_buffer())
batch_id: int = 0
should_process: bool = frame_skipping_probe(buffer_ptr, batch_id, detector)
if should_process:
stats["processed"] += 1
print(f"✅ PROCESSING frame {stats['total']}")
return Gst.PadProbeReturn.OK
else:
stats["skipped"] += 1
print(f"⏭️ SKIPPING frame {stats['total']}")
return Gst.PadProbeReturn.DROP

def _post_error_from_pad(pad: Gst.Pad, text: str, debug: str = "") -> None:
"""Post an error message to the GStreamer bus from a pad."""
elem = pad.get_parent_element()
if not elem:
return
gerr = GLib.Error(text)
msg = Gst.Message.new_error(elem, gerr, debug)
elem.post_message(msg)

def background_removal_gpu_probe(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn:
"""GPU pad probe that applies background removal to each frame."""
gst_buffer = info.get_buffer()
if not gst_buffer:
_post_error_from_pad(pad, "missing GstBuffer in probe", "background_removal_gpu_probe: buf is None")
if STRICT_ERRORS:
raise RuntimeError("background_removal_gpu_probe: missing GstBuffer")
return Gst.PadProbeReturn.DROP

try:
buffer_ptr: int = hash(gst_buffer)
batch_id: int = 0

success: bool = gpu_background_probe.remove_background_gpu_probe_pipeline(buffer_ptr, batch_id)
if not success:
_post_error_from_pad(pad, "GPU background removal failed")
if STRICT_ERRORS:
raise RuntimeError("GPU background removal failed")
return Gst.PadProbeReturn.DROP

except Exception as e:
_post_error_from_pad(pad, "GPU processing error", str(e))
if STRICT_ERRORS:
raise
return Gst.PadProbeReturn.DROP

return Gst.PadProbeReturn.OK

def build_pipeline() -> Gst.Pipeline:
"""Build DeepStream pipeline with background removal, frame skipping, and message broker."""
pipeline = Gst.Pipeline.new("final-cpu-pipeline")

# Elements
rtspsrc = Gst.ElementFactory.make("rtspsrc", "source")
depay = Gst.ElementFactory.make("rtph264depay", "depay")
parse = Gst.ElementFactory.make("h264parse", "parse")
decode = Gst.ElementFactory.make("decodebin", "decode")
convert = Gst.ElementFactory.make("videoconvert", "convert")
nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert")
capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")
streammux = Gst.ElementFactory.make("nvstreammux", "streammux")
nvinfer = Gst.ElementFactory.make("nvinfer", "nvinfer")
nvmsgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsgconv")
nvmsgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsgbroker")

for e in [
rtspsrc,
depay,
parse,
decode,
convert,
nvvideoconvert,
capsfilter,
streammux,
nvinfer,
nvmsgconv,
nvmsgbroker,
]:
assert e is not None, f"Failed to create element {e}"
pipeline.add(e)

# Configure elements
rtspsrc.set_property("location", RTSP_URL)
rtspsrc.set_property("latency", 200)
streammux.set_property("batch-size", 1)
streammux.set_property("width", 256)
streammux.set_property("height", 256)
nvinfer.set_property("config-file-path", CONFIG_FILE)
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
capsfilter.set_property("caps", caps)
nvmsgconv.set_property("config", MSGCONV_CONFIG)
nvmsgconv.set_property("payload-type", 0)
nvmsgbroker.set_property("proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so")
nvmsgbroker.set_property("conn-str", MQTT_CONN_STR)
nvmsgbroker.set_property("topic", MQTT_TOPIC)
nvmsgbroker.set_property("sync", False)

# Dynamic pad linking
def on_pad_added_rtspsrc(src: Any, pad: Any) -> None:
sinkpad = depay.get_static_pad("sink")
if not sinkpad.is_linked():
pad.link(sinkpad)

rtspsrc.connect("pad-added", on_pad_added_rtspsrc)

def on_pad_added_decode(src: Any, pad: Any) -> None:
sinkpad = convert.get_static_pad("sink")
if not sinkpad.is_linked():
pad.link(sinkpad)

decode.connect("pad-added", on_pad_added_decode)

# Link capsfilter → streammux
depay.link(parse)
parse.link(decode)
convert.link(nvvideoconvert)
nvvideoconvert.link(capsfilter)
srcpad = capsfilter.get_static_pad("src")
sinkpad = streammux.get_request_pad("sink_0")
srcpad.link(sinkpad)

streammux.link(nvinfer)
nvinfer.link(nvmsgconv)
nvmsgconv.link(nvmsgbroker)

detector = GPUFrameChangeDetector()

streammux_src_pad: Gst.Pad = streammux.get_static_pad("src")
streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, gpu_frame_skip_probe, detector)

streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, background_removal_gpu_probe)

tensor_extractor = TensorExtractor()
classifier = SoftmaxTopKClassifier(CLASS_LABELS)
plant_event_builder = NvdsPlantEventBuilder()
db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder)

nvinfer_src_pad = nvinfer.get_static_pad("src")
nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.pad_probe)

return pipeline

if __name__ == "__main__":
Gst.init(None)
build_engine_if_missing(CONFIG_FILE)
pipeline = build_pipeline()
run_pipeline(pipeline)
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
#include <cmath>
#include <vector>

const double MSE_THRESH = 20.0;
const double SSIM_THRESH = 0.998;
const double MSE_THRESH = 99.0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to change it?

const double SSIM_THRESH = 0.996;
const double FLOW_THRESH = 0.5;
const double OPTICAL_FLOW_ACTIVE_THRESH = 0.5;

Expand Down
Binary file not shown.
Loading