diff --git a/demos/point_cloud/point_cloud_loader.py b/demos/point_cloud/point_cloud_loader.py deleted file mode 100644 index 69c2604..0000000 --- a/demos/point_cloud/point_cloud_loader.py +++ /dev/null @@ -1,29 +0,0 @@ -from simpub.core.node_manager import init_node -from simpub.xr_device.xr_device import XRDevice -from simpub.core.node_manager import Publisher -import numpy as np - - -def generate_random_point_cloud(num_points=10000): - xyz = np.random.uniform(-1, 1, (num_points, 3)).astype(np.float32) - rgb = np.random.uniform(0, 1, (num_points, 3)).astype(np.float32) - size = (np.ones((num_points, 1)) * 0.01).astype(np.float32) - cloud = np.hstack((xyz, rgb, size)) - return cloud.astype(np.float32).tobytes() - - -if __name__ == "__main__": - - net_manager = init_node("192.168.0.134", "PointCloud") - net_manager.start_node_broadcast() - unity_editor = XRDevice("UnityNode") - publisher = Publisher("PointCloud") - try: - while True: - pointcloud_bytes = generate_random_point_cloud() - publisher.publish_bytes(pointcloud_bytes) - except KeyboardInterrupt: - pass - - # data = simpublisher.generate_point_cloud_data(rgb_image, depth_image) - # simpublisher.join() diff --git a/demos/point_cloud/streamer/Actions.py b/demos/point_cloud/streamer/Actions.py new file mode 100644 index 0000000..0a78205 --- /dev/null +++ b/demos/point_cloud/streamer/Actions.py @@ -0,0 +1,79 @@ +import time, threading +from typing import Optional, Tuple +import numpy as np +import cv2 +import zmq + +from net.PacketV2 import PacketV2Writer + +# ====================== Preview (RGB + Depth) ====================== +def _resolve_colormap_code(name_or_code): + if isinstance(name_or_code, int): + return int(name_or_code) + name = str(name_or_code).strip().upper() + return getattr(cv2, f"COLORMAP_{name}", cv2.COLORMAP_JET) + +def _colorize_depth_mm(depth_u16: np.ndarray, dmin: int, dmax: int, cmap_code: int) -> np.ndarray: + d = np.asarray(depth_u16, dtype=np.float32) + valid = d > 0 + lo, hi = float(dmin), float(max(dmax, dmin + 1)) + d = np.clip(d, lo, hi) + norm = (d - lo) * (255.0 / (hi - lo)) + norm[~valid] = 0.0 + img8 = norm.astype(np.uint8) + cm = cv2.applyColorMap(img8, cmap_code) + cm[~valid] = (0, 0, 0) + return cm + +class PreviewHub(threading.Thread): + def __init__(self): + super().__init__(daemon=True) + self.enabled_rgb = False + self.enabled_depth = False + self._run = False + self._lock = threading.Lock() + self._rgb = {} + self._depth = {} + + def enable(self, rgb: bool, depth: bool): + self.enabled_rgb = bool(rgb) + self.enabled_depth = bool(depth) + if (self.enabled_rgb or self.enabled_depth) and not self._run: + self._run = True + self.start() + + def stop(self): + self._run = False + + + def update(self, cam_id: int, bgr: np.ndarray | None, depth_bgr: np.ndarray | None): + if not self._run: + return + with self._lock: + if self.enabled_rgb and bgr is not None: + self._rgb[cam_id] = bgr + if self.enabled_depth and depth_bgr is not None: + self._depth[cam_id] = depth_bgr + + def run(self): + while self._run: + imgs = [] + with self._lock: + if self.enabled_rgb: + imgs.extend(("RGB", cid, img) for cid, img in self._rgb.items()) + if self.enabled_depth: + imgs.extend(("Depth", cid, img) for cid, img in self._depth.items()) + + for kind, cid, img in imgs: + cv2.imshow(f"{kind} - Cam {cid}", img) + + key = cv2.waitKey(1) & 0xFF + if key == 27: # ESC aborts preview loop + self._run = False + break + + cv2.destroyAllWindows() + + +PREVIEW = PreviewHub() + diff --git a/demos/point_cloud/streamer/Datasources.py b/demos/point_cloud/streamer/Datasources.py new file mode 100644 index 0000000..bcb55e6 --- /dev/null +++ b/demos/point_cloud/streamer/Datasources.py @@ -0,0 +1,9 @@ +from dataclasses import dataclass + +@dataclass +class CameraConfig: + # Default from a realsense camera with resolution 640x480 + fx: float = 591.4252 + fy: float = 591.4252 + cx: float = 320.1326 + cy: float = 239.1477 \ No newline at end of file diff --git a/demos/point_cloud/streamer/ProcessingStep.py b/demos/point_cloud/streamer/ProcessingStep.py new file mode 100644 index 0000000..147c600 --- /dev/null +++ b/demos/point_cloud/streamer/ProcessingStep.py @@ -0,0 +1,179 @@ +from abc import ABC, abstractmethod +import numpy as np +from skimage.measure import block_reduce +import cv2 + + + +class ProcessingStep(ABC): + def __init__(self, next_step=None): + self.next_step = next_step + + def set_next(self, next_step): + self.next_step = next_step + return next_step # chaining helper + + def process(self, rgb_frame, depth_frame): + rgb_out, depth_out = self._process(rgb_frame, depth_frame) + if self.next_step: + return self.next_step.process(rgb_out, depth_out) + else: + return rgb_out, depth_out + + @abstractmethod + def _process(self, rgb_frame, depth_frame): + pass + + +class DepthClampAndMask(ProcessingStep): + """ + Clamp depth to a valid metric range and set everything else to 0 (invalid). + Incoming depth assumed uint16 in millimeters (0 = invalid). If float (m), convert. + Constructor expects meters. + """ + def __init__(self, z_min_m=0.25, z_max_m=3.5): + super().__init__() + self.min_mm = int(max(0, round(z_min_m * 1000.0))) + self.max_mm = int(max(0, round(z_max_m * 1000.0))) + + def _process(self, rgb_frame, depth_frame): + if depth_frame is None: + return rgb_frame, depth_frame + + if np.issubdtype(depth_frame.dtype, np.floating): + depth_mm = (np.clip(depth_frame, 0.0, 65.535) * 1000.0).astype(np.uint16) + else: + depth_mm = depth_frame + + valid = depth_mm > 0 + out = depth_mm.copy() + out[valid & (out < self.min_mm)] = 0 + out[valid & (out > self.max_mm)] = 0 + return rgb_frame, out + + +class LocalMedianReject(ProcessingStep): + """ + Suppress isolated depth spikes by comparing to a local median. + win: odd kernel (3,5,7), thr_mm: reject if |d - median| > thr. + """ + def __init__(self, win=5, thr_mm=60): + super().__init__() + self.win = int(win if win % 2 == 1 else win + 1) + self.thr = int(thr_mm) + + def _process(self, rgb, depth_u16): + if depth_u16 is None: + return rgb, depth_u16 + med = cv2.medianBlur(depth_u16, self.win) + diff = cv2.absdiff(depth_u16, med) + out = depth_u16.copy() + valid = depth_u16 > 0 + out[valid & (diff > self.thr)] = 0 + return rgb, out + + +class CropROI(ProcessingStep): + """ + Crop a rectangular ROI (x0,y0,w,h) on RGB and depth. + NOTE: this step does NOT know intrinsics; Streamer.py will adjust cx,cy. + """ + def __init__(self, x0: int, y0: int, w: int, h: int): + super().__init__() + self.x0 = int(max(0, x0)) + self.y0 = int(max(0, y0)) + self.w = int(max(1, w)) + self.h = int(max(1, h)) + + def _process(self, rgb, depth): + if depth is not None: + H, W = depth.shape[:2] + elif rgb is not None: + H, W = rgb.shape[:2] + else: + return rgb, depth + + x1 = min(self.x0 + self.w, W) + y1 = min(self.y0 + self.h, H) + x0 = min(self.x0, x1 - 1) + y0 = min(self.y0, y1 - 1) + + if rgb is not None: + rgb = rgb[y0:y1, x0:x1].copy() + if depth is not None: + depth = depth[y0:y1, x0:x1].copy() + return rgb, depth + + +class EncodeRGBAsJPEG(ProcessingStep): + """Optional: turn RGB into a JPEG buffer (uint8).""" + def _process(self, rgb_frame, depth_frame): + if rgb_frame is None: + return None, depth_frame + if rgb_frame.dtype != np.uint8: + rgb_frame = rgb_frame.astype(np.uint8) + ret, rgb_buf = cv2.imencode('.jpg', rgb_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) + return rgb_buf, depth_frame + + +class DownSampling(ProcessingStep): + """ + Block downsample using skimage.block_reduce. + mode='avg' for RGB, 'min' for depth (keeps close structure). + NOTE: Streamer.py will scale fx,fy,cx,cy; this step only resizes arrays. + """ + def __init__(self, blocksize): + super().__init__() + self.block_size = int(max(1, blocksize)) + + def _process(self, rgb_frame, depth_frame): + depth = self.downsample(depth_frame, mode='min') if depth_frame is not None else None + rgb = self.downsample(rgb_frame, mode='avg') if rgb_frame is not None else None + return rgb, depth + + def downsample(self, img, mode='avg'): + func = np.mean if mode == 'avg' else np.min + if img.ndim == 3: # H×W×C + return block_reduce(img, block_size=(self.block_size, self.block_size, 1), func=func).astype(img.dtype) + else: + return block_reduce(img, block_size=(self.block_size, self.block_size), func=func).astype(img.dtype) + + + +Clamp = DepthClampAndMask +Median = LocalMedianReject +ROICrop = CropROI + +class Downsample(DownSampling): + def __init__(self, stride=2): + super().__init__(blocksize=int(stride)) + + + +def build_default_steps(global_cfg: dict): + """ + Clamp → (Median) → (ROI) → (Downsample) + YAML keys: + depth_min_mm, depth_max_mm, median_ksize, roi_xywh: [x,y,w,h], downsample_stride + """ + steps = [] + dmin_mm = global_cfg.get("depth_min_mm") + dmax_mm = global_cfg.get("depth_max_mm") + if dmin_mm is not None or dmax_mm is not None: + zmin_m = (dmin_mm or 0) / 1000.0 + zmax_m = (dmax_mm or 65535) / 1000.0 + steps.append(Clamp(z_min_m=zmin_m, z_max_m=zmax_m)) + + k = int(global_cfg.get("median_ksize", 0) or 0) + if k > 0: + steps.append(Median(win=k, thr_mm=60)) + + if "roi_xywh" in global_cfg and global_cfg["roi_xywh"]: + x, y, w, h = map(int, global_cfg["roi_xywh"]) + steps.append(ROICrop(x, y, w, h)) + + s = int(global_cfg.get("downsample_stride", 1) or 1) + if s > 1: + steps.append(Downsample(stride=s)) + + return steps \ No newline at end of file diff --git a/demos/point_cloud/streamer/Source.py b/demos/point_cloud/streamer/Source.py new file mode 100644 index 0000000..f704ef6 --- /dev/null +++ b/demos/point_cloud/streamer/Source.py @@ -0,0 +1,670 @@ +from abc import ABC, abstractmethod +import cv2 +import zmq +import struct +import socket +import numpy as np +import Datasources as ds +import time +import math +import depthai as dai + + +from pyk4a import PyK4A, Config, CalibrationType, ColorResolution, DepthMode +from pyk4a.calibration import Calibration + +class Source(ABC): + @abstractmethod + def connect(self): + pass + + @abstractmethod + def get_frame(self): + pass + + @abstractmethod + def close(self): + pass + + + +class CameraStrategy(Source): + @abstractmethod + def apply_filters(self, depth_frame): + pass + + @abstractmethod + def get_intrinsics(self): + pass + + + +try: + import depthai as dai +except ImportError: + dai = None + +try: + import pyrealsense2 as rs +except ImportError: + rs = None + +class RealSenseCameraStrategy(CameraStrategy): + def __init__(self, width=640, height=480): + if rs is None: + raise RuntimeError("RealSense SDK is not installed.") + self.width = width + self.height = height + self.pipeline = None + self.profile = None + + def connect(self): + self.pipeline = rs.pipeline() + config = rs.config() + config.enable_stream(rs.stream.depth, self.width, self.height, rs.format.z16, 30) + config.enable_stream(rs.stream.color, self.width, self.height, rs.format.bgr8, 30) + self.profile = self.pipeline.start(config) + self.align = rs.align(rs.stream.color) + + def get_frame(self): + frames = self.pipeline.wait_for_frames() + aligned = self.align.process(frames) + depth_frame = aligned.get_depth_frame() + color_frame = aligned.get_color_frame() + if not depth_frame or not color_frame: + return None, None + #depth_frame = self.apply_filters(depth_frame) + + o = self.get_intrinsics() + return np.asanyarray(color_frame.get_data()), np.asanyarray(depth_frame.get_data()), ds.CameraConfig(o["fx"], o["fy"], o["ppx"], o["ppy"]) + + def apply_filters(self, depth_frame): + #filtered = self.spatial.process(depth_frame) + #filtered = self.temporal.process(filtered) + #filtered = self.hole_filling.process(filtered) + pass + + def get_intrinsics(self): + if self.profile is None: + raise RuntimeError("Camera not connected.") + + depth_stream = self.profile.get_stream(rs.stream.depth).as_video_stream_profile() + intr = depth_stream.get_intrinsics() + return { + "width": intr.width, + "height": intr.height, + "fx": intr.fx, + "fy": intr.fy, + "ppx": intr.ppx, + "ppy": intr.ppy, + "model": str(intr.model), + "distortion_coeffs": list(intr.coeffs) + } + + def close(self): + self.pipeline.stop() + +class LuxonisCameraStrategy: + """ + Multi-device OAK-D strategy. + Returns (bgr, depth_u16_mm, cfg) where cfg has: fx, fy, cx, cy, timestamp_us. + Depth is aligned to RGB and sized exactly to (width, height). + """ + + def __init__(self, + width: int = 1280, + height: int = 720, + color_res=None, # optional [W,H] from YAML + mxid: str | None = None, # select device by MXID + device_index: int = 0, # fallback to index if no MXID + usb2mode: bool = False, # diagnostic: force USB2 if needed + fps: int = 30, + align_to_color: bool = True, # kept for API symmetry + # Quality/robustness knobs (sane defaults) + enable_lrc: bool = True, # REQUIRED when aligning depth to RGB/CENTER on FW 1.2.x + enable_subpixel: bool = True, + enable_extended: bool = False, + confidence: int = 180, # + median_kernel: str = "KERNEL_5x5", # "OFF", "KERNEL_3x3", "KERNEL_5x5", "KERNEL_7x7" + **_): + if color_res and len(color_res) == 2: + width, height = int(color_res[0]), int(color_res[1]) + + self.width = int(width) + self.height = int(height) + self.fps = int(fps) + + self.mxid = mxid + self.device_index = int(device_index) + self.usb2mode = bool(usb2mode) + + self.enable_lrc = bool(enable_lrc) + self.enable_subpixel = bool(enable_subpixel) + self.enable_extended = bool(enable_extended) + self.confidence = int(max(0, min(255, confidence))) + self.median_kernel = str(median_kernel).upper() + + self.dev = None + self.qRgb = None + self.qDepth = None + self._intr = None # np.array([fx,fy,cx,cy], float32) + + # API aliases for your CameraContext + def open(self): return self._connect() + def init(self): return self._connect() + def connect(self): return self._connect() + def disconnect(self): return self.close() + + def _connect(self): + # Enumerate & print devices (helps catch wrong MXID) + devs = dai.Device.getAllAvailableDevices() + found = [d.getMxId() for d in devs] + print(f"[OAK] Available devices: {found if found else '[]'}") + if self.mxid and self.mxid not in found: + raise RuntimeError(f"MXID '{self.mxid}' not found. Detected: {found}") + + # ------- Build pipeline ------- + p = dai.Pipeline() + + # Color camera (RGB) + camRgb = p.create(dai.node.ColorCamera) + rgb_sock = getattr(dai.CameraBoardSocket, "RGB", + getattr(dai.CameraBoardSocket, "CAM_A")) + camRgb.setBoardSocket(rgb_sock) + camRgb.setPreviewSize(self.width, self.height) + camRgb.setFps(self.fps) + camRgb.setInterleaved(False) + camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR) + + # Stereo inputs (mono L/R) — OV7251 supports 400p/480p only + monoL = p.create(dai.node.MonoCamera) + monoR = p.create(dai.node.MonoCamera) + monoL.setBoardSocket(dai.CameraBoardSocket.LEFT) + monoR.setBoardSocket(dai.CameraBoardSocket.RIGHT) + monoL.setResolution(dai.MonoCameraProperties.SensorResolution.THE_480_P) + monoR.setResolution(dai.MonoCameraProperties.SensorResolution.THE_480_P) + monoL.setFps(self.fps) + monoR.setFps(self.fps) + + # Stereo depth (quality + stability) + stereo = p.create(dai.node.StereoDepth) + try: + PM = dai.node.StereoDepth.PresetMode # modern path + except AttributeError: + PM = dai.StereoDepth.PresetMode + # Version-safe dense preset + if hasattr(PM, "HIGH_DENSITY"): + preset = PM.HIGH_DENSITY + elif hasattr(PM, "MEDIUM_DENSITY"): + preset = PM.MEDIUM_DENSITY + elif hasattr(PM, "DEFAULT"): + preset = PM.DEFAULT + else: + # last resort: fall back to any attr that exists + preset = [v for k,v in PM.__dict__.items() if k.isupper()][0] + stereo.setDefaultProfilePreset(preset) + + # Confidence (cleaner, fewer speckles) + stereo.initialConfig.setConfidenceThreshold(self.confidence) + + cfg = stereo.initialConfig.get() + + cfg.postProcessing.speckleFilter.enable = True + cfg.postProcessing.speckleFilter.speckleRange = 20 + + cfg.postProcessing.spatialFilter.enable = True + cfg.postProcessing.spatialFilter.holeFillingRadius = 2 + cfg.postProcessing.spatialFilter.numIterations = 2 + + cfg.postProcessing.temporalFilter.enable = True + + stereo.initialConfig.set(cfg) + + # Median filter (small kernel smooths speckle nicely) + kernel_map = { + "OFF": dai.MedianFilter.MEDIAN_OFF, + "KERNEL_3X3": dai.MedianFilter.KERNEL_3x3, + "KERNEL_5X5": dai.MedianFilter.KERNEL_5x5, + "KERNEL_7X7": dai.MedianFilter.KERNEL_7x7, + } + stereo.setMedianFilter(kernel_map.get(self.median_kernel, dai.MedianFilter.KERNEL_5x5)) + + # Align to RGB and match sizes exactly. + # NOTE: On FW 1.2.x, CENTER/RGB alignment REQUIRES LRC to be enabled. + stereo.setDepthAlign(rgb_sock) + stereo.setLeftRightCheck(True) + + # Keep disparity boosters off (reduce artifacts / avoid old warnings) + stereo.setExtendedDisparity(bool(self.enable_extended)) + stereo.setSubpixel(bool(self.enable_subpixel)) + + stereo.setOutputSize(self.width, self.height) + + # Links + monoL.out.link(stereo.left) + monoR.out.link(stereo.right) + + xoutRgb = p.create(dai.node.XLinkOut); xoutRgb.setStreamName("rgb") + xoutDepth = p.create(dai.node.XLinkOut); xoutDepth.setStreamName("depth") + camRgb.preview.link(xoutRgb.input) + stereo.depth.link(xoutDepth.input) + + # ------- Open specific device (critical for multi-camera) ------- + if self.mxid: + info = dai.DeviceInfo(self.mxid) + self.dev = dai.Device(p, info, usb2Mode=self.usb2mode) + print(f"[OAK] Opening device MXID={self.mxid} usb2={self.usb2mode}") + else: + if not devs: + raise RuntimeError("No OAK devices found.") + info = devs[min(self.device_index, len(devs)-1)] + self.dev = dai.Device(p, info, usb2Mode=self.usb2mode) + print(f"[OAK] Opening device MXID={info.getMxId()} (index={self.device_index}) usb2={self.usb2mode}") + + # Small, non-blocking queues for low latency + self.qRgb = self.dev.getOutputQueue("rgb", maxSize=1, blocking=False) + self.qDepth = self.dev.getOutputQueue("depth", maxSize=1, blocking=False) + + # Intrinsics for the RGB camera at (width,height) + calib = self.dev.readCalibration() + K = calib.getCameraIntrinsics(rgb_sock, self.width, self.height) + fx, fy, cx, cy = float(K[0][0]), float(K[1][1]), float(K[0][2]), float(K[1][2]) + self._intr = np.array([fx, fy, cx, cy], dtype=np.float32) + + def get_frame(self): + """ + Returns (bgr, depth_u16_mm, cfg). Never returns None; if packets are late, + it blocks briefly and falls back gracefully. + """ + if self.qRgb is None or self.qDepth is None: + time.sleep(0.005) + return self._blank_frame() + + try: + # Try-get with a short deadline to keep UI responsive + deadline = time.time() + 0.20 + rgb_pkt = None + d_pkt = None + while time.time() < deadline and (rgb_pkt is None or d_pkt is None): + if rgb_pkt is None: + rgb_pkt = self.qRgb.tryGet() + if d_pkt is None: + d_pkt = self.qDepth.tryGet() + if rgb_pkt is None or d_pkt is None: + time.sleep(0.001) + + if rgb_pkt is None: + rgb_pkt = self.qRgb.get() # block if still missing + if d_pkt is None: + d_pkt = self.qDepth.get() # block if still missing + + bgr = rgb_pkt.getCvFrame() # HxWx3 uint8 (BGR) + depth = d_pkt.getFrame().copy() # HxW uint16 (mm) + + if depth.shape[:2] != bgr.shape[:2]: + depth = cv2.resize(depth, (bgr.shape[1], bgr.shape[0]), interpolation=cv2.INTER_NEAREST) + + class _Cfg: pass + cfg = _Cfg() + cfg.fx, cfg.fy, cfg.cx, cfg.cy = self._intr + cfg.timestamp_us = int(time.time() * 1e6) + return bgr, depth.astype(np.uint16), cfg + + except Exception as e: + # Device glitch or link error; return a blank frame so the pipeline keeps running + print(f"[OAK] queue error: {e}") + return self._blank_frame() + + def _blank_frame(self): + # Minimal fallback frame to keep upstream safe + bgr = np.zeros((self.height, self.width, 3), np.uint8) + depth = np.zeros((self.height, self.width), np.uint16) + class _Cfg: pass + cfg = _Cfg() + if self._intr is None: + fx = fy = 1.0; cx = self.width * 0.5; cy = self.height * 0.5 + intr = np.array([fx, fy, cx, cy], dtype=np.float32) + else: + intr = self._intr + cfg.fx, cfg.fy, cfg.cx, cfg.cy = intr + cfg.timestamp_us = int(time.time() * 1e6) + return bgr, depth, cfg + + def get_intrinsics(self): + if self._intr is None: + return {"fx": 0.0, "fy": 0.0, "ppx": 0.0, "ppy": 0.0, "model": "perspective", "distortion_coeffs": []} + fx, fy, cx, cy = map(float, self._intr) + return {"fx": fx, "fy": fy, "ppx": cx, "ppy": cy, "model": "perspective", "distortion_coeffs": []} + + def close(self): + if self.dev is not None: + try: + self.dev.close() + finally: + self.dev = None + self.qRgb = None + self.qDepth = None + + +class AzureKinectCameraStrategy(CameraStrategy): + def __init__(self, width=640, height=480, device_index=0): + self.width = width + self.height = height + self.device_index = int(device_index) + self.device = None + + def connect(self): + self.device = PyK4A( + Config( + color_resolution=self._get_color_resolution(), + depth_mode=self._get_depth_mode(), + synchronized_images_only=True + ), + device_id=self.device_index + ) + self.device.start() + + + def get_frame(self): + capture = self.device.get_capture() + if capture.color is None or capture.depth is None: + return None, None, None + + color = capture.color + depth = capture.transformed_depth # depth aligned to color + + # Ensure 3-channel BGR for JPEG encoding + if color.ndim == 3 and color.shape[2] == 4: + color = cv2.cvtColor(color, cv2.COLOR_BGRA2BGR) + + intr = self.get_intrinsics() + return color, depth, ds.CameraConfig(intr["fx"], intr["fy"], intr["ppx"], intr["ppy"]) + + def get_intrinsics(self): + calib: Calibration = self.device.calibration + cam = calib.get_camera_matrix(CalibrationType.COLOR) + fx = cam[0, 0] + fy = cam[1, 1] + ppx = cam[0, 2] + ppy = cam[1, 2] + return { + "fx": fx, + "fy": fy, + "ppx": ppx, + "ppy": ppy, + "matrix": cam.tolist() + } + + def close(self): + if self.device is not None: + self.device.stop() + + def _get_color_resolution(self): + # Map width x height to Azure Kinect color resolution + if self.width == 1280 and self.height == 720: + return ColorResolution.RES_720P + elif self.width == 1920 and self.height == 1080: + return ColorResolution.RES_1080P + else: + return ColorResolution.RES_720P # default + + def _get_depth_mode(self): + return DepthMode.NFOV_2X2BINNED # 640x576 (highest quality narrow FOV) + + def apply_filters(self, depth_frame): + pass + + + +class CameraContext: + def __init__(self, strategy: Source): + self.strategy = strategy + self.is_camera = isinstance(strategy, CameraStrategy) + + def init(self): + self.strategy.connect() + + def get_frame(self): + rgb, depth, config = self.strategy.get_frame() + return rgb, depth, config + + def apply_filters(self, depth_frame): + if self.is_camera: + return self.strategy.apply_filters(depth_frame) + else: + print("Filter not available for this source.") + return depth_frame + + def get_intrinsics(self): + if self.is_camera: + return self.strategy.get_intrinsics() + else: + return None + + def close(self): + self.strategy.close() + +class DummyCameraStrategy: + """ + Synthetic camera for testing multi-cam and poses without hardware. + + Output per frame: + (bgr: HxWx3 uint8, depth_u16: HxW uint16 mm, cfg: object with fx,fy,cx,cy,timestamp_us) + + __init__ signature mirrors other strategies so Streamer._safe_construct can pass + common kwargs like width/height/color_res/align_to_color. + """ + def __init__(self, + width: int = 1280, + height: int = 720, + color_res=None, + fov_deg: float = 70.0, + pattern: str = "checker", # "checker" or "bars" + z_mm: int = 1500, # base Z in millimetres + amp_mm: int = 250, # wave amplitude in mm + period_s: float = 4.0, # animation period (seconds) + seed: int | None = None, + align_to_color: bool = True, # kept for API symmetry (not used) + **_): + # Resolve output size using color_res if provided + if color_res is not None and len(color_res) == 2: + width, height = int(color_res[0]), int(color_res[1]) + self.w, self.h = int(width), int(height) + + self.fov_deg = float(fov_deg) + self.pattern = str(pattern).lower() + self.z_base = int(z_mm) + self.amp = int(amp_mm) + self.period = float(period_s) + + self.t0 = None + self.rng = np.random.RandomState(None if seed is None else int(seed)) + self._intr = self._compute_intrinsics(self.w, self.h, self.fov_deg) + + # ---------------- API expected by CameraContext ---------------- + + def open(self): + """Open/init the synthetic source.""" + self.t0 = time.time() + + # Some contexts call these names instead; alias to open/close for consistency. + def init(self): return self.open() + def connect(self): return self.open() + def close(self): return None + def disconnect(self): return self.close() + + # ---------------- Internals ---------------- + + @staticmethod + def _compute_intrinsics(w, h, fov_deg): + """ + Compute simple pinhole intrinsics with fx=fy from horizontal FOV, + and principal point at the image center. + """ + fx = (0.5 * w) / math.tan(math.radians(fov_deg) * 0.5) + fy = fx + cx = 0.5 * w + cy = 0.5 * h + return np.array([fx, fy, cx, cy], dtype=np.float32) + + def _rgb_frame(self, t): + """Generate a synthetic RGB frame (uint8 BGR).""" + if self.pattern == "bars": + u = (np.arange(self.w, dtype=np.float32)[None, :] / 16.0) + img = np.zeros((self.h, self.w, 3), np.uint8) + phase = (t / self.period) * 255.0 + img[..., 0] = np.mod(u * 10.0 + phase, 255).astype(np.uint8) + img[..., 1] = np.mod(u * 20.0 + phase, 255).astype(np.uint8) + img[..., 2] = np.mod(u * 30.0 + phase, 255).astype(np.uint8) + return img + + # default: checker + u = (np.arange(self.w, dtype=np.float32)[None, :] / 32.0) + v = (np.arange(self.h, dtype=np.float32)[:, None] / 32.0) + phase = 2.0 * math.pi * (t / self.period) + checker = ((np.floor(u + phase) + np.floor(v)) % 2).astype(np.float32) + + img = np.empty((self.h, self.w, 3), dtype=np.uint8) + img[..., 0] = (checker * 220 + 20).astype(np.uint8) # B + img[..., 1] = ((1.0 - checker) * 220 + 20).astype(np.uint8) # G + img[..., 2] = (40 + 30*np.sin(u*0.5 + v*0.25 + phase)).astype(np.uint8) # R + return img + + def _depth_frame(self, t): + """Generate a synthetic depth map (uint16 millimetres).""" + xs = np.linspace(-1.0, 1.0, self.w, dtype=np.float32)[None, :] + ys = np.linspace(-1.0, 1.0, self.h, dtype=np.float32)[:, None] + phase = 2.0 * math.pi * (t / self.period) + + z = (self.z_base + + self.amp * np.sin(xs * 2.0 + phase) + + 0.5 * self.amp * np.sin(ys * 1.5 - phase * 0.7)) + + z[z < 300.0] = 0.0 # invalidate too-close values like a real sensor + depth = z.astype(np.uint16) + + # Sparse random holes to mimic stereo confidence dropouts + holes = (self.rng.rand(self.h, self.w) < 0.002) + depth[holes] = 0 + return depth + + # ---------------- Frame grab ---------------- + + def get_frame(self): + """ + Return a frame tuple like real strategies do: + (bgr: HxWx3 uint8, depth_u16: HxW uint16, cfg: object with fx,fy,cx,cy,timestamp_us) + """ + if self.t0 is None: + self.open() + t = time.time() - self.t0 + + bgr = self._rgb_frame(t) + depth = self._depth_frame(t) + + # Pack intrinsics / timestamp in a simple attribute object + class _Cfg: pass + cfg = _Cfg() + cfg.fx, cfg.fy, cfg.cx, cfg.cy = self._intr + cfg.timestamp_us = int(time.time() * 1e6) + + return bgr, depth, cfg + """ + Synthetic camera for testing multi-cam and poses without hardware. + + __init__ signature mirrors other strategies so Streamer._safe_construct can + pass common kwargs like width/height/color_res/align_to_color. + """ + def __init__(self, + width: int = 1280, + height: int = 720, + color_res=None, + fov_deg: float = 70.0, + pattern: str = "checker", + z_mm: int = 1500, + amp_mm: int = 250, + period_s: float = 4.0, + seed: int | None = None, + align_to_color: bool = True, # kept for API symmetry + **_): + # Resolve output size (use color_res if provided) + if color_res is not None and len(color_res) == 2: + width, height = int(color_res[0]), int(color_res[1]) + self.w, self.h = int(width), int(height) + + self.fov_deg = float(fov_deg) + self.pattern = str(pattern) + self.z_base = int(z_mm) + self.amp = int(amp_mm) + self.period = float(period_s) + self.t0 = None + self.rng = np.random.RandomState(None if seed is None else int(seed)) + + # Precompute intrinsics (fx=fy from horiz. FOV; principal point at center) + self._intr = self._compute_intrinsics(self.w, self.h, self.fov_deg) + + @staticmethod + def _compute_intrinsics(w, h, fov_deg): + import math + fx = (0.5 * w) / math.tan(math.radians(fov_deg) * 0.5) + fy = fx + cx = 0.5 * w + cy = 0.5 * h + return np.array([fx, fy, cx, cy], dtype=np.float32) + + # For API symmetry with other strategies + def open(self): # or init/connect depending on your CameraContext + self.t0 = time.time() + + def close(self): + pass + + def _rgb_frame(self, t): + # Moving checkerboard with a subtle animated tint + u = (np.arange(self.w)[None, :] / 32.0) + v = (np.arange(self.h)[:, None] / 32.0) + phase = 2 * np.pi * (t / self.period) + + if self.pattern.lower() == "bars": + img = np.zeros((self.h, self.w, 3), np.uint8) + img[..., 0] = ((u * 32 + phase) % 255).astype(np.uint8) + img[..., 1] = ((u * 64 + phase) % 255).astype(np.uint8) + img[..., 2] = ((u * 96 + phase) % 255).astype(np.uint8) + return img + + # default: checker + checker = ((np.floor(u + phase) + np.floor(v)) % 2).astype(np.float32) + img = np.empty((self.h, self.w, 3), np.uint8) + img[..., 0] = (checker * 220 + 20).astype(np.uint8) # B + img[..., 1] = ((1 - checker) * 220 + 20).astype(np.uint8) # G + img[..., 2] = (40 + 30 * np.sin(u * 0.5 + v * 0.25 + phase)).astype(np.uint8) # R + return img + + def _depth_frame(self, t): + # Sinusoidal “wavy plane” in front of camera (millimetres) + xs = np.linspace(-1, 1, self.w)[None, :] + ys = np.linspace(-1, 1, self.h)[:, None] + phase = 2 * np.pi * (t / self.period) + z = (self.z_base + + self.amp * np.sin(xs * 2.0 + phase) + + 0.5 * self.amp * np.sin(ys * 1.5 - phase * 0.7)) + z[z < 300] = 0 # invalidate too-close like real sensors + depth_u16 = z.astype(np.uint16) + # Add sparse holes + mask = (self.rng.rand(self.h, self.w) < 0.002) + depth_u16[mask] = 0 + return depth_u16 + + def get_frame(self): + if self.t0 is None: + self.open() + t = time.time() - self.t0 + bgr = self._rgb_frame(t) + depth = self._depth_frame(t) + + # Pack intrinsics + timestamp the same way as other strategies + class _Cfg: pass + cfg = _Cfg() + cfg.fx, cfg.fy, cfg.cx, cfg.cy = self._intr + cfg.timestamp_us = int(time.time() * 1e6) + return bgr, depth, cfg diff --git a/demos/point_cloud/streamer/Streamer.py b/demos/point_cloud/streamer/Streamer.py new file mode 100644 index 0000000..4df7a77 --- /dev/null +++ b/demos/point_cloud/streamer/Streamer.py @@ -0,0 +1,774 @@ +import sys +import time +import threading +import argparse +from pathlib import Path +import inspect +import json +import yaml +import numpy as np +import cv2 + +# Point cloud processing steps +from ProcessingStep import build_default_steps +from net.PacketV2 import PacketV2Writer +from Actions import PREVIEW, _resolve_colormap_code, _colorize_depth_mm + +# SimPublisher core +from simpub.core.simpub_server import ServerBase +from simpub.core.net_component import Streamer as BaseStreamer +from simpub.core.utils import get_zmq_socket_url, send_request_with_addr_async +from simpub.parser.simdata import SimScene, SimObject + +# Camera strategies (always required) +from Source import ( + CameraContext, + AzureKinectCameraStrategy, + LuxonisCameraStrategy, + DummyCameraStrategy, +) +# ==================== Pose loading ===================== +def load_pose_4x4(pose_path: str | None, align_path: str | None = None) -> np.ndarray | None: + """ + Load a 4x4 cam_to_world matrix from pose_path. + If align_path is given and exists, apply: + + T_final = A_align @ T_base + """ + if not pose_path: + return None + + p = Path(pose_path) + if not p.exists(): + print(f"[Pose] Not found: {p}") + return None + + T = None + + try: + # ---- 1) Load base cam_to_world into T ---- + if p.suffix.lower() == ".npz": + data = np.load(p) + if "cam_to_world" in data and data["cam_to_world"].shape == (4, 4): + T = data["cam_to_world"].astype(np.float32) + print(f"[Pose] {p.name}: loaded base cam_to_world.") + else: + print(f"[Pose] {p.name}: no 4x4 'cam_to_world' in NPZ.") + return None + else: + # txt / npy fallback + try: + T = np.loadtxt(p, dtype=np.float32).reshape(4, 4) + print(f"[Pose] {p.name}: loaded 4x4 from text.") + except Exception: + T = np.load(p).astype(np.float32).reshape(4, 4) + print(f"[Pose] {p.name}: loaded 4x4 from npy.") + + + if T is None: + print(f"[Pose] {p.name}: failed to load base pose.") + return None + + # ---- 2) If an alignment matrix exists, multiply it with T + if align_path: + ap = Path(align_path) + if ap.exists(): + try: + A = np.load(ap).astype(np.float32) + if A.shape == (4, 4): + T = A @ T + print(f"[Pose] Applied alignment {ap.name} to {p.name}.") + else: + print(f"[Pose] {ap.name}: not 4x4, ignoring alignment.") + except Exception as e: + print(f"[Pose] Failed to load alignment {ap}: {e}") + else: + print(f"[Pose] Alignment file not found: {ap}, ignoring.") + + return T + + except Exception as e: + print(f"[Pose] Failed to load {p}: {e}") + return None + + + +# ================ Coordinate conversion ================= +def pose_to_unity_coords(T_cam_to_cal: np.ndarray | None) -> np.ndarray | None: + """Convert calibration/world pose into Unity/IRIS coords.""" + if T_cam_to_cal is None: + return None + + S_Unity = np.array( + [ + [1, 0, 0, 0], + [0, 0, 1, 0], + [0, 1, 0, 0], + [0, 0, 0, 1], + ], + dtype=np.float32, + ) + + # camera frame Y flip to match unity + F_y = np.array( + [ + [1, 0, 0, 0], + [0, -1, 0, 0], + [0, 0, 1, 0], + [0, 0, 0, 1], + ], + dtype=np.float32, + ) + T = T_cam_to_cal.astype(np.float32) + + T_cam_to_cal_flipped = T @ F_y + + T_unity = S_Unity @ T_cam_to_cal_flipped + + # return (S_Unity @ T_cam_to_cal).astype(np.float32) + return T_unity.astype(np.float32) + + +# ================= Intrinsics helpers ================== +def _intr_get(i): + if i is None: + return None + try: + return float(i.fx), float(i.fy), float(i.cx), float(i.cy), "obj" + except AttributeError: + return ( + float(i["fx"]), + float(i["fy"]), + float(i["cx"]), + float(i["cy"]), + "dict", + ) + + +def _intr_set(i, fx, fy, cx, cy, kind): + if i is None: + return None + if kind == "obj": + i.fx, i.fy, i.cx, i.cy = fx, fy, cx, cy + else: + i["fx"], i["fy"], i["cx"], i["cy"] = fx, fy, cx, cy + return i + + +def _adjust_intrinsics_for_roi_stride(intr, global_cfg): + tup = _intr_get(intr) + if tup is None: + return intr + + fx, fy, cx, cy, kind = tup + roi = global_cfg.get("roi_xywh") + stride = int(global_cfg.get("downsample_stride", 1) or 1) + + if roi: + x0, y0 = int(roi[0]), int(roi[1]) + cx -= x0 + cy -= y0 + + if stride > 1: + fx /= stride + fy /= stride + cx /= stride + cy /= stride + + return _intr_set(intr, fx, fy, cx, cy, kind) + + +# ================= Camera strategies =================== +def _safe_construct(cls, **kwargs): + sig = inspect.signature(cls.__init__) + allowed = {k: v for k, v in kwargs.items() if k in sig.parameters} + return cls(**allowed) if allowed else cls() + + +def make_strategy(cam_cfg: dict, global_cfg: dict): + """Build the capture strategy.""" + cam_type = str(cam_cfg.get("type", "azure")).lower() + color_res = tuple(cam_cfg.get("color_res", [1280, 720])) + align_to_color = bool(global_cfg.get("align_to_color", True)) + usb2mode = bool(cam_cfg.get("usb2mode", False)) + + if cam_type in ("azure", "k4a"): + cam_id = cam_cfg.get("id", "?") + print(f"[Cam{cam_id}] Using AzureKinectCameraStrategy (type={cam_type})") + w, h = color_res + return _safe_construct( + AzureKinectCameraStrategy, + width=w, + height=h, + color_res=color_res, + device_index=cam_cfg.get("device", 0), + align_to_color=align_to_color, + ) + + if cam_type in ("oak", "oakd", "luxonis"): + w, h = color_res + return _safe_construct( + LuxonisCameraStrategy, + width=w, + height=h, + color_res=color_res, + mxid=cam_cfg.get("mxid"), + device_index=cam_cfg.get("device", 0), + usb2mode=usb2mode, + ) + + if cam_type in ("dummy", "sim", "test"): + w, h = color_res + return _safe_construct( + DummyCameraStrategy, + width=w, + height=h, + color_res=color_res, + fov_deg=cam_cfg.get("fov_deg", 70.0), + pattern=cam_cfg.get("pattern", "checker"), + z_mm=cam_cfg.get("z_mm", 1500), + amp_mm=cam_cfg.get("amp_mm", 250), + period_s=cam_cfg.get("period_s", 4.0), + seed=cam_cfg.get("seed"), + ) + + raise ValueError(f"Unknown camera type: {cam_type}") + + +# ================== Shared frame cache ================= +FRAME_CACHE: dict[int, dict] = {} +FRAME_LOCK = threading.Lock() + + +# ============== Camera pipeline thread ================= +class CameraPipeline(threading.Thread): + def __init__( + self, + cam_cfg: dict, + global_cfg: dict, + preview_rgb: bool, + preview_depth: bool, + depth_min_mm: int, + depth_max_mm: int, + depth_cmap_code: int, + ): + super().__init__(daemon=True) + self._running = True + + self.cam_id = int(cam_cfg["id"]) + self.global_cfg = global_cfg + + self.fps_max = int(global_cfg.get("fps_max", 30)) + self.frame_period = 1.0 / self.fps_max if self.fps_max > 0 else 0.0 + + self.preview_rgb = bool(preview_rgb) + self.preview_depth = bool(preview_depth) + self.depth_min_mm = int(depth_min_mm) + self.depth_max_mm = int(depth_max_mm) + self.depth_cmap_code = int(depth_cmap_code) + + # Pose (extrinsics + optional alignment -> Unity coords) + pose_raw = None + pose_file = cam_cfg.get("pose_file") + if pose_file: + pose_dir = Path(global_cfg.get("pose_dir", ".")) + p = Path(pose_file) + if not p.is_absolute(): + p = pose_dir / p + + align_path = None + align_file = cam_cfg.get("align_file") + if align_file: + ap = Path(align_file) + if not ap.is_absolute(): + ap = pose_dir / ap + align_path = str(ap) + + pose_raw = load_pose_4x4(str(p), align_path) + + # Convert to Unity coords (or identity) + self.pose = pose_to_unity_coords(pose_raw) + if self.pose is None: + self.pose = np.eye(4, dtype=np.float32) + + # Camera strategy + self.strategy = make_strategy(cam_cfg, global_cfg) + + self.ctx = CameraContext(self.strategy) + self._grab = self.ctx.get_frame + self._open = ( + getattr(self.ctx, "init", None) + or getattr(self.ctx, "connect", None) + or getattr(self.ctx, "open", None) + ) + self._close = getattr(self.ctx, "close", lambda: None) + + self.steps = build_default_steps(global_cfg) + self._next_deadline = time.time() + + def run(self): + try: + if self._open: + self._open() + except Exception as e: + print(f"[Cam{self.cam_id}] FAILED to open device: {e}") + return + + try: + while self._running: + if not self._grab: + break + + tup = self._grab() + if tup is None: + continue + + if isinstance(tup, tuple) and len(tup) == 3: + rgb, depth, cfg = tup + + # Processing steps + for s in self.steps: + try: + rgb, depth = s.process(rgb, depth) + except Exception: + pass + + # Adjust intrinsics + try: + cfg = _adjust_intrinsics_for_roi_stride( + cfg, self.global_cfg + ) + except Exception: + pass + + # Preview + depth_bgr = None + if self.preview_depth and depth is not None: + depth_bgr = _colorize_depth_mm( + depth, + self.depth_min_mm, + self.depth_max_mm, + self.depth_cmap_code, + ) + if self.preview_rgb: + PREVIEW.update(self.cam_id, rgb, depth_bgr) + + # Cache + ts_us = int(time.time() * 1e6) + with FRAME_LOCK: + FRAME_CACHE[self.cam_id] = { + "bgr": rgb, + "depth_u16": depth, + "intrinsics": np.array( + [cfg.fx, cfg.fy, cfg.cx, cfg.cy], + dtype=np.float32, + ), + "timestamp_us": ts_us, + "width": rgb.shape[1], + "height": rgb.shape[0], + "pose_Twc": self.pose, + } + else: + # Custom producer path directly populating cache + with FRAME_LOCK: + FRAME_CACHE[self.cam_id] = tup + + # FPS / throttling + if self.frame_period > 0: + self._next_deadline += self.frame_period + now = time.time() + if self._next_deadline > now: + time.sleep(self._next_deadline - now) + else: + self._next_deadline = now + + finally: + try: + self._close() + except Exception: + pass + + def stop(self): + self._running = False + +# ============== RGBD Streamer (SimPublisher wrapper) ============== +class RGBDStreamer(BaseStreamer): + def __init__( + self, + topic_name: str, + cam_id: int, + fps: float, + send_intrinsics: bool = True, + jpeg_quality: int = 80, + ): + self.cam_id = int(cam_id) + self.send_intrinsics = bool(send_intrinsics) + self.jpeg_quality = int(jpeg_quality) + self.packer = PacketV2Writer(send_intrinsics=self.send_intrinsics) + + super().__init__( + topic_name=topic_name, + update_func=self.get_update_bytes, + fps=float(fps), + start_streaming=False, + ) + + try: + endpoint = get_zmq_socket_url(self.socket) + print(f"[RGBD] {topic_name} publishing on {endpoint}") + except Exception: + pass + + def get_update_bytes(self) -> bytes: + with FRAME_LOCK: + f = FRAME_CACHE.get(self.cam_id) + + if not f: + return b"" + + bgr = f.get("bgr") + depth = f.get("depth_u16") + intr = f.get("intrinsics") if self.send_intrinsics else None + pose_Twc = f.get("pose_Twc") + ts = int(f.get("timestamp_us", time.time() * 1e6)) + w = int(f.get("width", 0)) + h = int(f.get("height", 0)) + + if bgr is not None: + ok, enc = cv2.imencode( + ".jpg", + bgr, + [int(cv2.IMWRITE_JPEG_QUALITY), self.jpeg_quality], + ) + rgb_jpeg = enc.tobytes() if ok else b"" + else: + rgb_jpeg = b"" + + return self.packer.pack( + camera_id=self.cam_id, + timestamp_us=ts, + width=w, + height=h, + rgb_jpeg_bytes=rgb_jpeg, + depth_u16=depth, + intrinsics=intr, + pose_Twc=pose_Twc, + ) + + +# ============== SimPublisher ============== +class PointCloudSimPublisher(ServerBase): + def __init__( + self, + cfg_path: str, + ip_addr: str, + preview_rgb_flag: bool, + preview_depth_flag: bool, + depth_min_flag: int | None, + depth_max_flag: int | None, + depth_cmap_flag: str | None, + ): + self.cfg_path = cfg_path + self.cli_preview_rgb = preview_rgb_flag + self.cli_preview_depth = preview_depth_flag + self.cli_depth_min = depth_min_flag + self.cli_depth_max = depth_max_flag + self.cli_depth_cmap = depth_cmap_flag + + self.sim_scene = SimScene() + self.sim_scene.name = "SimScene" # <- must match Unity side + self.sim_scene.root = SimObject(name="Root") + + self.workers: list[CameraPipeline] = [] + self.streamers: list[RGBDStreamer] = [] + + super().__init__(ip_addr) + + def initialize(self): + cfg_path = Path(self.cfg_path) + if not cfg_path.exists(): + print(f"Config not found: {cfg_path}") + sys.exit(1) + + cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) + global_cfg = cfg.get("global", {}) + cameras = cfg.get("cameras", []) + if not cameras: + print("No cameras in config.") + sys.exit(1) + + # Resolve preview flags (CLI overrides YAML) + preview_rgb = self.cli_preview_rgb or bool(global_cfg.get("preview", False)) + preview_depth = self.cli_preview_depth or bool( + global_cfg.get("preview_depth", False) + ) + + # Resolve depth range + dmin = ( + int(self.cli_depth_min) + if self.cli_depth_min is not None + else int(global_cfg.get("depth_min_mm", 400)) + ) + dmax = ( + int(self.cli_depth_max) + if self.cli_depth_max is not None + else int(global_cfg.get("depth_max_mm", 6000)) + ) + + # Resolve colormap + cmap_name_or_code = ( + self.cli_depth_cmap + if self.cli_depth_cmap is not None + else global_cfg.get("depth_colormap", "JET") + ) + depth_cmap_code = _resolve_colormap_code(cmap_name_or_code) + + if preview_rgb or preview_depth: + PREVIEW.enable(preview_rgb, preview_depth) + + # Start camera pipelines + print("=== PointCloudSimPublisher: starting camera pipelines ===") + for c in cameras: + cam_id = int(c["id"]) + print( + f" - Cam{cam_id}: type={c.get('type', 'azure')} " + f"topic=RGBD/Cam{cam_id} pose={'yes' if c.get('pose_file') else 'no'}" + ) + w = CameraPipeline( + cam_cfg=c, + global_cfg=global_cfg, + preview_rgb=preview_rgb, + preview_depth=preview_depth, + depth_min_mm=dmin, + depth_max_mm=dmax, + depth_cmap_code=depth_cmap_code, + ) + self.workers.append(w) + w.start() + + # Create one RGBDStreamer per camera + fps = float(global_cfg.get("fps_max", 15)) + send_intr = bool(global_cfg.get("send_intrinsics", True)) + jpeg_q = int(global_cfg.get("jpeg_quality", 80)) + + for c in cameras: + cam_id = int(c["id"]) + topic = f"RGBD/Cam{cam_id}" + s = RGBDStreamer( + topic_name=topic, + cam_id=cam_id, + fps=fps, + send_intrinsics=send_intr, + jpeg_quality=jpeg_q, + ) + s.start_streaming() + self.streamers.append(s) + print(f"[SimPub] Streaming '{topic}' at {fps:.1f} Hz") + + # Background task: announce streams to XR nodes + self.node_manager.submit_asyncio_task(self._auto_subscribe_loop) + + async def _auto_subscribe_loop(self): + from asyncio import sleep as asyncio_sleep + + announced: set[tuple[str, int]] = set() # (node_id, cam_id) already subscribed + spawned: set[str] = set() # node_ids that already got SpawnSimScene + + while True: + try: + registry = getattr(self.node_manager, "xr_nodes", None) + infos = registry.registered_infos() if registry is not None else [] + + for info in infos: + services = info.get("serviceList", []) or info.get("services", []) + node_id = info.get("nodeID") + ip = info.get("ip") + port = info.get("port") + if not node_id or not ip or not port: + continue + + addr = f"tcp://{ip}:{int(port)}" + + # --- 1) SpawnSimScene first (once per node) --- + if "SpawnSimScene" in services and node_id not in spawned: + try: + # Delete previous scene if any + await send_request_with_addr_async( + [b"DeleteSimScene", self.sim_scene.name.encode("utf-8")], + addr, + ) + except Exception: + # ignore if no scene existed yet + pass + + try: + scene_payload = self.sim_scene.serialize().encode("utf-8") + reply = await send_request_with_addr_async( + [b"SpawnSimScene", scene_payload], + addr, + ) + print( + f"[SimPub] SpawnSimScene on node " + f"{info.get('name', node_id)}: {reply}" + ) + spawned.add(node_id) + except Exception as e: + print(f"[SimPub] Failed to call SpawnSimScene on {addr}: {e}") + # if spawn failed, don't try SubscribePointCloudStream yet + continue + + # --- 2) After scene is spawned, subscribe point cloud streams --- + if "SubscribePointCloudStream" not in services: + continue + + for s in self.streamers: + key = (node_id, s.cam_id) + if key in announced: + continue + + try: + url = get_zmq_socket_url(s.socket) + except Exception: + continue + + payload = { + "camId": s.cam_id, + "topic": s.topic_name, + "url": url, + } + msg = [ + b"SubscribePointCloudStream", + json.dumps(payload).encode("utf-8"), + ] + + try: + rep = await send_request_with_addr_async(msg, addr) + print( + f"[SimPub] Announced {s.topic_name} ({url}) " + f"to node {info.get('name', node_id)}: {rep}" + ) + announced.add(key) + except Exception as e: + print(f"[SimPub] Failed to announce to {addr}: {e}") + + except Exception as e: + print(f"[SimPub] auto_subscribe_loop error: {e}") + + await asyncio_sleep(1.0) + + def shutdown(self): + # Make shutdown safe to call multiple times + if getattr(self, "_is_shutdown", False): + return + self._is_shutdown = True + + print("[SimPub] Shutting down streamers and cameras...") + + # Stop camera threads + for w in self.workers: + try: + w.stop() + except Exception: + pass + + for w in self.workers: + try: + w.join(timeout=1.0) + except Exception: + pass + + # Stop preview thread / windows + try: + if hasattr(PREVIEW, "stop"): + PREVIEW.stop() + except Exception: + pass + + # Extra safety: close all OpenCV windows + try: + cv2.destroyAllWindows() + except Exception: + pass + + # Let ServerBase shut down XRNodeManager / executor + try: + super().shutdown() + except Exception: + pass + + print("[SimPub] Shutdown complete.") + + +# ======================= CLI + main ======================= +def parse_args(): + ap = argparse.ArgumentParser() + ap.add_argument( + "--config", + "-c", + type=str, + default="config/multicam.yaml", + help="YAML config file for cameras.", + ) + ap.add_argument( + "--ip", + type=str, + default="10.2.0.2", + help="IP address to bind SimPublisher server (used by XRNodeManager).", + ) + # Preview / depth flags (override YAML) + ap.add_argument( + "--preview", + action="store_true", + help="Enable RGB preview.", + ) + ap.add_argument( + "--preview-depth", + action="store_true", + help="Enable depth preview.", + ) + ap.add_argument( + "--depth-min", + type=int, + default=None, + help="Override depth min (mm).", + ) + ap.add_argument( + "--depth-max", + type=int, + default=None, + help="Override depth max (mm).", + ) + ap.add_argument( + "--depth-cmap", + type=str, + default=None, + help="Override depth colormap (e.g. JET, TURBO).", + ) + return ap.parse_args() + + +def main(): + args = parse_args() + + server = PointCloudSimPublisher( + cfg_path=args.config, + ip_addr=args.ip, + preview_rgb_flag=args.preview, + preview_depth_flag=args.preview_depth, + depth_min_flag=args.depth_min, + depth_max_flag=args.depth_max, + depth_cmap_flag=args.depth_cmap, + ) + + try: + server.spin() + except KeyboardInterrupt: + print("\n[SimPub] KeyboardInterrupt - stopping.") + finally: + server.shutdown() + + +if __name__ == "__main__": + main() + +# python .\Streamer.py --preview --preview-depth --depth-min 400 --depth-max 4000 --depth-cmap TURBO diff --git a/demos/point_cloud/streamer/__init__.py b/demos/point_cloud/streamer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/demos/point_cloud/streamer/config/multicam.yaml b/demos/point_cloud/streamer/config/multicam.yaml new file mode 100644 index 0000000..5ea4eed --- /dev/null +++ b/demos/point_cloud/streamer/config/multicam.yaml @@ -0,0 +1,80 @@ +global: + + jpeg_quality: 80 + send_intrinsics: true + align_to_color: true + preview: false + pose_dir: ./poses + depth_min_mm: 300 + depth_max_mm: 2000 + roi_xywh: [200, 120, 960, 540] + downsample_stride: 2 + median_ksize: 2 + +cameras: + # - id: 1 + # type: azure + # device: 0 + # port: 5555 + # color_res: [1280, 720] + # pose_file: "C:/Users/yadhu/PointsPublisher/poses/cam1_Twc.txt" + + # - id: 2 + # type: azure + # device: 1 + # port: 5556 + # color_res: [1280, 720] + # pose_file: "C:/Users/yadhu/PointsPublisher/poses/cam2_Twc.txt" + + - id: 1 + type: oak + mxid: "1844301051D9B50F00" + port: 5555 + color_res: [1280, 720] + pose_file: "extrinsics_1844301051D9B50F00.npz" + # align_file: "point_cloud_alignment_1844301051D9B50F00.npy" + + - id: 2 + type: oak + mxid: "184430102111900E00" + port: 5556 + color_res: [1280, 720] + pose_file: "extrinsics_184430102111900E00.npz" + align_file: "point_cloud_alignment_184430102111900E00.npy" + + + # - id: 1 + # type: dummy + # port: 5555 + # color_res: [1280, 720] + # fov_deg: 70 + # pattern: checker + # z_mm: 1500 + # amp_mm: 300 + # period_s: 5.0 + # # Optional test pose (push left) + # pose_file: "cam1_Twc.txt" + + # - id: 2 + # type: dummy + # port: 5556 + # color_res: [1280, 720] + # fov_deg: 70 + # pattern: checker + # z_mm: 1700 + # amp_mm: 200 + # period_s: 6.0 + # # Optional test pose (push right) + # pose_file: "cam2_Twc.txt" + + # - id: 3 + # type: dummy + # port: 5557 + # color_res: [1280, 720] + # fov_deg: 70 + # pattern: checker + # z_mm: 1700 + # amp_mm: 200 + # period_s: 6.0 + # # Optional test pose (push right) + # pose_file: "cam3_Twc.txt" \ No newline at end of file diff --git a/demos/point_cloud/streamer/net/PacketV2.py b/demos/point_cloud/streamer/net/PacketV2.py new file mode 100644 index 0000000..6e2e4fb --- /dev/null +++ b/demos/point_cloud/streamer/net/PacketV2.py @@ -0,0 +1,61 @@ +import struct +import numpy as np + +MAGIC = 0xABCD1234 +VERSION = 2 +FLAG_POSE = 1 +FLAG_INTR = 2 + +class PacketV2Writer: + """ + Packs a message: + [Header 36B] + [Intrinsics 16B if flags&2] + [RGB JPEG rgb_len B] + [Depth U16 depth_len B] + [Pose 4x4 64B if flags&1] + """ + def __init__(self, send_intrinsics: bool = False): + self.send_intrinsics = send_intrinsics + + def pack( + self, + camera_id: int, + timestamp_us: int, + width: int, + height: int, + rgb_jpeg_bytes: bytes, + depth_u16: np.ndarray, + intrinsics: np.ndarray | None, + pose_Twc: np.ndarray | None + ) -> bytes: + flags = 0 + if pose_Twc is not None: + flags |= FLAG_POSE + if self.send_intrinsics and intrinsics is not None: + flags |= FLAG_INTR + + depth_u16 = np.asarray(depth_u16, dtype=np.uint16) + depth_bytes = depth_u16.tobytes(order="C") + + header = struct.pack( + "