|
import gradio as gr |
|
|
|
""" |
|
===================================================== |
|
Optical Flow: Predicting movement with the RAFT model |
|
===================================================== |
|
|
|
Optical flow is the task of predicting movement between two images, usually two |
|
consecutive frames of a video. Optical flow models take two images as input, and |
|
predict a flow: the flow indicates the displacement of every single pixel in the |
|
first image, and maps it to its corresponding pixel in the second image. Flows |
|
are (2, H, W)-dimensional tensors, where the first axis corresponds to the |
|
predicted horizontal and vertical displacements. |
|
|
|
The following example illustrates how torchvision can be used to predict flows |
|
using our implementation of the RAFT model. We will also see how to convert the |
|
predicted flows to RGB images for visualization. |
|
""" |
|
|
|
import cv2 |
|
import numpy as np |
|
import os |
|
import sys |
|
import torch |
|
import matplotlib.pyplot as plt |
|
import torchvision.transforms.functional as F |
|
from torchvision.io import read_video |
|
from torchvision.models.optical_flow import Raft_Large_Weights |
|
from torchvision.models.optical_flow import raft_large |
|
from torchvision.io import write_jpeg |
|
|
|
import tempfile |
|
from pathlib import Path |
|
from urllib.request import urlretrieve |
|
|
|
def write_flo(flow, filename): |
|
""" |
|
Write optical flow in Middlebury .flo format |
|
|
|
:param flow: optical flow map |
|
:param filename: optical flow file path to be saved |
|
:return: None |
|
|
|
from https://github.com/liruoteng/OpticalFlowToolkit/ |
|
|
|
""" |
|
|
|
flow = flow.cpu().data.numpy() |
|
flow = flow.astype(np.float32) |
|
f = open(filename, 'wb') |
|
magic = np.array([202021.25], dtype=np.float32) |
|
(height, width) = flow.shape[0:2] |
|
w = np.array([width], dtype=np.int32) |
|
h = np.array([height], dtype=np.int32) |
|
magic.tofile(f) |
|
w.tofile(f) |
|
h.tofile(f) |
|
flow.tofile(f) |
|
f.close() |
|
|
|
def get_pixel_value(img, x, y): |
|
""" |
|
Utility function to get pixel value for coordinate |
|
vectors x and y from a 4D tensor image. |
|
Input |
|
----- |
|
- img: tensor of shape (B, H, W, C) |
|
- x: flattened tensor of shape (B*H*W, ) |
|
- y: flattened tensor of shape (B*H*W, ) |
|
Returns |
|
------- |
|
- output: tensor of shape (B, H, W, C) |
|
""" |
|
shape = tf.shape(x) |
|
batch_size = shape[0] |
|
height = shape[1] |
|
width = shape[2] |
|
|
|
batch_idx = tf.range(0, batch_size) |
|
batch_idx = tf.reshape(batch_idx, (batch_size, 1, 1)) |
|
b = tf.tile(batch_idx, (1, height, width)) |
|
|
|
indices = tf.stack([b, y, x], 3) |
|
|
|
return tf.gather_nd(img, indices) |
|
|
|
def tf_warp(img, flow, H, W): |
|
|
|
|
|
x,y = tf.meshgrid(tf.range(W), tf.range(H)) |
|
x = tf.expand_dims(x,0) |
|
x = tf.expand_dims(x,0) |
|
|
|
y =tf.expand_dims(y,0) |
|
y = tf.expand_dims(y,0) |
|
|
|
x = tf.cast(x, tf.float32) |
|
y = tf.cast(y, tf.float32) |
|
grid = tf.concat([x,y],axis = 1) |
|
|
|
flows = grid+flow |
|
print(flows.shape) |
|
max_y = tf.cast(H - 1, tf.int32) |
|
max_x = tf.cast(W - 1, tf.int32) |
|
zero = tf.zeros([], dtype=tf.int32) |
|
|
|
x = flows[:,0,:,:] |
|
y = flows[:,1,:,:] |
|
x0 = x |
|
y0 = y |
|
x0 = tf.cast(x0, tf.int32) |
|
x1 = x0 + 1 |
|
y0 = tf.cast(y0, tf.int32) |
|
y1 = y0 + 1 |
|
|
|
|
|
x0 = tf.clip_by_value(x0, zero, max_x) |
|
x1 = tf.clip_by_value(x1, zero, max_x) |
|
y0 = tf.clip_by_value(y0, zero, max_y) |
|
y1 = tf.clip_by_value(y1, zero, max_y) |
|
|
|
|
|
Ia = get_pixel_value(img, x0, y0) |
|
Ib = get_pixel_value(img, x0, y1) |
|
Ic = get_pixel_value(img, x1, y0) |
|
Id = get_pixel_value(img, x1, y1) |
|
|
|
|
|
x0 = tf.cast(x0, tf.float32) |
|
x1 = tf.cast(x1, tf.float32) |
|
y0 = tf.cast(y0, tf.float32) |
|
y1 = tf.cast(y1, tf.float32) |
|
|
|
|
|
|
|
wa = (x1-x) * (y1-y) |
|
wb = (x1-x) * (y-y0) |
|
wc = (x-x0) * (y1-y) |
|
wd = (x-x0) * (y-y0) |
|
|
|
|
|
wa = tf.expand_dims(wa, axis=3) |
|
wb = tf.expand_dims(wb, axis=3) |
|
wc = tf.expand_dims(wc, axis=3) |
|
wd = tf.expand_dims(wd, axis=3) |
|
|
|
|
|
out = tf.add_n([wa*Ia, wb*Ib, wc*Ic, wd*Id]) |
|
return out |
|
|
|
def infer(): |
|
video_url = "https://download.pytorch.org/tutorial/pexelscom_pavel_danilyuk_basketball_hd.mp4" |
|
video_path = Path(tempfile.mkdtemp()) / "basketball.mp4" |
|
_ = urlretrieve(video_url, video_path) |
|
|
|
|
|
|
|
|
|
frames, _, _ = read_video(str(video_path), output_format="TCHW") |
|
|
|
img1_batch = torch.stack([frames[100]]) |
|
img2_batch = torch.stack([frames[101]]) |
|
|
|
|
|
|
|
weights = Raft_Large_Weights.DEFAULT |
|
transforms = weights.transforms() |
|
|
|
|
|
def preprocess(img1_batch, img2_batch): |
|
img1_batch = F.resize(img1_batch, size=[520, 960]) |
|
img2_batch = F.resize(img2_batch, size=[520, 960]) |
|
return transforms(img1_batch, img2_batch) |
|
|
|
|
|
img1_batch, img2_batch = preprocess(img1_batch, img2_batch) |
|
|
|
print(f"shape = {img1_batch.shape}, dtype = {img1_batch.dtype}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
model = raft_large(weights=Raft_Large_Weights.DEFAULT, progress=False).to(device) |
|
model = model.eval() |
|
|
|
list_of_flows = model(img1_batch.to(device), img2_batch.to(device)) |
|
print(f"type = {type(list_of_flows)}") |
|
print(f"length = {len(list_of_flows)} = number of iterations of the model") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
predicted_flows = list_of_flows[-1] |
|
print(f"dtype = {predicted_flows.dtype}") |
|
print(f"shape = {predicted_flows.shape} = (N, 2, H, W)") |
|
print(f"min = {predicted_flows.min()}, max = {predicted_flows.max()}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from torchvision.utils import flow_to_image |
|
|
|
|
|
|
|
|
|
|
|
predicted_flow = list_of_flows[-1][0] |
|
flow_img = flow_to_image(predicted_flow).to("cpu") |
|
|
|
write_jpeg(flow_img, f"predicted_flow.jpg") |
|
flo_file = write_flo(predicted_flow, "flofile.flo") |
|
|
|
with tf.Session() as sess: |
|
a = tf.placeholder(tf.float32, shape = [None,None,None,3]) |
|
flow_vec = tf.placeholder(tf.float32, shape = [None, 2, None, None]) |
|
init = tf.global_variables_initializer() |
|
sess.run(init) |
|
|
|
output = tf_warp(a, predicted_flow, 520, 960) |
|
out = sess.run(output, feed_dict = {a:img, flow_vec:flow}) |
|
out = np.clip(out,0,255).astype('uint8') |
|
|
|
im = Image.fromarray(out[0].astype('uint8')) |
|
im.save('output.jpg') |
|
return "done", "predicted_flow.jpg", ["flofile.flo"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gr.Interface(fn=infer, inputs=[], outputs=[gr.Textbox(), gr.Image(), gr.Files()]).launch() |