""" Copyright (c) 2022, salesforce.com, inc. All rights reserved. SPDX-License-Identifier: BSD-3-Clause For full license text, see the LICENSE_Lavis file in the repo root or https://opensource.org/licenses/BSD-3-Clause """ import os from collections import OrderedDict import sys sys.path.append('/ibex/project/c2090/kirolos/MiniGPT4-video-llama3') from minigpt4.datasets.datasets.base_dataset import BaseDataset from PIL import Image import random import json import cv2 import torch import torchvision.transforms as transforms import numpy as np import webvtt import math from moviepy.editor import VideoFileClip from minigpt4.processors.blip_processors import Blip2ImageTrainProcessor,BlipCaptionProcessor import pickle import time from decord import VideoReader, cpu, gpu from tqdm import tqdm import pysrt import chardet import re import whisper from datetime import timedelta # Function to format timestamps for VTT def format_timestamp(seconds): td = timedelta(seconds=seconds) total_seconds = int(td.total_seconds()) milliseconds = int(td.microseconds / 1000) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}" def duration_to_seconds(duration_str): duration_str = duration_str[2:] # Removing 'PT' prefix seconds = 0 if 'H' in duration_str: hours_str = duration_str.split('H')[0] seconds += int(hours_str) * 3600 duration_str = duration_str.split('H')[1] if 'M' in duration_str: minutes_str = duration_str.split('M')[0] seconds += int(minutes_str) * 60 duration_str = duration_str.split('M')[1] if 'S' in duration_str: seconds_str = duration_str.split('S')[0] seconds += int(seconds_str) return seconds def extract_audio(video_path, audio_path): video_clip = VideoFileClip(video_path) audio_clip = video_clip.audio audio_clip.write_audiofile(audio_path, codec="libmp3lame", bitrate="320k") def generate_subtitles(video_path,existed_subtitles,whisper_model): video_id=video_path.split('/')[-1].split('.')[0] subtitle_dir="workspace/misssing_eval_subtitles" audio_dir="workspace/misssing_eval_subtitles/mp3" os.makedirs(subtitle_dir,exist_ok=True) os.makedirs(audio_dir,exist_ok=True) audio_path = f"{audio_dir}/{video_id}"+'.mp3' if existed_subtitles.get(video_id,False): print("subtitle already generated") return f"{subtitle_dir}/{video_id}"+'.vtt' try: extract_audio(video_path,audio_path) print("successfully extracted") subtitle_path=f"{subtitle_dir}/{video_id}"+'.vtt' result = whisper_model.transcribe(audio_path,language="en") # Create VTT file with open(subtitle_path, "w", encoding="utf-8") as vtt_file: vtt_file.write("WEBVTT\n\n") for segment in result['segments']: start = format_timestamp(segment['start']) end = format_timestamp(segment['end']) text = segment['text'] vtt_file.write(f"{start} --> {end}\n{text}\n\n") # remove the audio file os.system(f"rm {audio_path}") print("subtitle successfully generated") return subtitle_path except Exception as e: print("error",video_path ,e) return None def read_subtitles(subtitle_path): # read the subtitle file and detect the encoding try: with open(subtitle_path, 'rb') as f: result = chardet.detect(f.read()) subs = pysrt.open(subtitle_path, encoding=result['encoding']) return subs except: return [] def srt_time_to_seconds(time): return time.hours * 3600 + time.minutes * 60 + time.seconds + time.milliseconds / 1000 class __DisplMixin: def displ_item(self, index): sample, ann = self.__getitem__(index), self.annotation[index] return OrderedDict( { "file": ann["image"], "caption": ann["caption"], "image": sample["image"], } ) class CMDVideoDataset(BaseDataset, __DisplMixin): def __init__(self, vis_processor, text_processor, vis_root, ann_paths, subtitles_path,model_name='llama2'): """ vis_root (string): Root directory of images (e.g. coco/images/) ann_root (string): directory to store the annotation file """ super().__init__(vis_processor, text_processor, vis_root, ann_paths) self.instruction_pool = [ 'Describe this video.', 'Provide a concise depiction of this video.', 'Present a description of this video.', 'Summarize this video.', 'Generate video caption:', 'Generate video description:', 'Write a description for the video.', 'Provide a description of what is presented in the video.', 'Describe the content of the video.', 'Can you explain what you see in the video?', 'Could you describe what you perceive in the video?', 'Please provide a depiction of the video.', 'Illustrate what is happening in the video.', ] self.model_name=model_name if self.model_name =='mistral': self.length = 90 self.max_sub_len = 800 else: self.length = 45 self.max_sub_len = 400 self.subtitle_folder = subtitles_path self.videos_has_subtitles={} for sub in os.listdir(self.subtitle_folder): video_id = sub.split('.')[0] self.videos_has_subtitles[video_id] = True self.transform = transforms.Compose([ transforms.ToPILImage(), ]) def __getitem__(self, index): ann = self.annotation[index] video_id = ann["image_id"] answer =ann['caption'] instruction = random.choice(self.instruction_pool) has_subtitles = self.videos_has_subtitles.get(video_id, False) if has_subtitles: subtitle_path = os.path.join(self.subtitle_folder, f'{video_id}.en.vtt') # Load the VTT subtitle file vtt_file = webvtt.read(subtitle_path) video_path = os.path.join(self.vis_root, f'{video_id}.mp4') clip = VideoFileClip(video_path) total_num_frames = int(clip.duration * clip.fps) clip.close() cap = cv2.VideoCapture(video_path) frame_count = 0 sampling_interval = int(total_num_frames / self.length) if sampling_interval == 0: sampling_interval = 1 img_placeholder = "" subtitle_text_in_interval = "" number_of_sub_words=0 images=[] history_subtitles = {} previous_sub = "" while cap.isOpened(): ret, frame = cap.read() if not ret: break # Find the corresponding subtitle for the each frame and combine the interval subtitles into one subtitle if has_subtitles: for subtitle in vtt_file: sub=subtitle.text.replace('\n',' ') if (subtitle.start_in_seconds <= (frame_count / int(clip.fps)) <= subtitle.end_in_seconds): if not history_subtitles.get(sub,False): for word in sub.split(' '): if word not in subtitle_text_in_interval and word not in previous_sub: subtitle_text_in_interval+=word+" " history_subtitles[sub]=True if frame_count % sampling_interval == 0: frame = self.transform(frame[:,:,::-1])# BGR to RGB frame = self.vis_processor(frame) images.append(frame) img_placeholder += '' if has_subtitles and number_of_sub_words{subtitle_text_in_interval}' number_of_sub_words+=len(subtitle_text_in_interval.split(' ')) previous_sub = subtitle_text_in_interval subtitle_text_in_interval = "" frame_count += 1 if len(images) >= self.length: break cap.release() if len(images) ==0: print("Video not found",video_path) if 0 {subtitle_text_in_interval}' number_of_sub_words+=len(subtitle_text_in_interval.split(' ')) previous_sub = subtitle_text_in_interval subtitle_text_in_interval = "" frame_count += 1 if len(images) >= self.length: break cap.release() if len(images) < self.length: last_item = images[-1] while len(images) < self.length: images.append(last_item) img_placeholder += '' images = torch.stack(images) instruction = random.choice(self.instruction_pool) instruction = img_placeholder + '\n' + instruction return { "image": images, "answer": caption, "image_id": video_id, "instruction_input": instruction, "length": self.length, } class VideoChatGPTDataset(BaseDataset, __DisplMixin): def __init__(self, vis_processor, text_processor, vis_root, ann_paths,subtitles_path,model_name='llama2',add_subtitles=True): """ vis_root (string): Root directory of images (e.g. coco/images/) ann_root (string): directory to store the annotation file """ super().__init__(vis_processor, text_processor, vis_root, ann_paths) self.img_ids = {} n=0 self.model_name=model_name if self.model_name =='mistral': self.length = 90 self.max_sub_len = 800 else: self.length = 45 self.max_sub_len = 400 self.add_subtitles = add_subtitles self.videos_has_subtitles = {} if self.add_subtitles: self.subtitle_folder = subtitles_path for sub in os.listdir(self.subtitle_folder): video_id = sub.split('.')[0] self.videos_has_subtitles[video_id] = True for ann in self.annotation: img_id = ann["video_id"] if img_id not in self.img_ids.keys(): self.img_ids[img_id] = n n+= 1 self.videos_extension={} for video in os.listdir(self.vis_root): self.videos_extension[video.split('.')[0]]=video.split('.')[1] self.transform = transforms.Compose([ transforms.ToPILImage(), ]) def __len__(self): return len(self.annotation) def __getitem__(self, index): ann = self.annotation[index] video_id = ann["video_id"] answer=ann["a"] instruction=ann["q"] images=[] img_placeholder = "" has_subtitles = self.videos_has_subtitles.get(video_id, False) if self.add_subtitles and has_subtitles: subtitle_path = os.path.join(self.subtitle_folder, f'{video_id}.vtt') # Load the VTT subtitle file vtt_file = webvtt.read(subtitle_path) video_path = os.path.join(self.vis_root,f'{video_id}.{self.videos_extension[video_id]}') clip = VideoFileClip(video_path) total_num_frames = int(clip.duration * clip.fps) clip.close() cap = cv2.VideoCapture(video_path) frame_count = 0 sampling_interval = int(total_num_frames / self.length) if sampling_interval == 0: sampling_interval = 1 img_placeholder = "" subtitle_text_in_interval = "" history_subtitles = {} number_of_sub_words=0 previous_sub = "" while cap.isOpened(): ret, frame = cap.read() if not ret: break # Find the corresponding subtitle for the each frame and combine the interval subtitles into one subtitle if self.add_subtitles and has_subtitles: for subtitle in vtt_file: sub=subtitle.text.replace('\n',' ') if (subtitle.start_in_seconds <= (frame_count / int(clip.fps)) <= subtitle.end_in_seconds): if not history_subtitles.get(sub,False): for word in sub.split(' '): if word not in subtitle_text_in_interval and word not in previous_sub: subtitle_text_in_interval+=word+" " history_subtitles[sub]=True if frame_count % sampling_interval == 0: frame = self.transform(frame[:,:,::-1])# BGR to RGB frame = self.vis_processor(frame) images.append(frame) img_placeholder += '' if self.add_subtitles and has_subtitles and number_of_sub_words{subtitle_text_in_interval}' number_of_sub_words+=len(subtitle_text_in_interval.split(' ')) previous_sub = subtitle_text_in_interval subtitle_text_in_interval = "" frame_count += 1 if len(images) >= self.length: break cap.release() if len(images) ==0: print("Video not found",video_path) if 0 {subtitle_text_in_interval}' number_of_sub_words+=len(subtitle_text_in_interval.split(' ')) subtitle_text_in_interval = "" frame_count += 1 if len(images) >= self.length: break cap.release() if len(images) == 0: print("Video not found") print('Video path',video_path) return None,None,None,None,None if 0 {subtitle_text_in_interval}' number_of_sub_words+=len(subtitle_text_in_interval.split(' ')) subtitle_text_in_interval = "" frame_count += 1 if len(images) >= self.length: break cap.release() if len(images) == 0: print("Video not found") print('Video path',video_path) return None,None,None,None,None if 0 {subtitle_text_in_interval}' number_of_sub_words+=len(subtitle_text_in_interval.split(' ')) subtitle_text_in_interval = "" frame_count += 1 if len(images) >= self.length: break cap.release() if len(images) == 0: print("Video not found") print('Video path',video_path) return None,None,None,None,None if 0 {subtitle_text_in_interval}' number_of_sub_words+=len(subtitle_text_in_interval.split(' ')) subtitle_text_in_interval = "" if len(images) >= self.length: break if len(images) ==0: print("Video not found",video_frames_path) if 0 {subtitle_text_in_interval}' number_of_sub_words+=len(subtitle_text_in_interval.split(' ')) subtitle_text_in_interval = "" frame_count += 1 if len(images) >= self.length: break cap.release() if len(images) ==0: print("Video not found",video_path) if 0