Spaces:
Runtime error
Runtime error
File size: 7,243 Bytes
2df809d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
#!/usr/bin/env python3
"""
Process 3D Ken Burns data by selecting random view types, copying images and depth files,
and computing camera intrinsics from a field-of-view value. The output files are stored in an
organized folder structure.
Usage:
python preprocess_3dkb.py --root /path/to/data_3d_ken_burns \
--out_dir /path/to/processed_3dkb \
[--num_workers 4] [--seed 42]
"""
import os
import json
import random
import shutil
from functools import partial
from pathlib import Path
import argparse
import cv2 # noqa: F401; cv2 is imported to ensure OpenEXR support.
import numpy as np
from PIL import Image
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
# Ensure OpenCV can read OpenEXR files.
os.environ["OPENCV_IO_ENABLE_OPENEXR"] = "1"
def fov_to_intrinsic_matrix(width, height, fov_deg, fov_type="horizontal"):
"""
Converts field of view (FOV) in degrees to a camera intrinsic matrix.
Args:
width (int): Image width in pixels.
height (int): Image height in pixels.
fov_deg (float): Field of view in degrees.
fov_type (str): 'horizontal' or 'vertical'; determines which FOV is used.
Returns:
np.ndarray: A 3x3 camera intrinsic matrix.
Raises:
ValueError: If width or height is non-positive or if fov_deg is not in (0, 180).
"""
if width <= 0 or height <= 0:
raise ValueError("Image width and height must be positive numbers.")
if not (0 < fov_deg < 180):
raise ValueError("FOV must be between 0 and 180 degrees (non-inclusive).")
if fov_type not in ["horizontal", "vertical"]:
raise ValueError("fov_type must be either 'horizontal' or 'vertical'.")
fov_rad = np.deg2rad(fov_deg)
if fov_type == "horizontal":
f_x = width / (2 * np.tan(fov_rad / 2))
aspect_ratio = height / width
f_y = f_x * aspect_ratio
else:
f_y = height / (2 * np.tan(fov_rad / 2))
aspect_ratio = width / height
f_x = f_y * aspect_ratio
c_x = width / 2
c_y = height / 2
K = np.array([[f_x, 0, c_x], [0, f_y, c_y], [0, 0, 1]])
return K
def process_basename(root, seq, basename, view_types, out_dir):
"""
Processes a single basename: selects a random view type, copies the corresponding
image and depth file, and computes the camera intrinsics from the JSON metadata.
Args:
root (str): Root directory of the raw data.
seq (str): Sequence directory name.
basename (str): Basename (common identifier) for the files.
view_types (list): List of view types to choose from (e.g. ['bl', 'br', 'tl', 'tr']).
out_dir (str): Output directory where processed data will be saved.
Returns:
str or None: Returns an error message string on failure; otherwise, returns None.
"""
# Select a random view type.
view_type = random.choice(view_types)
imgname = f"{basename}-{view_type}-image.png"
depthname = f"{basename}-{view_type}-depth.exr"
img_path = os.path.join(root, seq, imgname)
cam_path = os.path.join(root, seq, f"{basename}-meta.json")
depth_path = os.path.join(root, f"{seq}-depth", depthname)
# Prepare output directories.
out_seq_dir = os.path.join(out_dir, seq)
out_rgb_dir = os.path.join(out_seq_dir, "rgb")
out_depth_dir = os.path.join(out_seq_dir, "depth")
out_cam_dir = os.path.join(out_seq_dir, "cam")
# Output file paths.
out_img_path = os.path.join(out_rgb_dir, f"{basename}.png")
out_depth_path = os.path.join(out_depth_dir, f"{basename}.exr")
out_cam_path = os.path.join(out_cam_dir, f"{basename}.npz")
try:
# Load image using PIL and save as PNG.
with Image.open(img_path) as img:
W, H = img.size
img.save(out_img_path, format="PNG")
# Load camera JSON metadata.
with open(cam_path, "r") as f:
cam = json.load(f)
fov = cam["fltFov"]
K = fov_to_intrinsic_matrix(W, H, fov)
# Copy depth file.
shutil.copy(depth_path, out_depth_path)
# Save camera intrinsics.
np.savez(out_cam_path, intrinsics=K)
except Exception as e:
return f"Error processing {seq}/{basename}: {e}"
return None # Success indicator
def main():
parser = argparse.ArgumentParser(
description="Process raw 3D Ken Burns video data and generate processed images, depth maps, and camera intrinsics."
)
parser.add_argument(
"--root", type=str, required=True, help="Root directory of the raw data."
)
parser.add_argument(
"--out_dir",
type=str,
required=True,
help="Output directory for processed data.",
)
parser.add_argument(
"--num_workers",
type=int,
default=None,
help="Number of worker processes to use (default: half of available CPUs).",
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed for reproducibility (default: 42).",
)
parser.add_argument(
"--view_types",
type=str,
nargs="+",
default=["bl", "br", "tl", "tr"],
help="List of view types to choose from (default: bl br tl tr).",
)
args = parser.parse_args()
# Set the random seed.
random.seed(args.seed)
root = args.root
out_dir = args.out_dir
view_types = args.view_types
# Determine number of worker processes.
num_workers = (
args.num_workers if args.num_workers is not None else (os.cpu_count() or 4) // 2
)
# Collect all sequence directories from root.
seq_dirs = [
d
for d in os.listdir(root)
if os.path.isdir(os.path.join(root, d)) and not d.endswith("-depth")
]
# Pre-create output directory structure.
for seq in seq_dirs:
for subfolder in ["rgb", "depth", "cam"]:
(Path(out_dir) / seq / subfolder).mkdir(parents=True, exist_ok=True)
# Prepare list of tasks.
tasks = []
for seq in seq_dirs:
seq_path = os.path.join(root, seq)
# Assume JSON files contain metadata and have a name ending with "-meta.json".
json_files = [f for f in os.listdir(seq_path) if f.endswith(".json")]
# Remove the trailing "-meta.json" (10 characters) to get the basename.
basenames = sorted([f[:-10] for f in json_files])
for basename in basenames:
tasks.append((seq, basename))
# Define a partial function with fixed root, view_types, and out_dir.
process_func = partial(
process_basename, root, view_types=view_types, out_dir=out_dir
)
# Process tasks in parallel using ProcessPoolExecutor.
with ProcessPoolExecutor(max_workers=num_workers) as executor:
futures = {
executor.submit(process_func, seq, basename): (seq, basename)
for seq, basename in tasks
}
for future in tqdm(
as_completed(futures), total=len(futures), desc="Processing"
):
error = future.result()
if error:
print(error)
if __name__ == "__main__":
main()
|