Spaces:
Runtime error
Runtime error
import pathlib | |
import traceback | |
from PIL import Image | |
import numpy as np | |
import os | |
from src import core | |
from src import backbone | |
from src.common_constants import GenerationOptions as go | |
def open_path_as_images(path, maybe_depthvideo=False): | |
"""Takes the filepath, returns (fps, frames). Every frame is a Pillow Image object""" | |
suffix = pathlib.Path(path).suffix | |
if suffix.lower() == '.gif': | |
frames = [] | |
img = Image.open(path) | |
for i in range(img.n_frames): | |
img.seek(i) | |
frames.append(img.convert('RGB')) | |
return 1000 / img.info['duration'], frames | |
if suffix.lower() == '.mts': | |
import imageio_ffmpeg | |
import av | |
container = av.open(path) | |
frames = [] | |
for packet in container.demux(video=0): | |
for frame in packet.decode(): | |
# Convert the frame to a NumPy array | |
numpy_frame = frame.to_ndarray(format='rgb24') | |
# Convert the NumPy array to a Pillow Image | |
image = Image.fromarray(numpy_frame) | |
frames.append(image) | |
fps = float(container.streams.video[0].average_rate) | |
container.close() | |
return fps, frames | |
if suffix.lower() in ['.avi'] and maybe_depthvideo: | |
try: | |
import imageio_ffmpeg | |
# Suppose there are in fact 16 bits per pixel | |
# If this is not the case, this is not a 16-bit depthvideo, so no need to process it this way | |
gen = imageio_ffmpeg.read_frames(path, pix_fmt='gray16le', bits_per_pixel=16) | |
video_info = next(gen) | |
if video_info['pix_fmt'] == 'gray16le': | |
width, height = video_info['size'] | |
frames = [] | |
for frame in gen: | |
# Not sure if this is implemented somewhere else | |
result = np.frombuffer(frame, dtype='uint16') | |
result.shape = (height, width) # Why does it work? I don't remotely have any idea. | |
frames += [Image.fromarray(result)] | |
# TODO: Wrapping frames into Pillow objects is wasteful | |
return video_info['fps'], frames | |
finally: | |
if 'gen' in locals(): | |
gen.close() | |
if suffix.lower() in ['.webm', '.mp4', '.avi']: | |
from moviepy.video.io.VideoFileClip import VideoFileClip | |
clip = VideoFileClip(path) | |
frames = [Image.fromarray(x) for x in list(clip.iter_frames())] | |
# TODO: Wrapping frames into Pillow objects is wasteful | |
return clip.fps, frames | |
else: | |
try: | |
return 1, [Image.open(path)] | |
except Exception as e: | |
raise Exception(f"Probably an unsupported file format: {suffix}") from e | |
def frames_to_video(fps, frames, path, name, colorvids_bitrate=None): | |
if frames[0].mode == 'I;16': # depthmap video | |
import imageio_ffmpeg | |
writer = imageio_ffmpeg.write_frames( | |
os.path.join(path, f"{name}.avi"), frames[0].size, 'gray16le', 'gray16le', fps, codec='ffv1', | |
macro_block_size=1) | |
try: | |
writer.send(None) | |
for frame in frames: | |
writer.send(np.array(frame)) | |
finally: | |
writer.close() | |
else: | |
arrs = [np.asarray(frame) for frame in frames] | |
from moviepy.video.io.ImageSequenceClip import ImageSequenceClip | |
clip = ImageSequenceClip(arrs, fps=fps) | |
done = False | |
priority = [('avi', 'png'), ('avi', 'rawvideo'), ('mp4', 'libx264'), ('webm', 'libvpx')] | |
if colorvids_bitrate: | |
priority = reversed(priority) | |
for v_format, codec in priority: | |
try: | |
br = f'{colorvids_bitrate}k' if codec not in ['png', 'rawvideo'] else None | |
clip.write_videofile(os.path.join(path, f"{name}.{v_format}"), codec=codec, bitrate=br) | |
done = True | |
break | |
except: | |
traceback.print_exc() | |
if not done: | |
raise Exception('Saving the video failed!') | |
def process_predicitons(predictions, smoothening='none'): | |
def global_scaling(objs, a=None, b=None): | |
"""Normalizes objs, but uses (a, b) instead of (minimum, maximum) value of objs, if supplied""" | |
normalized = [] | |
min_value = a if a is not None else min([obj.min() for obj in objs]) | |
max_value = b if b is not None else max([obj.max() for obj in objs]) | |
for obj in objs: | |
normalized += [(obj - min_value) / (max_value - min_value)] | |
return normalized | |
print('Processing generated depthmaps') | |
# TODO: Detect cuts and process segments separately | |
if smoothening == 'none': | |
return global_scaling(predictions) | |
elif smoothening == 'experimental': | |
processed = [] | |
clip = lambda val: min(max(0, val), len(predictions) - 1) | |
for i in range(len(predictions)): | |
f = np.zeros_like(predictions[i]) | |
for u, mul in enumerate([0.10, 0.20, 0.40, 0.20, 0.10]): # Eyeballed it, math person please fix this | |
f += mul * predictions[clip(i + (u - 2))] | |
processed += [f] | |
# This could have been deterministic monte carlo... Oh well, this version is faster. | |
a, b = np.percentile(np.stack(processed), [0.5, 99.5]) | |
return global_scaling(predictions, a, b) | |
return predictions | |
def gen_video(video, outpath, inp, custom_depthmap=None, colorvids_bitrate=None, smoothening='none'): | |
if inp[go.GEN_SIMPLE_MESH.name.lower()] or inp[go.GEN_INPAINTED_MESH.name.lower()]: | |
return 'Creating mesh-videos is not supported. Please split video into frames and use batch processing.' | |
fps, input_images = open_path_as_images(os.path.abspath(video.name)) | |
os.makedirs(backbone.get_outpath(), exist_ok=True) | |
if custom_depthmap is None: | |
print('Generating depthmaps for the video frames') | |
needed_keys = [go.COMPUTE_DEVICE, go.MODEL_TYPE, go.BOOST, go.NET_SIZE_MATCH, go.NET_WIDTH, go.NET_HEIGHT] | |
needed_keys = [x.name.lower() for x in needed_keys] | |
first_pass_inp = {k: v for (k, v) in inp.items() if k in needed_keys} | |
# We need predictions where frames are not normalized separately. | |
first_pass_inp[go.DO_OUTPUT_DEPTH_PREDICTION] = True | |
# No need in normalized frames. Properly processed depth video will be created in the second pass | |
first_pass_inp[go.DO_OUTPUT_DEPTH.name] = False | |
gen_obj = core.core_generation_funnel(None, input_images, None, None, first_pass_inp) | |
input_depths = [x[2] for x in list(gen_obj)] | |
input_depths = process_predicitons(input_depths, smoothening) | |
else: | |
print('Using custom depthmap video') | |
cdm_fps, input_depths = open_path_as_images(os.path.abspath(custom_depthmap.name), maybe_depthvideo=True) | |
assert len(input_depths) == len(input_images), 'Custom depthmap video length does not match input video length' | |
if input_depths[0].size != input_images[0].size: | |
print('Warning! Input video size and depthmap video size are not the same!') | |
print('Generating output frames') | |
img_results = list(core.core_generation_funnel(None, input_images, input_depths, None, inp)) | |
gens = list(set(map(lambda x: x[1], img_results))) | |
print('Saving generated frames as video outputs') | |
for gen in gens: | |
if gen == 'depth' and custom_depthmap is not None: | |
# Well, that would be extra stupid, even if user has picked this option for some reason | |
# (forgot to change the default?) | |
continue | |
imgs = [x[2] for x in img_results if x[1] == gen] | |
basename = f'{gen}_video' | |
frames_to_video(fps, imgs, outpath, f"depthmap-{backbone.get_next_sequence_number(outpath, basename)}-{basename}", | |
colorvids_bitrate) | |
print('All done. Video(s) saved!') | |
return '<h3>Videos generated</h3>' if len(gens) > 1 else '<h3>Video generated</h3>' if len(gens) == 1 \ | |
else '<h3>Nothing generated - please check the settings and try again</h3>' | |