|
import gradio as gr |
|
|
|
""" |
|
===================================================== |
|
Optical Flow: Predicting movement with the RAFT model |
|
===================================================== |
|
|
|
Optical flow is the task of predicting movement between two images, usually two |
|
consecutive frames of a video. Optical flow models take two images as input, and |
|
predict a flow: the flow indicates the displacement of every single pixel in the |
|
first image, and maps it to its corresponding pixel in the second image. Flows |
|
are (2, H, W)-dimensional tensors, where the first axis corresponds to the |
|
predicted horizontal and vertical displacements. |
|
|
|
The following example illustrates how torchvision can be used to predict flows |
|
using our implementation of the RAFT model. We will also see how to convert the |
|
predicted flows to RGB images for visualization. |
|
""" |
|
|
|
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel |
|
from diffusers import UniPCMultistepScheduler |
|
|
|
import cv2 |
|
import numpy as np |
|
import os |
|
import sys |
|
import torch |
|
from PIL import Image |
|
import matplotlib.pyplot as plt |
|
import torchvision.transforms.functional as F |
|
from torchvision.io import read_video, read_image, ImageReadMode |
|
from torchvision.models.optical_flow import Raft_Large_Weights |
|
from torchvision.models.optical_flow import raft_large |
|
from torchvision.io import write_jpeg |
|
import torchvision.transforms as T |
|
|
|
import tempfile |
|
from pathlib import Path |
|
from urllib.request import urlretrieve |
|
|
|
from scipy.interpolate import LinearNDInterpolator |
|
from imageio import imread, imwrite |
|
|
|
|
|
|
|
low_threshold = 100 |
|
high_threshold = 200 |
|
|
|
|
|
controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny", torch_dtype=torch.float16) |
|
pipe = StableDiffusionControlNetPipeline.from_pretrained( |
|
"runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None, torch_dtype=torch.float16 |
|
) |
|
pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config) |
|
|
|
|
|
|
|
pipe.enable_model_cpu_offload() |
|
|
|
pipe.enable_xformers_memory_efficient_attention() |
|
|
|
|
|
generator = torch.manual_seed(0) |
|
|
|
def get_canny_filter(image): |
|
if not isinstance(image, np.ndarray): |
|
image = np.array(image) |
|
|
|
image = cv2.Canny(image, low_threshold, high_threshold) |
|
image = image[:, :, None] |
|
image = np.concatenate([image, image, image], axis=2) |
|
canny_image = Image.fromarray(image) |
|
return canny_image |
|
|
|
|
|
def generate_images(prompt, canny_image): |
|
|
|
output = pipe( |
|
prompt, |
|
canny_image, |
|
generator=generator, |
|
num_images_per_prompt=1, |
|
num_inference_steps=20, |
|
) |
|
all_outputs = [] |
|
all_outputs.append(canny_image) |
|
for image in output.images: |
|
all_outputs.append(image) |
|
return all_outputs |
|
|
|
|
|
def write_flo(flow, filename): |
|
""" |
|
Write optical flow in Middlebury .flo format |
|
|
|
:param flow: optical flow map |
|
:param filename: optical flow file path to be saved |
|
:return: None |
|
|
|
from https://github.com/liruoteng/OpticalFlowToolkit/ |
|
|
|
""" |
|
|
|
flow = flow.cpu().data.numpy() |
|
flow = flow.astype(np.float32) |
|
f = open(filename, 'wb') |
|
magic = np.array([202021.25], dtype=np.float32) |
|
(height, width) = flow.shape[0:2] |
|
w = np.array([width], dtype=np.int32) |
|
h = np.array([height], dtype=np.int32) |
|
magic.tofile(f) |
|
w.tofile(f) |
|
h.tofile(f) |
|
flow.tofile(f) |
|
f.close() |
|
|
|
|
|
|
|
def infer(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
prompt = "astronaut crew inside a spaceship, artwork by Claude Monet, beautiful details" |
|
|
|
pil2diff_img = Image.open("./frame1.jpg") |
|
canny_image = get_canny_filter(pil2diff_img) |
|
diffused_img = generate_images(prompt, canny_image) |
|
print(f"DIFFUSED IMG: {diffused_img[1]}") |
|
|
|
diffused_img[1].save("diffused_input1.jpg") |
|
|
|
pil2diff_img2 = Image.open("./frame2.jpg") |
|
canny_image2 = get_canny_filter(pil2diff_img2) |
|
|
|
canny_image.save("canny1.jpg") |
|
canny_image2.save("canny2.jpg") |
|
input_frame_1 = read_image(str("diffused_input1.jpg"), ImageReadMode.UNCHANGED) |
|
print(f"FRAME 1: {input_frame_1}") |
|
input_frame_2 = read_image(str("./frame2.jpg"), ImageReadMode.UNCHANGED) |
|
print(f"FRAME 1: {input_frame_2}") |
|
|
|
|
|
|
|
|
|
img1_batch = torch.stack([input_frame_1]) |
|
img2_batch = torch.stack([input_frame_2]) |
|
|
|
print(f"FRAME AFTER stack: {img1_batch}") |
|
|
|
weights = Raft_Large_Weights.DEFAULT |
|
transforms = weights.transforms() |
|
|
|
|
|
def preprocess(img1_batch, img2_batch): |
|
img1_batch = F.resize(img1_batch, size=[520, 960]) |
|
img2_batch = F.resize(img2_batch, size=[520, 960]) |
|
return transforms(img1_batch, img2_batch) |
|
|
|
|
|
img1_batch, img2_batch = preprocess(img1_batch, img2_batch) |
|
|
|
print(f"shape = {img1_batch.shape}, dtype = {img1_batch.dtype}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
model = raft_large(weights=Raft_Large_Weights.DEFAULT, progress=False).to(device) |
|
model = model.eval() |
|
|
|
list_of_flows = model(img1_batch.to(device), img2_batch.to(device)) |
|
print(f"list_of_flows type = {type(list_of_flows)}") |
|
print(f"list_of_flows length = {len(list_of_flows)} = number of iterations of the model") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
predicted_flows = list_of_flows[-1] |
|
print(f"predicted_flows dtype = {predicted_flows.dtype}") |
|
print(f"predicted_flows shape = {predicted_flows.shape} = (N, 2, H, W)") |
|
print(f"predicted_flows min = {predicted_flows.min()}, predicted_flows max = {predicted_flows.max()}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from torchvision.utils import flow_to_image |
|
|
|
|
|
|
|
|
|
|
|
predicted_flow = list_of_flows[-1][0] |
|
print(f"predicted flow dtype = {predicted_flow.dtype}") |
|
print(f"predicted flow shape = {predicted_flow.shape}") |
|
|
|
flow_img = flow_to_image(predicted_flow).to("cpu") |
|
write_jpeg(flow_img, f"predicted_flow.jpg") |
|
|
|
flo_file = write_flo(predicted_flow, "flofile.flo") |
|
|
|
|
|
transform = T.ToPILImage() |
|
|
|
|
|
|
|
img = transform(input_frame_2) |
|
img = img.resize((960, 520)) |
|
|
|
|
|
frame2pil = np.array(img.convert('RGB')) |
|
print(f"frame1pil: {frame2pil}") |
|
print(f"frame1pil shape: {frame2pil.shape}") |
|
print(f"frame1pil dtype: {frame2pil.dtype}") |
|
img.save('raw_frame2.jpg') |
|
|
|
|
|
numpy_array_flow = predicted_flow.permute(1, 2, 0).detach().cpu().numpy() |
|
print(f"numpy_array_flow: {numpy_array_flow}") |
|
print(f"numpy_array_flow shape: {numpy_array_flow.shape}") |
|
print(f"numpy_array_flow dtype: {numpy_array_flow.dtype}") |
|
|
|
h, w = numpy_array_flow.shape[:2] |
|
numpy_array_flow = numpy_array_flow.copy() |
|
numpy_array_flow[:, :, 0] += np.arange(w) |
|
numpy_array_flow[:, :, 1] += np.arange(h)[:, np.newaxis] |
|
|
|
|
|
numpy_array_flow*=1. |
|
|
|
|
|
res = cv2.remap(frame2pil, numpy_array_flow, None, cv2.INTER_LANCZOS4) |
|
print(res) |
|
|
|
res = Image.fromarray(res) |
|
res.save('wraped.jpg') |
|
|
|
blend2 = Image.open('raw_frame2.jpg') |
|
blend2 = Image.blend(res,blend2,0.5) |
|
blend2.save("blended2.jpg") |
|
|
|
pil2diff_blend = Image.open("blended2.jpg") |
|
canny_image = get_canny_filter(pil2diff_blend) |
|
diffused_blend = generate_images(prompt, canny_image) |
|
print(f"DIFFUSED IMG: {diffused_blend[1]}") |
|
|
|
diffused_blend[1].save("diffused_blended_2.jpg") |
|
|
|
return "done", "predicted_flow.jpg", ["flofile.flo"], "diffused_input1.jpg", "diffused_blended_2.jpg", 'wraped.jpg', "blended2.jpg" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gr.Interface(fn=infer, inputs=[], outputs=[gr.Textbox(), gr.Image(label="flow"), gr.Files(), gr.Image(label="diffused input"), gr.Image(), gr.Image(label="wraped flow to img2"), gr.Image(label="blended result to diffuse")]).launch() |