Spaces:
Runtime error
Runtime error
File size: 3,740 Bytes
1e87f84 9e3c23c 1e87f84 9e3c23c 1e87f84 9e3c23c 1e87f84 9e3c23c 1e87f84 9e3c23c d8653f1 9e3c23c d8653f1 9e3c23c 6b89aad 9e3c23c 6b89aad 9e3c23c 6b89aad 9e3c23c d8653f1 9e3c23c 1e87f84 d8653f1 6b89aad 9e3c23c a8bf6c1 9e3c23c 6b89aad 9e3c23c 6b89aad 9e3c23c 6b89aad 9e3c23c 6b89aad 9e3c23c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
from typing import List, Tuple
import cv2
import numpy as np
import pandas as pd
import torch
from torch import Tensor
from transformers import AutoFeatureExtractor, TimesformerForVideoClassification
from utils.img_container import ImgContainer
def load_model(model_name: str):
if "base-finetuned-k400" in model_name or "base-finetuned-k600" in model_name:
feature_extractor = AutoFeatureExtractor.from_pretrained(
"MCG-NJU/videomae-base-finetuned-kinetics"
)
else:
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)
model = TimesformerForVideoClassification.from_pretrained(model_name)
return feature_extractor, model
def inference():
if not img_container.ready:
return
inputs = feature_extractor(list(img_container.imgs), return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
logits: Tensor = outputs.logits
# model predicts one of the 400 Kinetics-400 classes
max_index = logits.argmax(-1).item()
predicted_label = model.config.id2label[max_index]
img_container.frame_rate.label = f"{predicted_label}_{logits[0][max_index]:.2f}%"
TOP_K = 12
# logits = np.squeeze(logits)
logits = logits.squeeze().numpy()
indices = np.argsort(logits)[::-1][:TOP_K]
values = logits[indices]
results: List[Tuple[str, float]] = []
for index, value in zip(indices, values):
predicted_label = model.config.id2label[index]
# print(f"Label: {predicted_label} - {value:.2f}%")
results.append((predicted_label, value))
img_container.rs = pd.DataFrame(results, columns=("Label", "Confidence"))
def get_frames_per_video(model_name: str) -> int:
if "base-finetuned" in model_name:
return 8
elif "hr-finetuned" in model_name:
return 16
else:
return 96
model_name = "facebook/timesformer-base-finetuned-k400"
# "facebook/timesformer-base-finetuned-k400"
# "facebook/timesformer-base-finetuned-k600",
# "facebook/timesformer-base-finetuned-ssv2",
# "facebook/timesformer-hr-finetuned-k600",
# "facebook/timesformer-hr-finetuned-k400",
# "facebook/timesformer-hr-finetuned-ssv2",
# "fcakyon/timesformer-large-finetuned-k400",
# "fcakyon/timesformer-large-finetuned-k600",
feature_extractor, model = load_model(model_name)
frames_per_video = get_frames_per_video(model_name)
print(f"Frames per video: {frames_per_video}")
img_container = ImgContainer(frames_per_video)
SKIP_FRAMES = 4
num_skips = 0
# define a video capture object
camera = cv2.VideoCapture(0)
frame_width = int(camera.get(3))
frame_height = int(camera.get(4))
size = (frame_width, frame_height)
video_output = cv2.VideoWriter(
"activities.mp4", cv2.VideoWriter_fourcc(*"MJPG"), 10, size
)
if camera.isOpened() == False:
print("Error reading video file")
while camera.isOpened():
# Capture the video frame
# by frame
ret, frame = camera.read()
num_skips = (num_skips + 1) % SKIP_FRAMES
img_container.img = frame
img_container.frame_rate.count()
if num_skips == 0:
img_container.add_frame(frame)
# inference()
rs = img_container.frame_rate.show_fps(frame, img_container.is_recording)
# Display the resulting frame
cv2.imshow("ActivityTracking", rs)
if img_container.is_recording:
video_output.write(rs)
# the 'q' button is set as the
# quitting button you may use any
# desired button of your choice
k = cv2.waitKey(1)
if k == ord("q"):
break
elif k == ord("r"):
img_container.toggle_recording()
# After the loop release the cap object
camera.release()
video_output.release()
# Destroy all the windows
cv2.destroyAllWindows()
|