Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
102 changes: 102 additions & 0 deletions src/faceforensics_internal/scripts/extract_mask_bounding_boxes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Extract mask bounding box from mask video indicating the manipulated area."""
import json
import multiprocessing as mp
from pathlib import Path

import click
import cv2
from joblib import delayed
from joblib import Parallel
from tqdm import tqdm

from faceforensics_internal.utils import get_mask_bounding_box
from faceforensics_internal.utils import DataType
from faceforensics_internal.utils import FaceForensicsDataStructure


def extract_bounding_boxes_from_video(video_path: Path, target_sub_dir: Path):

output_path = (target_sub_dir / video_path.name).with_suffix(".json")
#if output_path.exists():
# return

# Get video capture
mask_capture = cv2.VideoCapture(str(video_path))

bounding_boxes = {}
frame_count = -1
while mask_capture.isOpened():
frame_count += 1

# Read next frame
ret, mask = mask_capture.read()

if not ret:
mask_capture.release()
break

mask_bounding_box = get_mask_bounding_box(mask)

if mask_bounding_box is not None:
bounding_boxes[f"{frame_count:04d}"] = [list(mask_bounding_box)]
else:
print(video_path, frame_count, "no bounding box")
return

with open(output_path, "w") as f:
json.dump(bounding_boxes, f)


@click.command()
@click.option("--source_dir_root", required=True, type=click.Path(exists=True))
@click.option("--target_dir_root", required=True, type=click.Path(exists=False))
@click.option(
"--methods", "-m", multiple=True, default=FaceForensicsDataStructure.ALL_METHODS
)
def extract_bounding_box_from_masks(
source_dir_root, target_dir_root, methods
):
target_dir_root = Path(target_dir_root)
target_dir_root.mkdir(parents=True, exist_ok=True)

# use FaceForensicsDataStructure to iterate over the correct image folders
source_dir_data_structure = FaceForensicsDataStructure(
source_dir_root,
methods=methods,
compressions=(DataType.masks,),
data_types=(DataType.videos,),
)

# this will be used to iterate the same way as the source dir
# -> create same data structure again
target_dir_data_structure = FaceForensicsDataStructure(
target_dir_root,
methods=methods,
compressions=(DataType.masks,),
data_types=(DataType.bounding_boxes,),
)

# zip source and target structure to iterate over both simultaneously
for source_sub_dir, target_sub_dir in zip(
source_dir_data_structure.get_subdirs(), target_dir_data_structure.get_subdirs()
):

if not source_sub_dir.exists():
continue

target_sub_dir.mkdir(parents=True, exist_ok=True)
print(f"Processing {source_sub_dir.parts[-2]}, {source_sub_dir.parts[-3]}")

# compute mask bounding box for each folder
Parallel(n_jobs=mp.cpu_count())(
delayed(
lambda _video_path: extract_bounding_boxes_from_video(
_video_path, target_sub_dir
)
)(video_path)
for video_path in tqdm(sorted(source_sub_dir.iterdir()))
)


if __name__ == "__main__":
extract_bounding_box_from_masks()
16 changes: 16 additions & 0 deletions src/faceforensics_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from typing import List
from typing import Union

import numpy as np


class StrEnum(Enum):
def __str__(self):
Expand Down Expand Up @@ -104,3 +106,17 @@ def _img_name_to_int(img: Path):
)

cl_logger = logging.getLogger()


def get_mask_bounding_box(mask):
a = np.where(mask != 0)
try:
y1, y2, x1, x2 = np.min(a[0]), np.max(a[0]), np.min(a[1]), np.max(a[1])
top = y1
right = x2
bottom = y2
left = x1
mask_bounding_box = int(top), int(right), int(bottom), int(left)
except ValueError:
return None
return mask_bounding_box