Spaces:
Build error
Build error
| import gradio as gr | |
| import ffmpeg | |
| from pathlib import Path | |
| import os | |
| import ast | |
| import json | |
| import base64 | |
| import requests | |
| import moviepy.editor as mp | |
| from PIL import Image, ImageSequence | |
| import cv2 | |
| API_URL = "https://api-inference.huggingface.co/models/facebook/wav2vec2-base-960h" | |
| headers = {"Authorization": "Bearer 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"} | |
| #HF_TOKEN = os.environ["HF_TOKEN"] | |
| #headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| def generate_transcripts(in_video): #generate_gifs(in_video, gif_transcript): | |
| print("********* Inside generate_transcripts() **********") | |
| #convert video to audio | |
| print(f" input video is : {in_video}") | |
| video_path = Path("./ShiaLaBeouf.mp4") | |
| audio_memory, _ = ffmpeg.input(video_path).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) | |
| #Getting transcripts using wav2Vec2 huggingface hosted accelerated inference | |
| #sending audio file in request along with stride and chunk length information | |
| model_response = query_api(audio_memory) | |
| #model response has both - transcripts as well as character timestamps or chunks | |
| print(f"model_response is : {model_response}") | |
| transcription = model_response["text"].lower() | |
| chnk = model_response["chunks"] | |
| #creating lists from chunks to consume downstream easily | |
| timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]] | |
| for chunk in chnk] | |
| #getting words and word timestamps | |
| words, words_timestamp = get_word_timestamps(timestamps) | |
| print(f"Total words in the audio transcript is:{len(words)}, transcript word list is :{words}, type of words is :{type(words)} ") | |
| print(f"Total Word timestamps derived fromcharacter timestamp are :{len(words_timestamp)}, Word timestamps are :{words_timestamp}") | |
| return transcription, words, words_timestamp | |
| def generate_gifs(gif_transcript, words, words_timestamp): | |
| print("********* Inside generate_gifs() **********") | |
| #creating list from input gif transcript | |
| gif = "don't let your dreams be dreams" | |
| #gif = gif_transcript | |
| giflist = gif.split() | |
| #getting gif indexes from the generator | |
| # Converting string to list | |
| words = ast.literal_eval(words) | |
| words_timestamp = ast.literal_eval(words_timestamp) | |
| print(f"words is :{words}") | |
| print(f"type of words is :{type(words)}") | |
| print(f"length of words is :{len(words)}") | |
| print(f"giflist is :{giflist}") | |
| #print(f"haystack and needle function returns value as : {list(get_gif_word_indexes(words, giflist))}") | |
| #indx_tmp = [num for num in get_gif_word_indexes(words, giflist)] | |
| #print(f"index temp is : {indx_tmp}") | |
| giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0]) | |
| print(f"giflist_indxs is : {giflist_indxs}") | |
| #getting start and end timestamps for a gif video | |
| start_seconds, end_seconds = get_gif_timestamps(giflist_indxs, words_timestamp) | |
| print(f"start_seconds, end_seconds are : ({start_seconds}, {end_seconds})") | |
| #generated .gif image | |
| im, gif_img, gif_vid, vid_cap, gif_video_out = gen_moviepy_gif(start_seconds, end_seconds) | |
| im.save('./gifimage1.gif', save_all=True) | |
| #gif_img = gen_moviepy_gif(start_seconds, end_seconds) | |
| #gif_img = f"./gifimage.gif" | |
| #html_out = "<img src= '" + gif_img + "' alt='create a gif from video' width='100%'/>" | |
| #print("html out is :", html_out) | |
| return gif_video_out #vid_cap #gif_vid | |
| #calling the hosted model | |
| def query_api(audio_bytes: bytes): | |
| """ | |
| Query for Huggingface Inference API for Automatic Speech Recognition task | |
| """ | |
| print("********* Inside query_api() **********") | |
| payload = json.dumps({ | |
| "inputs": base64.b64encode(audio_bytes).decode("utf-8"), | |
| "parameters": { | |
| "return_timestamps": "char", | |
| "chunk_length_s": 10, | |
| "stride_length_s": [4, 2] | |
| }, | |
| "options": {"use_gpu": False} | |
| }).encode("utf-8") | |
| response = requests.request( | |
| "POST", API_URL, headers=headers, data=payload) | |
| json_reponse = json.loads(response.content.decode("utf-8")) | |
| print(f"json_reponse is :{json_reponse}") | |
| return json_reponse | |
| #getting word timestamps from character timestamps | |
| def get_word_timestamps(timestamps): | |
| words, word = [], [] | |
| letter_timestamp, word_timestamp, words_timestamp = [], [], [] | |
| for idx,entry in enumerate(timestamps): | |
| word.append(entry[0]) | |
| letter_timestamp.append(entry[1]) | |
| if entry[0] == ' ': | |
| words.append(''.join(word)) | |
| word_timestamp.append(letter_timestamp[0]) | |
| word_timestamp.append(timestamps[idx-1][2]) | |
| words_timestamp.append(word_timestamp) | |
| word, word_timestamp, letter_timestamp = [], [], [] | |
| words = [word.strip() for word in words] | |
| return words, words_timestamp | |
| #getting index of gif words in main transcript | |
| def get_gif_word_indexes(total_words_list, gif_words_list): | |
| if not gif_words_list: | |
| print("THIS IS 1") | |
| return | |
| # just optimization | |
| COUNT=0 | |
| lengthgif_words_list = len(gif_words_list) | |
| print("THIS IS 2") | |
| firstgif_words_list = gif_words_list[0] | |
| print("THIS IS 3") | |
| print(f"total_words_list is :{total_words_list}") | |
| print(f"length of total_words_list is :{len(total_words_list)}") | |
| print(f"gif_words_list is :{gif_words_list}") | |
| print(f"length of gif_words_list is :{len(gif_words_list)}") | |
| for idx, item in enumerate(total_words_list): | |
| COUNT+=1 | |
| #print("COUNT IS :", COUNT) | |
| if item == firstgif_words_list: | |
| print("THIS IS 5") | |
| if total_words_list[idx:idx+lengthgif_words_list] == gif_words_list: | |
| print("THIS IS 6") | |
| print(f"value 1 is: {range(idx, idx+lengthgif_words_list)}") | |
| print(f"value of tuple is : {tuple(range(idx, idx+lengthgif_words_list))}") | |
| yield tuple(range(idx, idx+lengthgif_words_list)) | |
| #getting start and end timestamps for gif transcript | |
| def get_gif_timestamps(giflist_indxs, words_timestamp): | |
| print(f"******** Inside get_gif_timestamps() **********") | |
| #giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0]) | |
| min_idx = min(giflist_indxs) | |
| max_idx = max(giflist_indxs) | |
| print(f"min_idx is :{min_idx}") | |
| print(f"max_idx is :{max_idx}") | |
| print(f"type of words_timestamp is :{type(words_timestamp)}") | |
| gif_words_timestamp = words_timestamp[min_idx : max_idx+1] | |
| print(f"words_timestamp is :{words_timestamp}") | |
| print(f"gif_words_timestamp is :{gif_words_timestamp}") | |
| start_seconds, end_seconds = gif_words_timestamp[0][0], gif_words_timestamp[-1][-1] | |
| print(f"start_seconds, end_seconds are :{start_seconds},{end_seconds}") | |
| return start_seconds, end_seconds | |
| #extracting the video and building and serving a .gif image | |
| def gen_moviepy_gif(start_seconds, end_seconds): | |
| print("******** inside moviepy_gif () ***************") | |
| video_path = "./ShiaLaBeouf.mp4" | |
| video = mp.VideoFileClip(video_path) #.resize(0.3) | |
| final_clip = video.subclip(start_seconds, end_seconds) | |
| #final_clip.write_videofile("gifimage.mp4") | |
| print("I am here now") | |
| #gifclip = VideoFileClip("gifimage.mp4") | |
| final_clip.write_gif("./gifimage.gif") #, program='ffmpeg', tempfiles=True, fps=15, fuzz=3) | |
| final_clip.close() | |
| print("pretty good") | |
| gif_img = mp.VideoFileClip("gifimage.gif") | |
| print(gif_img) | |
| gif_img.write_videofile("gifimage.mp4") | |
| gif_vid = mp.VideoFileClip("gifimage.mp4") | |
| im = Image.open("gifimage.gif") | |
| vid_cap = cv2.VideoCapture('gifimage.mp4') | |
| print("At the very end") | |
| return im, gif_img, gif_vid, vid_cap, "gifimage.gif" #"gifimage.mp4" | |
| # showing gif | |
| #gif.ipython_display() | |
| sample_video = ['./ShiaLaBeouf.mp4'] | |
| sample_vid = gr.Video(label='Video file') #for displaying the example | |
| examples = gr.components.Dataset(components=[sample_vid], samples=[sample_video], type='values') | |
| demo = gr.Blocks() | |
| with demo: | |
| gr.Markdown("""This app is still a work in progress..""") | |
| with gr.Row(): | |
| input_video = gr.Video(label="Upload a Video", visible=True) #for incoming video | |
| text_transcript = gr.Textbox(label="Transcripts", lines = 10, interactive = True ) #to generate and display transcriptions for input video | |
| text_words = gr.Textbox(visible=False) | |
| text_wordstimestamps = gr.Textbox(visible=False) | |
| text_gif_transcript = gr.Textbox(label="Transcripts", placeholder="Copy paste transcripts here to create GIF image" , lines = 3, interactive = True ) #to copy paste required gif transcript | |
| #out_gif = gr.HTML(label="Generated GIF from transcript selected", show_label=True) | |
| examples.render() | |
| def load_examples(video): #to load sample video into input_video upon clicking on it | |
| print("****** inside load_example() ******") | |
| print("in_video is : ", video[0]) | |
| return video[0] | |
| examples.click(load_examples, examples, input_video) | |
| with gr.Row(): | |
| button_transcript = gr.Button("Generate transcripts") | |
| button_gifs = gr.Button("Create Gif") | |
| #def load_gif(): | |
| # print("****** inside load_gif() ******") | |
| # #created embedding width='560' height='315' | |
| # html_out = "<img src='./gifimage.gif' />" | |
| # print(f"html output is : {html_out}") | |
| # return | |
| with gr.Row(): | |
| #out_gif = gr.HTML(label="Generated GIF from transcript selected", show_label=True) | |
| #gr.Markdown(""" [] """) | |
| out_gif = gr.Image() #gr.Video() | |
| button_transcript.click(generate_transcripts, input_video, [text_transcript, text_words, text_wordstimestamps ]) | |
| button_gifs.click(generate_gifs, [text_gif_transcript, text_words, text_wordstimestamps], out_gif ) | |
| demo.launch(debug=True) |