diff --git a/deployment/cli/main.py b/deployment/cli/main.py index 3a1906148..2ee48c360 100644 --- a/deployment/cli/main.py +++ b/deployment/cli/main.py @@ -11,6 +11,7 @@ import importlib import pkgutil import sys +import traceback from typing import List import deployment.projects as projects_pkg @@ -51,11 +52,13 @@ def build_parser() -> argparse.ArgumentParser: subparsers = parser.add_subparsers(dest="project", required=True) # Discover projects and import them so they can contribute args. + failed_projects: List[str] = [] for project_name in _discover_project_packages(): try: _import_and_register_project(project_name) - except Exception: - # Skip broken/incomplete project bundles rather than breaking the whole CLI. + except Exception as e: + tb = traceback.format_exc() + failed_projects.append(f"- {project_name}: {e}\n{tb}") continue try: @@ -68,6 +71,12 @@ def build_parser() -> argparse.ArgumentParser: adapter.add_args(sub) sub.set_defaults(_adapter_name=project_name) + if not project_registry.list(): + details = "\n".join(failed_projects) if failed_projects else "(no project packages discovered)" + raise RuntimeError( + "No deployment projects were registered. This usually means project imports failed.\n" f"{details}" + ) + return parser diff --git a/deployment/core/__init__.py b/deployment/core/__init__.py index afe64e8b4..1afb0be1a 100644 --- a/deployment/core/__init__.py +++ b/deployment/core/__init__.py @@ -1,15 +1,21 @@ """Core components for deployment framework.""" -from deployment.core.artifacts import Artifact +from deployment.core.artifacts import ( + Artifact, + get_component_files, + resolve_artifact_path, + resolve_engine_path, + resolve_onnx_path, +) from deployment.core.backend import Backend from deployment.core.config.base_config import ( - BackendConfig, BaseDeploymentConfig, DeviceConfig, EvaluationConfig, ExportConfig, ExportMode, RuntimeConfig, + TensorRTConfig, VerificationConfig, VerificationScenario, parse_base_args, @@ -26,6 +32,7 @@ BaseEvaluator, EvalResultDict, EvaluationDefaults, + InferenceInput, ModelSpec, TaskProfile, VerifyResultDict, @@ -56,7 +63,7 @@ "ExportConfig", "ExportMode", "RuntimeConfig", - "BackendConfig", + "TensorRTConfig", "DeviceConfig", "EvaluationConfig", "VerificationConfig", @@ -71,11 +78,16 @@ # Evaluation "BaseEvaluator", "TaskProfile", + "InferenceInput", "EvalResultDict", "VerifyResultDict", "VerificationMixin", # Artifacts "Artifact", + "resolve_artifact_path", + "resolve_onnx_path", + "resolve_engine_path", + "get_component_files", "ModelSpec", # Preprocessing "build_preprocessing_pipeline", diff --git a/deployment/core/artifacts.py b/deployment/core/artifacts.py index 985aa3bb1..d9bbfc3f3 100644 --- a/deployment/core/artifacts.py +++ b/deployment/core/artifacts.py @@ -1,18 +1,229 @@ -"""Artifact descriptors for deployment outputs.""" +""" +Artifact Path Resolution for Deployment Pipelines. + +This module provides: +1. Artifact dataclass - represents an exported model artifact +2. Path resolution functions - resolve artifact paths from deploy config + +Supports: +- Single-component models (YOLOX, Calibration): use component="model" +- Multi-component models (CenterPoint): use component="voxel_encoder", "backbone_head", etc. +""" from __future__ import annotations +import logging +import os import os.path as osp from dataclasses import dataclass +from typing import Any, Dict, Mapping, Optional + +logger = logging.getLogger(__name__) + + +# ============================================================================ +# Artifact Dataclass +# ============================================================================ @dataclass(frozen=True) class Artifact: - """Represents a produced deployment artifact such as ONNX or TensorRT outputs.""" + """ + Represents an exported model artifact (ONNX file, TensorRT engine, etc.). + + Attributes: + path: Filesystem path to the artifact (file or directory). + multi_file: True if artifact is a directory containing multiple files + (e.g., CenterPoint has voxel_encoder.onnx + backbone_head.onnx). + """ path: str multi_file: bool = False def exists(self) -> bool: - """Return True if the artifact path currently exists on disk.""" - return osp.exists(self.path) + """Check if the artifact exists on disk.""" + return os.path.exists(self.path) + + def is_directory(self) -> bool: + """Check if the artifact is a directory.""" + return os.path.isdir(self.path) + + def __str__(self) -> str: + return self.path + + +# ============================================================================ +# Path Resolution Functions +# ============================================================================ + +# File extension mapping +FILE_EXTENSIONS: Dict[str, str] = { + "onnx_file": ".onnx", + "engine_file": ".engine", +} + + +def resolve_artifact_path( + *, + base_dir: str, + components_cfg: Optional[Mapping[str, Any]], + component: str, + file_key: str, +) -> str: + """Resolve artifact path for any component. + + This is the entry point for artifact path resolution. + + Args: + base_dir: Base directory for artifacts (onnx_dir or tensorrt_dir), + or direct path to an artifact file. + components_cfg: The `components` dict from deploy_config. + Can be None for backwards compatibility. + component: Component name (e.g., 'model', 'voxel_encoder', 'backbone_head') + file_key: Key to look up ('onnx_file' or 'engine_file') + + Returns: + Resolved path to the artifact file + + Resolution strategy (single supported mode): + 1. `base_dir` must be a directory (e.g., `.../onnx` or `.../tensorrt`) + 2. Require `components_cfg[component][file_key]` to be set + - must be a relative path resolved under `base_dir` + 3. The resolved path must exist and be a file + + This function intentionally does NOT: + - scan directories for matching extensions + - fall back to default filenames + - accept `base_dir` as a file path + - accept absolute paths in `components` (enforces fully config-driven, workspace-relative artifacts) + + Examples: + # Single-component model (YOLOX) + resolve_artifact_path( + base_dir="work_dirs/yolox/onnx", + components_cfg={"model": {"onnx_file": "yolox.onnx"}}, + component="model", + file_key="onnx_file", + ) + + # Multi-component model (CenterPoint) + resolve_artifact_path( + base_dir="work_dirs/centerpoint/tensorrt", + components_cfg={"voxel_encoder": {"engine_file": "voxel.engine"}}, + component="voxel_encoder", + file_key="engine_file", + ) + """ + if not os.path.isdir(base_dir): + raise ValueError( + "Artifact resolution requires `base_dir` to be a directory. " + f"Got: {base_dir}. " + "Set evaluation.backends..{model_dir|engine_dir} to the artifact directory, " + "and set the artifact filename in deploy config under components.*.{onnx_file|engine_file}." + ) + + # Require filename from components config + filename = _get_filename_from_config(components_cfg, component, file_key) + if not filename: + raise KeyError( + "Missing artifact filename in deploy config. " + f"Expected components['{component}']['{file_key}'] to be set." + ) + + if osp.isabs(filename): + raise ValueError( + "Absolute artifact paths are not allowed. " + f"Set components['{component}']['{file_key}'] to a relative filename under base_dir instead. " + f"(got: {filename})" + ) + + base_abs = osp.abspath(base_dir) + path = osp.abspath(osp.join(base_abs, filename)) + # Prevent escaping base_dir via '../' + if osp.commonpath([base_abs, path]) != base_abs: + raise ValueError( + "Artifact path must stay within base_dir. " + f"Got components['{component}']['{file_key}']={filename} which resolves to {path} outside {base_abs}." + ) + if not os.path.isfile(path): + raise FileNotFoundError( + f"Configured artifact file not found: {path}. " + f"(base_dir={base_dir}, component={component}, file_key={file_key})" + ) + return path + + +def _get_filename_from_config( + components_cfg: Optional[Mapping[str, Any]], + component: str, + file_key: str, +) -> Optional[str]: + """Extract filename from components config.""" + if not components_cfg: + return None + + comp_cfg = components_cfg.get(component, {}) + if not isinstance(comp_cfg, Mapping): + return None + + filename = comp_cfg.get(file_key) + if isinstance(filename, str) and filename: + return filename + return None + + +def get_component_files( + components_cfg: Mapping[str, Any], + file_key: str, +) -> Dict[str, str]: + """Get all component filenames for a given file type. + + Useful for multi-component models to enumerate all artifacts. + + Args: + components_cfg: The unified `components` dict from deploy_config + file_key: Key to look up ('onnx_file' or 'engine_file') + + Returns: + Dict mapping component name to filename + + Example: + >>> components = {"voxel_encoder": {"onnx_file": "voxel.onnx"}, + ... "backbone_head": {"onnx_file": "head.onnx"}} + >>> get_component_files(components, "onnx_file") + {"voxel_encoder": "voxel.onnx", "backbone_head": "head.onnx"} + """ + result = {} + for comp_name, comp_cfg in components_cfg.items(): + if isinstance(comp_cfg, Mapping) and file_key in comp_cfg: + result[comp_name] = comp_cfg[file_key] + return result + + +# Convenience aliases for common use cases +def resolve_onnx_path( + base_dir: str, + components_cfg: Optional[Mapping[str, Any]] = None, + component: str = "model", +) -> str: + """Convenience function for resolving ONNX paths.""" + return resolve_artifact_path( + base_dir=base_dir, + components_cfg=components_cfg, + component=component, + file_key="onnx_file", + ) + + +def resolve_engine_path( + base_dir: str, + components_cfg: Optional[Mapping[str, Any]] = None, + component: str = "model", +) -> str: + """Convenience function for resolving TensorRT engine paths.""" + return resolve_artifact_path( + base_dir=base_dir, + components_cfg=components_cfg, + component=component, + file_key="engine_file", + ) diff --git a/deployment/core/config/__init__.py b/deployment/core/config/__init__.py index 3197eb73d..7859ffe7e 100644 --- a/deployment/core/config/__init__.py +++ b/deployment/core/config/__init__.py @@ -1,13 +1,13 @@ """Configuration subpackage for deployment core.""" from deployment.core.config.base_config import ( - BackendConfig, BaseDeploymentConfig, EvaluationConfig, ExportConfig, ExportMode, PrecisionPolicy, RuntimeConfig, + TensorRTConfig, VerificationConfig, VerificationScenario, parse_base_args, @@ -16,7 +16,7 @@ from deployment.core.evaluation.base_evaluator import EVALUATION_DEFAULTS, EvaluationDefaults __all__ = [ - "BackendConfig", + "TensorRTConfig", "BaseDeploymentConfig", "EvaluationConfig", "ExportConfig", diff --git a/deployment/core/config/base_config.py b/deployment/core/config/base_config.py index a9f00e573..174202c90 100644 --- a/deployment/core/config/base_config.py +++ b/deployment/core/config/base_config.py @@ -12,7 +12,7 @@ from dataclasses import dataclass, field from enum import Enum from types import MappingProxyType -from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union +from typing import Any, Dict, Mapping, Optional, Tuple, Union import torch from mmengine.config import Config @@ -173,36 +173,40 @@ def from_dict(cls, config_dict: Mapping[str, Any]) -> RuntimeConfig: @dataclass(frozen=True) -class BackendConfig: - """Configuration for backend-specific settings.""" +class TensorRTConfig: + """ + Configuration for TensorRT backend-specific settings. + + Uses config structure: + tensorrt_config = dict(precision_policy="auto", max_workspace_size=1<<30) + + TensorRT profiles are defined in components.*.tensorrt_profile. + + Note: + The deploy config key for this section is **`tensorrt_config`**. + """ - common_config: Mapping[str, Any] = field(default_factory=_empty_mapping) - model_inputs: Tuple[TensorRTModelInputConfig, ...] = field(default_factory=tuple) + precision_policy: str = PrecisionPolicy.AUTO.value + max_workspace_size: int = DEFAULT_WORKSPACE_SIZE @classmethod - def from_dict(cls, config_dict: Mapping[str, Any]) -> BackendConfig: - common_config = dict(config_dict.get("common_config", {})) - model_inputs_raw: Iterable[Mapping[str, Any]] = config_dict.get("model_inputs", []) or [] - model_inputs: Tuple[TensorRTModelInputConfig, ...] = tuple( - TensorRTModelInputConfig.from_dict(item) for item in model_inputs_raw - ) + def from_dict(cls, config_dict: Mapping[str, Any]) -> "TensorRTConfig": return cls( - common_config=MappingProxyType(common_config), - model_inputs=model_inputs, + precision_policy=config_dict.get("precision_policy", PrecisionPolicy.AUTO.value), + max_workspace_size=config_dict.get("max_workspace_size", DEFAULT_WORKSPACE_SIZE), ) def get_precision_policy(self) -> str: """Get precision policy name.""" - return self.common_config.get("precision_policy", PrecisionPolicy.AUTO.value) + return self.precision_policy def get_precision_flags(self) -> Mapping[str, bool]: """Get TensorRT precision flags for the configured policy.""" - policy = self.get_precision_policy() - return PRECISION_POLICIES.get(policy, {}) + return PRECISION_POLICIES.get(self.precision_policy, {}) def get_max_workspace_size(self) -> int: """Get maximum workspace size for TensorRT.""" - return self.common_config.get("max_workspace_size", DEFAULT_WORKSPACE_SIZE) + return self.max_workspace_size @dataclass(frozen=True) @@ -315,7 +319,7 @@ def __init__(self, deploy_cfg: Config): # Initialize config sections self.export_config = ExportConfig.from_dict(deploy_cfg.get("export", {})) self.runtime_config = RuntimeConfig.from_dict(deploy_cfg.get("runtime_io", {})) - self.backend_config = BackendConfig.from_dict(deploy_cfg.get("backend_config", {})) + self.tensorrt_config = TensorRTConfig.from_dict(deploy_cfg.get("tensorrt_config", {}) or {}) self._evaluation_config = EvaluationConfig.from_dict(deploy_cfg.get("evaluation", {})) self._verification_config = VerificationConfig.from_dict(deploy_cfg.get("verification", {})) @@ -336,9 +340,8 @@ def _validate_config(self) -> None: raise ValueError(str(exc)) from exc # Validate precision policy if present - backend_cfg = self.deploy_cfg.get("backend_config", {}) - common_cfg = backend_cfg.get("common_config", {}) - precision_policy = common_cfg.get("precision_policy", PrecisionPolicy.AUTO.value) + tensorrt_config = self.deploy_cfg.get("tensorrt_config", {}) or {} + precision_policy = tensorrt_config.get("precision_policy", PrecisionPolicy.AUTO.value) if precision_policy not in PRECISION_POLICIES: raise ValueError( f"Invalid precision_policy '{precision_policy}'. " f"Must be one of {list(PRECISION_POLICIES.keys())}" @@ -453,72 +456,126 @@ def task_type(self) -> Optional[str]: def get_onnx_settings(self) -> ONNXExportConfig: """ - Get ONNX export settings. + Get ONNX export settings from unified components configuration. + + Reads I/O from components.model.io.{inputs, outputs, dynamic_axes} Returns: ONNXExportConfig instance containing ONNX export parameters """ onnx_config = self.onnx_config - model_io = self.deploy_cfg.get("model_io", {}) - - # Get batch size and dynamic axes from model_io - batch_size = model_io.get("batch_size", None) - dynamic_axes = model_io.get("dynamic_axes", None) + components_io = self._get_model_io_from_components() - # If batch_size is set to a number, disable dynamic_axes - if batch_size is not None and isinstance(batch_size, int): - dynamic_axes = None + # Get input/output names from components + input_names = [inp.get("name", "input") for inp in components_io.get("inputs", [])] + output_names = [out.get("name", "output") for out in components_io.get("outputs", [])] - # Handle multiple inputs and outputs - input_names = [model_io.get("input_name", "input")] - output_names = [model_io.get("output_name", "output")] - - # Add additional inputs if specified - additional_inputs = model_io.get("additional_inputs", []) - for additional_input in additional_inputs: - if isinstance(additional_input, dict): - input_names.append(additional_input.get("name", "input")) - - # Add additional outputs if specified - additional_outputs = model_io.get("additional_outputs", []) - for additional_output in additional_outputs: - if isinstance(additional_output, str): - output_names.append(additional_output) + # Fallback to defaults if components not configured + if not input_names: + input_names = ["input"] + if not output_names: + output_names = ["output"] settings_dict = { "opset_version": onnx_config.get("opset_version", 16), "do_constant_folding": onnx_config.get("do_constant_folding", True), "input_names": tuple(input_names), "output_names": tuple(output_names), - "dynamic_axes": dynamic_axes, + "dynamic_axes": components_io.get("dynamic_axes"), "export_params": onnx_config.get("export_params", True), "keep_initializers_as_inputs": onnx_config.get("keep_initializers_as_inputs", False), "verbose": onnx_config.get("verbose", False), - "save_file": onnx_config.get("save_file", "model.onnx"), - "batch_size": batch_size, + "save_file": components_io.get("onnx_file") or onnx_config.get("save_file", "model.onnx"), + "batch_size": None, } - # Note: simplify is typically True by default, but can be overridden if "simplify" in onnx_config: settings_dict["simplify"] = onnx_config["simplify"] return ONNXExportConfig.from_mapping(settings_dict) + def _get_model_io_from_components(self) -> Dict[str, Any]: + """ + Extract model I/O configuration from components. + + For end-to-end models (single component), returns the io config + from components.model. + + Returns: + Dictionary with inputs, outputs, dynamic_axes, and onnx_file. + """ + components = self.deploy_cfg.get("components", {}) + if not components: + return {} + + # For single-component models, look for 'model' component + if "model" in components: + comp_cfg = components["model"] + io_cfg = comp_cfg.get("io", {}) + return { + "inputs": io_cfg.get("inputs", []), + "outputs": io_cfg.get("outputs", []), + "dynamic_axes": io_cfg.get("dynamic_axes"), + "onnx_file": comp_cfg.get("onnx_file"), + } + + return {} + def get_tensorrt_settings(self) -> TensorRTExportConfig: """ - Get TensorRT export settings with precision policy support. + Get TensorRT export settings from unified components configuration. + + TensorRT profiles are read from components.model.tensorrt_profile. Returns: TensorRTExportConfig instance containing TensorRT export parameters """ + model_inputs = self._build_model_inputs_from_components() + settings_dict = { - "max_workspace_size": self.backend_config.get_max_workspace_size(), - "precision_policy": self.backend_config.get_precision_policy(), - "policy_flags": self.backend_config.get_precision_flags(), - "model_inputs": self.backend_config.model_inputs, + "max_workspace_size": self.tensorrt_config.get_max_workspace_size(), + "precision_policy": self.tensorrt_config.get_precision_policy(), + "policy_flags": self.tensorrt_config.get_precision_flags(), + "model_inputs": model_inputs, } return TensorRTExportConfig.from_mapping(settings_dict) + def _build_model_inputs_from_components(self) -> Tuple[TensorRTModelInputConfig, ...]: + """ + Build model_inputs from components configuration. + + For end-to-end models (single component), extracts tensorrt_profile + from components.model and converts to TensorRTModelInputConfig format. + + Returns: + Tuple of TensorRTModelInputConfig, or empty tuple if not configured. + """ + components = self.deploy_cfg.get("components", {}) + if not components or "model" not in components: + return () + + comp_cfg = components["model"] + tensorrt_profile = comp_cfg.get("tensorrt_profile", {}) + + if not tensorrt_profile: + return () + + from deployment.exporters.common.configs import TensorRTProfileConfig + + input_shapes = {} + for input_name, shape_cfg in tensorrt_profile.items(): + if isinstance(shape_cfg, Mapping): + input_shapes[input_name] = TensorRTProfileConfig( + min_shape=tuple(shape_cfg.get("min_shape", [])), + opt_shape=tuple(shape_cfg.get("opt_shape", [])), + max_shape=tuple(shape_cfg.get("max_shape", [])), + ) + + if input_shapes: + return (TensorRTModelInputConfig(input_shapes=MappingProxyType(input_shapes)),) + + return () + def setup_logging(level: str = "INFO") -> logging.Logger: """ diff --git a/deployment/core/contexts.py b/deployment/core/contexts.py index 486caa1e5..8df2f8a23 100644 --- a/deployment/core/contexts.py +++ b/deployment/core/contexts.py @@ -1,18 +1,6 @@ """ Typed context objects for deployment workflows. -This module defines typed dataclasses that replace **kwargs with explicit, -type-checked parameters. This improves: -- Type safety: Catches mismatches at type-check time -- Discoverability: IDE autocomplete shows available parameters -- Refactoring safety: Renamed fields are caught by type checkers - -Design Principles: - 1. Base contexts define common parameters across all projects - 2. Project-specific contexts extend base with additional fields - 3. Optional fields have sensible defaults - 4. Contexts are immutable (frozen=True) for safety - Usage: # Create context for export ctx = ExportContext(sample_idx=0) diff --git a/deployment/core/evaluation/base_evaluator.py b/deployment/core/evaluation/base_evaluator.py index 23d251ae9..72ace770e 100644 --- a/deployment/core/evaluation/base_evaluator.py +++ b/deployment/core/evaluation/base_evaluator.py @@ -23,6 +23,7 @@ from deployment.core.backend import Backend from deployment.core.evaluation.evaluator_types import ( EvalResultDict, + InferenceInput, InferenceResult, LatencyBreakdown, LatencyStats, @@ -38,6 +39,7 @@ "EvalResultDict", "VerifyResultDict", "ModelSpec", + "InferenceInput", "InferenceResult", "LatencyStats", "LatencyBreakdown", @@ -160,8 +162,14 @@ def _prepare_input( sample: Mapping[str, Any], data_loader: BaseDataLoader, device: str, - ) -> Tuple[Any, Dict[str, Any]]: - """Prepare model input from a sample. Returns (input_data, inference_kwargs).""" + ) -> InferenceInput: + """Prepare model input from a sample. + + Returns: + InferenceInput containing: + - data: The actual input data (e.g., points tensor) + - metadata: Sample metadata forwarded to postprocess() + """ raise NotImplementedError @abstractmethod @@ -211,7 +219,7 @@ def _get_verification_input( sample_idx: int, data_loader: BaseDataLoader, device: str, - ) -> Tuple[Any, Dict[str, Any]]: + ) -> InferenceInput: """Get verification input.""" sample = data_loader.load_sample(sample_idx) return self._prepare_input(sample, data_loader, device) @@ -254,12 +262,12 @@ def evaluate( logger.info(f"Processing sample {idx + 1}/{actual_samples}") sample = data_loader.load_sample(idx) - input_data, infer_kwargs = self._prepare_input(sample, data_loader, model.device) + inference_input = self._prepare_input(sample, data_loader, model.device) - gt_data = data_loader.get_ground_truth(idx) + gt_data = sample.get("ground_truth", {}) ground_truths = self._parse_ground_truths(gt_data) - infer_result = pipeline.infer(input_data, **infer_kwargs) + infer_result = pipeline.infer(inference_input.data, metadata=inference_input.metadata) latencies.append(infer_result.latency_ms) if infer_result.breakdown: latency_breakdowns.append(infer_result.breakdown) diff --git a/deployment/core/evaluation/evaluator_types.py b/deployment/core/evaluation/evaluator_types.py index de800656f..d6ae9fabc 100644 --- a/deployment/core/evaluation/evaluator_types.py +++ b/deployment/core/evaluation/evaluator_types.py @@ -7,7 +7,7 @@ from __future__ import annotations -from dataclasses import asdict, dataclass +from dataclasses import asdict, dataclass, field from typing import Any, Dict, Optional, TypedDict from deployment.core.artifacts import Artifact @@ -93,6 +93,19 @@ def to_dict(self) -> Dict[str, Dict[str, float]]: return {stage: stats.to_dict() for stage, stats in self.stages.items()} +@dataclass +class InferenceInput: + """Prepared input for pipeline inference. + + Attributes: + data: The actual input data (e.g., points tensor, image tensor). + metadata: Sample metadata forwarded to postprocess(). + """ + + data: Any + metadata: Dict[str, Any] = field(default_factory=dict) + + @dataclass(frozen=True) class InferenceResult: """Standard inference return payload.""" diff --git a/deployment/core/evaluation/verification_mixin.py b/deployment/core/evaluation/verification_mixin.py index 9b44c2110..45ef8dccd 100644 --- a/deployment/core/evaluation/verification_mixin.py +++ b/deployment/core/evaluation/verification_mixin.py @@ -16,7 +16,7 @@ import torch from deployment.core.backend import Backend -from deployment.core.evaluation.evaluator_types import ModelSpec, VerifyResultDict +from deployment.core.evaluation.evaluator_types import InferenceInput, ModelSpec, VerifyResultDict from deployment.core.io.base_data_loader import BaseDataLoader @@ -71,8 +71,14 @@ def _get_verification_input( sample_idx: int, data_loader: BaseDataLoader, device: str, - ) -> Tuple[Any, Dict[str, Any]]: - """Get input data for verification.""" + ) -> InferenceInput: + """Get input data for verification. + + Returns: + InferenceInput containing: + - data: The actual input data (e.g., points tensor) + - metadata: Sample metadata forwarded to postprocess() + """ raise NotImplementedError def _get_output_names(self) -> Optional[List[str]]: @@ -417,17 +423,25 @@ def _verify_single_sample( logger: logging.Logger, ) -> bool: """Verify a single sample.""" - input_data, metadata = self._get_verification_input(sample_idx, data_loader, ref_device) + inference_input = self._get_verification_input(sample_idx, data_loader, ref_device) ref_name = f"{ref_backend.value} ({ref_device})" logger.info(f"\nRunning {ref_name} reference...") - ref_result = ref_pipeline.infer(input_data, metadata, return_raw_outputs=True) + ref_result = ref_pipeline.infer( + inference_input.data, + metadata=inference_input.metadata, + return_raw_outputs=True, + ) logger.info(f" {ref_name} latency: {ref_result.latency_ms:.2f} ms") - test_input = self._move_input_to_device(input_data, test_device) + test_input = self._move_input_to_device(inference_input.data, test_device) test_name = f"{test_backend.value} ({test_device})" logger.info(f"\nRunning {test_name} test...") - test_result = test_pipeline.infer(test_input, metadata, return_raw_outputs=True) + test_result = test_pipeline.infer( + test_input, + metadata=inference_input.metadata, + return_raw_outputs=True, + ) logger.info(f" {test_name} latency: {test_result.latency_ms:.2f} ms") passed, _ = self._compare_backend_outputs(ref_result.output, test_result.output, tolerance, test_name, logger) diff --git a/deployment/core/io/base_data_loader.py b/deployment/core/io/base_data_loader.py index bdc94c066..88d946f74 100644 --- a/deployment/core/io/base_data_loader.py +++ b/deployment/core/io/base_data_loader.py @@ -68,7 +68,7 @@ def load_sample(self, index: int) -> SampleData: raise NotImplementedError @abstractmethod - def preprocess(self, sample: SampleData) -> torch.Tensor: + def preprocess(self, sample: SampleData) -> Any: """ Preprocess raw sample data into model input format. @@ -76,8 +76,8 @@ def preprocess(self, sample: SampleData) -> torch.Tensor: sample: Raw sample data returned by load_sample() Returns: - Preprocessed tensor ready for model inference. - Shape and format depend on the specific task. + Preprocessed model input ready for inference. Type/shape is task-specific. + (e.g., torch.Tensor, Dict[str, torch.Tensor], tuple, etc.) Raises: ValueError: If sample format is invalid @@ -94,20 +94,6 @@ def get_num_samples(self) -> int: """ raise NotImplementedError - @abstractmethod - def get_ground_truth(self, index: int) -> Mapping[str, Any]: - """ - Get ground truth annotations for a specific sample. - - Args: - index: Sample index whose annotations should be returned - - Returns: - Dictionary containing task-specific ground truth data. - Implementations should raise IndexError if the index is invalid. - """ - raise NotImplementedError - def get_shape_sample(self, index: int = 0) -> Any: """ Return a representative sample used for export shape configuration. diff --git a/deployment/core/metrics/base_metrics_interface.py b/deployment/core/metrics/base_metrics_interface.py index 37feb8be4..3d68245e3 100644 --- a/deployment/core/metrics/base_metrics_interface.py +++ b/deployment/core/metrics/base_metrics_interface.py @@ -59,24 +59,27 @@ def to_dict(self) -> Dict[str, Any]: @dataclass(frozen=True) class DetectionSummary: - """Structured summary for detection metrics (2D/3D).""" + """Structured summary for detection metrics (2D/3D). - mAP: float = 0.0 - per_class_ap: Dict[str, float] = field(default_factory=dict) + All matching modes computed by autoware_perception_evaluation are included. + The `mAP_by_mode` and `mAPH_by_mode` dicts contain results for each matching mode. + """ + + mAP_by_mode: Dict[str, float] = field(default_factory=dict) + mAPH_by_mode: Dict[str, float] = field(default_factory=dict) + per_class_ap_by_mode: Dict[str, Dict[str, float]] = field(default_factory=dict) num_frames: int = 0 detailed_metrics: Dict[str, float] = field(default_factory=dict) - mAPH: Optional[float] = None def to_dict(self) -> Dict[str, Any]: - data = { - "mAP": self.mAP, - "per_class_ap": dict(self.per_class_ap), + """Convert to dict.""" + return { + "mAP_by_mode": dict(self.mAP_by_mode), + "mAPH_by_mode": dict(self.mAPH_by_mode), + "per_class_ap_by_mode": {k: dict(v) for k, v in self.per_class_ap_by_mode.items()}, "num_frames": self.num_frames, "detailed_metrics": dict(self.detailed_metrics), } - if self.mAPH is not None: - data["mAPH"] = self.mAPH - return data class BaseMetricsInterface(ABC): @@ -125,7 +128,7 @@ def reset(self) -> None: pass @abstractmethod - def add_frame(self, *args, **kwargs) -> None: + def add_frame(self, *args) -> None: """ Add a frame of predictions and ground truths for evaluation. diff --git a/deployment/core/metrics/detection_2d_metrics.py b/deployment/core/metrics/detection_2d_metrics.py index fb9e73e5c..575dbb34f 100644 --- a/deployment/core/metrics/detection_2d_metrics.py +++ b/deployment/core/metrics/detection_2d_metrics.py @@ -452,27 +452,49 @@ def _process_metrics_score(self, metrics_score: MetricsScore) -> Dict[str, float return metric_dict def get_summary(self) -> DetectionSummary: - """Get a summary of the evaluation including mAP and per-class metrics.""" + """Get a summary of the evaluation including mAP and per-class metrics for all matching modes.""" metrics = self.compute_metrics() - # Extract primary metrics (first mAP value found) - primary_map = None - per_class_ap = {} - - for key, value in metrics.items(): - if key.startswith("mAP_") and primary_map is None: - primary_map = value - elif "_AP_" in key and not key.startswith("mAP"): - # Extract class name from key - parts = key.split("_AP_") - if len(parts) == 2: - class_name = parts[0] - if class_name not in per_class_ap: - per_class_ap[class_name] = value + # Extract matching modes from metrics + modes = [] + for k in metrics.keys(): + if k.startswith("mAP_") and k != "mAP": + modes.append(k[len("mAP_") :]) + modes = list(dict.fromkeys(modes)) # Remove duplicates while preserving order + + if not modes: + return DetectionSummary( + mAP_by_mode={}, + mAPH_by_mode={}, + per_class_ap_by_mode={}, + num_frames=self._frame_count, + detailed_metrics=metrics, + ) + + # Collect mAP and per-class AP for each matching mode + mAP_by_mode: Dict[str, float] = {} + per_class_ap_by_mode: Dict[str, Dict[str, float]] = {} + + for mode in modes: + map_value = metrics.get(f"mAP_{mode}", 0.0) + mAP_by_mode[mode] = float(map_value) + + # Collect AP values per class for this mode + per_class_ap_values: Dict[str, List[float]] = {} + ap_key_infix = f"_AP_{mode}_" + for key, value in metrics.items(): + if ap_key_infix not in key or key.startswith("mAP"): + continue + class_name = key.split("_AP_", 1)[0] + per_class_ap_values.setdefault(class_name, []).append(float(value)) + + if per_class_ap_values: + per_class_ap_by_mode[mode] = {k: float(np.mean(v)) for k, v in per_class_ap_values.items() if v} return DetectionSummary( - mAP=primary_map or 0.0, - per_class_ap=per_class_ap, + mAP_by_mode=mAP_by_mode, + mAPH_by_mode={}, # 2D detection doesn't have mAPH + per_class_ap_by_mode=per_class_ap_by_mode, num_frames=self._frame_count, detailed_metrics=metrics, ) diff --git a/deployment/core/metrics/detection_3d_metrics.py b/deployment/core/metrics/detection_3d_metrics.py index 235ab795b..6bfc6ddd5 100644 --- a/deployment/core/metrics/detection_3d_metrics.py +++ b/deployment/core/metrics/detection_3d_metrics.py @@ -25,9 +25,10 @@ """ import logging +import re import time -from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from dataclasses import dataclass +from typing import Any, Dict, List, Mapping, Optional import numpy as np from perception_eval.common.dataset import FrameGroundTruth @@ -130,30 +131,13 @@ def __post_init__(self): class Detection3DMetricsInterface(BaseMetricsInterface): + # TODO(vividf): refactor this class after refactoring T4MetricV2 """ Interface for computing 3D detection metrics using autoware_perception_evaluation. This interface provides a simplified interface for the deployment framework to compute mAP, mAPH, and other detection metrics that are consistent with the T4MetricV2 used during training. - - Example usage: - config = Detection3DMetricsConfig( - class_names=["car", "truck", "bus", "bicycle", "pedestrian"], - frame_id="base_link", - ) - interface = Detection3DMetricsInterface(config) - - # Add frames - for pred, gt in zip(predictions_list, ground_truths_list): - interface.add_frame( - predictions=pred, # List[Dict] with bbox_3d, label, score - ground_truths=gt, # List[Dict] with bbox_3d, label - ) - - # Compute metrics - metrics = interface.compute_metrics() - # Returns: {"mAP_center_distance_bev_0.5": 0.7, ...} """ _UNKNOWN = "unknown" @@ -176,38 +160,99 @@ def __init__( self.data_root = data_root self.result_root_directory = result_root_directory - # Create perception evaluation config - self.perception_eval_config = PerceptionEvaluationConfig( - dataset_paths=data_root, - frame_id=config.frame_id, - result_root_directory=result_root_directory, - evaluation_config_dict=config.evaluation_config_dict, - load_raw_data=False, - ) + cfg_dict = config.evaluation_config_dict or {} + self._evaluation_cfg_dict: Dict[str, Any] = dict(cfg_dict) - # Create critical object filter config - self.critical_object_filter_config = CriticalObjectFilterConfig( - evaluator_config=self.perception_eval_config, - **config.critical_object_filter_config, - ) + # Create multiple evaluators for different distance ranges (like T4MetricV2) + min_distance = cfg_dict.get("min_distance") + max_distance = cfg_dict.get("max_distance") - # Create frame pass fail config - self.frame_pass_fail_config = PerceptionPassFailConfig( - evaluator_config=self.perception_eval_config, - **config.frame_pass_fail_config, - ) + if isinstance(min_distance, (int, float)) and isinstance(max_distance, (int, float)): + min_distance = [float(min_distance)] + max_distance = [float(max_distance)] + elif not isinstance(min_distance, list) or not isinstance(max_distance, list): + raise ValueError( + "min_distance and max_distance must be either scalars (int/float) or lists for multi-evaluator mode. " + f"Got min_distance={type(min_distance)}, max_distance={type(max_distance)}" + ) - # Initialize evaluation manager (will be created on first use or reset) - self.evaluator: Optional[PerceptionEvaluationManager] = None + if len(min_distance) != len(max_distance): + raise ValueError( + f"min_distance and max_distance must have the same length. " + f"Got len(min_distance)={len(min_distance)}, len(max_distance)={len(max_distance)}" + ) + + if len(min_distance) == 0: + raise ValueError("min_distance and max_distance lists cannot be empty") + + # Create distance ranges and evaluators + self._bev_distance_ranges = list(zip(min_distance, max_distance)) + self.evaluators: Dict[str, Dict[str, Any]] = {} + self._create_evaluators(config) + + self.gt_count_total: int = 0 + self.pred_count_total: int = 0 + self.gt_count_by_label: Dict[str, int] = {} + self.pred_count_by_label: Dict[str, int] = {} + self._last_metrics_by_eval_name: Dict[str, MetricsScore] = {} + + def _create_evaluators(self, config: Detection3DMetricsConfig) -> None: + """Create multiple evaluators for different distance ranges (like T4MetricV2).""" + range_filter_name = "bev_center" + + for min_dist, max_dist in self._bev_distance_ranges: + # Create a copy of evaluation_config_dict with single distance values + eval_config_dict = dict(config.evaluation_config_dict or {}) + eval_config_dict["min_distance"] = min_dist + eval_config_dict["max_distance"] = max_dist + + # Create perception evaluation config for this range + evaluator_config = PerceptionEvaluationConfig( + dataset_paths=self.data_root, + frame_id=config.frame_id, + result_root_directory=self.result_root_directory, + evaluation_config_dict=eval_config_dict, + load_raw_data=False, + ) + + # Create critical object filter config + critical_object_filter_config = CriticalObjectFilterConfig( + evaluator_config=evaluator_config, + **config.critical_object_filter_config, + ) + + # Create frame pass fail config + frame_pass_fail_config = PerceptionPassFailConfig( + evaluator_config=evaluator_config, + **config.frame_pass_fail_config, + ) + + evaluator_name = f"{range_filter_name}_{min_dist}-{max_dist}" + + self.evaluators[evaluator_name] = { + "evaluator": None, # Will be created on reset + "evaluator_config": evaluator_config, + "critical_object_filter_config": critical_object_filter_config, + "frame_pass_fail_config": frame_pass_fail_config, + "bev_distance_range": (min_dist, max_dist), + } def reset(self) -> None: """Reset the interface for a new evaluation session.""" - self.evaluator = PerceptionEvaluationManager( - evaluation_config=self.perception_eval_config, - load_ground_truth=False, - metric_output_dir=None, - ) + # Reset all evaluators + for eval_name, eval_data in self.evaluators.items(): + eval_data["evaluator"] = PerceptionEvaluationManager( + evaluation_config=eval_data["evaluator_config"], + load_ground_truth=False, + metric_output_dir=None, + ) + self._frame_count = 0 + self.gt_count_total = 0 + self.pred_count_total = 0 + self.gt_count_by_label = {} + self.pred_count_by_label = {} + self._last_metrics_by_eval_name = {} def _convert_index_to_label(self, label_index: int) -> Label: """Convert a label index to a Label object. @@ -374,28 +419,57 @@ def add_frame( - num_lidar_pts: int (optional) frame_name: Optional name for the frame. """ - if self.evaluator is None: + needs_reset = any(eval_data["evaluator"] is None for eval_data in self.evaluators.values()) + if needs_reset: self.reset() unix_time = time.time() if frame_name is None: frame_name = str(self._frame_count) + self.pred_count_total += len(predictions) + self.gt_count_total += len(ground_truths) + + for p in predictions: + try: + label = int(p.get("label", -1)) + except Exception: + label = -1 + if 0 <= label < len(self.class_names): + name = self.class_names[label] + self.pred_count_by_label[name] = self.pred_count_by_label.get(name, 0) + 1 + + for g in ground_truths: + try: + label = int(g.get("label", -1)) + except Exception: + label = -1 + if 0 <= label < len(self.class_names): + name = self.class_names[label] + self.gt_count_by_label[name] = self.gt_count_by_label.get(name, 0) + 1 + # Convert predictions to DynamicObject estimated_objects = self._predictions_to_dynamic_objects(predictions, unix_time) # Convert ground truths to FrameGroundTruth frame_ground_truth = self._ground_truths_to_frame_ground_truth(ground_truths, unix_time, frame_name) - # Add frame result to evaluator + # Add frame result to all evaluators try: - self.evaluator.add_frame_result( - unix_time=unix_time, - ground_truth_now_frame=frame_ground_truth, - estimated_objects=estimated_objects, - critical_object_filter_config=self.critical_object_filter_config, - frame_pass_fail_config=self.frame_pass_fail_config, - ) + for eval_name, eval_data in self.evaluators.items(): + if eval_data["evaluator"] is None: + eval_data["evaluator"] = PerceptionEvaluationManager( + evaluation_config=eval_data["evaluator_config"], + load_ground_truth=False, + metric_output_dir=None, + ) + eval_data["evaluator"].add_frame_result( + unix_time=unix_time, + ground_truth_now_frame=frame_ground_truth, + estimated_objects=estimated_objects, + critical_object_filter_config=eval_data["critical_object_filter_config"], + frame_pass_fail_config=eval_data["frame_pass_fail_config"], + ) self._frame_count += 1 except Exception as e: logger.warning(f"Failed to add frame {frame_name}: {e}") @@ -405,22 +479,47 @@ def compute_metrics(self) -> Dict[str, float]: Returns: Dictionary of metrics with keys like: - - mAP_center_distance_bev_0.5 - - mAP_center_distance_bev_1.0 - - mAPH_center_distance_bev_0.5 - - car_AP_center_distance_bev_0.5 + - mAP_center_distance_bev (mean AP across all classes, no threshold) + - mAPH_center_distance_bev (mean APH across all classes, no threshold) + - car_AP_center_distance_bev_0.5 (per-class AP with threshold) + - car_AP_center_distance_bev_1.0 (per-class AP with threshold) + - car_APH_center_distance_bev_0.5 (per-class APH with threshold) - etc. + For multi-evaluator mode, metrics are prefixed with evaluator name: + - bev_center_0.0-50.0_mAP_center_distance_bev + - bev_center_0.0-50.0_car_AP_center_distance_bev_0.5 + - bev_center_50.0-90.0_mAP_center_distance_bev + - etc. + Note: mAP/mAPH keys do not include threshold; only per-class AP/APH keys do. """ - if self.evaluator is None or self._frame_count == 0: + if self._frame_count == 0: logger.warning("No frames to evaluate") return {} try: - # Get scene result (aggregated metrics) - metrics_score: MetricsScore = self.evaluator.get_scene_result() - - # Process metrics into a flat dictionary - return self._process_metrics_score(metrics_score) + # Cache scene results to avoid recomputing + scene_results = {} + for eval_name, eval_data in self.evaluators.items(): + evaluator = eval_data["evaluator"] + if evaluator is None: + continue + + try: + metrics_score = evaluator.get_scene_result() + scene_results[eval_name] = metrics_score + except Exception as e: + logger.warning(f"Error computing metrics for {eval_name}: {e}") + + # Process cached metrics with evaluator name prefix + all_metrics = {} + for eval_name, metrics_score in scene_results.items(): + eval_metrics = self._process_metrics_score(metrics_score, prefix=eval_name) + all_metrics.update(eval_metrics) + + # Cache results for reuse by format_last_report() and get_summary() + self._last_metrics_by_eval_name = scene_results + + return all_metrics except Exception as e: logger.error(f"Error computing metrics: {e}") @@ -429,16 +528,42 @@ def compute_metrics(self) -> Dict[str, float]: traceback.print_exc() return {} - def _process_metrics_score(self, metrics_score: MetricsScore) -> Dict[str, float]: + def format_last_report(self) -> str: + """Format the last metrics report using perception_eval's own __str__ implementation. + + For multi-evaluator mode, returns reports for all evaluators with distance range labels. + Uses cached results from compute_metrics() if available to avoid recomputation. + """ + # Use cached results if available, otherwise compute them + if not self._last_metrics_by_eval_name: + # Cache not available, compute now + self.compute_metrics() + + # Format reports for all evaluators using cached results + reports = [] + for eval_name, metrics_score in self._last_metrics_by_eval_name.items(): + try: + # Extract distance range from evaluator name (e.g., "bev_center_0.0-50.0" -> "0.0-50.0") + distance_range = eval_name.replace("bev_center_", "") + report = f"\n{'='*80}\nDistance Range: {distance_range} m\n{'='*80}\n{str(metrics_score)}" + reports.append(report) + except Exception as e: + logger.warning(f"Error formatting report for {eval_name}: {e}") + + return "\n".join(reports) if reports else "" + + def _process_metrics_score(self, metrics_score: MetricsScore, prefix: Optional[str] = None) -> Dict[str, float]: """Process MetricsScore into a flat dictionary. Args: metrics_score: MetricsScore instance from evaluator. + prefix: Optional prefix to add to metric keys (for multi-evaluator mode). Returns: Flat dictionary of metrics. """ metric_dict = {} + key_prefix = f"{prefix}_" if prefix else "" for map_instance in metrics_score.mean_ap_values: matching_mode = map_instance.matching_mode.value.lower().replace(" ", "_") @@ -452,43 +577,137 @@ def _process_metrics_score(self, metrics_score: MetricsScore) -> Dict[str, float ap_value = ap.ap # Create the metric key - key = f"{label_name}_AP_{matching_mode}_{threshold}" + key = f"{key_prefix}{label_name}_AP_{matching_mode}_{threshold}" metric_dict[key] = ap_value + # Process individual APH values + label_to_aphs = getattr(map_instance, "label_to_aphs", None) + if label_to_aphs: + for label, aphs in label_to_aphs.items(): + label_name = label.value + for aph in aphs: + threshold = aph.matching_threshold + aph_value = getattr(aph, "aph", None) + if aph_value is None: + aph_value = getattr(aph, "ap", None) + if aph_value is None: + continue + key = f"{key_prefix}{label_name}_APH_{matching_mode}_{threshold}" + metric_dict[key] = aph_value + # Add mAP and mAPH values - map_key = f"mAP_{matching_mode}" - maph_key = f"mAPH_{matching_mode}" + map_key = f"{key_prefix}mAP_{matching_mode}" + maph_key = f"{key_prefix}mAPH_{matching_mode}" metric_dict[map_key] = map_instance.map metric_dict[maph_key] = map_instance.maph return metric_dict + @staticmethod + def _extract_matching_modes(metrics: Mapping[str, float]) -> List[str]: + """Extract matching modes from metrics dict keys (e.g., 'mAP_center_distance_bev' -> 'center_distance_bev'). + + Supports both prefixed and non-prefixed formats: + - Non-prefixed: "mAP_center_distance_bev" + - Prefixed: "bev_center_0.0-50.0_mAP_center_distance_bev" + """ + # Matches either "mAP_" or "_mAP_" + pat = re.compile(r"(?:^|_)mAP_(.+)$") + modes: List[str] = [] + for k in metrics.keys(): + m = pat.search(k) + if m: + modes.append(m.group(1)) + # Remove duplicates while preserving order + return list(dict.fromkeys(modes)) + + def get_thresholds_for_mode( + self, mode: str, metrics: Optional[Mapping[str, float]] = None + ) -> Optional[List[float]]: + """Return thresholds for a matching mode from config or inferred from metric keys.""" + cfg = self._evaluation_cfg_dict + threshold_key = f"{mode}_thresholds" + thresholds = cfg.get(threshold_key) + if thresholds is not None: + return [float(x) for x in thresholds] + + if not metrics: + return None + + pattern = re.compile(rf"_AP(H)?_{re.escape(mode)}_([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)$") + found: List[float] = [] + for k in metrics.keys(): + m = pattern.search(k) + if m: + try: + found.append(float(m.group(2))) + except Exception: + pass + return sorted(set(found)) if found else None + def get_summary(self) -> DetectionSummary: - """Get a summary of the evaluation including mAP and per-class metrics.""" + """Get a summary of the evaluation including mAP and per-class metrics for all matching modes.""" metrics = self.compute_metrics() - # Extract primary metrics (first mAP value found) - primary_map = None - primary_maph = None - per_class_ap = {} - - for key, value in metrics.items(): - if key.startswith("mAP_") and primary_map is None: - primary_map = value - elif key.startswith("mAPH_") and primary_maph is None: - primary_maph = value - elif "_AP_" in key and not key.startswith("mAP"): - # Extract class name from key - parts = key.split("_AP_") - if len(parts) == 2: - class_name = parts[0] - if class_name not in per_class_ap: - per_class_ap[class_name] = value + modes = self._extract_matching_modes(metrics) + if not modes: + return DetectionSummary( + mAP_by_mode={}, + mAPH_by_mode={}, + per_class_ap_by_mode={}, + num_frames=self._frame_count, + detailed_metrics=metrics, + ) + + # Collect mAP/mAPH and per-class AP for each matching mode + # Handle both prefixed (multi-evaluator) and non-prefixed metrics + mAP_by_mode: Dict[str, float] = {} + mAPH_by_mode: Dict[str, float] = {} + per_class_ap_by_mode: Dict[str, Dict[str, float]] = {} + + for mode in modes: + map_values = [] + maph_values = [] + + # Use regex to match both prefixed and non-prefixed formats + map_pattern = re.compile(rf"(?:^|_)mAP_{re.escape(mode)}$") + maph_pattern = re.compile(rf"(?:^|_)mAPH_{re.escape(mode)}$") + + for key, value in metrics.items(): + if map_pattern.search(key): + map_values.append(float(value)) + if maph_pattern.search(key): + maph_values.append(float(value)) + + if map_values: + mAP_by_mode[mode] = float(np.mean(map_values)) + else: + mAP_by_mode[mode] = 0.0 + + if maph_values: + mAPH_by_mode[mode] = float(np.mean(maph_values)) + + # Collect AP values per class for this mode + # Parse class name from key format: "{prefix}_