fffiloni's picture
Upload 164 files
2ada650 verified
raw
history blame
47.6 kB
"""
Copyright (c) 2022, salesforce.com, inc.
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
For full license text, see the LICENSE_Lavis file in the repo root or https://opensource.org/licenses/BSD-3-Clause
"""
import os
from collections import OrderedDict
import sys
from minigpt4.datasets.datasets.base_dataset import BaseDataset
from PIL import Image
import random
import json
import cv2
import torch
import torchvision.transforms as transforms
import numpy as np
import webvtt
import math
from moviepy.editor import VideoFileClip
from minigpt4.processors.blip_processors import Blip2ImageTrainProcessor,BlipCaptionProcessor
import pickle
import time
from decord import VideoReader, cpu, gpu
from tqdm import tqdm
import pysrt
import chardet
import re
def duration_to_seconds(duration_str):
duration_str = duration_str[2:] # Removing 'PT' prefix
seconds = 0
if 'H' in duration_str:
hours_str = duration_str.split('H')[0]
seconds += int(hours_str) * 3600
duration_str = duration_str.split('H')[1]
if 'M' in duration_str:
minutes_str = duration_str.split('M')[0]
seconds += int(minutes_str) * 60
duration_str = duration_str.split('M')[1]
if 'S' in duration_str:
seconds_str = duration_str.split('S')[0]
seconds += int(seconds_str)
return seconds
def extract_audio(video_path, audio_path):
video_clip = VideoFileClip(video_path)
audio_clip = video_clip.audio
audio_clip.write_audiofile(audio_path, codec="libmp3lame", bitrate="320k")
def generate_subtitles(video_path,existed_subtitles):
video_id=video_path.split('/')[-1].split('.')[0]
audio_path = f"workspace/misssing_eval_subtitles/mp3/{video_id}"+'.mp3'
if existed_subtitles.get(video_id,False):
print("subtitle already generated")
return f"workspace/misssing_eval_subtitles/{video_id}"+'.vtt'
try:
extract_audio(video_path,audio_path)
print("successfully extracted")
os.system(f"whisper {audio_path} --language English --model large --output_format vtt --output_dir workspace/misssing_eval_subtitles")
# remove the audio file
os.system(f"rm {audio_path}")
print("subtitle successfully generated")
return f"workspace/misssing_eval_subtitles/{video_id}"+'.vtt'
except Exception as e:
print("error",video_path ,e)
return None
def read_subtitles(subtitle_path):
# read the subtitle file and detect the encoding
try:
with open(subtitle_path, 'rb') as f:
result = chardet.detect(f.read())
subs = pysrt.open(subtitle_path, encoding=result['encoding'])
return subs
except:
return []
def srt_time_to_seconds(time):
return time.hours * 3600 + time.minutes * 60 + time.seconds + time.milliseconds / 1000
class __DisplMixin:
def displ_item(self, index):
sample, ann = self.__getitem__(index), self.annotation[index]
return OrderedDict(
{
"file": ann["image"],
"caption": ann["caption"],
"image": sample["image"],
}
)
class CMDVideoDataset(BaseDataset, __DisplMixin):
def __init__(self, vis_processor, text_processor, vis_root, ann_paths, cc_path):
"""
vis_root (string): Root directory of images (e.g. coco/images/)
ann_root (string): directory to store the annotation file
"""
super().__init__(vis_processor, text_processor, vis_root, ann_paths)
self.instruction_pool = [
'Describe this video.',
'Provide a concise depiction of this video.',
'Present a description of this video.',
'Summarize this video.',
'Generate video caption:',
'Generate video description:',
'Write a description for the video.',
'Provide a description of what is presented in the video.',
'Describe the content of the video.',
'Can you explain what you see in the video?',
'Could you describe what you perceive in the video?',
'Please provide a depiction of the video.',
'Illustrate what is happening in the video.',
]
self.img_ids = {}
n = 0
self.length = 90
for ann in self.annotation:
img_id = ann["image_id"]
if img_id not in self.img_ids.keys():
self.img_ids[img_id] = n
n += 1
self.cc = json.load(open(cc_path,'r'))
self.image_sep = "<Img>"
self.text_sep = "<Cap>"
def __getitem__(self, index):
ann = self.annotation[index]
video_id = ann["image_id"]
captions = self.cc[video_id] if video_id in self.cc else None
answer = self.text_processor(ann["caption"])
instruction = random.choice(self.instruction_pool)
images = []
img_placeholder = ""
num_of_images=len(os.listdir(os.path.join(self.vis_root, video_id)))
sampling_interval = int(num_of_images / self.length)
if sampling_interval == 0:
sampling_interval = 1
for frame_id in range(0,num_of_images,sampling_interval):
image_path = os.path.join(self.vis_root, video_id, f'frame_{frame_id}.jpg')
image = Image.open(image_path).convert("RGB")
image = self.vis_processor(image)
images.append(image)
img_placeholder += f"{self.image_sep}<ImageHere>"
time_step = str(frame_id * 2)
if captions is not None:
if time_step in captions:
img_placeholder += f"{self.text_sep}{captions[time_step]}"
if len(images) >= self.length:
break
if len(images) < self.length:
last_item = images[-1]
while len(images) < self.length:
images.append(last_item)
images = torch.stack(images)
instruction = f"{img_placeholder}\n{instruction}"
return {
"image": images,
"answer": answer,
"image_id": video_id,
"instruction_input": instruction,
"length": self.length,
}
class WebVidDataset(BaseDataset, __DisplMixin):
def __init__(self, vis_processor, text_processor, vis_root, ann_paths,subtitles_path,add_subtitles=False):
"""
vis_root (string): Root directory of images (e.g. coco/images/)
ann_root (string): directory to store the annotation file
"""
super().__init__(vis_processor, text_processor, vis_root, ann_paths)
self.instruction_pool = [
'Describe this video.',
'Provide a concise depiction of this video.',
'Present a description of this video.',
'Summarize this video.',
'Generate video caption:',
'Generate video description:',
'Write a description for the video.',
'Provide a description of what is presented in the video.',
'Describe the content of the video.',
'Can you explain what you see in the video?',
'Could you describe what you perceive in the video?',
'Please provide a depiction of the video.',
'Illustrate what is happening in the video.',
]
self.img_ids = {}
n = 0
self.length = 90
self.max_sub_len = 800
self.add_subtitles = add_subtitles
self.videos_has_subtitles = {}
if self.add_subtitles:
self.subtitle_folder = os.path.join(subtitles_path)
for sub in os.listdir(self.subtitle_folder):
video_id = sub.split('.')[0]
self.videos_has_subtitles[video_id] = True
for ann in self.annotation:
img_id = ann["videoid"]
if img_id not in self.img_ids.keys():
self.img_ids[img_id] = n
n += 1
self.transform = transforms.Compose([
transforms.ToPILImage(),
])
def __getitem__(self, index):
ann = self.annotation[index]
video_id = ann["videoid"]
images = []
caption = ann["name"].split('-')[-1].split(':')[-1]
# caption = self.text_processor(caption)
video_path = os.path.join(self.vis_root, ann['page_dir'], f'{video_id}.mp4')
has_subtitles = self.videos_has_subtitles.get(video_id, False)
if self.add_subtitles and has_subtitles:
subtitle_path = os.path.join(self.subtitle_folder, f'{video_id}.vtt')
# Load the VTT subtitle file
vtt_file = webvtt.read(subtitle_path)
cap = cv2.VideoCapture(video_path)
clip = VideoFileClip(video_path)
total_num_frames = int(clip.duration * clip.fps)
clip.close()
cap = cv2.VideoCapture(video_path)
images = []
frame_count = 0
sampling_interval = int(total_num_frames /self.length)
if sampling_interval == 0:
sampling_interval = 1
img_placeholder = ""
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_sub_words=0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Find the corresponding subtitle for the frame and combine the interval subtitles into one subtitle
# we choose 1 frame for every 2 seconds,so we need to combine the subtitles in the interval of 2 seconds
if self.add_subtitles and has_subtitles:
for subtitle in vtt_file:
sub=subtitle.text.replace('\n',' ')
if (subtitle.start_in_seconds <= (frame_count / int(clip.fps)) <= subtitle.end_in_seconds) and sub not in subtitle_text_in_interval:
if not history_subtitles.get(sub,False):
subtitle_text_in_interval+=sub+" "
history_subtitles[sub]=True
break
if frame_count % sampling_interval == 0:
frame = self.transform(frame[:,:,::-1])
frame = self.vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if self.add_subtitles and has_subtitles and subtitle_text_in_interval != "" and number_of_sub_words<self.max_sub_len:
img_placeholder+=f'<Cap>{subtitle_text_in_interval}'
number_of_sub_words+=len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
frame_count += 1
if len(images) >= self.length:
break
cap.release()
if len(images) < self.length:
last_item = images[-1]
while len(images) < self.length:
images.append(last_item)
img_placeholder += '<Img><ImageHere>'
images = torch.stack(images)
instruction = random.choice(self.instruction_pool)
instruction = img_placeholder + '\n' + instruction
return {
"image": images,
"answer": caption,
"image_id": video_id,
"instruction_input": instruction,
"length": self.length,
}
class VideoChatGPTDataset(BaseDataset, __DisplMixin):
def __init__(self, vis_processor, text_processor, vis_root, ann_paths,add_subtitles=True,llm_name="llama2"):
"""
vis_root (string): Root directory of images (e.g. coco/images/)
ann_root (string): directory to store the annotation file
"""
super().__init__(vis_processor, text_processor, vis_root, ann_paths)
self.img_ids = {}
n=0
self.length = 90
self.max_sub_len = 800
self.add_subtitles = add_subtitles
self.videos_has_subtitles = {}
if self.add_subtitles:
self.subtitle_folder = os.path.join(self.vis_root,'subtitles')
for sub in os.listdir(self.subtitle_folder):
video_id = sub.split('.')[0]
self.videos_has_subtitles[video_id] = True
for ann in self.annotation:
img_id = ann["video_id"]
if img_id not in self.img_ids.keys():
self.img_ids[img_id] = n
n+= 1
self.videos_extension={}
for video in os.listdir(os.path.join(self.vis_root,'videos')):
self.videos_extension[video.split('.')[0]]=video.split('.')[1]
self.transform = transforms.Compose([
transforms.ToPILImage(),
])
def __len__(self):
return len(self.annotation)
def __getitem__(self, index):
ann = self.annotation[index]
video_id = ann["video_id"]
answer=ann["a"]
instruction=ann["q"]
images=[]
img_placeholder = ""
has_subtitles = self.videos_has_subtitles.get(video_id, False)
if self.add_subtitles and has_subtitles:
subtitle_path = os.path.join(self.subtitle_folder, f'{video_id}.vtt')
# Load the VTT subtitle file
vtt_file = webvtt.read(subtitle_path)
video_path = os.path.join(self.vis_root,'videos',f'{video_id}.{self.videos_extension[video_id]}')
clip = VideoFileClip(video_path)
total_num_frames = int(clip.duration * clip.fps)
clip.close()
cap = cv2.VideoCapture(video_path)
frame_count = 0
sampling_interval = int(total_num_frames / self.length)
if sampling_interval == 0:
sampling_interval = 1
img_placeholder = ""
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_sub_words=0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Find the corresponding subtitle for the frame and combine the interval subtitles into one subtitle
# we choose 1 frame for every 2 seconds,so we need to combine the subtitles in the interval of 2 seconds
if self.add_subtitles and has_subtitles:
for subtitle in vtt_file:
sub=subtitle.text.replace('\n',' ')
if (subtitle.start_in_seconds <= (frame_count / int(clip.fps)) <= subtitle.end_in_seconds) and sub not in subtitle_text_in_interval:
if not history_subtitles.get(sub,False):
subtitle_text_in_interval+=sub+" "
history_subtitles[sub]=True
break
if frame_count % sampling_interval == 0:
frame = self.transform(frame[:,:,::-1])# BGR to RGB
frame = self.vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if self.add_subtitles and has_subtitles and number_of_sub_words<self.max_sub_len:
if subtitle_text_in_interval != "":
img_placeholder+=f'<Cap>{subtitle_text_in_interval}'
number_of_sub_words+=len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
frame_count += 1
if len(images) >= self.length:
break
cap.release()
if len(images) ==0:
print("Video not found",video_path)
if 0 <len(images) < self.length:
last_item = images[-1]
while len(images) < self.length:
images.append(last_item)
img_placeholder += '<Img><ImageHere>'
images = torch.stack(images)
instruction = img_placeholder + '\n' + instruction
return{
"image": images,
"answer": answer,
"image_id": video_id,
"instruction_input": instruction,
"length": self.length,
}
class CMDEvalDataset(torch.utils.data.Dataset):
def __init__(self, vis_processor, root_path, ann_path, length, fix=False, add_captions=False,cc_path="datasets/training_datasets/video_text_data/cmd/caption.json"):
self.root_path = root_path
self.vis_processor = vis_processor
self.length = length
with open(ann_path,'r') as f:
self.annotation=json.load(f)
self.fix = fix
if fix:
filtered_annotation = []
for ann in self.annotation:
if ann['length']>=self.length:
filtered_annotation.append(ann)
self.annotation = filtered_annotation
self.annotation = self.annotation
self.add_caption = add_captions
self.cc = json.load(open(cc_path,'r'))
self.image_sep = "<Img>"
self.text_sep = "<Cap>"
def __len__(self):
return len(self.annotation)
def __getitem__(self, idx):
ann = self.annotation[idx]
video_id = ann["image_id"]
images = []
subtitles=[]
length = min(self.length, ann['length'])
caption = ann["caption"]
instruction = "Write a detailed description for the video."
interleave = ""
captions = self.cc[video_id] if video_id in self.cc else None
for frame_id in range(length):
image_path = os.path.join(self.root_path, video_id, f'frame_{frame_id}.jpg')
image = Image.open(image_path).convert("RGB")
image = self.vis_processor(image).half().cuda()
images.append(image)
interleave += f"{self.image_sep}<ImageHere>"
time_step = str(frame_id* 2)
if captions is not None and self.add_caption:
caption_found=captions.get(time_step,False)
if caption_found:
interleave += f"{self.text_sep}{captions[time_step]}"
subtitles.append(captions[time_step])
if 0 < len(images) < self.length:
last_item = images[-1]
while len(images) < self.length:
images.append(last_item)
interleave += f"{self.image_sep}<ImageHere>"
instruction = f"{interleave}\n{instruction}"
images = torch.stack(images)
return images, instruction, caption, self.length,video_id
class WebVidEvalDataset(torch.utils.data.Dataset):
def __init__(self, vis_processor, root_path, ann_path, length, fix=False,add_captions=False):
self.root_path = root_path
self.vis_processor = vis_processor
self.length = length
with open(ann_path,'r') as f:
self.annotation=json.load(f)
self.fix = fix
if fix:
filtered_annotation = []
for ann in self.annotation:
if duration_to_seconds(ann['duration']) // 2 >= self.length:
filtered_annotation.append(ann)
self.annotation = filtered_annotation
self.transform = transforms.Compose([
transforms.ToPILImage(),
])
self.annotation = self.annotation
self.add_subtitles = add_captions
self.videos_has_subtitles = {}
if self.add_subtitles:
self.subtitle_folder = os.path.join("datasets/video_text_data/webvid/webvid_val_subtitles")
for sub in os.listdir(self.subtitle_folder):
video_id = sub.split('.')[0]
self.videos_has_subtitles[video_id] = True
def __len__(self):
return len(self.annotation)
def __getitem__(self, idx):
ann = self.annotation[idx]
video_id = ann["videoid"]
length = min(self.length, duration_to_seconds(ann['duration']) // 2)
caption = ann["name"]
video_path = os.path.join(self.root_path, ann['page_dir'], f'{video_id}.mp4')
has_subtitles = self.videos_has_subtitles.get(video_id, False)
if self.add_subtitles and has_subtitles:
subtitle_path = os.path.join(self.subtitle_folder, f'{video_id}.vtt')
# Load the VTT subtitle file
vtt_file = webvtt.read(subtitle_path)
cap = cv2.VideoCapture(video_path)
clip = VideoFileClip(video_path)
total_num_frames = int(clip.duration * clip.fps)
clip.close()
cap = cv2.VideoCapture(video_path)
images = []
frame_count = 0
sampling_interval = int(total_num_frames /self.length)
if sampling_interval == 0:
sampling_interval = 1
img_placeholder = ""
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_sub_words=0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Find the corresponding subtitle for the frame and combine the interval subtitles into one subtitle
# we choose 1 frame for every 2 seconds,so we need to combine the subtitles in the interval of 2 seconds
if self.add_subtitles and has_subtitles:
for subtitle in vtt_file:
sub=subtitle.text.replace('\n',' ')
if (subtitle.start_in_seconds <= (frame_count / int(cap.get(cv2.CAP_PROP_FPS))) <= subtitle.end_in_seconds) and sub not in subtitle_text_in_interval:
if not history_subtitles.get(sub,False):
subtitle_text_in_interval+=sub+" "
history_subtitles[sub]=True
break
if frame_count % sampling_interval == 0:
frame = self.transform(frame[:,:,::-1])
frame = self.vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if self.add_subtitles and has_subtitles and subtitle_text_in_interval != "" and number_of_sub_words<self.max_sub_len:
img_placeholder+=f'<Cap>{subtitle_text_in_interval}'
number_of_sub_words+=len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
frame_count += 1
if len(images) >= self.length:
break
cap.release()
instruction = "Write a description for the video."
video_found = True
if len(images) == 0:
images = torch.zeros(length, 3, 224, 224)
for i in range(length):
img_placeholder += '<Img><ImageHere>'
print("Video not found")
video_found = False
if len(images) < self.length:
last_item = images[-1]
while len(images) < self.length:
images.append(last_item)
img_placeholder += '<Img><ImageHere>'
images = torch.stack(images) if video_found else images
instruction = img_placeholder + '\n' + instruction
return images, instruction, caption, self.length,video_id
class VideoChatGPTEvalDataset(torch.utils.data.Dataset):
def __init__(self, vis_processor, videos_path, ann_path,subtitles_path,annotations_keys,videos_features_path,add_subtitles=True,llm_name="llama2"):
if llm_name=="llama2":
self.length = 45
self.max_sub_len = 400
else:
self.length = 90
self.max_sub_len = 800
self.add_subtitles = add_subtitles
self.vis_processor=vis_processor
self.videos_path=videos_path
self.question_key=annotations_keys[0]
self.answer_key=annotations_keys[1]
self.video_name_key=annotations_keys[2]
self.videos_extension={}
for video in os.listdir(self.videos_path):
self.videos_extension[video.split('.')[0]]=video.split('.')[1]
self.annotation=json.load(open(ann_path,'r'))
self.videos_has_subtitles = {}
if self.add_subtitles:
self.subtitle_folder = subtitles_path
for sub in os.listdir(self.subtitle_folder):
video_id = sub.split('.')[0]
self.videos_has_subtitles[video_id] = True
self.transform = transforms.Compose([
transforms.ToPILImage(),
])
self.videos_features_path=videos_features_path
def __len__(self):
return len(self.annotation)
def __getitem__(self, index):
ann = self.annotation[index]
video_id = ann[self.video_name_key]
answer=ann[self.answer_key]
instruction=ann[self.question_key]
images=[]
img_placeholder = ""
video_path = os.path.join(self.videos_path,f'{video_id}.{self.videos_extension[video_id]}')
cap = cv2.VideoCapture(video_path)
clip = VideoFileClip(video_path)
total_num_frames = int(clip.duration * clip.fps)
clip.close()
frame_count = 0
sampling_interval = int(total_num_frames / self.length)
if sampling_interval == 0:
sampling_interval = 1
subtitle_path=None
if self.add_subtitles :
subtitle_path = generate_subtitles(video_path,self.videos_has_subtitles)
if subtitle_path is not None:
# Load the VTT subtitle file
vtt_file = webvtt.read(subtitle_path)
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_sub_words=0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Find the corresponding subtitle for the frame and combine the interval subtitles into one subtitle
# we choose 1 frame for every 2 seconds,so we need to combine the subtitles in the interval of 2 seconds
if self.add_subtitles and subtitle_path is not None:
for subtitle in vtt_file:
sub=subtitle.text.replace('\n',' ')
if (subtitle.start_in_seconds <= (frame_count / int(cap.get(cv2.CAP_PROP_FPS))) <= subtitle.end_in_seconds) and sub not in subtitle_text_in_interval:
if not history_subtitles.get(sub,False):
subtitle_text_in_interval+=sub+" "
history_subtitles[sub]=True
break
if frame_count % sampling_interval == 0:
frame = self.transform(frame[:,:,::-1])
frame = self.vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if self.add_subtitles and subtitle_path is not None and number_of_sub_words<self.max_sub_len and subtitle_text_in_interval != "":
img_placeholder+=f'<Cap>{subtitle_text_in_interval}'
number_of_sub_words+=len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
frame_count += 1
if len(images) >= self.length:
break
cap.release()
if len(images) == 0:
print("Video not found")
print('Video path',video_path)
return None,None,None,None,None
if 0 <len(images) < self.length:
last_image = images[-1]
while len(images) < self.length:
images.append(last_image)
img_placeholder += '<Img><ImageHere>'
images = torch.stack(images)
instruction = img_placeholder + '\n' + instruction
return images,instruction,answer,self.length,video_id
class Video_validation_Dataset(torch.utils.data.Dataset):
def __init__(self, vis_processor, videos_path, ann_path,subtitles_path,annotations_keys,add_subtitles=True,llm_name="llama2"):
if llm_name=="llama2":
self.length = 45
self.max_sub_len = 400
else:
self.length = 90
self.max_sub_len = 800
self.add_subtitles = add_subtitles
self.vis_processor=vis_processor
self.videos_path=videos_path
self.question_key=annotations_keys[0]
self.answer_key=annotations_keys[1]
self.video_name_key=annotations_keys[2]
self.videos_extension={}
for video in os.listdir(self.videos_path):
self.videos_extension[video.split('.')[0]]=video.split('.')[1]
self.annotation=json.load(open(ann_path,'r'))
self.videos_has_subtitles = {}
if self.add_subtitles:
self.subtitle_folder = subtitles_path
for sub in os.listdir(self.subtitle_folder):
video_id = sub.split('.')[0]
self.videos_has_subtitles[video_id] = True
self.transform = transforms.Compose([
transforms.ToPILImage(),
])
def __len__(self):
return len(self.annotation)
def __getitem__(self, index):
ann = self.annotation[index]
video_id = ann[self.video_name_key]
answer=ann[self.answer_key]
instruction=ann[self.question_key]
video_path = os.path.join(self.videos_path,f'{video_id}.{self.videos_extension[video_id]}')
images=[]
img_placeholder = ""
cap = cv2.VideoCapture(video_path)
clip = VideoFileClip(video_path)
total_num_frames = int(clip.duration * clip.fps)
clip.close()
frame_count = 0
sampling_interval = int(total_num_frames / self.length)
if sampling_interval == 0:
sampling_interval = 1
subtitle_path=None
if self.add_subtitles :
subtitle_path = generate_subtitles(video_path,self.videos_has_subtitles)
if subtitle_path is not None:
# Load the VTT subtitle file
vtt_file = webvtt.read(subtitle_path)
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_sub_words=0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Find the corresponding subtitle for the frame and combine the interval subtitles into one subtitle
# we choose 1 frame for every 2 seconds,so we need to combine the subtitles in the interval of 2 seconds
if self.add_subtitles and subtitle_path is not None:
for subtitle in vtt_file:
sub=subtitle.text.replace('\n',' ')
if (subtitle.start_in_seconds <= (frame_count / int(cap.get(cv2.CAP_PROP_FPS))) <= subtitle.end_in_seconds) and sub not in subtitle_text_in_interval:
if not history_subtitles.get(sub,False):
subtitle_text_in_interval+=sub+" "
history_subtitles[sub]=True
break
if frame_count % sampling_interval == 0:
frame = self.transform(frame[:,:,::-1])
frame = self.vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if self.add_subtitles and subtitle_path is not None and number_of_sub_words<self.max_sub_len and subtitle_text_in_interval != "":
img_placeholder+=f'<Cap>{subtitle_text_in_interval}'
number_of_sub_words+=len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
frame_count += 1
if len(images) >= self.length:
break
cap.release()
if len(images) == 0:
print("Video not found")
print('Video path',video_path)
return None,None,None,None,None
if 0 <len(images) < self.length:
last_image = images[-1]
while len(images) < self.length:
images.append(last_image)
img_placeholder += '<Img><ImageHere>'
images = torch.stack(images)
instruction = img_placeholder + '\n' + instruction
return images,instruction,answer,self.length,video_id
class VideoChatGPTEval_consistancy(torch.utils.data.Dataset):
def __init__(self, vis_processor, videos_path, ann_path,subtitles_path,annotations_keys,add_subtitles=True,llm_name="llama2"):
if llm_name=="llama2":
self.length = 45
self.max_sub_len = 400
else:
self.length = 90
self.max_sub_len = 800
self.add_subtitles = add_subtitles
self.vis_processor=vis_processor
self.videos_path=videos_path
self.question1_key=annotations_keys[0][0]
self.question2_key=annotations_keys[0][1]
self.answer_key=annotations_keys[1]
self.video_name_key=annotations_keys[2]
self.videos_extension={}
for video in os.listdir(self.videos_path):
self.videos_extension[video.split('.')[0]]=video.split('.')[1]
self.annotation=json.load(open(ann_path,'r'))
self.videos_has_subtitles = {}
if self.add_subtitles:
self.subtitle_folder = subtitles_path
for sub in os.listdir(self.subtitle_folder):
video_id = sub.split('.')[0]
self.videos_has_subtitles[video_id] = True
self.transform = transforms.Compose([
transforms.ToPILImage(),
])
def __len__(self):
return len(self.annotation)
def __getitem__(self, index):
ann = self.annotation[index]
video_id = ann[self.video_name_key]
answer=ann[self.answer_key]
instruction_1=ann[self.question1_key]
instruction_2=ann[self.question2_key]
video_path = os.path.join(self.videos_path,f'{video_id}.{self.videos_extension[video_id]}')
cap = cv2.VideoCapture(video_path)
clip = VideoFileClip(video_path)
total_num_frames = int(clip.duration * clip.fps)
clip.close()
images = []
frame_count = 0
sampling_interval = int(total_num_frames / self.length)
if sampling_interval == 0:
sampling_interval = 1
subtitle_path=None
if self.add_subtitles :
subtitle_path = generate_subtitles(video_path,self.videos_has_subtitles)
if subtitle_path is not None:
# Load the VTT subtitle file
vtt_file = webvtt.read(subtitle_path)
img_placeholder = ""
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_sub_words=0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Find the corresponding subtitle for the frame and combine the interval subtitles into one subtitle
# we choose 1 frame for every 2 seconds,so we need to combine the subtitles in the interval of 2 seconds
if self.add_subtitles and subtitle_path is not None:
for subtitle in vtt_file:
sub=subtitle.text.replace('\n',' ')
if (subtitle.start_in_seconds <= (frame_count / int(cap.get(cv2.CAP_PROP_FPS))) <= subtitle.end_in_seconds) and sub not in subtitle_text_in_interval:
if not history_subtitles.get(sub,False):
subtitle_text_in_interval+=sub+" "
history_subtitles[sub]=True
break
if frame_count % sampling_interval == 0:
frame = self.transform(frame[:,:,::-1])
frame = self.vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if self.add_subtitles and subtitle_path is not None and number_of_sub_words<self.max_sub_len and subtitle_text_in_interval != "":
img_placeholder+=f'<Cap>{subtitle_text_in_interval}'
number_of_sub_words+=len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
frame_count += 1
if len(images) >= self.length:
break
cap.release()
if len(images) == 0:
print("Video not found")
print('Video path',video_path)
return None,None,None,None,None
if 0 <len(images) < self.length:
last_image = images[-1]
while len(images) < self.length:
images.append(last_image)
img_placeholder += '<Img><ImageHere>'
images = torch.stack(images)
instruction_1 = img_placeholder + '\n' + instruction_1
instruction_2 = img_placeholder + '\n' + instruction_2
return images,instruction_1,instruction_2,answer,self.length,video_id
class TVQAEVAL (torch.utils.data.Dataset):
def __init__(self, vis_processor, videos_path, ann_path,subtitles_path,videos_features_path,add_subtitles=True,llm_name="llama2"):
self.tv_shows_mapping={"Grey's Anatomy":"grey_frames", 'How I Met You Mother':"met_frames", 'Friends':"friends_frames", 'The Big Bang Theory':"bbt_frames", 'House M.D.':"house_frames", 'Castle':"castle_frames"}
self.fps=3
if llm_name=="llama2":
self.length = 45
self.max_sub_len = 400
else:
self.length = 90
self.max_sub_len = 800
self.add_subtitles = add_subtitles
self.vis_processor=vis_processor
self.videos_path=videos_path
with open(ann_path,'r') as f:
self.annotation=json.load(f)
with open(subtitles_path,'r') as f:
self.subtitles_list=json.load(f)
self.subtitles={}
for sub in self.subtitles_list:
self.subtitles[sub["vid_name"]]=sub["sub"]
self.transform = transforms.Compose([
transforms.ToPILImage(),
])
self.videos_features_path=videos_features_path
self.processed_videos={}
self.save_pkl="subtitles" if self.add_subtitles else "no_subtitles"
for video_pkl in os.listdir(videos_features_path):
video_id_sub=video_pkl.split('.')[0]
self.processed_videos[video_id_sub]=True
def __len__(self):
return len(self.annotation)
def __getitem__(self, index):
ann = self.annotation[index]
video_id = ann["vid_name"]
answer=str(ann['answer_idx'])
folder_name=self.tv_shows_mapping[ann["show_name"]]
instruction=ann["q"]+" \n\n As you watched in this video Choose ONE suitable answer from these mutiple choices \n\n"
for i in range(5):
ans=ann[f"a{i}"]
instruction+=f"option {i}: {ans} \n\n"
instruction+="\n Your output should be THE NUMBER OF THE CORRECT ANSWER FROM THE CHOICES FROM 0 TO 4 INCLUSIVE"
images=[]
img_placeholder = ""
video_frames_path = os.path.join(self.videos_path,folder_name,video_id)
total_num_frames=len(os.listdir(video_frames_path))
sampling_interval = round(total_num_frames / self.length)
if sampling_interval == 0:
sampling_interval = 1
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_sub_words=0
for i,frame in enumerate(sorted(os.listdir(video_frames_path))):
# Find the corresponding subtitle for the frame and combine the interval subtitles into one subtitle
# we choose 1 frame for every 2 seconds,so we need to combine the subtitles in the interval of 2 seconds
if self.add_subtitles:
for subtitle in self.subtitles[video_id]:
if (subtitle['start'] <= (i / self.fps) <= subtitle['end']) and subtitle['text'] not in subtitle_text_in_interval:
if not history_subtitles.get(subtitle['text'],False):
subtitle_text_in_interval+=subtitle['text']+" "
history_subtitles[subtitle['text']]=True
break
if i % sampling_interval == 0:
frame = Image.open(os.path.join(video_frames_path,frame)).convert("RGB")
frame = self.vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if self.add_subtitles and number_of_sub_words<self.max_sub_len:
if subtitle_text_in_interval != "":
img_placeholder+=f'<Cap>{subtitle_text_in_interval}'
number_of_sub_words+=len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
if len(images) >= self.length:
break
if len(images) ==0:
print("Video not found",video_frames_path)
if 0 <len(images) < self.length:
last_item = images[-1]
while len(images) < self.length:
images.append(last_item)
img_placeholder += '<Img><ImageHere>'
images = torch.stack(images)
instruction = img_placeholder + '\n' + instruction
return images,instruction,answer,self.length,video_id
class TVQAEVAL_Long (torch.utils.data.Dataset):
def __init__(self, vis_processor, videos_path, ann_path,subtitles_path,videos_features_path,add_subtitles=False,llm_name="llama2"):
self.tv_shows_mapping={"Grey's Anatomy":"grey_frames", 'How I Met You Mother':"met_frames", 'Friends':"friends_frames", 'The Big Bang Theory':"bbt_frames", 'House M.D.':"house_frames", 'Castle':"castle_frames"}
self.fps=3
if llm_name=="llama2":
self.length = 45
self.max_sub_len = 400
else:
self.length = 90
self.max_sub_len = 800
self.add_subtitles = add_subtitles
self.vis_processor=vis_processor
self.videos_path=videos_path
self.subtitles_path=subtitles_path
with open(ann_path,'r') as f:
self.annotation=json.load(f)
self.transform = transforms.Compose([
transforms.ToPILImage(),
])
self.videos_features_path=videos_features_path
self.processed_videos={}
self.save_pkl="subtitles" if self.add_subtitles else "no_subtitles"
for video_pkl in os.listdir(videos_features_path):
video_id_sub=video_pkl.split('.')[0]
self.processed_videos[video_id_sub]=True
def extract_season_episode(self,video_name):
# Define a regex pattern to match season and episode numbers
pattern = r's(\d+)e(\d+)'
# Use re.search to find the pattern in the video name
match = re.search(pattern, video_name, re.IGNORECASE)
if match:
# Extract season and episode numbers from the matched groups
season_number = int(match.group(1))
episode_number = int(match.group(2))
return f"season_{season_number}", f"episode_{episode_number}"
else:
# Return None if the pattern is not found
return None, None
def __len__(self):
return len(self.annotation)
def __getitem__(self, index):
ann = self.annotation[index]
season_number,episode_number=self.extract_season_episode(ann["vid_name"])
folder_name=self.tv_shows_mapping[ann["show_name"]]
self.videos_path
video_id = f"{folder_name}_{season_number}_{episode_number}"
answer=str(ann['answer_idx'])
instruction=ann["q"]+" \n\n As you watched in this video Choose ONE suitable answer from these mutiple choices \n\n"
for i in range(5):
ans=ann[f"a{i}"]
instruction+=f"option {i}: {ans} \n\n"
# instruction+="\n Your output should be THE NUMBER OF THE CORRECT ANSWER FROM THE CHOICES FROM 0 TO 4 INCLUSIVE"
instruction+=f"option 5: Can't answer based on the provided information \n\n"
instruction+="\n Your output should be THE NUMBER OF THE CORRECT ANSWER FROM THE CHOICES FROM 0 TO 5 INCLUSIVE"
images=[]
img_placeholder = ""
if self.processed_videos.get(f"{video_id}_{self.save_pkl}",False):
with open(f"{self.videos_features_path}/{video_id}_{self.save_pkl}.pkl",'rb') as f:
data=pickle.load(f)
images=data['images']
img_placeholder = data['img_placeholder']
else:
video_frames_path = os.path.join(self.videos_path,folder_name,season_number,episode_number)
video_subtitle_path=os.path.join(self.subtitles_path,folder_name,season_number,episode_number+".srt")
video_subtitles=read_subtitles(video_subtitle_path)
total_num_frames=len(os.listdir(video_frames_path))
sampling_interval = round(total_num_frames / self.length)
if sampling_interval == 0:
sampling_interval = 1
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_sub_words=0
number_of_interval_words=0
max_number_of_interval_words=10
for i,frame in enumerate(sorted(os.listdir(video_frames_path))):
# Find the corresponding subtitle for the frame and combine the interval subtitles into one subtitle
# we choose 1 frame for every 2 seconds,so we need to combine the subtitles in the interval of 2 seconds
if self.add_subtitles:
for subtitle in video_subtitles:
if (srt_time_to_seconds(subtitle.start) <= (i / self.fps) <= srt_time_to_seconds(subtitle.end)) and subtitle.text not in subtitle_text_in_interval:
if not history_subtitles.get(subtitle.text,False) and number_of_interval_words<max_number_of_interval_words:
subtitle_text_in_interval+=subtitle.text+" "
number_of_interval_words+=len(subtitle.text.split(' '))
history_subtitles[subtitle.text]=True
break
if i % sampling_interval == 0:
frame = Image.open(os.path.join(video_frames_path,frame)).convert("RGB")
frame = self.vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if self.add_subtitles and number_of_sub_words<self.max_sub_len:
if subtitle_text_in_interval != "":
img_placeholder+=f'<Cap>{subtitle_text_in_interval}'
number_of_sub_words+=len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
if len(images) >= self.length:
break
if len(images) ==0:
print("Video not found",video_frames_path)
if 0 <len(images) < self.length:
last_item = images[-1]
while len(images) < self.length:
images.append(last_item)
img_placeholder += '<Img><ImageHere>'
images = torch.stack(images)
with open(f"{self.videos_features_path}/{video_id}_{self.save_pkl}.pkl",'wb') as f:
pickle.dump({"images":images,"img_placeholder":img_placeholder},f)
self.processed_videos[f"{video_id}_{self.save_pkl}"]=True
instruction = img_placeholder + '\n\n' + instruction
return images,instruction,answer,self.length,video_id