Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tools/calibration_classification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ python tools/calibration_classification/create_data_t4dataset.py --config /works
- `-o, --out_dir` (required): Output directory for info files
- `--lidar_channel` (optional): Lidar channel name (default: LIDAR_CONCAT)
- `--target_cameras` (optional): List of target cameras to process (default: all cameras)
- `--filter` (optional): Filter out likely black/corrupted images based on file size

**Examples:**

Expand All @@ -49,8 +50,14 @@ python tools/calibration_classification/create_data_t4dataset.py --config /works

# Process only specific cameras
python tools/calibration_classification/create_data_t4dataset.py --config /workspace/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_base.py --version gen2_base --root_path ./data/t4dataset -o ./data/t4dataset/calibration_info/ --target_cameras CAM_FRONT CAM_LEFT CAM_RIGHT

# Filter out black/corrupted images (recommended for datasets with potential black images)
python tools/calibration_classification/create_data_t4dataset.py --config /workspace/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_base.py --version gen2_base --root_path ./data/t4dataset -o ./data/t4dataset/calibration_info/ --filter
```

**Note on `--filter` flag:**
The `--filter` option detects and excludes likely black/corrupted images by analyzing file sizes. It samples images from each scene to calculate an average size, then filters out images smaller than 5% of that average (with a minimum threshold of 10KB).

**Output files:**
The script generates three pickle files for train/val/test splits:
- `t4dataset_{version}_infos_train.pkl`
Expand Down
141 changes: 130 additions & 11 deletions tools/calibration_classification/create_data_t4dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,90 @@
logger = MMLogger.get_instance(name="create_data_t4dataset")


def is_image_likely_black(img_path: str, size_threshold_bytes: int) -> bool:
"""Check if an image is likely black based on file size.

Black images compress extremely well (nearly all zeros), resulting in very small files.
This is much faster than reading and analyzing pixel values.

Args:
img_path (str): Path to the image file.
size_threshold_bytes (int): File size threshold in bytes. Images smaller than this
are considered likely black.

Returns:
bool: True if the image file is smaller than threshold (likely black), False otherwise.
"""
if not osp.isfile(img_path):
logger.warning(f"Image file not found: {img_path}")
return True

file_size = osp.getsize(img_path)
return file_size < size_threshold_bytes


def calculate_size_threshold(
sample_data_json: List[Dict[str, Any]],
root_path: str,
scene_root: str,
target_cameras: List[str],
num_samples: int = 20,
threshold_ratio: float = 0.05,
) -> int:
"""Calculate file size threshold for filtering likely black images.

Samples a number of images to determine average file size, then sets threshold
as a fraction of that average. Black images are typically <5% of normal image size.

Args:
sample_data_json (list): List of sample data dictionaries.
root_path (str): Absolute root path of the dataset.
scene_root (str): Relative scene root path.
target_cameras (list): List of camera channels to sample from.
num_samples (int): Number of images to sample for calculating average size.
threshold_ratio (float): Ratio of average size to use as threshold (default 0.05 = 5%).

Returns:
int: Size threshold in bytes. Images smaller than this are likely black.
"""
image_sizes = []

for sd in sample_data_json:
if len(image_sizes) >= num_samples:
break

filename = sd.get("filename", "")
for cam in target_cameras:
if filename.startswith(f"data/{cam}/"):
img_rel_path = osp.join(scene_root, filename)
img_abs_path = osp.join(root_path, img_rel_path)
if osp.isfile(img_abs_path):
file_size = osp.getsize(img_abs_path)
# Only include non-tiny files in the average calculation
# (to avoid black images skewing the average down)
if file_size > 10000: # > 10KB
image_sizes.append(file_size)
break

if not image_sizes:
# Fallback to a reasonable default (50KB) if no valid samples found
logger.warning("Could not sample images for threshold calculation, using default 50KB threshold")
return 50000

avg_size = sum(image_sizes) / len(image_sizes)
threshold = int(avg_size * threshold_ratio)

# Ensure minimum threshold of 10KB to avoid false positives
threshold = max(threshold, 10000)

logger.info(
f"Calculated size threshold: {threshold / 1000:.1f}KB "
f"(sampled {len(image_sizes)} images, avg size: {avg_size / 1000:.1f}KB)"
)

return threshold


def load_json(path: str) -> Dict[str, Any]:
"""Load a JSON file from the given path.

Expand Down Expand Up @@ -197,6 +281,8 @@ def generate_calib_info(
scene_id: Optional[str] = None,
start_sample_idx: int = 0,
target_cameras: Optional[List[str]] = None,
root_path: Optional[str] = None,
filter_black_images: bool = False,
) -> Tuple[List[Dict[str, Any]], int]:
"""
Generate calibration info for each frame (grouped by filename index).
Expand All @@ -210,6 +296,9 @@ def generate_calib_info(
start_sample_idx (int, optional): Starting index for sample numbering. Defaults to 0.
target_cameras (list, optional): List of target camera channels to process.
If None, processes all available cameras. Defaults to None.
root_path (str, optional): Absolute root path of the dataset for image validation.
Defaults to None.
filter_black_images (bool, optional): Whether to filter out black images. Defaults to False.
Returns:
tuple: (infos, next_sample_idx) where infos is a list of calibration info dictionaries
and next_sample_idx is the next available sample index.
Expand All @@ -231,6 +320,11 @@ def generate_calib_info(
target_cameras = get_all_channels(sample_data_json)
logger.info(f"Using all available cameras: {target_cameras}")

# Calculate size threshold for black image filtering (only once per scene)
size_threshold = None
if filter_black_images and root_path is not None:
size_threshold = calculate_size_threshold(sample_data_json, root_path, scene_root, target_cameras)

# Group all sample_data by frame index, filtering out invalid samples
frame_groups: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
invalid_count = 0
Expand Down Expand Up @@ -265,6 +359,8 @@ def generate_calib_info(
lidar_channel,
scene_id,
sample_idx,
root_path,
size_threshold,
)
if frame_infos:
infos.extend(frame_infos)
Expand All @@ -285,6 +381,8 @@ def build_frame_info(
lidar_channel: str,
scene_id: Optional[str] = None,
sample_idx: int = 0,
root_path: Optional[str] = None,
size_threshold: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Build frame info for each target camera. If target_cameras is None, build for all cameras.
Args:
Expand All @@ -297,6 +395,10 @@ def build_frame_info(
lidar_channel (str): Name of the lidar channel.
scene_id (str, optional): ID of the scene. Defaults to None.
sample_idx (int, optional): Starting index for sample numbering. Defaults to 0.
root_path (str, optional): Absolute root path of the dataset for image validation.
Defaults to None.
size_threshold (int, optional): File size threshold in bytes for filtering likely black
images. If None, no filtering is performed. Defaults to None.
Returns:
list: List of frame info dictionaries, one for each available camera in the frame.
Each info contains camera data, lidar data, and transformation matrices.
Expand Down Expand Up @@ -349,9 +451,22 @@ def build_frame_info(
available_cameras: List[str] = [cam for cam in target_cameras if cam in camera_data]

infos: List[Dict[str, Any]] = []
for i, cam in enumerate(available_cameras):
valid_sample_count = 0
for cam in available_cameras:
cam_info = camera_data[cam]

# Check if image is likely black based on file size (when filtering is enabled)
if size_threshold is not None and root_path is not None:
img_rel_path = cam_info["img_path"]
img_abs_path = osp.join(root_path, img_rel_path)
if is_image_likely_black(img_abs_path, size_threshold):
file_size = osp.getsize(img_abs_path) if osp.isfile(img_abs_path) else 0
logger.warning(
f"Skipping likely black image for camera {cam} in frame {frame_idx}, "
f"scene {scene_id} (size: {file_size / 1000:.1f}KB < threshold {size_threshold / 1000:.1f}KB)"
)
continue

# Calculate lidar2cam transformation matrix
lidar2ego = lidar_data["lidar2ego"]
lidar_pose = lidar_data["lidar_pose"]
Expand All @@ -372,10 +487,11 @@ def build_frame_info(
"frame_id": cam,
"image": cam_info,
"lidar_points": lidar_data,
"sample_idx": sample_idx + i,
"sample_idx": sample_idx + valid_sample_count,
"scene_id": scene_id,
}
infos.append(info)
valid_sample_count += 1

return infos

Expand All @@ -402,6 +518,7 @@ def parse_args() -> argparse.Namespace:
parser.add_argument(
"--target_cameras", nargs="*", default=None, help="Target cameras to generate info for (default: all cameras)"
)
parser.add_argument("--filter", action="store_true", help="Filter out black images (all pixels are zero)")
return parser.parse_args()


Expand All @@ -410,22 +527,20 @@ def get_scene_root_dir_path(root_path: str, dataset_version: str, scene_id: str)
Args:
root_path (str): Root path of the dataset.
dataset_version (str): Version of the dataset.
scene_id (str): ID of the scene (may include version number like "uuid version").
scene_id (str): ID of the scene (format: "uuid/version", e.g., "c4edef0a-1d00-4f0d-9e08-65ba8b0692ff/0").
Returns:
str: Path to the scene root directory. If scene_id contains a version number,
returns the path with the version. Otherwise, looks for version subdirectories
and returns the path to the highest version number subdirectory.
str: Path to the scene root directory.
"""
# Check if scene_id already contains a version number (e.g., "uuid version")
scene_id_parts = scene_id.strip().split()
scene_id = scene_id.strip()
scene_id_parts = scene_id.split("/")

if len(scene_id_parts) == 2 and scene_id_parts[1].isdigit():
# scene_id contains version number, use it directly
base_scene_id = scene_id_parts[0]
version_id = scene_id_parts[1]
scene_root_dir_path = osp.join(root_path, dataset_version, base_scene_id)
return osp.join(scene_root_dir_path, version_id)
else:
raise ValueError(f"Invalid scene_id format: {scene_id}")

raise ValueError(f"Invalid scene_id format: {scene_id}. Expected 'uuid/version'.")


def main() -> None:
Expand All @@ -448,6 +563,8 @@ def main() -> None:
logger.info(f"Lidar channel: {args.lidar_channel}")
if args.target_cameras:
logger.info(f"Target cameras: {args.target_cameras}")
if args.filter:
logger.info("Black image filtering is enabled")

abs_root_path = osp.abspath(args.root_path)

Expand Down Expand Up @@ -479,6 +596,8 @@ def main() -> None:
scene_id,
split_sample_idx[split],
args.target_cameras,
abs_root_path,
args.filter,
)
split_infos[split].extend(scene_infos)
except ValueError as e:
Expand Down