diff --git a/skills/deepstream/deepstream-dev/.claude-plugin/plugin.json b/skills/deepstream/deepstream-dev/.claude-plugin/plugin.json deleted file mode 100644 index c20b6595..00000000 --- a/skills/deepstream/deepstream-dev/.claude-plugin/plugin.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "deepstream-dev", - "description": "NVIDIA DeepStream SDK 9.0 development with Python pyservicemaker API. Use when building video analytics pipelines, GStreamer-based video processing, TensorRT inference integration, object detection/tracking, or Kafka/message broker integration.", - "author": "NVIDIA CORPORATION", - "skills": "./" -} diff --git a/skills/deepstream/deepstream-dev/BENCHMARK.md b/skills/deepstream/deepstream-dev/BENCHMARK.md deleted file mode 100644 index 2627a581..00000000 --- a/skills/deepstream/deepstream-dev/BENCHMARK.md +++ /dev/null @@ -1,111 +0,0 @@ -# Evaluation Report - -Evaluation of the `deepstream-dev` skill before publication through NVSkills-Eval. - -This benchmark summarizes 3-Tier Evaluation from NVSkills-Eval results for the skill. The goal is to document whether the skill is safe, discoverable, effective, and useful for agents before it is published for broader workflow use. - -## Evaluation Summary - -- Skill: `deepstream-dev` -- Evaluation date: 2026-05-28 -- NVSkills-Eval profile: `external` -- Environment: `local` -- Dataset: 7 evaluation tasks -- Attempts per task: 2 -- Pass threshold: 50% -- Overall verdict: FAIL - -## Agents Used - -- `claude-code` -- `codex` - -## Metrics Used - -Reported benchmark dimensions: - -- Security: checks whether skill-assisted execution avoids unsafe behavior such as secret leakage, destructive commands, or unauthorized access. -- Correctness: checks whether the agent follows the expected workflow and produces the correct final output. -- Discoverability: checks whether the agent loads the skill when relevant and avoids using it when irrelevant. -- Effectiveness: checks whether the agent performs measurably better with the skill than without it. -- Efficiency: checks whether the agent uses fewer tokens and avoids redundant work. - -Underlying evaluation signals used in this run: - -- `skill_execution` (Skill Execution): verifies that the agent loaded the expected skill and workflow. -- `skill_efficiency` (Efficiency): checks routing quality, decoy avoidance, and redundant tool usage. -- `accuracy` (Accuracy): grades final-answer correctness against the reference answer. -- `goal_accuracy` (Goal Accuracy): checks whether the overall user task completed successfully. -- `behavior_check` (Behavior Check): verifies expected behavior steps, including safety expectations. -- `token_efficiency` (Token Efficiency): compares token usage with and without the skill. - -## Test Tasks - -The benchmark dataset contained 7 evaluation tasks: - -- Positive tasks: 5 tasks where the skill was expected to activate. -- Negative tasks: 2 tasks where no skill was expected. -- Unlabeled tasks: 0 tasks where positive/negative intent could not be inferred. - -Task composition is derived from the evaluation dataset when possible. Entries with `expected_skill` set are treated as positive skill-activation cases, while entries with `expected_skill: null` are treated as negative activation cases. - -## Results - -| Dimension | Num | `claude-code` | `codex` | -|---|---:|---:|---:| -| Security | 8 | 74% (+9%) | 57% (-2%) | -| Correctness | 8 | 94% (+6%) | 88% (+9%) | -| Discoverability | 8 | 86% (+11%) | 76% (+9%) | -| Effectiveness | 8 | 81% (+6%) | 78% (+9%) | -| Efficiency | 8 | 72% (+12%) | 64% (+9%) | - -Score values show skill-assisted performance. Values in parentheses show uplift versus the no-skill baseline when baseline data is available. - -## Tier 1: Static Validation Summary - -Tier 1 validation passed with observations. NVSkills-Eval ran 9 checks and found 34 total findings. - -Top findings: - -- MEDIUM PII/gps_coordinates: GPS coordinates (location information) (`references/service_maker_api.md:804`) -- MEDIUM PII/gps_coordinates: GPS coordinates (location information) (`references/service_maker_api.md:827`) -- MEDIUM PII/gps_coordinates: GPS coordinates (location information) (`references/service_maker_api.md:829`) -- MEDIUM PII/gps_coordinates: GPS coordinates (location information) (`references/service_maker_api.md:1279`) -- MEDIUM PII/gps_coordinates: GPS coordinates (location information) (`references/use_cases_pipelines.md:842`) - -## Tier 2: Deduplication Summary - -Tier 2 validation reported findings. NVSkills-Eval ran 2 checks and found 34 total findings. - -Top findings: - -- HIGH DUPLICATE/duplicate: Duplicate content found within references/metamux_config.md: - "# default pts-tolerance is 60 ms." in references/metamux_config.md (lines 67-72) - vs "# default pts-tolerance is 60 ms." in references/metamux_config.md (lines 125-130) (`references/metamux_config.md:67`) -- HIGH DUPLICATE/duplicate: Duplicate content found across references/buffer_apis.md and references/kafka_messaging.md and references/service_maker_api.md and references/use_cases_pipelines.md and references/utilities_config.md: - "### Pattern 3: Selective Frame Capture" in references/buffer_apis.md (lines 1198-1199) - vs "### Pattern 5: Frame Analysis and Logging" in references/buffer_apis.md (lines 1339-1340) - vs "#### Example 2: Pipeline with Both Kafka and Display (Using Tee)" in references/kafka_messaging.md (lines 167-168) - vs "#### Custom Kafka Producer Probe" in references/kafka_messaging.md (lines 581-582) - vs "# Enable tensor output in nvinfer" in references/service_maker_api.md (lines 1329-1333) - vs "#### Approach 3: Custom Postprocessing with Tensor Metadata" in references/use_cases_pipelines.md (lines 837-841) - vs "### Pattern 3: Custom Postprocessing" in references/utilities_config.md (lines 1275-1279) (`references/buffer_apis.md:1198`) -- HIGH DUPLICATE/duplicate: Duplicate content found across references/buffer_apis.md and references/kafka_messaging.md and references/use_cases_pipelines.md and references/utilities_config.md: - "# from multiprocessing import Queue # Use this for MULTIPROCESSING!" in references/buffer_apis.md (lines 1059-1063) - vs "### Pattern 3: Selective Frame Capture" in references/buffer_apis.md (lines 1195-1197) - vs "### Pattern 5: Frame Analysis and Logging" in references/buffer_apis.md (lines 1336-1338) - vs "#### Example 2: Pipeline with Both Kafka and Display (Using Tee)" in references/kafka_messaging.md (lines 162-166) - vs "#### Custom Kafka Producer Probe" in references/kafka_messaging.md (lines 576-580) - vs "#### Approach 3: Custom Postprocessing with Tensor Metadata" in references/use_cases_pipelines.md (lines 832-836) - vs "### Pattern 3: Custom Postprocessing" in references/utilities_config.md (lines 1272-1274) (`references/buffer_apis.md:1059`) -- HIGH DUPLICATE/duplicate: Duplicate content found within references/utilities_config.md: - "### Pattern 1: Load and Use Source Configuration" in references/utilities_config.md (lines 1107-1109) - vs "### Pattern 1: Load and Use Source Configuration" in references/utilities_config.md (lines 1127-1128) - vs "### Pattern 1: Load and Use Source Configuration" in references/utilities_config.md (lines 1142-1143) (`references/utilities_config.md:1107`) -- HIGH DUPLICATE/duplicate: Duplicate content found within references/metamux_config.md: - "# mux all source if don't set it." in references/metamux_config.md (lines 74-78) - vs "# mux all source if don't set it." in references/metamux_config.md (lines 132-136) (`references/metamux_config.md:74`) - -## Publication Recommendation - -The skill should be reviewed before NVSkills-Eval publication. Skill owners should address the findings above and rerun NVSkills-Eval to refresh this benchmark. diff --git a/skills/deepstream/deepstream-dev/SKILL.md b/skills/deepstream/deepstream-dev/SKILL.md deleted file mode 100644 index 033844a1..00000000 --- a/skills/deepstream/deepstream-dev/SKILL.md +++ /dev/null @@ -1,180 +0,0 @@ ---- -name: deepstream-dev -description: NVIDIA DeepStream SDK 9.0 development with Python pyservicemaker API. Use when building video analytics pipelines, GStreamer-based video processing, TensorRT inference integration, object detection/tracking, or Kafka/message broker integration. -owner: NVIDIA CORPORATION -service: deepstream -version: 1.1.0 -reviewed: 2026-04-24 -license: CC-BY-4.0 AND Apache-2.0 ---- - -# DeepStream Development Skill - -When this skill is active, **ALWAYS read the relevant reference documents** before generating code. Do NOT rely on memory - the reference documents contain critical details about exact property names, correct API usage, and common pitfalls. - -## SDK and Architecture Quick Reference - -### DeepStream SDK 9.0 Version Requirements - -- **GStreamer**: 1.24.2 -- **NVIDIA Driver**: 590+ -- **CUDA**: 13.1 -- **TensorRT**: 10.14.1.48 -- **Platforms**: Ubuntu 24.04 (x86_64 and ARM64/Jetson) - -### Typical Pipeline Flow - -``` -Source → Stream Muxer → Inference → [Tracker] → OSD → Renderer -``` -Components in `[brackets]` are **optional** -- only add them when the user explicitly requests them. - -| Stage | Role | Key Element(s) | Required? | -|-------|------|-----------------|-----------| -| Source | Input from files, RTSP, cameras | `nvurisrcbin` (preferred), `nvmultiurisrcbin`, `filesrc` | Yes | -| Stream Muxer | Batches streams for inference | `nvstreammux` | Yes | -| Inference | TensorRT model execution | `nvinfer`, `nvinferserver` | Yes | -| Tracker | Multi-object tracking across frames | `nvtracker` | **Only if requested** | -| OSD | Draws bounding boxes, labels, overlays | `nvosdbin` | Yes (for visualization) | -| Renderer | Display or save output | `nveglglessink`, `nv3dsink`, `filesink` | Yes | - -### Memory Model - -DeepStream uses NVIDIA Video Memory Manager (NVMM) for zero-copy GPU buffer transfers. Caps strings use `memory:NVMM` to indicate GPU memory (e.g., `video/x-raw(memory:NVMM), format=NV12`). - -## Critical Rules - -1. **Only Add Requested Components**: Do NOT add pipeline elements the user did not ask for. - - **Tracker (`nvtracker`)**: Only add when the user explicitly requests tracking or object IDs across frames - - **Secondary GIEs**: Only add when the user requests classification or attribute extraction - - **Analytics (`nvdsanalytics`)**: Only add when the user requests line crossing, ROI counting, etc. - - **Message broker (`nvmsgbroker`/`nvmsgconv`)**: Only add when the user requests Kafka/cloud messaging - - When in doubt, build the **minimal working pipeline** and let the user ask for additions - -2. **Default to `nvurisrcbin` for Sources**: When the user says "camera", "stream", "video", or provides a file path: - - Always use `nvurisrcbin` -- it handles RTSP, HTTP, and local files (`file://`) transparently - - Only use `filesrc` + `qtdemux` + parser when the user explicitly needs raw file source control - - For RTSP/live sources, also set `live-source=1` on `nvstreammux` and `sync=0` on the sink - - Convert local paths to URI: `"file://" + os.path.abspath(path)` - -3. **Metadata Iteration**: Use `.frame_items` and `.object_items` (returns iterators, NOT lists) - - NEVER use `len()` on these - iterate to count - - Iterator can only be consumed once - -4. **Request Pad Syntax**: Use `"sink_%u"` template, NEVER literal pad names - ```python - pipeline.link(("decoder", "mux"), ("", "sink_%u")) # CORRECT - # pipeline.link(("decoder", "mux"), ("", "sink_0")) # WRONG - will fail - ``` - -5. **Platform Detection for Sinks**: - ```python - import platform - sink_type = "nv3dsink" if platform.processor() == "aarch64" else "nveglglessink" - ``` - -6. **Buffer Cloning**: Always clone buffers for async processing - ```python - tensor = buffer.extract(0).clone() # CRITICAL - ``` - -7. **Queue Types**: - - `queue.Queue` → Use with `threading.Thread` - - `multiprocessing.Queue` → Use with `multiprocessing.Process` - - Using wrong type causes silent data loss! - -8. **nvinfer Config Format**: - - YAML: Use `property:` section (NOT `model:`), `key: value` with space after colon - - INI: Use `[property]` section, `key=value` with equals sign - - Section MUST be named `property` - -9. **nvmsgbroker is a SINK**: Cannot have downstream elements - use `tee` to split pipeline - -10. **ALL Sinks Need async=0 for Tee Splits or Dynamic Sources**: CRITICAL for state transitions - ```python - # When using tee splits OR dynamic sources, ALL sinks MUST have async=0 - pipeline.add("nveglglessink", "sink", { - "sync": 0, "qos": 0, - "async": 0 # CRITICAL - prevents state transition deadlock - }) - ``` - **Symptom if missing**: Pipeline stays in PAUSED state, no video displays. - -11. **Built-in Probe Attachment**: `measure_fps_probe` can only be attached to processing elements (e.g., `nvinfer`, `nvosdbin`), **NOT** to sink elements. Attaching to a sink raises `RuntimeError: Probe failure`. - -12. **Dynamic ONNX Models Require `infer-dims`**: When the ONNX model has dynamic input shapes (e.g., exported with `dynamic=True` in Ultralytics YOLO, or with dynamic batch/height/width axes), you **MUST** add `infer-dims=C;H;W` to the nvinfer config. Without it, TensorRT sees `-1` for dynamic dimensions and fails with `setDimensions: Error Code 3`. Common values: - - YOLO models (640 input): `infer-dims=3;640;640` - - Models with 416 input: `infer-dims=3;416;416` - - Models with 1280 input: `infer-dims=3;1280;1280` - -13. **Ultralytics YOLO Output Format Depends on Model Generation** — newer models (v10+/v26+) output post-NMS results; older models (v8/v11) output raw pre-NMS tensors. The custom parser and `cluster-mode` **must** match the actual output: - - | Model generation | Output tensor shape | Fields | `cluster-mode` | - |------------------|--------------------|---------------------------------|----------------| - | v8 / v11 | `[batch, 84, 8400]` | `[features(4+80), anchors]` — raw cx/cy/w/h + class scores, no NMS | `2` (NMS) | - | v10 / v26+ | `[batch, 300, 6]` | `[max_det, (x1,y1,x2,y2,conf,cls)]` — already post-NMS, pixel coords | `4` (none) | - - **How to identify at runtime**: log `inferDims.d[0]` and `inferDims.d[1]` inside the custom parser. - - `d={84, 8400}` → pre-NMS (v8/v11 style) - - `d={300, 6}` → post-NMS (v10/v26+ style) - - **Symptom of mismatch**: If `cluster-mode: 2` is used with a post-NMS `[N, 6]` output, bounding boxes appear shifted by 45° or 135° from the actual objects (DeepStream's NMS incorrectly re-processes already-final coordinates). - If you see tilted or rotated boxes, also check the OBB / `rotation_angle` note in `references/nvinfer_config.md`: for non-OBB models, value-initialize `NvDsInferObjectDetectionInfo` with `obj{}` and keep `rotation_angle = 0`; plain `NvDsInferObjectDetectionInfo obj;` leaves fields uninitialized. - -14. **Virtual Environment Must Include pyservicemaker**: `pyservicemaker` is installed system-wide but is NOT accessible from a standard Python virtual environment. When a task requires a venv (e.g., for model download/conversion pip dependencies), **always install `pyservicemaker` and `pyyaml` inside the venv**. The venv setup in generated code and README must always include: - ```bash - python3 -m venv venv - source venv/bin/activate - pip install /opt/nvidia/deepstream/deepstream/service-maker/python/pyservicemaker*.whl pyyaml - pip install -r requirements.txt # other dependencies - ``` - **Symptom if missing**: `ModuleNotFoundError: No module named 'pyservicemaker'` when running the app inside the venv. - -## Key Paths (DeepStream 9.0) - -- Models: `/opt/nvidia/deepstream/deepstream/samples/models/` -- Primary Detector: `/opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/resnet18_trafficcamnet_pruned.onnx` -- Tracker lib: `/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so` -- Kafka lib: `/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so` -- Sample configs: `/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/` - -## Reference Documents - -**IMPORTANT**: Always read these documents for complete details. Do NOT generate code from memory. - -| Document | Use When | -|----------|----------| -| [references/gstreamer_plugins.md](references/gstreamer_plugins.md) | Looking up plugin properties, ALL properties listed | -| [references/service_maker_api.md](references/service_maker_api.md) | Using Pipeline/Flow API, metadata access, probes, EventMessageUserMetadata | -| [references/use_cases_pipelines.md](references/use_cases_pipelines.md) | Building pipelines: simple playback, multi-inference, cascaded GIE | -| [references/kafka_messaging.md](references/kafka_messaging.md) | Kafka/message broker setup, nvmsgconv/nvmsgbroker config, msg2p-newapi | -| [references/best_practices.md](references/best_practices.md) | Design patterns, common pitfalls, anti-patterns | -| [references/buffer_apis.md](references/buffer_apis.md) | BufferProvider/Feeder (injection), BufferRetriever/Receiver (extraction) | -| [references/media_extractor_advanced.md](references/media_extractor_advanced.md) | MediaExtractor, MediaChunk, FrameSampler | -| [references/utilities_config.md](references/utilities_config.md) | PerfMonitor, EngineFileMonitor, SourceConfig, SensorInfo, SmartRecordConfig | -| [references/nvinfer_config.md](references/nvinfer_config.md) | nvinfer config file format, ALL parameters | -| [references/tracker_config.md](references/tracker_config.md) | nvtracker config, NvDCF/IOU/DeepSORT/NvSORT | -| [references/troubleshooting.md](references/troubleshooting.md) | Error messages and solutions | -| [references/rest_api_dynamic.md](references/rest_api_dynamic.md) | REST API, dynamic source add/remove, nvmultiurisrcbin | -| [references/metamux_config.md](references/metamux_config.md) | nvdsmetamux config, parallel multi-model inference, metadata merging, source ID filtering | -| [references/docker_containers.md](references/docker_containers.md) | Docker images, Dockerfile examples, pyservicemaker install, container run commands | - -## Quick Error Reference - -| Error | Solution | -|-------|----------| -| `iterator has no len()` | Iterate to count, don't use `len()` | -| `pad template not found` | Use `"sink_%u"` not `"sink_0"` | -| Queue data loss | Use `multiprocessing.Queue` with `Process` | -| Config parse failed | Use `property:` not `model:` in YAML | -| `is-classifier` deprecation warning | Use `network-type: 1` instead of `is-classifier: 1` for classifiers; omit both for detectors | -| `min-boxes` unknown key warning | Use `minBoxes` (camelCase) in `class-attrs-*` sections, not `min-boxes` | -| Secondary GIE inactive | Set `process-mode: 2`, check `operate-on-gie-id` | -| Tee/dynamic source stuck PAUSED | Set `async: 0` on **ALL** sink elements | -| RTSP no data/reconnecting | Test URL with ffplay, check credentials | -| `RuntimeError: Probe failure` | `measure_fps_probe` cannot attach to sink elements; use `nvinfer` or `nvosdbin` instead | -| `setDimensions` negative dims / engine build failed | Add `infer-dims=C;H;W` for dynamic ONNX models (e.g., `infer-dims=3;640;640`) | -| `No module named 'pyservicemaker'` in venv | `pip install /opt/nvidia/deepstream/deepstream/service-maker/python/pyservicemaker*.whl pyyaml` inside the venv | -| `AttributeError: object has no attribute 'obj_label'` | Use `obj_meta.label` not `obj_meta.obj_label` in pyservicemaker (C API name differs from Python binding) | - - diff --git a/skills/deepstream/deepstream-dev/evals/evals.json b/skills/deepstream/deepstream-dev/evals/evals.json deleted file mode 100644 index 91564ad8..00000000 --- a/skills/deepstream/deepstream-dev/evals/evals.json +++ /dev/null @@ -1,97 +0,0 @@ -[ - { - "id": "deepstream-dev-001", - "question": "Using DeepStream SDK 9.0 and the pyservicemaker Python API, generate a pipeline that reads a local video file, runs primary inference with nvinfer using the ResNet18 TrafficCamNet detector shipped with DeepStream, draws bounding boxes with nvosdbin, and renders to the screen. The user did not ask for tracking or Kafka.", - "expected_skill": "deepstream-dev", - "expected_script": null, - "ground_truth": "A minimal pipeline using nvurisrcbin, nvstreammux, nvinfer, nvosdbin, and a platform-appropriate sink. It must avoid nvtracker, secondary GIEs, nvmsgbroker, and other optional components that were not requested.", - "expected_behavior": [ - "Use nvurisrcbin as the source for a local video file.", - "Batch streams through nvstreammux.", - "Use the sink_%u request-pad template when linking sources into nvstreammux.", - "Reference the bundled ResNet18 TrafficCamNet ONNX model path.", - "Do not add nvtracker because tracking was not requested.", - "Do not add nvmsgbroker or Kafka messaging because messaging was not requested." - ] - }, - { - "id": "deepstream-dev-002", - "question": "Build a DeepStream 9.0 pyservicemaker pipeline that ingests two RTSP cameras, runs primary detection, tracks objects across frames, displays the result in a tiled view, and publishes detection metadata to a Kafka broker. Cover the live-source and tee-split requirements.", - "expected_skill": "deepstream-dev", - "expected_script": null, - "ground_truth": "The pipeline uses nvurisrcbin for each RTSP source, sets live-source=1 on nvstreammux, includes nvtracker because tracking was requested, splits display and broker output with tee, sends metadata to nvmsgbroker, and sets async=0 on sinks.", - "expected_behavior": [ - "Configure nvstreammux with live-source=1 for RTSP input.", - "Include nvtracker because the user explicitly requested tracking.", - "Use tee to feed both display and broker branches.", - "Use nvmsgbroker for Kafka publishing.", - "Set async=0 on sinks in the tee branches to avoid state-transition deadlocks.", - "Use sync=0 on the live renderer path." - ] - }, - { - "id": "deepstream-dev-003", - "question": "Generate an nvinfer YAML config for a YOLOv11 model with 640x640 input exported from Ultralytics with dynamic=True. The model outputs a raw pre-NMS tensor of shape [batch, 84, 8400].", - "expected_skill": "deepstream-dev", - "expected_script": null, - "ground_truth": "The nvinfer YAML uses a property section, sets infer-dims=3;640;640 so TensorRT does not see dynamic -1 dimensions, and uses cluster-mode: 2 for DeepStream NMS because the output tensor is pre-NMS.", - "expected_behavior": [ - "Use the property section for the nvinfer YAML.", - "Set infer-dims to 3;640;640 for the dynamic ONNX input shape.", - "Use cluster-mode: 2 because YOLOv11 output is pre-NMS.", - "Do not set is-classifier for an object detector." - ] - }, - { - "id": "deepstream-dev-004", - "question": "Write a DeepStream pipeline that just plays a video file through inference and shows it on screen. Keep it as minimal as possible.", - "expected_skill": "deepstream-dev", - "expected_script": null, - "ground_truth": "A minimal video inference pipeline with nvurisrcbin, nvstreammux, nvinfer, nvosdbin, and a renderer. It should not add tracking, analytics, secondary classifiers, metadata brokers, or other optional elements that the user did not request.", - "expected_behavior": [ - "Do not add nvtracker when tracking was not requested.", - "Do not add nvdsanalytics when line crossing, ROI, or analytics were not requested.", - "Do not add a secondary GIE when secondary classification was not requested.", - "Do not add nvmsgbroker or nvmsgconv when messaging was not requested.", - "Still include nvinfer for the requested inference stage." - ] - }, - { - "id": "deepstream-dev-005", - "question": "My pyservicemaker probe runs len(frame.object_items) to count detections and I am installing my app inside a fresh python3 -m venv. It fails with ModuleNotFoundError: pyservicemaker and the probe raises 'iterator has no len()'. Fix both.", - "expected_skill": "deepstream-dev", - "expected_script": null, - "ground_truth": "Explain that frame.object_items and frame.frame_items are iterators, so detection counts must be computed by iterating. Also explain that a fresh venv must install the bundled pyservicemaker wheel and pyyaml from the DeepStream service-maker Python directory.", - "expected_behavior": [ - "State that object_items and frame_items are iterators and cannot be counted with len().", - "Show or describe counting by iterating over object_items.", - "Tell the user to install the bundled pyservicemaker wheel inside the venv.", - "Reference the DeepStream service-maker Python wheel directory under /opt/nvidia/deepstream/deepstream/service-maker/python/.", - "Also install pyyaml in the venv so YAML nvinfer configs can load." - ] - }, - { - "id": "deepstream-dev-006-negative", - "question": "Train a custom image classifier from scratch in PyTorch and export it to CoreML for iOS. I do not need any DeepStream pipeline setup.", - "expected_skill": null, - "expected_script": null, - "ground_truth": "The deepstream-dev skill should not be selected for this request because it is outside DeepStream pipeline and SDK usage scope.", - "expected_behavior": [ - "Do not activate deepstream-dev for this request.", - "Avoid DeepStream-specific pipeline guidance and plugin recommendations.", - "Respond with a generic fallback or suggest a more relevant non-DeepStream path." - ] - }, - { - "id": "deepstream-dev-007-negative", - "question": "How do I configure a MySQL replication slave on Ubuntu 22.04?", - "expected_skill": null, - "expected_script": null, - "ground_truth": "The deepstream-dev skill should not be selected because this request is unrelated to DeepStream SDK development or pipeline operations.", - "expected_behavior": [ - "Do not activate deepstream-dev for this request.", - "State that the request is outside DeepStream scope and avoid pipeline or plugin guidance.", - "Suggest a MySQL-focused resource or workflow." - ] - } -] diff --git a/skills/deepstream/deepstream-dev/references/best_practices.md b/skills/deepstream/deepstream-dev/references/best_practices.md deleted file mode 100644 index 783f1130..00000000 --- a/skills/deepstream/deepstream-dev/references/best_practices.md +++ /dev/null @@ -1,1169 +0,0 @@ -# DeepStream Best Practices and Design Patterns - -## Overview - -This document provides comprehensive best practices, design patterns, and optimization strategies for building production-grade DeepStream applications. These guidelines help ensure performance, reliability, maintainability, and scalability. - ---- - -## 1. Pipeline Design Patterns - -### Pattern 1: Modular Pipeline Construction - -**Best Practice**: Build pipelines in modular, reusable functions. - -```python -def create_source_pipeline(video_path, num_streams=1): - """Create reusable source pipeline""" - sources = [] - for i in range(num_streams): - sources.extend([ - {"element": "filesrc", "name": f"src{i}", "props": {"location": video_path}}, - {"element": "h264parse", "name": f"parser{i}"}, - {"element": "nvv4l2decoder", "name": f"decoder{i}"} - ]) - return sources - -def create_inference_pipeline(config_files): - """Create reusable inference pipeline""" - inference_elements = [] - for idx, config in enumerate(config_files): - unique_id = idx + 1 - inference_elements.append({ - "element": "nvinfer", - "name": f"infer{idx}", - "props": { - "config-file-path": config, - "unique-id": unique_id - } - }) - return inference_elements - -def build_complete_pipeline(video_path, infer_configs): - """Compose complete pipeline from modules""" - pipeline = Pipeline("modular-pipeline") - - # Add source modules - sources = create_source_pipeline(video_path) - for src_config in sources: - pipeline.add(src_config["element"], src_config["name"], src_config.get("props", {})) - - # Add inference modules - infer_elements = create_inference_pipeline(infer_configs) - for infer_config in infer_elements: - pipeline.add(infer_config["element"], infer_config["name"], infer_config.get("props", {})) - - # Link modules - # ... linking logic ... - - return pipeline -``` - -### Pattern 2: Configuration-Driven Pipelines - -**Best Practice**: Use YAML/JSON configuration files for pipeline definition. - -```python -import yaml - -def load_pipeline_config(config_path): - """Load pipeline configuration from YAML""" - with open(config_path, 'r') as f: - return yaml.safe_load(f) - -def build_pipeline_from_config(config): - """Build pipeline from configuration""" - pipeline = Pipeline(config["pipeline"]["name"]) - - # Add elements from config - for elem_config in config["pipeline"]["elements"]: - pipeline.add( - elem_config["type"], - elem_config["name"], - elem_config.get("properties", {}) - ) - - # Link elements from config - for link_group in config["pipeline"]["links"]: - pipeline.link(*link_group) - - return pipeline -``` - -### Pattern 3: Factory Pattern for Element Creation - -**Best Practice**: Use factory functions for element creation with validation. - -```python -def create_decoder(platform="x86"): - """Factory function for decoder creation""" - decoder_props = {} - - if platform == "jetson": - decoder_props["device"] = "/dev/video0" - - return { - "element": "nvv4l2decoder", - "name": "decoder", - "props": decoder_props - } - -def create_sink(platform="x86", window_config=None): - """Factory function for sink creation""" - sink_type = "nv3dsink" if platform == "jetson" else "nveglglessink" - sink_props = {"sync": 1} - - if window_config: - sink_props.update(window_config) - - return { - "element": sink_type, - "name": "sink", - "props": sink_props - } -``` - -### Pattern 4: Strategy Pattern for Processing - -**Best Practice**: Use strategy pattern for different processing approaches. - -```python -class ProcessingStrategy: - """Base class for processing strategies""" - def process(self, batch_meta): - raise NotImplementedError - -class DetectionStrategy(ProcessingStrategy): - """Strategy for object detection""" - def process(self, batch_meta): - # Detection-specific processing - pass - -class ClassificationStrategy(ProcessingStrategy): - """Strategy for classification""" - def process(self, batch_meta): - # Classification-specific processing - pass - -class PipelineBuilder: - """Pipeline builder with strategy pattern""" - def __init__(self, strategy: ProcessingStrategy): - self.strategy = strategy - - def build(self): - pipeline = Pipeline("strategy-pipeline") - # Build pipeline based on strategy - return pipeline -``` - ---- - -## 2. Performance Optimization - -### Optimization 1: Batch Size Tuning - -**Best Practice**: Optimize batch sizes based on GPU memory and model complexity. - -```python -def calculate_optimal_batch_size( - num_streams, - gpu_memory_gb, - model_complexity="medium", - resolution=(1920, 1080) -): - """ - Calculate optimal batch size - - Args: - num_streams: Number of input streams - gpu_memory_gb: Available GPU memory in GB - model_complexity: "low", "medium", "high" - resolution: (width, height) tuple - """ - # Base memory per stream (GB) - base_memory = { - (1920, 1080): 1.0, - (1280, 720): 0.5, - (640, 480): 0.25 - }.get(resolution, 1.0) - - # Model complexity multiplier - complexity_mult = { - "low": 1.0, - "medium": 1.5, - "high": 2.0 - }.get(model_complexity, 1.5) - - # Calculate max batch size - memory_per_stream = base_memory * complexity_mult - max_batch = int(gpu_memory_gb / memory_per_stream) - - # Clamp to number of streams and use power of 2 - optimal_batch = min(max_batch, num_streams) - optimal_batch = 2 ** (optimal_batch.bit_length() - 1) # Round down to power of 2 - - return max(1, optimal_batch) -``` - -### Optimization 2: Inference Precision Selection - -**Best Practice**: Use appropriate precision based on accuracy requirements. - -```python -def get_inference_config(precision="fp16", model_path=None): - """ - Get inference configuration with optimal precision - - Args: - precision: "fp32", "fp16", "int8" - model_path: Path to model file - """ - precision_map = { - "fp32": 0, # Highest accuracy, slowest - "fp16": 1, # Good balance (recommended) - "int8": 2 # Fastest, may need calibration - } - - config = { - "network-mode": precision_map.get(precision, 1), - "model-engine-file": model_path - } - - if precision == "int8": - config["calibration-file"] = model_path.replace(".engine", "_calibration.bin") - - return config -``` - -### Optimization 3: Pipeline Parallelism - -**Best Practice**: Run multiple pipelines on different GPUs for scalability. - -```python -from multiprocessing import Process - -def run_pipeline_on_gpu(pipeline_config, gpu_id): - """Run pipeline on specific GPU""" - import os - os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) - - pipeline = build_pipeline(pipeline_config) - pipeline.start().wait() - -def run_multi_gpu_pipelines(pipeline_configs): - """Run pipelines on multiple GPUs""" - processes = [] - - for idx, config in enumerate(pipeline_configs): - gpu_id = idx % get_num_gpus() # Distribute across GPUs - process = Process( - target=run_pipeline_on_gpu, - args=(config, gpu_id) - ) - process.start() - processes.append(process) - - # Wait for all processes - for process in processes: - process.join() -``` - -### Optimization 4: Memory Pool Configuration - -**Best Practice**: Configure appropriate buffer pool sizes. - -```python -def configure_buffer_pools(pipeline, num_streams, batch_size): - """Configure buffer pools for optimal performance""" - # Calculate buffer pool size - # Rule: pool_size >= (num_streams / batch_size) * 2 - pool_size = max(4, (num_streams // batch_size) * 2) - - # Configure queues - for elem in pipeline.elements: - if elem.name.startswith("queue"): - elem.set_property("max-size-buffers", pool_size * 10) - elem.set_property("max-size-time", 0) # Unlimited time - elem.set_property("leaky", 2) # Leaky downstream -``` - ---- - -## 3. Memory Management - -### Best Practice 1: Proper Cleanup - -```python -class ManagedPipeline: - """Pipeline with proper resource management""" - def __init__(self, pipeline): - self.pipeline = pipeline - self.probes = [] - - def add_probe(self, element_name, probe): - """Add probe and track for cleanup""" - self.pipeline.attach(element_name, probe) - self.probes.append(probe) - - def start(self): - """Start pipeline""" - self.pipeline.start() - - def stop(self): - """Stop pipeline and cleanup""" - self.pipeline.set_state(GST_STATE_NULL) - - # Cleanup probes - for probe in self.probes: - if hasattr(probe, 'close'): - probe.close() - if hasattr(probe, 'flush'): - probe.flush() - - def __enter__(self): - self.start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.stop() -``` - -### Best Practice 2: Memory Monitoring - -```python -import pynvml - -class MemoryMonitor: - """Monitor GPU memory usage""" - def __init__(self): - pynvml.nvmlInit() - self.handle = pynvml.nvmlDeviceGetHandleByIndex(0) - - def get_memory_info(self): - """Get current GPU memory usage""" - info = pynvml.nvmlDeviceGetMemoryInfo(self.handle) - return { - "total": info.total / (1024**3), # GB - "used": info.used / (1024**3), # GB - "free": info.free / (1024**3) # GB - } - - def check_memory_pressure(self, threshold=0.9): - """Check if memory usage exceeds threshold""" - info = self.get_memory_info() - usage_ratio = info["used"] / info["total"] - return usage_ratio > threshold - -# Usage in pipeline -monitor = MemoryMonitor() -if monitor.check_memory_pressure(): - print("Warning: High GPU memory usage!") -``` - ---- - -## 4. Error Handling and Resilience - -### Pattern 1: Retry Logic - -```python -import time -from functools import wraps - -def retry(max_attempts=3, delay=1.0, backoff=2.0): - """Retry decorator with exponential backoff""" - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - attempts = 0 - current_delay = delay - - while attempts < max_attempts: - try: - return func(*args, **kwargs) - except Exception as e: - attempts += 1 - if attempts >= max_attempts: - raise - print(f"Attempt {attempts} failed: {e}. Retrying in {current_delay}s...") - time.sleep(current_delay) - current_delay *= backoff - return wrapper - return decorator - -@retry(max_attempts=3, delay=1.0) -def initialize_kafka_producer(config): - """Initialize Kafka producer with retry""" - return KafkaProducer(bootstrap_servers=config["servers"]) -``` - -### Pattern 2: Circuit Breaker - -```python -class CircuitBreaker: - """Circuit breaker pattern for external services""" - def __init__(self, failure_threshold=5, timeout=60): - self.failure_threshold = failure_threshold - self.timeout = timeout - self.failure_count = 0 - self.last_failure_time = None - self.state = "closed" # closed, open, half_open - - def call(self, func, *args, **kwargs): - """Execute function with circuit breaker""" - if self.state == "open": - if time.time() - self.last_failure_time > self.timeout: - self.state = "half_open" - else: - raise Exception("Circuit breaker is OPEN") - - try: - result = func(*args, **kwargs) - self.on_success() - return result - except Exception as e: - self.on_failure() - raise - - def on_success(self): - """Reset on success""" - self.failure_count = 0 - self.state = "closed" - - def on_failure(self): - """Track failures""" - self.failure_count += 1 - self.last_failure_time = time.time() - - if self.failure_count >= self.failure_threshold: - self.state = "open" -``` - -### Pattern 3: Graceful Shutdown - -```python -import signal -import sys - -class GracefulShutdown: - """Handle graceful shutdown signals""" - def __init__(self): - self.shutdown_requested = False - signal.signal(signal.SIGINT, self._signal_handler) - signal.signal(signal.SIGTERM, self._signal_handler) - - def _signal_handler(self, signum, frame): - """Handle shutdown signals""" - print(f"\nReceived signal {signum}. Initiating graceful shutdown...") - self.shutdown_requested = True - - def is_shutdown_requested(self): - """Check if shutdown was requested""" - return self.shutdown_requested - -# Usage -shutdown_handler = GracefulShutdown() - -def run_pipeline_with_graceful_shutdown(pipeline): - """Run pipeline with graceful shutdown handling""" - try: - pipeline.start() - - while not shutdown_handler.is_shutdown_requested(): - time.sleep(0.1) - # Check pipeline state, process messages, etc. - - print("Shutting down pipeline...") - pipeline.stop() - except Exception as e: - print(f"Error: {e}") - pipeline.stop() -``` - ---- - -## 5. Code Organization and Maintainability - -### Pattern 1: Separation of Concerns - -```python -# config.py - Configuration management -class PipelineConfig: - def __init__(self, config_path): - self.config = self._load_config(config_path) - - def get_source_config(self): - return self.config["source"] - - def get_inference_config(self): - return self.config["inference"] - -# pipeline_builder.py - Pipeline construction -class PipelineBuilder: - def __init__(self, config: PipelineConfig): - self.config = config - - def build(self): - pipeline = Pipeline("main") - # Build pipeline from config - return pipeline - -# processors.py - Processing logic -class MetadataProcessor: - def process(self, batch_meta): - # Processing logic - pass - -# main.py - Application entry point -def main(): - config = PipelineConfig("config.yml") - builder = PipelineBuilder(config) - pipeline = builder.build() - pipeline.start().wait() -``` - -### Pattern 2: Dependency Injection - -```python -class PipelineService: - """Service class with dependency injection""" - def __init__(self, - source_factory, - inference_factory, - sink_factory, - processor_factory): - self.source_factory = source_factory - self.inference_factory = inference_factory - self.sink_factory = sink_factory - self.processor_factory = processor_factory - - def create_pipeline(self): - """Create pipeline using injected factories""" - pipeline = Pipeline("service-pipeline") - - # Use factories to create elements - source = self.source_factory.create() - inference = self.inference_factory.create() - sink = self.sink_factory.create() - - # Build pipeline - # ... - - return pipeline -``` - ---- - -## 6. Testing Strategies - -### Unit Testing - -```python -import unittest -from unittest.mock import Mock, patch - -class TestMetadataProcessor(unittest.TestCase): - def setUp(self): - self.processor = MetadataProcessor() - - def test_process_empty_batch(self): - """Test processing empty batch""" - batch_meta = Mock() - batch_meta.frame_items = [] - - # Should not raise exception - self.processor.process(batch_meta) - - def test_process_with_objects(self): - """Test processing batch with objects""" - batch_meta = Mock() - frame_meta = Mock() - frame_meta.object_items = [Mock(), Mock()] - batch_meta.frame_items = [frame_meta] - - self.processor.process(batch_meta) - # Assert expected behavior -``` - -### Integration Testing - -```python -class TestPipelineIntegration(unittest.TestCase): - def test_pipeline_creation(self): - """Test pipeline creation""" - config = PipelineConfig("test_config.yml") - builder = PipelineBuilder(config) - pipeline = builder.build() - - self.assertIsNotNone(pipeline) - self.assertEqual(len(pipeline.elements), expected_count) - - def test_pipeline_linking(self): - """Test pipeline element linking""" - pipeline = create_test_pipeline() - - # Verify links are correct - # ... -``` - -### Performance Testing - -```python -import time - -class PerformanceTest: - def test_fps_measurement(self, pipeline, duration=10): - """Measure FPS of pipeline""" - start_time = time.time() - frame_count = 0 - - def frame_callback(batch_meta): - nonlocal frame_count - frame_count += len(batch_meta.frame_items) - - pipeline.attach("infer", Probe("fps", frame_callback)) - pipeline.start() - - time.sleep(duration) - pipeline.stop() - - elapsed = time.time() - start_time - fps = frame_count / elapsed - - print(f"Measured FPS: {fps:.2f}") - return fps -``` - ---- - -## 7. Deployment Considerations - -### Configuration Management - -```python -import os -from pathlib import Path - -class EnvironmentConfig: - """Load configuration based on environment""" - def __init__(self): - self.env = os.getenv("DEEPSTREAM_ENV", "development") - self.config_dir = Path("/etc/deepstream") / self.env - - def get_config_path(self, config_name): - """Get configuration file path""" - return self.config_dir / f"{config_name}.yml" - - def get_model_path(self, model_name): - """Get model file path""" - return Path("/opt/models") / self.env / model_name -``` - -### Logging Best Practices - -```python -import logging -import sys - -def setup_logging(level=logging.INFO, log_file=None): - """Setup logging configuration""" - handlers = [logging.StreamHandler(sys.stdout)] - - if log_file: - handlers.append(logging.FileHandler(log_file)) - - logging.basicConfig( - level=level, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=handlers - ) - -# Usage -logger = logging.getLogger(__name__) -logger.info("Pipeline started") -logger.error("Error occurred", exc_info=True) -``` - ---- - -## 8. Security Best Practices - -### Secure Configuration - -```python -import os -from cryptography.fernet import Fernet - -class SecureConfig: - """Handle sensitive configuration securely""" - def __init__(self): - self.key = os.getenv("CONFIG_ENCRYPTION_KEY") - self.cipher = Fernet(self.key) if self.key else None - - def get_secret(self, secret_name): - """Get decrypted secret""" - encrypted = os.getenv(secret_name) - if self.cipher and encrypted: - return self.cipher.decrypt(encrypted.encode()).decode() - return encrypted -``` - -### Input Validation - -```python -def validate_video_path(path): - """Validate video file path""" - if not os.path.exists(path): - raise ValueError(f"Video file not found: {path}") - - allowed_extensions = ['.h264', '.h265', '.mp4', '.mkv'] - if not any(path.endswith(ext) for ext in allowed_extensions): - raise ValueError(f"Unsupported video format: {path}") - - return path - -def validate_config_file(config_path): - """Validate configuration file""" - if not os.path.exists(config_path): - raise ValueError(f"Config file not found: {config_path}") - - # Additional validation - # ... - - return config_path -``` - ---- - -## 9. Monitoring and Observability - -### Metrics Collection - -```python -from prometheus_client import Counter, Histogram, Gauge - -# Define metrics -frames_processed = Counter('deepstream_frames_processed_total', 'Total frames processed') -inference_latency = Histogram('deepstream_inference_latency_seconds', 'Inference latency') -gpu_memory_usage = Gauge('deepstream_gpu_memory_bytes', 'GPU memory usage') - -class MetricsCollector(BatchMetadataOperator): - """Collect metrics from pipeline""" - def handle_metadata(self, batch_meta): - for frame_meta in batch_meta.frame_items: - frames_processed.inc() - - # Record inference latency if available - if hasattr(frame_meta, 'inference_time'): - inference_latency.observe(frame_meta.inference_time) -``` - ---- - -## 10. Common Anti-Patterns to Avoid - -### Anti-Pattern 1: Blocking Operations in Probes - -**Bad**: -```python -class BadProbe(BatchMetadataOperator): - def handle_metadata(self, batch_meta): - # Blocking network call in probe - response = requests.get("http://api.example.com/data") - # This blocks the pipeline! -``` - -**Good**: -```python -import queue -import threading - -class GoodProbe(BatchMetadataOperator): - def __init__(self): - super().__init__() - self.queue = queue.Queue() - self.worker = threading.Thread(target=self._process_queue) - self.worker.start() - - def handle_metadata(self, batch_meta): - # Non-blocking: add to queue - self.queue.put(batch_meta) - - def _process_queue(self): - while True: - batch_meta = self.queue.get() - # Process asynchronously - response = requests.get("http://api.example.com/data") -``` - -### Anti-Pattern 2: Ignoring Memory Limits - -**Bad**: -```python -# No batch size limits -pipeline.add("nvstreammux", "mux", {"batch-size": 100}) # Too large! -``` - -**Good**: -```python -# Calculate optimal batch size -optimal_batch = calculate_optimal_batch_size(num_streams, gpu_memory) -pipeline.add("nvstreammux", "mux", {"batch-size": optimal_batch}) -``` - -### Anti-Pattern 3: Not Handling Errors - -**Bad**: -```python -pipeline.start().wait() # No error handling -``` - -**Good**: -```python -try: - pipeline.start().wait() -except Exception as e: - logger.error(f"Pipeline error: {e}", exc_info=True) - pipeline.stop() - raise -``` - -### Anti-Pattern 4: Missing async=0 on All Sinks (Tee/Dynamic Sources) - -**CRITICAL**: When using `tee` to split a pipeline into multiple branches OR using dynamic sources (nvmultiurisrcbin), **ALL sink elements** must have `async: 0`. This is the most common cause of pipelines stuck in PAUSED state. - -**Bad** - Pipeline stuck in PAUSED: -```python -# ❌ WRONG - Only display sink has async=0, Kafka sink is missing it -# Pipeline will be STUCK IN PAUSED STATE! - -# Tee split -pipeline.add("tee", "tee") - -# Metadata branch - MISSING async=0! -pipeline.add("nvmsgbroker", "msgbroker", { - "proto-lib": "/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so", - "conn-str": "localhost;9092", - "sync": 0, - # async: 0 is MISSING! Pipeline will hang! -}) - -# Video branch - has async=0 but it's not enough -pipeline.add("nveglglessink", "sink", { - "sync": 0, - "async": 0 # This alone is NOT enough - ALL sinks need it! -}) -``` - -**Good** - All sinks have async=0: -```python -# ✅ CORRECT - ALL sinks have async=0 - -# Tee split -pipeline.add("tee", "tee") - -# Metadata branch - Kafka sink with async=0 -pipeline.add("nvmsgbroker", "msgbroker", { - "proto-lib": "/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so", - "conn-str": "localhost;9092", - "sync": 0, - "async": 0 # CRITICAL: Required on ALL sinks! -}) - -# Video branch - display sink with async=0 -pipeline.add("nveglglessink", "sink", { - "sync": 0, - "qos": 0, - "async": 0 # CRITICAL: Required on ALL sinks! -}) -``` - -**Symptoms of this bug**: -- Camera shows "added successfully" in logs -- Pipeline elements transition to READY, then PAUSED -- Pipeline never transitions to PLAYING -- No video display, no data flowing -- No error messages (silent failure) - -**Rule**: When using `tee` or dynamic sources, ALWAYS set `async: 0` on EVERY sink element in the pipeline. - -### Anti-Pattern 5: Using threading.Queue with multiprocessing.Process - -**CRITICAL**: This is a common and subtle bug that causes data loss! - -When using `multiprocessing.Process` to run pipelines in separate processes, you MUST use `multiprocessing.Queue` for inter-process communication. A regular `queue.Queue` (from the `queue` module) only works within a single process. - -**Bad** - Data silently lost: -```python -from multiprocessing import Process -from queue import Queue # WRONG! This is a threading queue - -class MultiStreamProcessor: - def __init__(self): - # This queue WILL NOT work across process boundaries! - self.batch_queue = Queue() # BAD: threading.Queue - - def start(self, use_multiprocessing=True): - for stream in self.streams: - if use_multiprocessing: - # Child process gets a COPY of the queue - # Any data put into it never reaches the parent! - process = Process( - target=self._run_pipeline, - args=(stream, self.batch_queue) - ) - process.start() -``` - -**Good** - Use multiprocessing.Queue for inter-process communication: -```python -from multiprocessing import Process, Queue as MPQueue # Correct! -from queue import Queue as ThreadQueue - -class MultiStreamProcessor: - def __init__(self, use_multiprocessing=True): - # Choose the right queue type based on usage - if use_multiprocessing: - self.batch_queue = MPQueue() # CORRECT: multiprocessing.Queue - else: - self.batch_queue = ThreadQueue() # For single-process/threading - - def start(self, use_multiprocessing=True): - for stream in self.streams: - if use_multiprocessing: - # multiprocessing.Queue properly shares data across processes - process = Process( - target=self._run_pipeline, - args=(stream, self.batch_queue) - ) - process.start() -``` - -**Alternative - Use threading instead of multiprocessing**: -```python -import threading -from queue import Queue # OK for threading - -class MultiStreamProcessor: - def __init__(self): - self.batch_queue = Queue() # OK: threading.Queue for threads - - def start(self): - for stream in self.streams: - # Threads share memory, so queue.Queue works fine - thread = threading.Thread( - target=self._run_pipeline, - args=(stream, self.batch_queue) - ) - thread.start() -``` - -**Key Rules**: -1. `queue.Queue` → Use with `threading.Thread` (same process) -2. `multiprocessing.Queue` → Use with `multiprocessing.Process` (cross-process) -3. When in doubt, set `use_multiprocessing=False` and use threads -4. Always add debug logs to verify data flows through queues correctly - -**Symptoms of this bug**: -- Pipeline appears to run normally -- No error messages -- Downstream processing (e.g., VLM, Kafka) never receives data -- Statistics show 0 batches/messages processed - ---- - -## 11. Common Pitfalls and Code Generation Errors - -This section documents common mistakes encountered when generating DeepStream code, to prevent them in future. - -### Pitfall 1: Using len() on Metadata Iterators - -**Problem**: `frame_meta.object_items`, `frame_meta.tensor_items`, and `frame_meta.user_items` return **iterators**, not lists. - -**Error**: -``` -TypeError: object of type 'iterator' has no len() -``` - -**Bad Code**: -```python -# ❌ WRONG - Causes crash -count = len(frame_meta.object_items) - -# ❌ WRONG - Second loop is empty (iterator already consumed) -for obj in frame_meta.object_items: - process(obj) -for obj in frame_meta.object_items: - count += 1 -``` - -**Correct Code**: -```python -# ✅ CORRECT - Count while iterating -obj_count = 0 -for obj in frame_meta.object_items: - obj_count += 1 - process(obj) -``` - -### Pitfall 2: Incorrect nvinfer Configuration Syntax - -**Problem**: nvinfer supports **both YAML and INI-style formats**, but the syntax must be correct for each format. - -**Error**: -``` -Configuration file parsing failed -``` - -**Common Mistakes**: -```yaml -# ❌ WRONG - Incorrect section name (should be 'property', not 'model') -model: - model-engine-file: /path/to/model.engine - batch-size: 1 - -# ❌ WRONG - Mixing formats (YAML syntax in .txt file or vice versa) -``` - -**Correct YAML Config** (`.yml`): -```yaml -# ✅ CORRECT YAML format -property: - gpu-id: 0 - onnx-file: /opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/resnet18_trafficcamnet_pruned.onnx - labelfile-path: /opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/labels.txt - batch-size: 1 - network-mode: 2 - num-detected-classes: 4 - process-mode: 1 - cluster-mode: 2 - -class-attrs-all: - topk: 20 - pre-cluster-threshold: 0.2 -``` - -**Correct INI-style Config** (`.txt`): -```ini -# ✅ CORRECT INI-style format -[property] -gpu-id=0 -onnx-file=/opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/resnet18_trafficcamnet_pruned.onnx -labelfile-path=/opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/labels.txt -batch-size=1 -network-mode=2 -num-detected-classes=4 -process-mode=1 -cluster-mode=2 - -[class-attrs-all] -topk=20 -pre-cluster-threshold=0.2 -``` - -**Key Rules**: -- YAML format: Use `property:` (no brackets), `key: value` with colon+space -- INI format: Use `[property]` (with brackets), `key=value` with equals sign -- Section must be named `property` (not `model` or other names) -- Don't mix formats in the same file - -### Pitfall 3: Using Wrong Model (ResNet10 vs ResNet18) - -**Problem**: DeepStream samples use **ResNet18** TrafficCamNet model, not ResNet10. - -**Correct Model Paths**: -``` -/opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/ -├── resnet18_trafficcamnet_pruned.onnx # ✅ Use this ONNX model -├── labels.txt # Class labels -└── cal_trt.bin # INT8 calibration (optional) -``` - -**In nvinfer config**: -```ini -[property] -onnx-file=/opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/resnet18_trafficcamnet_pruned.onnx -labelfile-path=/opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/labels.txt -``` - -### Pitfall 4: nvv4l2decoder Output Format Assumption - -**Fact**: `nvv4l2decoder` outputs `video/x-raw(memory:NVMM)` - already in GPU memory format. - -**Common Mistake**: Adding unnecessary `nvvideoconvert` after decoder. - -**Unnecessary Code**: -```python -# ❌ UNNECESSARY - nvv4l2decoder already outputs NVMM format -pipeline.add("nvv4l2decoder", "decoder") -pipeline.add("nvvideoconvert", "conv") # Not needed! -pipeline.add("nvstreammux", "mux") -``` - -**Correct Code**: -```python -# ✅ CORRECT - Direct connection, no converter needed -pipeline.add("nvv4l2decoder", "decoder") -pipeline.add("nvstreammux", "mux") -pipeline.link(("decoder", "mux"), ("", "sink_%u")) -``` - -### Pitfall 5: Built-in Probe Usage - -**Fact**: `measure_fps_probe` is a valid built-in probe, but must be attached to the correct element. - -**Correct Usage**: -```python -# Attach to inference element for FPS measurement -pipeline.attach("infer", "measure_fps_probe", "fps-probe") -``` - -**If probe attachment fails**, implement custom FPS measurement: -```python -class FPSCounter(BatchMetadataOperator): - def __init__(self): - super().__init__() - self.start_time = None - self.frame_count = 0 - - def handle_metadata(self, batch_meta): - if self.start_time is None: - self.start_time = time.time() - self.frame_count += 1 - elapsed = time.time() - self.start_time - if elapsed > 0 and self.frame_count % 30 == 0: - print(f"FPS: {self.frame_count / elapsed:.2f}") - -pipeline.attach("infer", Probe("fps-counter", FPSCounter())) -``` - ---- - -## Summary - -Following these best practices and patterns will help you build robust, performant, and maintainable DeepStream applications. Key takeaways: - -1. **Design for modularity**: Use patterns like Factory, Strategy, and Dependency Injection -2. **Optimize performance**: Tune batch sizes, use appropriate precision, enable parallelism -3. **Manage resources**: Proper cleanup, memory monitoring, buffer pool configuration -4. **Handle errors gracefully**: Retry logic, circuit breakers, graceful shutdown -5. **Test thoroughly**: Unit tests, integration tests, performance tests -6. **Monitor and observe**: Metrics collection, logging, health checks -7. **Secure your application**: Input validation, secure configuration, access control -8. **Use correct Queue types**: - - `queue.Queue` → for threading (same process) - - `multiprocessing.Queue` → for multiprocessing (cross-process) - - **NEVER** use `queue.Queue` with `multiprocessing.Process` - data will be silently lost! -9. **Set async=0 on ALL sinks when using tee or dynamic sources**: - - When pipeline uses `tee` to split into multiple branches, ALL sink elements need `async: 0` - - When using dynamic sources (nvmultiurisrcbin), ALL sinks need `async: 0` - - **Symptom if missing**: Pipeline stuck in PAUSED state, no video/data flows - - This applies to display sinks, Kafka sinks, file sinks - ALL sinks! -10. **Avoid common code generation pitfalls**: - - **NEVER** use `len()` on metadata iterators (`object_items`, `tensor_items`, `user_items`) - - **USE** correct syntax for nvinfer config (YAML: `property:` with `: `, or INI: `[property]` with `=`) - - **USE** ResNet18 model (`resnet18_trafficcamnet_pruned.onnx`) from DeepStream samples - - **KNOW** that `nvv4l2decoder` outputs NVMM format (no converter needed before nvstreammux) - -These practices ensure your DeepStream applications are production-ready and scalable. - diff --git a/skills/deepstream/deepstream-dev/references/buffer_apis.md b/skills/deepstream/deepstream-dev/references/buffer_apis.md deleted file mode 100644 index 0c169d46..00000000 --- a/skills/deepstream/deepstream-dev/references/buffer_apis.md +++ /dev/null @@ -1,1670 +0,0 @@ -# Buffer Provider and Retriever APIs - -## Overview - -DeepStream Service Maker provides two complementary APIs for custom data injection and extraction: - -1. **Media Extractor (BufferProvider/Feeder)** - Inject custom data INTO pipelines -2. **Frame Selector (BufferRetriever/Receiver)** - Extract data FROM pipelines - -## When to Use Each API - -### Use BufferProvider/Feeder When: -- You need to inject custom video frames from non-standard sources -- You want to generate synthetic video data for testing -- You have pre-processed frames to feed into the pipeline -- You need to implement custom video sources beyond file/RTSP -- You want to transfer frames FROM another pipeline or system INTO DeepStream - -**See**: Part 1 below for detailed API reference and implementation patterns. - -### Use BufferRetriever/Receiver When: -- You need to extract frames for custom processing outside the pipeline -- You want to save specific frames to disk or external storage -- You need to collect inference results with frame data -- You want to implement custom frame selection logic -- You want to transfer frames FROM DeepStream TO another pipeline or system - -**See**: Part 2 below for detailed API reference and implementation patterns. - -## Common Patterns - -### Pattern 1: Pipeline-to-Pipeline Transfer -Transfer frames between two DeepStream pipelines. - -``` -Pipeline A -> BufferRetriever -> Queue -> BufferProvider -> Pipeline B -``` - -**Use Case**: Process video in one pipeline, then re-process results in another - -**Details**: See Part 1 Pattern 3 (Frame Queue Injection) and Part 2 Pattern 2 (Frame Queue Transfer) - -### Pattern 2: Custom Video Source -Read from custom camera or video source. - -``` -Custom Source -> BufferProvider -> appsrc -> DeepStream Pipeline -``` - -**Use Case**: Integrate non-standard cameras or video sources - -**Details**: See Part 1 Pattern 1 (File-Based Custom Video Source) - -### Pattern 3: Frame Extraction -Extract frames from pipeline for archival or analysis. - -``` -DeepStream Pipeline -> appsink -> BufferRetriever -> Save/Process -``` - -**Use Case**: Save frames at intervals, capture detection screenshots - -**Details**: See Part 2 Pattern 1 (Frame Extraction and Saving) - -### Pattern 4: Synthetic Data Generation -Generate test data for pipeline validation. - -``` -Synthetic Generator -> BufferProvider -> appsrc -> DeepStream Pipeline -``` - -**Use Case**: Testing, simulation, validation - -**Details**: See Part 1 Pattern 2 (Synthetic Frame Generation) - -### Pattern 5: Selective Frame Capture -Capture frames based on inference results. - -``` -Pipeline -> Inference -> Metadata Probe -> Trigger -> BufferRetriever -> Save -``` - -**Use Case**: Save frames only when specific objects detected - -**Details**: See Part 2 Pattern 3 (Selective Frame Capture) - -## API Comparison - -| Feature | BufferProvider/Feeder | BufferRetriever/Receiver | -|---------|----------------------|--------------------------| -| **Direction** | Data IN (injection) | Data OUT (extraction) | -| **GStreamer Element** | appsrc | appsink | -| **Signal** | need-data/enough-data | new-sample | -| **Method to Implement** | `generate(size)` | `consume(buffer)` | -| **Return Value** | Buffer object | int (1=success, 0=error) | -| **EOS Handling** | Return empty Buffer() | Return -1 | -| **Properties** | format, width, height, framerate, device | None (configured on appsink) | - -## Quick Start Examples - -### Inject Custom Frames (BufferProvider) - -```python -from pyservicemaker import Pipeline, BufferProvider, Feeder, as_tensor, ColorFormat, Buffer -import torch # pip install torch torchvision (not in base DS container) - -class MyProvider(BufferProvider): - def __init__(self): - super().__init__() - self.format = "RGB" - self.width = 1280 - self.height = 720 - self.framerate = 30 - self.device = 'gpu' - - def generate(self, size): - # Your custom frame generation logic - frame = get_custom_frame() # Your function - if frame is None: - return Buffer() # EOS - - torch_tensor = torch.from_numpy(frame).cuda() - ds_tensor = as_tensor(torch_tensor, "HWC") - return ds_tensor.wrap(ColorFormat.RGB) - -pipeline = Pipeline("inject-pipeline") -caps = "video/x-raw(memory:NVMM), format=RGB, width=1280, height=720, framerate=30/1" -pipeline.add("appsrc", "src", {"caps": caps, "do-timestamp": True}) -# ... add more elements ... -pipeline.attach("src", Feeder("feeder", MyProvider()), tips="need-data/enough-data") -pipeline.start().wait() -``` - -### Extract Frames (BufferRetriever) - -```python -from pyservicemaker import Pipeline, BufferRetriever, Receiver -import torch # pip install torch torchvision (not in base DS container) - -class MyRetriever(BufferRetriever): - def __init__(self): - super().__init__() - self.count = 0 - - def consume(self, buffer): - tensor = buffer.extract(0).clone() # Always clone! - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - - # Your custom processing logic - process_frame(torch_tensor) # Your function - - self.count += 1 - return 1 # Success - -pipeline = Pipeline("extract-pipeline") -# ... add source and processing elements ... -pipeline.add("appsink", "sink", {"emit-signals": True, "sync": False}) -pipeline.attach("sink", Receiver("receiver", MyRetriever()), tips="new-sample") -pipeline.start().wait() -``` - -## Key Concepts - -### BufferProvider/Feeder -- **Purpose**: Custom data injection -- **Element**: Works with `appsrc` -- **Flow**: Your code -> BufferProvider -> Pipeline -- **Control**: Pipeline pulls data when needed -- **Properties**: Must set format, width, height, framerate, device - -### BufferRetriever/Receiver -- **Purpose**: Custom data extraction -- **Element**: Works with `appsink` -- **Flow**: Pipeline -> BufferRetriever -> Your code -- **Control**: Pipeline pushes data when available -- **Critical**: Always call `.clone()` on extracted tensors - -## Best Practices Summary - -### For BufferProvider: -1. Set all required properties (format, width, height, framerate, device) -2. Return empty `Buffer()` to signal end of stream -3. Use GPU memory (`device='gpu'`) for best performance -4. Set `do-timestamp=True` on appsrc for proper sync -5. Use `tips="need-data/enough-data"` when attaching - -### For BufferRetriever: -1. **Always** call `.clone()` on extracted tensors -2. Set `emit-signals=True` on appsink -3. Use `tips="new-sample"` when attaching -4. Return 1 for success, 0 for error (continue), -1 for fatal error -5. Set `sync=False` for non-real-time extraction - -## Common Pitfalls - -### BufferProvider Issues: -- Forgetting to set format properties -> Pipeline fails to negotiate caps -- Not returning empty Buffer() for EOS -> Pipeline hangs -- Mismatched caps between provider and appsrc -> Format errors - -### BufferRetriever Issues: -- Not calling `.clone()` -> Data corruption in async processing -- Forgetting `emit-signals=True` -> No frames received -- Slow processing in consume() -> Frame drops -- Not handling exceptions -> Pipeline crashes - -## Performance Tips - -### BufferProvider: -- Use GPU memory for zero-copy transfers -- Pre-allocate buffers when possible -- Avoid CPU<->GPU transfers in hot path -- Consider buffer pooling for high frame rates - -### BufferRetriever: -- Set `sync=False` if you don't need real-time pacing -- Process frames asynchronously if possible -- Limit buffer accumulation to prevent memory issues -- Use batch processing when extracting multiple streams - -## Example Applications - -The service-maker package includes sample applications demonstrating these APIs: - -**Pipeline API Examples**: -- `/opt/nvidia/deepstream/deepstream/service-maker/sources/apps/python/pipeline_api/deepstream_appsrc_test_app/` - -**Flow API Examples**: -- `/opt/nvidia/deepstream/deepstream/service-maker/sources/apps/python/flow_api/deepstream_appsrc_test_app/` - -## Goal-Based API Selection - -| Goal | Use This API | Section | -|------|-------------|---------| -| Inject custom frames | BufferProvider/Feeder | Part 1 | -| Extract frames | BufferRetriever/Receiver | Part 2 | -| Pipeline-to-pipeline transfer | Both | Part 1 Pattern 3, Part 2 Pattern 2 | -| Custom video source | BufferProvider/Feeder | Part 1 Pattern 1 | -| Frame archival | BufferRetriever/Receiver | Part 2 Pattern 1 | -| Synthetic data generation | BufferProvider/Feeder | Part 1 Pattern 2 | -| Selective capture | BufferRetriever/Receiver | Part 2 Pattern 3 | - -Choose the right API based on your data flow direction: injection (BufferProvider) or extraction (BufferRetriever). - ---- - -# Part 1: BufferProvider / Feeder API (Media Extractor) - -## Overview - -The Media Extractor API (implemented through `BufferProvider` and `Feeder` classes) enables custom data injection into DeepStream pipelines. This is useful for: -- Injecting custom video frames from non-standard sources -- Generating synthetic video data for testing -- Feeding pre-processed frames into the pipeline -- Implementing custom video sources beyond file/RTSP streams - -## Core Concepts - -### BufferProvider -A `BufferProvider` is a user-implemented class that generates buffers on-demand. It works with GStreamer's `appsrc` element to inject data into the pipeline. - -### Feeder -A `Feeder` is a wrapper that connects a `BufferProvider` to an `appsrc` element. It manages the signal handling for "need-data" and "enough-data" events. - -### Data Flow -``` -BufferProvider.generate() -> Feeder -> appsrc -> Pipeline -``` - -## API Reference - -### BufferProvider Class - -Base class for implementing custom media providers. - -**Methods to Override**: - -#### `generate(size)` -Generate a buffer when the pipeline needs data. - -**Parameters**: -- `size` (int): Number of bytes requested by the pipeline - -**Returns**: `Buffer` object containing the data, or empty `Buffer()` to signal EOS - -**Properties to Set**: -- `format` (str): Video format (e.g., "RGB", "NV12") -- `width` (int): Frame width in pixels -- `height` (int): Frame height in pixels -- `framerate` (int): Frame rate -- `device` (str): 'gpu' or 'cpu' - -**Example**: -```python -from pyservicemaker import BufferProvider, as_tensor, ColorFormat, Buffer -import torch # pip install torch torchvision (not in base DS container) - -class MyBufferProvider(BufferProvider): - def __init__(self, video_source): - super().__init__() - self.source = video_source - self.format = "RGB" - self.width = 1920 - self.height = 1080 - self.framerate = 30 - self.device = 'gpu' - self.frame_count = 0 - - def generate(self, size): - # Get frame from your custom source - frame = self.source.get_next_frame() - - if frame is None: - # Signal end of stream - return Buffer() - - # Convert to torch tensor (on GPU if needed) - torch_tensor = torch.from_numpy(frame).cuda() - - # Convert to DeepStream tensor format - ds_tensor = as_tensor(torch_tensor, "HWC") # Height, Width, Channels - - # Wrap in buffer with color format - buffer = ds_tensor.wrap(ColorFormat.RGB) - - self.frame_count += 1 - return buffer -``` - -### Feeder Class - -Wrapper for attaching a BufferProvider to a pipeline element. - -**Constructor**: -```python -from pyservicemaker import Feeder - -feeder = Feeder("feeder-name", buffer_provider_instance) -``` - -**Parameters**: -- `name` (str): Name of the feeder -- `provider` (BufferProvider): BufferProvider instance - -### Helper Functions - -#### `as_tensor(torch_tensor, layout)` -Convert a PyTorch tensor to DeepStream tensor format. - -**Parameters**: -- `torch_tensor`: PyTorch tensor -- `layout` (str): Tensor layout - "HWC" (Height, Width, Channels) or "CHW" - -**Returns**: DeepStream tensor object - -#### ColorFormat Enum -Specifies the pixel format for buffers. - -**Values**: -- `ColorFormat.RGB`: RGB format -- `ColorFormat.RGBA`: RGBA format -- `ColorFormat.NV12`: NV12 format (YUV 4:2:0) -- `ColorFormat.GRAY`: Grayscale - -### Buffer Class - -Container for video frame data. - -**Constructor**: -```python -buffer = Buffer() # Empty buffer (signals EOS) -``` - -**Methods**: -- `extract(index)`: Extract tensor at index from buffer -- `clone()`: Create a copy of the buffer - -## Implementation Patterns - -### Pattern 1: File-Based Custom Video Source - -Read frames from custom file format and inject into pipeline. - -```python -from pyservicemaker import Pipeline, BufferProvider, Feeder, as_tensor, ColorFormat, Buffer -import cv2 # pip install opencv-python-headless (not in base DS container) -import torch # pip install torch torchvision (not in base DS container) -import platform - -class CustomVideoFileProvider(BufferProvider): - def __init__(self, video_path): - super().__init__() - self.cap = cv2.VideoCapture(video_path) - - # Set buffer properties - self.format = "RGB" - self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - self.framerate = int(self.cap.get(cv2.CAP_PROP_FPS)) - self.device = 'gpu' - self.frame_count = 0 - - def generate(self, size): - ret, frame = self.cap.read() - - if not ret: - # End of video - self.cap.release() - return Buffer() - - # Convert BGR to RGB - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - # Convert to torch tensor and move to GPU - torch_tensor = torch.from_numpy(frame_rgb).cuda() - - # Convert to DeepStream tensor - ds_tensor = as_tensor(torch_tensor, "HWC") - - self.frame_count += 1 - print(f"Generated frame {self.frame_count}") - - return ds_tensor.wrap(ColorFormat.RGB) - -def main(video_path): - pipeline = Pipeline("custom-video-source") - - # Create appsrc with appropriate capabilities - caps = f"video/x-raw(memory:NVMM), format=RGB, width=1920, height=1080, framerate=30/1" - pipeline.add("appsrc", "src", { - "caps": caps, - "do-timestamp": True, - "format": 3 # GST_FORMAT_TIME - }) - - # Add processing elements - pipeline.add("nvvideoconvert", "convert", { - "nvbuf-memory-type": 2, # NVBUF_MEM_CUDA_DEVICE - "compute-hw": 1 - }) - pipeline.add("capsfilter", "caps", {"caps": "video/x-raw(memory:NVMM), format=NV12"}) - pipeline.add("nvstreammux", "mux", { - "batch-size": 1, - "width": 1920, - "height": 1080 - }) - - # Add inference (optional) - pipeline.add("nvinfer", "infer", { - "config-file-path": "/path/to/config.yml" - }) - - # Add display - pipeline.add("nvosdbin", "osd") - sink_type = "nv3dsink" if platform.processor() == "aarch64" else "nveglglessink" - pipeline.add(sink_type, "sink", {"sync": False}) - - # Link elements - pipeline.link("src", "convert") - pipeline.link(("convert", "mux"), ("", "sink_%u")) - pipeline.link("mux", "infer", "osd", "sink") - - # Attach feeder to appsrc - provider = CustomVideoFileProvider(video_path) - pipeline.attach("src", Feeder("feeder", provider), tips="need-data/enough-data") - - # Start pipeline - pipeline.start().wait() - -if __name__ == "__main__": - import sys - main(sys.argv[1]) -``` - -### Pattern 2: Synthetic Frame Generation - -Generate synthetic frames for testing or simulation. - -```python -from pyservicemaker import Pipeline, BufferProvider, Feeder, as_tensor, ColorFormat, Buffer -import torch # pip install torch torchvision (not in base DS container) -import numpy as np - -class SyntheticFrameProvider(BufferProvider): - def __init__(self, num_frames=100, width=1280, height=720, fps=30): - super().__init__() - self.format = "RGB" - self.width = width - self.height = height - self.framerate = fps - self.device = 'gpu' - self.num_frames = num_frames - self.frame_idx = 0 - - def generate(self, size): - if self.frame_idx >= self.num_frames: - return Buffer() - - # Generate synthetic frame (moving gradient) - x = np.linspace(0, 255, self.width, dtype=np.uint8) - y = np.linspace(0, 255, self.height, dtype=np.uint8) - - offset = (self.frame_idx * 5) % 255 - frame = np.zeros((self.height, self.width, 3), dtype=np.uint8) - frame[:, :, 0] = (x + offset) % 255 # Red channel - frame[:, :, 1] = (y + offset) % 255 # Green channel - frame[:, :, 2] = 128 # Blue channel - - # Convert to torch and move to GPU - torch_tensor = torch.from_numpy(frame).cuda() - ds_tensor = as_tensor(torch_tensor, "HWC") - - self.frame_idx += 1 - return ds_tensor.wrap(ColorFormat.RGB) - -def generate_test_video(): - pipeline = Pipeline("synthetic-video") - - provider = SyntheticFrameProvider(num_frames=300, width=1280, height=720, fps=30) - - caps = f"video/x-raw(memory:NVMM), format=RGB, width={provider.width}, height={provider.height}, framerate={provider.framerate}/1" - pipeline.add("appsrc", "src", {"caps": caps, "do-timestamp": True}) - pipeline.add("nvvideoconvert", "convert") - pipeline.add("nvv4l2h264enc", "encoder", {"bitrate": 4000000}) - pipeline.add("h264parse", "parser") - pipeline.add("mp4mux", "mux") - pipeline.add("filesink", "sink", {"location": "synthetic_output.mp4"}) - - pipeline.link("src", "convert", "encoder", "parser", "mux", "sink") - pipeline.attach("src", Feeder("feeder", provider), tips="need-data/enough-data") - - pipeline.start().wait() -``` - -### Pattern 3: Frame Queue Injection - -Transfer frames between two pipelines using a queue. - -```python -from pyservicemaker import Pipeline, BufferProvider, Feeder, as_tensor, ColorFormat, Buffer -from queue import Queue, Empty -import torch # pip install torch torchvision (not in base DS container) - -class QueuedBufferProvider(BufferProvider): - def __init__(self, frame_queue, width=1280, height=720): - super().__init__() - self.queue = frame_queue - self.format = "RGB" - self.width = width - self.height = height - self.framerate = 30 - self.device = 'gpu' - - def generate(self, size): - try: - # Wait up to 2 seconds for frame - tensor = self.queue.get(timeout=2) - - # Convert DLPack tensor to PyTorch - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - - # Convert to DeepStream tensor - ds_tensor = as_tensor(torch_tensor, "HWC") - - return ds_tensor.wrap(ColorFormat.RGB) - except Empty: - # Queue is empty, signal EOS - print("Queue empty, ending stream") - return Buffer() - -def pipeline_with_queue_injection(frame_queue): - pipeline = Pipeline("queue-injection") - - provider = QueuedBufferProvider(frame_queue, width=1280, height=720) - - caps = f"video/x-raw(memory:NVMM), format=RGB, width={provider.width}, height={provider.height}, framerate={provider.framerate}/1" - pipeline.add("appsrc", "src", {"caps": caps, "do-timestamp": True}) - pipeline.add("nvvideoconvert", "convert", {"nvbuf-memory-type": 2}) - pipeline.add("capsfilter", "caps", {"caps": "video/x-raw(memory:NVMM), format=NV12"}) - pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1280, "height": 720}) - pipeline.add("nveglglessink", "sink", {"sync": False}) - - pipeline.link("src", "convert", "caps") - pipeline.link(("convert", "mux"), ("", "sink_%u")) - pipeline.link("mux", "sink") - - pipeline.attach("src", Feeder("feeder", provider), tips="need-data/enough-data") - pipeline.start().wait() -``` - -### Pattern 4: Flow API with Buffer Injection - -High-level Flow API for buffer injection. - -```python -from pyservicemaker import Pipeline, Flow, BufferProvider, ColorFormat, as_tensor, Buffer -import torch # pip install torch torchvision (not in base DS container) -import cv2 # pip install opencv-python-headless (not in base DS container) - -class SimpleVideoProvider(BufferProvider): - def __init__(self, video_path): - super().__init__() - self.cap = cv2.VideoCapture(video_path) - self.format = "RGB" - self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - self.framerate = int(self.cap.get(cv2.CAP_PROP_FPS)) - self.device = 'gpu' - - def generate(self, size): - ret, frame = self.cap.read() - if not ret: - return Buffer() - - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - torch_tensor = torch.from_numpy(frame_rgb).cuda() - ds_tensor = as_tensor(torch_tensor, "HWC") - return ds_tensor.wrap(ColorFormat.RGB) - -def flow_api_injection(video_path): - pipeline = Pipeline("flow-injection") - provider = SimpleVideoProvider(video_path) - - # Flow API: inject() -> infer() -> render() - flow = Flow(pipeline) - flow.inject([provider]) # Pass list of providers - flow.infer("/path/to/config.yml") # Optional: add inference - flow.render() # Add renderer - flow() # Execute -``` - -## Advanced Usage - -### Multi-Source Buffer Injection - -Inject from multiple custom sources simultaneously. - -```python -from pyservicemaker import Pipeline, BufferProvider, Feeder, as_tensor, ColorFormat, Buffer -import cv2 # pip install opencv-python-headless (not in base DS container) -import torch # pip install torch torchvision (not in base DS container) - -class MultiSourceProvider(BufferProvider): - def __init__(self, source_id, video_path): - super().__init__() - self.source_id = source_id - self.cap = cv2.VideoCapture(video_path) - self.format = "RGB" - self.width = 1280 - self.height = 720 - self.framerate = 30 - self.device = 'gpu' - - def generate(self, size): - ret, frame = self.cap.read() - if not ret: - return Buffer() - - # Resize to common size - frame = cv2.resize(frame, (self.width, self.height)) - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - torch_tensor = torch.from_numpy(frame_rgb).cuda() - ds_tensor = as_tensor(torch_tensor, "HWC") - return ds_tensor.wrap(ColorFormat.RGB) - -def multi_source_injection(video_paths): - pipeline = Pipeline("multi-source-injection") - - # Create multiple appsrc elements - for i, path in enumerate(video_paths): - caps = "video/x-raw(memory:NVMM), format=RGB, width=1280, height=720, framerate=30/1" - pipeline.add("appsrc", f"src{i}", {"caps": caps, "do-timestamp": True}) - pipeline.add("nvvideoconvert", f"convert{i}", {"nvbuf-memory-type": 2}) - - # Add muxer - pipeline.add("nvstreammux", "mux", { - "batch-size": len(video_paths), - "width": 1280, - "height": 720 - }) - - # Add inference and display - pipeline.add("nvinfer", "infer", {"config-file-path": "/path/to/config.yml"}) - pipeline.add("nvmultistreamtiler", "tiler", {"rows": 2, "columns": 2}) - pipeline.add("nvosdbin", "osd") - pipeline.add("nveglglessink", "sink") - - # Link sources to muxer - for i in range(len(video_paths)): - pipeline.link(f"src{i}", f"convert{i}") - pipeline.link((f"convert{i}", "mux"), ("", "sink_%u")) - - # Attach feeder - provider = MultiSourceProvider(i, video_paths[i]) - pipeline.attach(f"src{i}", Feeder(f"feeder{i}", provider), tips="need-data/enough-data") - - # Link processing chain - pipeline.link("mux", "infer", "tiler", "osd", "sink") - pipeline.start().wait() -``` - -## Part 1 Best Practices - -### 1. Memory Management -- Use GPU memory (`device='gpu'`) for best performance -- Release resources properly (close files, release capture devices) -- Avoid memory leaks by managing tensors correctly - -### 2. Buffer Format -- Always specify correct `format`, `width`, `height`, and `framerate` -- Match color format with pipeline requirements -- Use `ColorFormat.RGB` for most cases, `ColorFormat.NV12` for optimized pipelines - -### 3. Timestamping -- Set `"do-timestamp": True` on appsrc for proper synchronization -- Important for multi-stream applications - -### 4. Signal Handling -- Use `tips="need-data/enough-data"` when attaching Feeder -- This enables proper flow control and prevents buffer overflow - -### 5. End of Stream -- Return empty `Buffer()` to signal EOS -- Properly cleanup resources before returning EOS - -### 6. Error Handling -```python -class SafeBufferProvider(BufferProvider): - def __init__(self, source): - super().__init__() - self.source = source - self.format = "RGB" - self.width = 1280 - self.height = 720 - self.framerate = 30 - self.device = 'gpu' - - def generate(self, size): - try: - frame = self.source.get_frame() - if frame is None: - return Buffer() - - torch_tensor = torch.from_numpy(frame).cuda() - ds_tensor = as_tensor(torch_tensor, "HWC") - return ds_tensor.wrap(ColorFormat.RGB) - except Exception as e: - print(f"Error generating buffer: {e}") - return Buffer() # Signal EOS on error -``` - -## Part 1 Common Use Cases - -### 1. Custom Camera Integration -Integrate cameras not supported by standard GStreamer elements. - -### 2. Pre-processed Frame Injection -Inject frames that have been pre-processed by custom algorithms. - -### 3. Frame Rate Control -Control exact frame timing and rate for testing. - -### 4. Multi-Pipeline Communication -Transfer frames between multiple DeepStream pipelines. See also Part 2 Pattern 2 for the retriever side of pipeline-to-pipeline transfer. - -### 5. Synthetic Data Generation -Generate synthetic data for testing inference models. - -### 6. Image Sequence Processing -Process sequences of images as video streams. - -## Part 1 Troubleshooting - -### Issue 1: Frames Not Flowing -**Solution**: Check that `tips="need-data/enough-data"` is set, verify appsrc caps match buffer properties - -### Issue 2: Memory Errors -**Solution**: Ensure tensors are on correct device (GPU/CPU), check memory allocation - -### Issue 3: Format Mismatch -**Solution**: Verify color format matches between BufferProvider and appsrc caps - -### Issue 4: Timing Issues -**Solution**: Enable timestamping with `"do-timestamp": True` - -## Part 1 Summary - -The Media Extractor API (BufferProvider/Feeder) provides a powerful way to inject custom video data into DeepStream pipelines. Key points: - -1. Implement `BufferProvider.generate()` to create custom buffers -2. Use `Feeder` to attach provider to `appsrc` elements -3. Convert data to DeepStream format using `as_tensor()` and `wrap()` -4. Return empty `Buffer()` to signal end of stream -5. Always set correct format properties (`width`, `height`, `framerate`, etc.) -6. Use GPU memory for optimal performance - -This API enables seamless integration of custom video sources with DeepStream's powerful inference and analytics capabilities. - ---- - -# Part 2: BufferRetriever / Receiver API (Frame Selector) - -## Overview - -The Frame Selector API (implemented through `BufferRetriever` and `Receiver` classes) enables extraction of video frames and buffers from DeepStream pipelines. This is useful for: -- Extracting frames for custom processing outside the pipeline -- Saving frames to disk or sending to external systems -- Collecting inference results with frame data -- Implementing custom frame selection logic -- Transferring data between multiple pipelines - -## Core Concepts - -### BufferRetriever -A `BufferRetriever` is a user-implemented class that consumes buffers from the pipeline. It works with GStreamer's `appsink` element to extract data from the pipeline. - -### Receiver -A `Receiver` is a wrapper that connects a `BufferRetriever` to an `appsink` element. It manages the signal handling for "new-sample" events. - -### Data Flow -``` -Pipeline -> appsink -> Receiver -> BufferRetriever.consume() -``` - -## API Reference - -### BufferRetriever Class - -Base class for implementing custom buffer consumers. - -**Methods to Override**: - -#### `consume(buffer)` -Process a buffer received from the pipeline. - -**Parameters**: -- `buffer` (Buffer): Buffer object containing frame data - -**Returns**: int (1 for success, 0 or negative for error/stop) - -**Example**: -```python -from pyservicemaker import BufferRetriever -import torch # pip install torch torchvision (not in base DS container) - -class MyBufferRetriever(BufferRetriever): - def __init__(self): - super().__init__() - self.frame_count = 0 - - def consume(self, buffer): - # Extract tensor from buffer at index 0 - tensor = buffer.extract(0) - - # Clone to prevent data loss - tensor_copy = tensor.clone() - - # Convert to PyTorch for processing - torch_tensor = torch.utils.dlpack.from_dlpack(tensor_copy) - - # Process the frame - print(f"Received frame {self.frame_count}: shape={torch_tensor.shape}") - - self.frame_count += 1 - return 1 # Success -``` - -### Receiver Class - -Wrapper for attaching a BufferRetriever to a pipeline element. - -**Constructor**: -```python -from pyservicemaker import Receiver - -receiver = Receiver("receiver-name", buffer_retriever_instance) -``` - -**Parameters**: -- `name` (str): Name of the receiver -- `retriever` (BufferRetriever): BufferRetriever instance - -### Buffer Class Methods - -**Methods**: - -#### `extract(index)` -Extract tensor at specified index from the buffer. - -**Parameters**: -- `index` (int): Batch index (usually 0 for single-stream) - -**Returns**: Tensor object (DLPack format) - -#### `clone()` -Create a copy of the tensor to prevent data corruption. - -**Returns**: Cloned tensor - -**Example**: -```python -def consume(self, buffer): - # Extract and clone in one step - tensor = buffer.extract(0).clone() - - # Now safe to use tensor asynchronously - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - return 1 -``` - -## Implementation Patterns - -### Pattern 1: Frame Extraction and Saving - -Extract frames from pipeline and save to disk. - -```python -from pyservicemaker import Pipeline, BufferRetriever, Receiver -import torch # pip install torch torchvision (not in base DS container) -import cv2 # pip install opencv-python-headless (not in base DS container) -import numpy as np -import platform -from multiprocessing import Process - -class FrameSaver(BufferRetriever): - def __init__(self, output_dir="./frames", save_interval=30): - super().__init__() - self.output_dir = output_dir - self.save_interval = save_interval - self.frame_count = 0 - - import os - os.makedirs(output_dir, exist_ok=True) - - def consume(self, buffer): - # Extract and clone buffer - tensor = buffer.extract(0).clone() - - # Save every Nth frame - if self.frame_count % self.save_interval == 0: - # Convert to PyTorch tensor - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - - # Move to CPU and convert to numpy - frame_np = torch_tensor.cpu().numpy() - - # Convert RGB to BGR for OpenCV - frame_bgr = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR) - - # Save frame - filename = f"{self.output_dir}/frame_{self.frame_count:06d}.jpg" - cv2.imwrite(filename, frame_bgr) - print(f"Saved: {filename}") - - self.frame_count += 1 - return 1 - -def extract_frames(video_uri, output_dir): - pipeline = Pipeline("frame-extractor") - - # Source - pipeline.add("nvurisrcbin", "src", {"uri": video_uri}) - - # Muxer - pipeline.add("nvstreammux", "mux", { - "batch-size": 1, - "width": 1920, - "height": 1080 - }) - - # Convert to RGB for extraction - pipeline.add("nvvideoconvert", "converter") - pipeline.add("capsfilter", "caps", { - "caps": "video/x-raw(memory:NVMM), format=RGB" - }) - - # Sink for extraction - pipeline.add("appsink", "sink", { - "emit-signals": True, - "sync": False - }) - - # Link elements - pipeline.link(("src", "mux"), ("", "sink_%u")) - pipeline.link("mux", "converter", "caps", "sink") - - # Attach retriever - retriever = FrameSaver(output_dir, save_interval=30) - pipeline.attach("sink", Receiver("receiver", retriever), tips="new-sample") - - # Run - pipeline.start().wait() - -if __name__ == "__main__": - import sys - process = Process(target=extract_frames, args=(sys.argv[1], "./output_frames")) - try: - process.start() - process.join() - except KeyboardInterrupt: - process.terminate() -``` - -### Pattern 2: Frame Queue Transfer - -Transfer frames from one pipeline to another using a queue. - -> **CRITICAL WARNING: Queue Type Selection** -> -> When transferring data between **threads**, use `queue.Queue` (from `queue` module). -> When transferring data between **processes**, use `multiprocessing.Queue`. -> -> Using `queue.Queue` with `multiprocessing.Process` will silently fail - data put into the queue in a child process will NEVER reach the parent process! This is a common bug that causes pipelines to appear running but produce no output. -> -> See the Best Practices reference for Anti-Pattern 4 with detailed examples. - -```python -from pyservicemaker import Pipeline, BufferRetriever, Receiver, BufferProvider, Feeder -import torch # pip install torch torchvision (not in base DS container) -from queue import Queue, Empty # Use for THREADING only! -# from multiprocessing import Queue # Use this for MULTIPROCESSING! -import threading - -class QueuedRetriever(BufferRetriever): - def __init__(self, frame_queue): - super().__init__() - self.queue = frame_queue - self.count = 0 - - def consume(self, buffer): - # Extract and clone - tensor = buffer.extract(0).clone() - - # Put in queue for other pipeline - self.queue.put(tensor) - - self.count += 1 - print(f"Queued frame {self.count}") - return 1 - -class QueuedProvider(BufferProvider): - def __init__(self, frame_queue, width=1280, height=720): - super().__init__() - self.queue = frame_queue - self.format = "RGB" - self.width = width - self.height = height - self.framerate = 30 - self.device = 'gpu' - - def generate(self, size): - try: - tensor = self.queue.get(timeout=2) - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - - from pyservicemaker import as_tensor, ColorFormat - ds_tensor = as_tensor(torch_tensor, "HWC") - return ds_tensor.wrap(ColorFormat.RGB) - except Empty: - from pyservicemaker import Buffer - return Buffer() - -def source_pipeline(uri, queue): - """Extract frames from source and queue them""" - pipeline = Pipeline("source-pipeline") - - pipeline.add("nvurisrcbin", "src", {"uri": uri}) - pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1280, "height": 720}) - pipeline.add("nvvideoconvert", "converter") - pipeline.add("capsfilter", "caps", {"caps": "video/x-raw(memory:NVMM), format=RGB"}) - pipeline.add("appsink", "sink", {"emit-signals": True, "sync": False}) - - pipeline.link(("src", "mux"), ("", "sink_%u")) - pipeline.link("mux", "converter", "caps", "sink") - - retriever = QueuedRetriever(queue) - pipeline.attach("sink", Receiver("receiver", retriever), tips="new-sample") - - pipeline.start().wait() - -def destination_pipeline(queue): - """Consume frames from queue and process""" - pipeline = Pipeline("dest-pipeline") - - provider = QueuedProvider(queue, width=1280, height=720) - - caps = "video/x-raw(memory:NVMM), format=RGB, width=1280, height=720, framerate=30/1" - pipeline.add("appsrc", "src", {"caps": caps, "do-timestamp": True}) - pipeline.add("nvvideoconvert", "convert", {"nvbuf-memory-type": 2}) - pipeline.add("capsfilter", "caps2", {"caps": "video/x-raw(memory:NVMM), format=NV12"}) - pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1280, "height": 720}) - pipeline.add("nvinfer", "infer", {"config-file-path": "/path/to/config.yml"}) - pipeline.add("nvosdbin", "osd") - pipeline.add("nveglglessink", "sink") - - pipeline.link("src", "convert", "caps2") - pipeline.link(("convert", "mux"), ("", "sink_%u")) - pipeline.link("mux", "infer", "osd", "sink") - - pipeline.attach("src", Feeder("feeder", provider), tips="need-data/enough-data") - - pipeline.start().wait() - -def multi_pipeline_transfer(video_uri, use_multiprocessing=False): - """ - Transfer frames between pipelines. - - IMPORTANT: Queue type must match execution model: - - Threading: use queue.Queue - - Multiprocessing: use multiprocessing.Queue - - Args: - video_uri: Video source URI - use_multiprocessing: If True, use processes (requires multiprocessing.Queue) - """ - if use_multiprocessing: - from multiprocessing import Queue as MPQueue, Process - queue = MPQueue(maxsize=10) # MUST use multiprocessing.Queue! - - # Run pipelines in separate processes - proc1 = Process(target=source_pipeline, args=(video_uri, queue)) - proc2 = Process(target=destination_pipeline, args=(queue,)) - - proc1.start() - proc2.start() - - proc2.join() - proc1.join() - else: - # Threading approach - queue.Queue works fine here - queue = Queue(maxsize=10) - - # Run both pipelines in threads (same process, shared memory) - thread1 = threading.Thread(target=source_pipeline, args=(video_uri, queue)) - thread2 = threading.Thread(target=destination_pipeline, args=(queue,)) - - thread1.start() - thread2.start() - - thread2.join() - thread1.join() -``` - -### Pattern 3: Selective Frame Capture - -Capture frames based on inference results (e.g., when objects are detected). - -```python -from pyservicemaker import Pipeline, BufferRetriever, Receiver, BatchMetadataOperator, Probe -import torch # pip install torch torchvision (not in base DS container) -import cv2 # pip install opencv-python-headless (not in base DS container) -import numpy as np - -class SelectiveFrameCapture(BufferRetriever): - def __init__(self, output_dir="./captured", min_objects=1): - super().__init__() - self.output_dir = output_dir - self.min_objects = min_objects - self.frame_count = 0 - self.saved_count = 0 - self.capture_next = False - - import os - os.makedirs(output_dir, exist_ok=True) - - def set_capture_flag(self, should_capture): - """Called by metadata probe to signal capture""" - self.capture_next = should_capture - - def consume(self, buffer): - tensor = buffer.extract(0).clone() - - if self.capture_next: - # Save this frame - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - frame_np = torch_tensor.cpu().numpy() - frame_bgr = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR) - - filename = f"{self.output_dir}/capture_{self.saved_count:06d}.jpg" - cv2.imwrite(filename, frame_bgr) - print(f"Captured frame {self.frame_count} with objects -> {filename}") - - self.saved_count += 1 - self.capture_next = False - - self.frame_count += 1 - return 1 - -class ObjectDetectionTrigger(BatchMetadataOperator): - def __init__(self, frame_capture, min_objects=1): - super().__init__() - self.frame_capture = frame_capture - self.min_objects = min_objects - - def handle_metadata(self, batch_meta): - for frame_meta in batch_meta.frame_items: - # Note: object_items is an ITERATOR - cannot use len() directly - # Count by iterating - obj_count = sum(1 for _ in frame_meta.object_items) - - if obj_count >= self.min_objects: - # Signal frame capture to save this frame - self.frame_capture.set_capture_flag(True) - print(f"Detected {obj_count} objects, triggering capture") - -def selective_capture(video_uri, config_path, output_dir): - pipeline = Pipeline("selective-capture") - - # Source and muxer - pipeline.add("nvurisrcbin", "src", {"uri": video_uri}) - pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1920, "height": 1080}) - - # Inference - pipeline.add("nvinfer", "infer", {"config-file-path": config_path}) - - # Convert for extraction - pipeline.add("nvvideoconvert", "converter") - pipeline.add("capsfilter", "caps", {"caps": "video/x-raw(memory:NVMM), format=RGB"}) - - # Sink - pipeline.add("appsink", "sink", {"emit-signals": True, "sync": False}) - - # Link - pipeline.link(("src", "mux"), ("", "sink_%u")) - pipeline.link("mux", "infer", "converter", "caps", "sink") - - # Attach frame capture - frame_capture = SelectiveFrameCapture(output_dir, min_objects=2) - pipeline.attach("sink", Receiver("receiver", frame_capture), tips="new-sample") - - # Attach metadata processor to trigger capture - trigger = ObjectDetectionTrigger(frame_capture, min_objects=2) - pipeline.attach("infer", Probe("trigger", trigger)) - - pipeline.start().wait() -``` - -### Pattern 4: Flow API with Frame Retrieval - -High-level Flow API for frame extraction. - -```python -from pyservicemaker import Pipeline, Flow, BufferRetriever -import torch # pip install torch torchvision (not in base DS container) -import cv2 # pip install opencv-python-headless (not in base DS container) -import numpy as np - -class SimpleFrameRetriever(BufferRetriever): - def __init__(self, save_path="output.jpg"): - super().__init__() - self.save_path = save_path - self.count = 0 - - def consume(self, buffer): - if self.count == 0: # Save first frame only - tensor = buffer.extract(0).clone() - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - frame_np = torch_tensor.cpu().numpy() - frame_bgr = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR) - cv2.imwrite(self.save_path, frame_bgr) - print(f"Saved frame to {self.save_path}") - - self.count += 1 - return 1 - -def flow_api_retrieval(video_uri): - pipeline = Pipeline("flow-retrieval") - retriever = SimpleFrameRetriever("output_frame.jpg") - - # Flow API: batch_capture() -> retrieve() - flow = Flow(pipeline) - flow.batch_capture([video_uri]) - flow.retrieve(retriever) - flow() -``` - -### Pattern 5: Frame Analysis and Logging - -Extract frames with metadata for analysis. - -```python -from pyservicemaker import Pipeline, BufferRetriever, Receiver, BatchMetadataOperator, Probe -import torch # pip install torch torchvision (not in base DS container) -import json -from datetime import datetime - -class FrameAnalyzer(BufferRetriever): - def __init__(self, log_file="frame_analysis.json"): - super().__init__() - self.log_file = log_file - self.frame_count = 0 - self.metadata_cache = {} - - def set_metadata(self, frame_num, metadata): - """Called by metadata probe""" - self.metadata_cache[frame_num] = metadata - - def consume(self, buffer): - tensor = buffer.extract(0).clone() - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - - # Calculate frame statistics - mean_intensity = torch_tensor.float().mean().item() - std_intensity = torch_tensor.float().std().item() - - # Get metadata if available - metadata = self.metadata_cache.get(self.frame_count, {}) - - # Log analysis - analysis = { - "frame_number": self.frame_count, - "timestamp": datetime.now().isoformat(), - "mean_intensity": mean_intensity, - "std_intensity": std_intensity, - "shape": list(torch_tensor.shape), - "objects_detected": metadata.get("object_count", 0), - "object_classes": metadata.get("classes", []) - } - - with open(self.log_file, "a") as f: - f.write(json.dumps(analysis) + "\n") - - # Clear cached metadata - if self.frame_count in self.metadata_cache: - del self.metadata_cache[self.frame_count] - - self.frame_count += 1 - return 1 - -class MetadataExtractor(BatchMetadataOperator): - def __init__(self, frame_analyzer): - super().__init__() - self.frame_analyzer = frame_analyzer - - def handle_metadata(self, batch_meta): - for frame_meta in batch_meta.frame_items: - # Note: object_items is an ITERATOR - convert to list if you need - # to access it multiple times or use len() - objects = list(frame_meta.object_items) - metadata = { - "object_count": len(objects), - "classes": [obj.class_id for obj in objects], - "confidences": [obj.confidence for obj in objects] - } - self.frame_analyzer.set_metadata(frame_meta.frame_number, metadata) - -def analyze_frames(video_uri, config_path): - pipeline = Pipeline("frame-analyzer") - - # Source - pipeline.add("nvurisrcbin", "src", {"uri": video_uri}) - pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1920, "height": 1080}) - - # Inference - pipeline.add("nvinfer", "infer", {"config-file-path": config_path}) - - # Convert and extract - pipeline.add("nvvideoconvert", "converter") - pipeline.add("capsfilter", "caps", {"caps": "video/x-raw(memory:NVMM), format=RGB"}) - pipeline.add("appsink", "sink", {"emit-signals": True, "sync": False}) - - # Link - pipeline.link(("src", "mux"), ("", "sink_%u")) - pipeline.link("mux", "infer", "converter", "caps", "sink") - - # Attach analyzer - analyzer = FrameAnalyzer("analysis_log.json") - pipeline.attach("sink", Receiver("receiver", analyzer), tips="new-sample") - - # Attach metadata extractor - extractor = MetadataExtractor(analyzer) - pipeline.attach("infer", Probe("extractor", extractor)) - - pipeline.start().wait() -``` - -### Pattern 6: Real-time Frame Streaming - -Stream frames to external system (e.g., web server, cloud service). - -```python -from pyservicemaker import Pipeline, BufferRetriever, Receiver -import torch # pip install torch torchvision (not in base DS container) -import cv2 # pip install opencv-python-headless (not in base DS container) -import numpy as np -import base64 -import requests - -class FrameStreamer(BufferRetriever): - def __init__(self, endpoint_url, stream_interval=1): - super().__init__() - self.endpoint_url = endpoint_url - self.stream_interval = stream_interval - self.frame_count = 0 - - def consume(self, buffer): - # Stream every Nth frame - if self.frame_count % self.stream_interval == 0: - tensor = buffer.extract(0).clone() - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - frame_np = torch_tensor.cpu().numpy() - - # Encode as JPEG - frame_bgr = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR) - _, jpeg_buffer = cv2.imencode('.jpg', frame_bgr, [cv2.IMWRITE_JPEG_QUALITY, 85]) - - # Encode as base64 - jpeg_base64 = base64.b64encode(jpeg_buffer).decode('utf-8') - - # Send to endpoint - try: - response = requests.post( - self.endpoint_url, - json={ - "frame_number": self.frame_count, - "image": jpeg_base64 - }, - timeout=1 - ) - if response.status_code == 200: - print(f"Streamed frame {self.frame_count}") - except Exception as e: - print(f"Failed to stream frame {self.frame_count}: {e}") - - self.frame_count += 1 - return 1 - -def stream_frames(video_uri, endpoint_url): - pipeline = Pipeline("frame-streamer") - - pipeline.add("nvurisrcbin", "src", {"uri": video_uri}) - pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1280, "height": 720}) - pipeline.add("nvvideoconvert", "converter") - pipeline.add("capsfilter", "caps", {"caps": "video/x-raw(memory:NVMM), format=RGB"}) - pipeline.add("appsink", "sink", {"emit-signals": True, "sync": False}) - - pipeline.link(("src", "mux"), ("", "sink_%u")) - pipeline.link("mux", "converter", "caps", "sink") - - streamer = FrameStreamer(endpoint_url, stream_interval=10) - pipeline.attach("sink", Receiver("receiver", streamer), tips="new-sample") - - pipeline.start().wait() -``` - -## Part 2 Best Practices - -### 1. Always Clone Buffers -```python -def consume(self, buffer): - # ALWAYS clone to prevent data corruption - tensor = buffer.extract(0).clone() - # Now safe to use asynchronously -``` - -### 2. Signal Configuration -```python -# Always use "new-sample" signal for appsink -pipeline.attach("sink", Receiver("receiver", retriever), tips="new-sample") - -# Enable signal emission on appsink -pipeline.add("appsink", "sink", {"emit-signals": True}) -``` - -### 3. Synchronization Control -```python -# For frame extraction, usually disable sync -pipeline.add("appsink", "sink", { - "emit-signals": True, - "sync": False # Don't block on frame rate -}) - -# For real-time processing, enable sync -pipeline.add("appsink", "sink", { - "emit-signals": True, - "sync": True # Maintain real-time pacing -}) -``` - -### 4. Return Value Handling -```python -def consume(self, buffer): - try: - # Process buffer - tensor = buffer.extract(0).clone() - # ... processing ... - return 1 # Success, continue processing - except Exception as e: - print(f"Error: {e}") - return 0 # Error, but continue - # return -1 # Fatal error, stop pipeline -``` - -### 5. Memory Management -```python -class EfficientRetriever(BufferRetriever): - def __init__(self): - super().__init__() - self.frame_buffer = [] - self.max_buffer_size = 100 - - def consume(self, buffer): - tensor = buffer.extract(0).clone() - - # Limit buffer size to prevent memory issues - if len(self.frame_buffer) >= self.max_buffer_size: - self.frame_buffer.pop(0) # Remove oldest - - self.frame_buffer.append(tensor) - return 1 -``` - -### 6. Thread Safety -```python -import threading - -class ThreadSafeRetriever(BufferRetriever): - def __init__(self): - super().__init__() - self.lock = threading.Lock() - self.frame_count = 0 - - def consume(self, buffer): - with self.lock: - tensor = buffer.extract(0).clone() - # Safe concurrent access - self.frame_count += 1 - return 1 -``` - -## Advanced Usage - -### Multi-Batch Frame Extraction - -Extract frames from multi-stream batches. - -```python -class MultiBatchRetriever(BufferRetriever): - def __init__(self, num_streams): - super().__init__() - self.num_streams = num_streams - self.frame_counts = [0] * num_streams - - def consume(self, buffer): - # Extract all streams in batch - for stream_idx in range(self.num_streams): - try: - tensor = buffer.extract(stream_idx).clone() - torch_tensor = torch.utils.dlpack.from_dlpack(tensor) - - # Process each stream - print(f"Stream {stream_idx}, Frame {self.frame_counts[stream_idx]}") - - self.frame_counts[stream_idx] += 1 - except Exception as e: - print(f"Error extracting stream {stream_idx}: {e}") - - return 1 - -def multi_stream_extraction(video_uris): - pipeline = Pipeline("multi-stream-extract") - - # Add sources - for i, uri in enumerate(video_uris): - pipeline.add("nvurisrcbin", f"src{i}", {"uri": uri}) - - # Muxer for batching - pipeline.add("nvstreammux", "mux", { - "batch-size": len(video_uris), - "width": 1280, - "height": 720 - }) - - # Convert and extract - pipeline.add("nvvideoconvert", "converter") - pipeline.add("capsfilter", "caps", {"caps": "video/x-raw(memory:NVMM), format=RGB"}) - pipeline.add("appsink", "sink", {"emit-signals": True, "sync": False}) - - # Link sources to muxer - for i in range(len(video_uris)): - pipeline.link((f"src{i}", "mux"), ("", "sink_%u")) - - pipeline.link("mux", "converter", "caps", "sink") - - # Attach multi-batch retriever - retriever = MultiBatchRetriever(len(video_uris)) - pipeline.attach("sink", Receiver("receiver", retriever), tips="new-sample") - - pipeline.start().wait() -``` - -## Part 2 Common Use Cases - -### 1. Frame Archival -Extract and save frames at regular intervals for archival purposes. - -### 2. Thumbnail Generation -Extract keyframes to generate video thumbnails. - -### 3. Object Detection Screenshots -Capture frames when specific objects are detected. - -### 4. Video Quality Analysis -Extract frames for quality metrics computation. - -### 5. Pipeline Debugging -Extract frames at various pipeline stages for debugging. - -### 6. Data Collection -Collect frames and metadata for training dataset creation. - -## Part 2 Troubleshooting - -### Issue 1: No Frames Received -**Solution**: Ensure `emit-signals=True` is set on appsink, verify `tips="new-sample"` is set - -### Issue 2: Data Corruption -**Solution**: Always call `.clone()` on extracted tensors before async processing - -### Issue 3: Memory Leaks -**Solution**: Limit buffer accumulation, properly release tensors - -### Issue 4: Performance Issues -**Solution**: Set `sync=False` on appsink, process frames asynchronously - -### Issue 5: Missing Frames -**Solution**: Check return value (return 1 for success), ensure processing is fast enough - -### Issue 6: Frames/Batches Not Reaching Downstream Processing (Queue Empty) -**Symptoms**: -- Pipeline runs without errors -- BufferRetriever.consume() is being called -- But downstream processing (VLM, Kafka, etc.) never receives data -- Queue appears to be empty in consumer thread/process - -**Root Cause**: Using `queue.Queue` with `multiprocessing.Process` - -**Solution**: -1. If using multiprocessing: Switch to `multiprocessing.Queue` -2. If process isolation not required: Use `threading.Thread` with `queue.Queue` -3. Set `use_multiprocessing=False` in your configuration - -```python -# WRONG: queue.Queue with multiprocessing -from multiprocessing import Process -from queue import Queue # Won't work across processes! - -# CORRECT Option 1: Use multiprocessing.Queue -from multiprocessing import Process, Queue - -# CORRECT Option 2: Use threading instead -import threading -from queue import Queue - -# See the Best Practices reference for Anti-Pattern 4 details -``` - -## Part 2 Summary - -The Frame Selector API (BufferRetriever/Receiver) provides powerful capabilities for extracting frames and data from DeepStream pipelines. Key points: - -1. Implement `BufferRetriever.consume()` to process extracted buffers -2. Use `Receiver` to attach retriever to `appsink` elements -3. Always call `buffer.extract(0).clone()` to safely extract tensors -4. Return `1` for success, `0` for error (continue), `-1` for fatal error -5. Set `emit-signals=True` on appsink and use `tips="new-sample"` -6. Consider `sync=False` for non-real-time extraction - -This API enables seamless extraction of frames, inference results, and metadata from DeepStream pipelines for custom processing, archival, or transfer to other systems. diff --git a/skills/deepstream/deepstream-dev/references/docker_containers.md b/skills/deepstream/deepstream-dev/references/docker_containers.md deleted file mode 100644 index f5bf6245..00000000 --- a/skills/deepstream/deepstream-dev/references/docker_containers.md +++ /dev/null @@ -1,273 +0,0 @@ -# DeepStream Docker Containers Reference - -## Overview - -DeepStream Docker images are hosted on the NVIDIA NGC container registry (`nvcr.io`). They package all SDK dependencies (GStreamer, TensorRT, CUDA, models, sample streams) and require the NVIDIA Container Toolkit (`nvidia-container-toolkit`) for GPU access. - -- **NGC catalog page**: https://catalog.ngc.nvidia.com/orgs/nvidia/containers/deepstream -- **Official docs**: https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_docker_containers.html - ---- - -## Available Containers (DeepStream 9.0) - -### dGPU (x86_64) - -| Container | Pull Command | Description | -|-----------|-------------|-------------| -| **Samples** | `docker pull nvcr.io/nvidia/deepstream:9.0-samples-multiarch` | Runtime libraries, GStreamer plugins, reference apps, sample streams, models, configs. Best for running demos and deploying applications. | -| **Triton** | `docker pull nvcr.io/nvidia/deepstream:9.0-triton-multiarch` | Everything in samples + Triton Inference Server and dependencies + development environment. Use when Triton-based inference is needed or building custom DeepStream applications. | - -### Jetson (ARM64/aarch64) - -| Container | Pull Command | Description | -|-----------|-------------|-------------| -| **Samples** | `docker pull nvcr.io/nvidia/deepstream:9.0-samples-multiarch` | Runtime libraries, GStreamer plugins, reference apps, sample streams, models, configs. **Deployment only** — does not support development inside the container. | -| **Triton** | `docker pull nvcr.io/nvidia/deepstream:9.0-triton-multiarch` | Samples contents + devel libraries + Triton Inference Server backends. | - -### dGPU on ARM (GH200, GB200, SBSA) - -| Container | Pull Command | Description | -|-----------|-------------|-------------| -| **Triton ARM SBSA** | `docker pull nvcr.io/nvidia/deepstream:9.0-triton-arm-sbsa` | Triton Inference Server + development environment for ARM SBSA platforms. | - ---- - -## Choosing the Right Image - -| Use Case | Recommended Image | -|----------|-------------------| -| Running sample apps / demos | `9.0-samples-multiarch` | -| pyservicemaker Python applications | `9.0-triton-multiarch` | -| Triton Inference Server required | `9.0-triton-multiarch` | -| Custom Dockerfile base image | `9.0-samples-multiarch` (minimal) or `9.0-triton-multiarch` (with Triton) | - ---- - -## NGC Authentication - -Pulling images requires NGC authentication: - -```bash -# 1. Get an API key from https://ngc.nvidia.com -# 2. Log in to the NGC registry -docker login nvcr.io -# Username: $oauthtoken -# Password: -``` - ---- - -## Installing pyservicemaker Inside the Container - -The `pyservicemaker` Python wheel is **bundled** in the container but **NOT pre-installed**. You must install it explicitly: - -```bash -pip install /opt/nvidia/deepstream/deepstream/service-maker/python/pyservicemaker*.whl \ - pyyaml -``` - -In a Dockerfile: - -```dockerfile -RUN pip install --break-system-packages \ - /opt/nvidia/deepstream/deepstream/service-maker/python/pyservicemaker*.whl \ - pyyaml -``` - -> **Note**: The `--break-system-packages` flag is needed on Ubuntu 24.04 (Python 3.12) to install into the system Python environment. Alternatively, use a virtual environment. - ---- - -## Running Containers - -### Prerequisites - -1. **Docker**: Install `docker-ce` via [official instructions](https://docs.docker.com/engine/install) -2. **NVIDIA Container Toolkit**: Install via [install guide](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/install-guide.html) -3. **NVIDIA Driver**: 590+ for dGPU - -### Basic Run (with display) - -```bash -export DISPLAY=:0 -xhost + - -docker run -it --rm \ - --network=host \ - --gpus all \ - -e DISPLAY=$DISPLAY \ - -v /tmp/.X11-unix/:/tmp/.X11-unix \ - nvcr.io/nvidia/deepstream:9.0-triton-multiarch -``` - -### Headless Run (no display) - -```bash -docker run -it --rm \ - --gpus all \ - nvcr.io/nvidia/deepstream:9.0-triton-multiarch -``` - -> For headless mode, use `fakesink` instead of `nveglglessink`/`nv3dsink` in your pipeline, or output to a file with `filesink`. - -### Run with Custom Video File - -```bash -docker run -it --rm \ - --gpus all \ - -e DISPLAY=$DISPLAY \ - -v /tmp/.X11-unix/:/tmp/.X11-unix \ - -v /path/to/videos:/data \ - nvcr.io/nvidia/deepstream:9.0-triton-multiarch -``` - ---- - -## Building Custom Docker Images - -Use a DeepStream image as the base for your application: - -```dockerfile -FROM nvcr.io/nvidia/deepstream:9.0-triton-multiarch - -# Install pyservicemaker -RUN pip install --break-system-packages \ - /opt/nvidia/deepstream/deepstream/service-maker/python/pyservicemaker*.whl \ - pyyaml - -# Copy application files -WORKDIR /app -COPY my_app.py . -COPY my_config.yml . - -# Enable video driver libraries at runtime (encode/decode) -ENV NVIDIA_DRIVER_CAPABILITIES=${NVIDIA_DRIVER_CAPABILITIES},video - -ENTRYPOINT ["python3", "my_app.py"] -``` - -### Build and Run - -```bash -# Build -docker build -t my-ds-app . - -# Run with display -docker run --rm --gpus all \ - -e DISPLAY=$DISPLAY \ - -v /tmp/.X11-unix:/tmp/.X11-unix \ - my-ds-app - -# Run with RTSP source (no display needed) -docker run --rm --gpus all \ - my-ds-app rtsp://camera-ip/stream -``` - ---- - -## Additional Packages - -DeepStream 9.0 containers do **not** include certain multimedia libraries by default. Install them if needed: - -### Audio/Codec Support - -```bash -# Run the bundled install script for common multimedia packages -/opt/nvidia/deepstream/deepstream/user_additional_install.sh - -# Or install specific packages manually -apt-get install -y gstreamer1.0-libav gstreamer1.0-plugins-good \ - gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly -``` - -### ffmpeg (for sample video preparation scripts) - -```bash -apt-get install --reinstall libflac8 libmp3lame0 libxvidcore4 ffmpeg -``` - -### Kafka Support (librdkafka) - -```bash -apt-get install -y librdkafka-dev -``` - -### Tracker Support (libmosquitto) - -```bash -apt-get install -y libmosquitto1 -``` - ---- - -## Important Paths Inside the Container - -| Path | Contents | -|------|----------| -| `/opt/nvidia/deepstream/deepstream/` | DeepStream SDK root | -| `/opt/nvidia/deepstream/deepstream/samples/models/` | Sample models (Primary_Detector, Secondary_*, etc.) | -| `/opt/nvidia/deepstream/deepstream/samples/streams/` | Sample video streams (e.g., `sample_1080p_h264.mp4`) | -| `/opt/nvidia/deepstream/deepstream/samples/configs/` | Sample configuration files | -| `/opt/nvidia/deepstream/deepstream/lib/` | DeepStream libraries (GStreamer plugins, protocol adapters) | -| `/opt/nvidia/deepstream/deepstream/lib/gst-plugins/` | GStreamer plugin `.so` files | -| `/opt/nvidia/deepstream/deepstream/service-maker/python/` | pyservicemaker wheel file | - ---- - -## Environment Variables - -| Variable | Purpose | Example | -|----------|---------|---------| -| `GST_PLUGIN_PATH` | GStreamer plugin search path | `/opt/nvidia/deepstream/deepstream/lib/gst-plugins` | -| `LD_LIBRARY_PATH` | Shared library search path | `/opt/nvidia/deepstream/deepstream/lib:$LD_LIBRARY_PATH` | -| `GST_DEBUG` | GStreamer debug log level | `3` (INFO) or `nvinfer:5` (plugin-specific) | -| `NVIDIA_DRIVER_CAPABILITIES` | GPU capabilities exposed | `${NVIDIA_DRIVER_CAPABILITIES},video` | -| `DISPLAY` | X11 display for rendering sinks | `:0` | - ---- - -## Common Docker Issues - -### `ModuleNotFoundError: No module named 'pyservicemaker'` - -**Cause**: The wheel is bundled but not installed. - -**Fix**: Add to Dockerfile: -```dockerfile -RUN pip install --break-system-packages \ - /opt/nvidia/deepstream/deepstream/service-maker/python/pyservicemaker*.whl \ - pyyaml -``` - -### Display sinks fail with `Could not open display` - -**Cause**: X11 forwarding not configured. - -**Fix**: Pass display environment and socket: -```bash -docker run --rm --gpus all \ - -e DISPLAY=$DISPLAY \ - -v /tmp/.X11-unix:/tmp/.X11-unix \ - my-ds-app -``` - -Or use `fakesink` / `filesink` for headless operation. - -### `Failed to load plugin ... libnvds_kafka_proto.so` - -**Cause**: `librdkafka` not installed (not bundled in the container). - -**Fix**: Add to Dockerfile: -```dockerfile -RUN apt-get update && apt-get install -y librdkafka-dev && rm -rf /var/lib/apt/lists/* -``` - -### Warning about audio decoder not available - -**Cause**: Multimedia codec packages removed in DS 9.0 containers. - -**Fix**: -```dockerfile -RUN /opt/nvidia/deepstream/deepstream/user_additional_install.sh -``` diff --git a/skills/deepstream/deepstream-dev/references/gstreamer_plugins.md b/skills/deepstream/deepstream-dev/references/gstreamer_plugins.md deleted file mode 100644 index e3c7982f..00000000 --- a/skills/deepstream/deepstream-dev/references/gstreamer_plugins.md +++ /dev/null @@ -1,984 +0,0 @@ -# DeepStream GStreamer Plugins Overview - -## Introduction - -DeepStream provides a comprehensive set of custom GStreamer plugins optimized for NVIDIA GPUs. These plugins handle video decoding, inference, tracking, visualization, and various other video analytics tasks. Understanding these plugins is crucial for building effective DeepStream applications. - -## Plugin Categories - -### Source Plugins -Plugins that generate or capture video data from various sources. - -### Processing Plugins -Plugins that transform, analyze, or process video data. - -### Sink Plugins -Plugins that output video to displays, files, or network destinations. - ---- - -## Source Plugins - -### nvv4l2decoder -**Purpose**: Hardware-accelerated video decoder using NVIDIA V4L2 API (from nvvideo4linux2 plugin) - -**Key Properties**: -- `capture-io-mode`: Capture I/O mode for the sink pad (`auto`, `mmap`, `dmabuf-import`) -- `output-io-mode`: Output I/O mode for the src pad (`auto`, `mmap`, `dmabuf-import`) -- `cudadec-memtype`: CUDA buffer memory type (`memtype_device`, `memtype_pinned`, `memtype_unified`) -- `gpu-id`: GPU device ID used for decoding -- `drop-frame-interval`: Interval for dropping frames (0 keeps all frames) -- `num-extra-surfaces`: Additional decode surfaces to allocate -- `disable-dpb`: Disable DPB buffers to reduce latency -- `low-latency-mode`: Enable low-latency decoding for I/IPPP streams -- `skip-frames`: Frame skipping policy (`decode_all`, `decode_non_ref`, `decode_key`) -- `device`: Decoder device path (read-only, default `/dev/nvidia0`) - -**Usage**: -```bash -nvv4l2decoder output-io-mode=0 drop-frame-interval=0 -``` - -**Common Pipeline Pattern**: -``` -h264parse ! nvv4l2decoder ! ... -``` - -**Output Format**: -- Outputs `video/x-raw(memory:NVMM)` - GPU memory format -- This is already in NVMM format, so NO nvvideoconvert is needed before nvstreammux - -**Notes**: -- Essential for GPU-accelerated pipelines -- Supports H.264, H.265, VP8, VP9 codecs with zero-copy memory transfers -- Output is already in NVMM memory, compatible with nvstreammux and other DeepStream plugins - ---- - -### nvurisrcbin -**Purpose**: Source bin for handling URI-based sources (files, RTSP, HTTP) - -**Key Properties**: -- `uri`: Source URI (file://, rtsp://, http://, etc.) -- `num-buffers`: Number of buffers to process -- `drop-on-latency`: Drop frames on latency - -**Usage**: -```bash -nvurisrcbin uri=file:///path/to/video.mp4 -``` - -**Common Pipeline Pattern**: -``` -nvurisrcbin uri=rtsp://camera-ip/stream ! ... -``` - -**Notes**: -- Automatically handles demuxing and parsing for multiple protocols and formats - ---- - -### nvmultiurisrcbin -**Purpose**: Source bin with built-in REST API server for dynamic multi-stream management - -**Key Properties**: -| Property | Type | Description | -|----------|------|-------------| -| `uri-list` | string | Comma-separated list of initial URIs | -| `sensor-id-list` | string | Comma-separated sensor IDs (maps 1:1 with uri-list) | -| `sensor-name-list` | string | Comma-separated sensor names | -| `ip-address` | string | REST API server IP (default: localhost) | -| `port` | int | REST API server port (default: 9000, 0 to disable) | -| `max-batch-size` | int | Maximum number of sources | -| `batched-push-timeout` | int | Timeout in microseconds to push batch | -| `live-source` | int | Set to 1 for live/dynamic sources (REQUIRED) | -| `drop-pipeline-eos` | int | Set to 1 to keep pipeline alive when sources removed | -| `async-handling` | int | Set to 1 for async state changes | -| `select-rtp-protocol` | int | 0=UDP+TCP auto, 4=TCP only | -| `latency` | int | Jitterbuffer size in ms for RTSP | - -**Built-in REST API Endpoints**: -- `POST /api/v1/stream/add` - Add a stream dynamically -- `POST /api/v1/stream/remove` - Remove a stream -- `GET /api/v1/stream/get-stream-info` - Get current streams - -**Usage**: -```python -# Pipeline with built-in REST server on port 9000 -pipeline.add("nvmultiurisrcbin", "src", { - "port": 9000, - "max-batch-size": 16, - "live-source": 1, - "drop-pipeline-eos": 1, - "async-handling": 1, -}) -# REST API automatically available at http://localhost:9000/api/v1/ -``` - -**⚠️ CRITICAL for Dynamic Sources**: -When using dynamic source addition, the sink element MUST have `async=0`: -```python -pipeline.add("nveglglessink", "sink", { - "sync": 0, - "qos": 0, - "async": 0 # CRITICAL - prevents state transition deadlock -}) -``` - -**Notes**: -- Integrates nvds_rest_server, nvurisrcbin, and nvstreammux in one bin -- Do NOT implement custom Flask/FastAPI server - use built-in REST API -- See `rest_api_dynamic.md` for complete REST API documentation - ---- - -### nvdsdynamicsrcbin -**Purpose**: Source bin for programmatically adding and removing file/URI-based video sources at runtime. Unlike `nvmultiurisrcbin` (REST API / config-driven), `nvdsdynamicsrcbin` is controlled entirely through code using `SourceManager`. - -**CRITICAL**: `nvdsdynamicsrcbin` does **not** manage sources on its own. You **must** use `SourceManager` from `pyservicemaker._pydeepstream.signal` to add, remove, and terminate sources. Without `SourceManager`, the bin has no way to receive source URIs. - -**Key Properties**: -| Property | Type | Default | Description | -|----------|------|---------|-------------| -| `gpu-id` | uint | 0 | GPU Device ID to use for decoding | -| `message-forward` | bool | False | Forward all children messages to the pipeline bus (required for EOS detection) | -| `async-handling` | bool | False | Handle asynchronous state changes internally | -| `current-file` | string (read-only) | null | Currently processing file path | -| `current-id` | int (read-only) | -1 | ID of the chunk currently being processed | - -**Element Actions** (triggered via `SourceManager`): -| Action | Description | -|--------|-------------| -| `add-source` | Add a new file/URI source to the bin | -| `remove-source` | Remove a source by its unique ID | -| `terminate` | Signal no more sources will be added; sends EOS after all finish | - -**Internal Children**: Contains `parsebin`, `queue_parsebin`, and `decoder` — it automatically parses and decodes the added sources. - ---- - -### v4l2src -**Purpose**: Video4Linux2 source for USB cameras - -**Key Properties**: -- `device`: Device path (e.g., `/dev/video0`) -- `io-mode`: I/O mode -- `do-timestamp`: Enable timestamping - -**Usage**: -```bash -v4l2src device=/dev/video0 ! ... -``` - -**Notes**: -- Standard GStreamer plugin for USB webcams, may require format conversion - ---- - -### nvarguscamerasrc -**Purpose**: NVIDIA camera source for Jetson CSI cameras - -**Key Properties**: -- `sensor-id`: Sensor ID (0, 1, etc.) -- `sensor-mode`: Sensor mode -- `wbmode`: White balance mode -- `exposuretimerange`: Exposure time range -- `gainrange`: Gain range - -**Usage**: -```bash -nvarguscamerasrc sensor-id=0 ! ... -``` - -**Notes**: -- Jetson-specific plugin optimized for CSI cameras with hardware-accelerated capture - ---- - -## Processing Plugins - -### nvstreammux -**Purpose**: Batches multiple video streams into a single batch for efficient inference - -**IMPORTANT**: There are TWO versions of nvstreammux: -- **OLD nvstreammux**: Default, uses GObject properties for configuration -- **NEW nvstreammux**: Enabled with `USE_NEW_NVSTREAMMUX=yes`, uses config file for advanced settings - -**Key Properties (NEW nvstreammux - RECOMMENDED)**: -- `batch-size`: Maximum number of buffers in a batch -- `batched-push-timeout`: Timeout for batching in microseconds (default: 33000) -- `config-file-path`: Path to configuration file for advanced settings -- `num-surfaces-per-frame`: Number of surfaces per frame -- `attach-sys-ts`: Attach system timestamp as NTP timestamp (boolean) -- `max-latency`: Maximum latency in live mode (nanoseconds) -- `sync-inputs`: Force synchronization of input frames (boolean) -- `frame-num-reset-on-eos`: Reset frame numbers on EOS (boolean) -- `frame-num-reset-on-stream-reset`: Reset frame numbers on stream reset (boolean) -- `frame-duration`: Duration of input frames in milliseconds for NTP correction -- `drop-pipeline-eos`: Don't propagate EOS downstream when all pads are at EOS (boolean) - -**Key Properties (OLD nvstreammux - Legacy)**: -- `batch-size`: Number of streams to batch -- `width`: Output batch width -- `height`: Output batch height -- `gpu-id`: GPU ID for processing -- `batched-push-timeout`: Timeout for batching (microseconds) -- `enable-padding`: Enable padding for different resolutions -- `nvbuf-memory-type`: Memory type (0=default, 1=NVMM, 2=unified) - -**Usage**: -```bash -nvstreammux name=m batch-size=4 width=1920 height=1080 -``` - -**Common Pipeline Pattern**: -``` -source1 ! m.sink_0 source2 ! m.sink_1 nvstreammux name=m batch-size=2 ! ... -``` - -**Notes**: -- **Critical plugin** for multi-stream applications -- **NEW nvstreammux** (recommended): More flexible, uses config file for width/height/memory-type settings -- **OLD nvstreammux**: Uses GObject properties for width/height, may be deprecated in future -- To use NEW version: Set environment variable `USE_NEW_NVSTREAMMUX=yes` before running pipeline -- Batch size should match number of input streams -- NEW version infers output resolution from downstream elements or uses config file - ---- - -### nvstreamdemux -**Purpose**: Demultiplexes batched streams back to individual streams - -**Key Properties**: -- `name`: Element name (required for pad access) - -**Usage**: -```bash -nvstreamdemux name=d -``` - -**Common Pipeline Pattern**: -``` -nvstreammux name=m ! ... ! nvstreamdemux name=d d.src_0 ! ... d.src_1 ! ... -``` - -**Notes**: -- Used after processing batched streams -- Provides separate source pads for each stream -- Essential for per-stream rendering or processing - ---- - -### nvinfer -**Purpose**: TensorRT-based inference engine for deep learning models - -**Key Properties**: -- `config-file-path`: Path to inference configuration file (supports **both** INI-style text format and YAML format) -- `batch-size`: Batch size for inference -- `gpu-id`: GPU ID for inference -- `unique-id`: Unique identifier for this inference instance -- `process-mode`: Infer processing mode (primary or secondary) -- `interval`: Number of consecutive batches to skip for inference -- `infer-on-gie-id`: Infer on metadata from GIE with this unique ID (-1 for all) -- `infer-on-class-ids`: Operate on objects with specified class IDs -- `filter-out-class-ids`: Ignore metadata for objects of specified class IDs -- `model-engine-file`: Path to pre-generated TensorRT engine file -- `output-tensor-meta`: Output raw tensor metadata (0=no, 1=yes) -- `output-instance-mask`: Output instance mask in metadata (0=no, 1=yes) -- `input-tensor-meta`: Use tensor metadata from upstream (0=no, 1=yes) -- `clip-object-outside-roi`: Clip object bbox outside ROI from nvdspreprocess -- `crop-objects-to-roi-boundary`: Crop object bbox to ROI boundary -- `raw-output-file-write`: Write raw inference output to file -- `raw-output-generated-callback`: Callback for raw output -- `raw-output-generated-userdata`: Userdata for raw output callback - -**Configuration File Structure**: - -nvinfer supports **two configuration formats**: - -### Format 1: YAML Format (Recommended) - -```yaml -# Example: pgie_config.yml (Primary detector using ResNet18) -property: - gpu-id: 0 - net-scale-factor: 0.00392156862745098 - # Use ResNet18 TrafficCamNet model from DeepStream samples - onnx-file: /opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/resnet18_trafficcamnet_pruned.onnx - labelfile-path: /opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/labels.txt - batch-size: 1 - process-mode: 1 - model-color-format: 0 - # 0=FP32, 1=INT8, 2=FP16 - network-mode: 2 - num-detected-classes: 4 - interval: 0 - gie-unique-id: 1 - # 1=DBSCAN, 2=NMS, 3=DBSCAN+NMS, 4=None - cluster-mode: 2 - -class-attrs-all: - topk: 20 - nms-iou-threshold: 0.5 - pre-cluster-threshold: 0.2 -``` - -### Format 2: INI-style Text Format - -```ini -# Example: pgie_config.txt (Primary detector using ResNet18) -[property] -gpu-id=0 -net-scale-factor=0.00392156862745098 -onnx-file=/opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/resnet18_trafficcamnet_pruned.onnx -labelfile-path=/opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/labels.txt -batch-size=1 -process-mode=1 -model-color-format=0 -network-mode=2 -num-detected-classes=4 -interval=0 -gie-unique-id=1 -cluster-mode=2 - -[class-attrs-all] -topk=20 -nms-iou-threshold=0.5 -pre-cluster-threshold=0.2 -``` - -**Key Differences**: -| Aspect | YAML Format | INI Format | -|--------|-------------|------------| -| File extension | `.yml` or `.yaml` | `.txt` | -| Section headers | `property:` (no brackets) | `[property]` (with brackets) | -| Key-value separator | `: ` (colon + space) | `=` (equals) | -| Indentation | Required for nested values | Not used | - -**Usage**: -```bash -nvinfer config-file-path=/path/to/config.yml batch-size=4 -``` - -**Common Pipeline Pattern**: -``` -nvstreammux ! nvinfer config-file-path=pgie_config.txt ! ... -``` - -**Notes**: -- **Primary inference engine** for object detection/classification -- Supports TensorRT engines (.trt), ONNX models, and custom networks -- Can be used as Primary GIE (PGIE) or Secondary GIE (SGIE) -- Multiple instances can be cascaded for complex models -- `output-tensor-meta=1` enables custom postprocessing -- `input-tensor-meta=1` uses preprocessed tensors from nvdspreprocess -- **Note**: `enable-dbscan` is DEPRECATED and is a config file parameter, not a GObject property - ---- - -### nvinferserver -**Purpose**: Inference using Triton Inference Server backend - -**Key Properties**: -- `config-file-path`: Path to Triton configuration file -- `gpu-id`: GPU ID -- `unique-id`: Unique identifier -- `output-tensor-meta`: Output tensor metadata - -**Usage**: -```bash -nvinferserver config-file-path=/path/to/triton_config.txt -``` - -**Notes**: -- Alternative to nvinfer for Triton-based inference -- Supports remote inference servers -- Better for scalable deployments -- Requires Triton Inference Server setup - ---- - -### nvdspreprocess -**Purpose**: Custom preprocessing plugin for region-of-interest (ROI) preprocessing - -**Key Properties**: -- `config-file`: Path to preprocessing configuration file -- `gpu-id`: GPU ID - -**Configuration File Structure**: -```yaml -preprocess-config: - - preprocess-group: - target-unique-ids: [1] - roi-params-src: [0] - process-on-roi: 1 - network-input-shape: [1, 3, 544, 960] - tensor-format: 0 # 0=NCHW, 1=NHWC - maintain-aspect-ratio: 0 - custom-transform-function: "custom_transform" - custom-tensor-prep-function: "custom_tensor_prep" -``` - -**Usage**: -```bash -nvdspreprocess config-file=/path/to/preprocess_config.yml -``` - -**Common Pipeline Pattern**: -``` -nvstreammux ! nvdspreprocess config-file=preprocess.yml ! nvinfer input-tensor-meta=1 ! ... -``` - -**Notes**: -- Enables custom preprocessing before inference -- Processes ROIs or full frames -- Outputs tensor metadata for nvinfer -- Custom preprocessing library and functions are specified in the **config file**, not as GObject properties -- Optimal performance: batch-size should match total units in config - ---- - -### nvdspostprocess -**Purpose**: Custom postprocessing plugin for parsing model outputs - -**Key Properties**: -- `postprocesslib-name`: Path to postprocessing library (.so) -- `postprocesslib-config-file`: Path to postprocessing configuration file -- `gpu-id`: GPU ID - -**Configuration File Structure** (YAML): -```yaml -postprocess-config: - - postprocess-group: - target-unique-ids: [1] - custom-parse-function: "custom_parse" - custom-bbox-parse-function: "custom_bbox_parse" - output-format: 0 # 0=object detection, 1=classification -``` - -**Usage**: -```bash -nvdspostprocess postprocesslib-name=./libpostprocess.so postprocesslib-config-file=config.yml -``` - -**Common Pipeline Pattern**: -``` -nvinfer output-tensor-meta=1 ! nvdspostprocess postprocesslib-name=... ! ... -``` - -**Notes**: -- Parses raw tensor outputs from nvinfer -- Requires nvinfer with output-tensor-meta=1 -- Supports custom parsing functions -- Used for models not supported by nvinfer's built-in parsers - ---- - -### nvtracker -**Purpose**: Multi-object tracker for tracking objects across frames - -**Key Properties**: -- `ll-lib-file`: Path to low-level tracker library (.so) -- `ll-config-file`: Path to tracker configuration file -- `tracker-width`: Tracker input width -- `tracker-height`: Tracker input height -- `gpu-id`: GPU ID -- `input-tensor-meta`: Use tensor metadata (0=no, 1=yes) -- `tensor-meta-gie-id`: GIE ID for tensor metadata (used with input-tensor-meta) -- `display-tracking-id`: Display tracking ID in object text -- `tracking-id-reset-mode`: Tracking ID reset mode on stream reset/EOS -- `tracking-surface-type`: Selective tracking surface type -- `user-meta-pool-size`: Tracker user metadata buffer pool size -- `sub-batches`: Configuration of sub-batches for parallel processing -- `sub-batch-err-recovery-trial-cnt`: Max trials to reinitialize tracker on error - -**Configuration File Structure**: -```yaml -tracker: - ll-lib-file: /path/to/libnvds_nvmultiobjecttracker.so - ll-config-file: /path/to/tracker_config.yml - enable-batch-process: 1 - enable-past-frame: 1 - tracker-width: 1920 - tracker-height: 1080 -``` - -**Usage**: -```bash -nvtracker ll-lib-file=/path/to/libnvds_nvmultiobjecttracker.so ll-config-file=/path/to/config.yml -``` - -**Common Pipeline Pattern**: -``` -nvinfer ! nvtracker ll-lib-file=... ! ... -``` - -**Notes**: -- Tracks objects across video frames -- Assigns unique tracking IDs to objects -- Supports multiple tracking algorithms -- Requires object metadata from inference engine -- Tracker dimensions should match preprocess/infer dimensions when using input-tensor-meta=1 - ---- - -### nvdsosd (nvosdbin) -**Purpose**: On-Screen Display element (`nvdsosd`) and DeepStream convenience bin (`nvosdbin`) for drawing bounding boxes, labels, masks, and clocks - -**Key Properties**: -- `gpu-id`: GPU ID to render on -- `process-mode`: Rendering backend (0=CPU, 1=GPU) -- `display-text`: Enable text overlay (boolean) -- `display-bbox`: Enable bounding box display (boolean) -- `display-mask`: Enable instance mask display (boolean) -- `display-clock`: Enable clock display (boolean) -- `clock-font`: Font for clock text -- `clock-font-size`: Font size for clock -- `x-clock-offset`: X offset for clock position -- `y-clock-offset`: Y offset for clock position -- `clock-color`: Clock color (RGBA as uint) -- `blur-bbox`: Enable bbox blurring (boolean) -- `blur-on-gie-class-ids`: Blur bboxes for specific GIE unique ID and class ID - -**Note**: Text and bbox styling properties (like colors, borders) are controlled through object metadata, not as GObject properties on the plugin itself. - -**Usage**: -```bash -nvdsosd display-text=1 display-bbox=1 -``` - -**Common Pipeline Pattern**: -``` -nvtracker ! nvdsosd ! ... -``` - -**Notes**: -- Use `nvdsosd` for the raw transform element -- Supports tracking ID display, text overlays, and optional blur/clocks -- Keeps surfaces in NVMM for zero-copy rendering on GPU -- Object-specific styling (text colors, bbox colors, etc.) is set through NvDsMeta object metadata, not plugin properties - ---- - -### nvmultistreamtiler -**Purpose**: Tiles multiple video streams into a single output frame - -**Key Properties**: -- `width`: Output width -- `height`: Output height -- `rows`: Number of rows in tile layout -- `columns`: Number of columns in tile layout -- `gpu-id`: GPU ID -- `show-source`: Show source index (0=no, 1=yes) - -**Usage**: -```bash -nvmultistreamtiler width=1920 height=1080 rows=2 columns=2 -``` - -**Common Pipeline Pattern**: -``` -nvstreamdemux name=d d.src_0 ! ... d.src_1 ! ... ! nvmultistreamtiler ! ... -``` - -**Notes**: -- Combines multiple streams into a grid layout, useful for multi-stream visualization - ---- - -### nvvideoconvert -**Purpose**: Video format converter (color space conversion, scaling) - -**Key Properties**: -- `gpu-id`: GPU ID -- `nvbuf-memory-type`: Memory type -- `src-crop`: Source crop rectangle -- `dest-crop`: Destination crop rectangle - -**Usage**: -```bash -nvvideoconvert gpu-id=0 -``` - -**Common Pipeline Pattern**: -``` -nvdsosd ! nvvideoconvert ! nveglglessink -``` - -**Notes**: -- GPU-accelerated color format conversion (NV12, RGBA, etc.), often needed before rendering sinks - ---- - -### nvdsanalytics -**Purpose**: Video analytics plugin for motion detection, line crossing, etc. - -**Key Properties**: -- `config-file`: Path to analytics configuration file -- `enable`: Enable analytics (0=no, 1=yes) -- `gpu-id`: GPU ID - -**Configuration File Parameters**: -The config file **must** include a **property** group/section. Other groups define per-stream ROI, line-crossing, overcrowding, and direction rules. Stream index is given by the numeric suffix in the group name (e.g. `roi-filtering-stream-0` for stream 0). -- `property`: General group; Mandatory. - - `config-width`,`config-height`: Reference resolution width and height for analytics coordinate scaling. - - `enable`: Whether analytics is enabled (aligned with the element **enable** property). - - `display-font-size`: Optional; OSD font size. - - `osd-mode`: Optional; 0, 1, or 2. 0 = OSD off, 1 = labels only, 2 = full (default). - - `obj-cnt-win-in-ms`: Optional; object-count time window in milliseconds; range 1–1000000000. - - `display-obj-cnt`: Optional; whether to show per-class object counts on OSD. -- `roi-filtering-stream-`: ROI Filtering group per stream - - `enable`: Enable ROI filtering for this stream. - - `class-id`: Class IDs to include in ROI analytics (semicolon-separated integer list). - - `inverse-roi`: Whether treat as “outside ROI” for counting/filtering. - - `roi-