ashishja commited on
Commit
2643f0a
·
verified ·
1 Parent(s): b34781c

Upload tools.py

Browse files
Files changed (1) hide show
  1. tools.py +523 -0
tools.py ADDED
@@ -0,0 +1,523 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import base64
2
+ import io
3
+ import json
4
+ import os
5
+ import subprocess
6
+ from email.message import Message
7
+ from io import StringIO
8
+ from pathlib import Path
9
+ from typing import List
10
+ import av
11
+ import pandas as pd
12
+ import requests
13
+ import yt_dlp
14
+ from bs4 import BeautifulSoup
15
+ from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
16
+ from langchain_core.tools import tool
17
+ from langchain_openai import ChatOpenAI
18
+ from langchain_tavily import TavilyExtract, TavilySearch
19
+ from pydantic import SecretStr
20
+
21
+ TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "")
22
+ OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
23
+ YOUTUBE_FRAME_ASSESSMENT_MODEL = os.getenv("YOUTUBE_FRAME_ASSESSMENT_MODEL", "google/gemini-2.5-flash-preview-05-20")
24
+ YOUTUBE_CONFIRMATION_MODEL = os.getenv("YOUTUBE_CONFIRMATION_MODEL", "google/gemini-2.5-pro-preview")
25
+
26
+ # Define Tools for the Agent
27
+ @tool(parse_docstring=True)
28
+ def download_file_from_url(url: str, filename_override: str|None = None) -> str:
29
+ """
30
+ Downloads a file from a URL to a directory in the cwd. Prefer to use the filename associated with the URL, but can override if directed to.
31
+ Filename Logic:
32
+ 1. If `filename_override` is provided, it is used directly.
33
+ 2. Otherwise, the filename is extracted from the 'Content-Disposition' HTTP header
34
+ using Python's `email.message.Message` parser. The result is sanitized.
35
+ 3. If no filename is provided via override and none can be determined from
36
+ the header, a ValueError is raised.
37
+ Args:
38
+ url: The URL of the file to download.
39
+ filename_override: Optional. If provided, this exact name is used for the downloaded file. Using the name associated with the URL is recommended (but may require identifying the extension).
40
+ Returns:
41
+ The full path to the downloaded file.
42
+ Raises:
43
+ requests.exceptions.RequestException: For HTTP errors (e.g., 404, network issues).
44
+ IOError: If the file cannot be written.
45
+ ValueError: If no filename can be determined (neither provided via override
46
+ nor found in Content-Disposition header).
47
+ """
48
+ try:
49
+ with requests.Session() as session:
50
+ with session.get(url, stream=True, allow_redirects=True, timeout=30) as response:
51
+ response.raise_for_status()
52
+
53
+ final_filename = None
54
+
55
+ if filename_override:
56
+ final_filename = filename_override
57
+ print(f"Using provided filename: {final_filename}")
58
+ else:
59
+ content_disposition = response.headers.get('content-disposition')
60
+ if content_disposition:
61
+ msg = Message()
62
+ msg['Content-Disposition'] = content_disposition
63
+ filename_from_header = msg.get_filename() # Handles various encodings
64
+
65
+ if filename_from_header:
66
+ # Sanitize by taking only the basename to prevent path traversal
67
+ final_filename = os.path.basename(filename_from_header)
68
+ print(f"Using filename from Content-Disposition: {final_filename}")
69
+
70
+ if not final_filename:
71
+ raise ValueError(
72
+ "No filename could be determined. "
73
+ "None was provided as an override, and it could not be "
74
+ "extracted from the Content-Disposition header."
75
+ )
76
+
77
+ current_dir = Path.cwd()
78
+ temp_dir = current_dir / "temp_downloads"
79
+ temp_dir.mkdir(parents=True, exist_ok=True)
80
+
81
+ local_filepath = os.path.join(temp_dir, final_filename)
82
+
83
+ with open(local_filepath, 'wb') as f:
84
+ for chunk in response.iter_content(chunk_size=8192):
85
+ if chunk:
86
+ f.write(chunk)
87
+
88
+ #print(f"File downloaded to: {local_filepath}")
89
+ return_str = f"File downloaded successfully. Local File Path: {local_filepath}"
90
+ return return_str
91
+
92
+ except requests.exceptions.RequestException as e:
93
+ print(f"Error during download from {url}: {e}")
94
+ raise
95
+ except IOError as e:
96
+ print(f"Error writing file: {e}")
97
+ raise
98
+ # ValueError will propagate if raised
99
+
100
+ @tool(parse_docstring=True)
101
+ def basic_web_search(query: str, search_domains: list[str]|None = None) -> str:
102
+ """
103
+ Perform a web search using Tavily. Useful for retrieving relevant URLs and content summaries based on a search query.
104
+ The content returned by this tool is limited. For more detailed content extraction, use the `extract_url_content` tool.
105
+ If you would like to limit the search to specific domains, you can pass a comma-separated string of domains (['wikipedia.org', 'example.com']).
106
+ Args:
107
+ query (str): The search query to perform.
108
+ search_domains (None | list[str]): Optional. A list of domains (E.g., ['wikipedia.org', 'example.com']) to restrict the search to. If None, searches across all domains.
109
+
110
+ Returns:
111
+ str: a json formatted string containing the search results, including titles, content snippets, and URLs.
112
+ """
113
+ search_tool = TavilySearch(
114
+ api_key=SecretStr(TAVILY_API_KEY),
115
+ max_results=5,
116
+ include_raw_content=False,
117
+ #include_answer=True,
118
+ include_domains=search_domains
119
+ )
120
+
121
+ results = search_tool.invoke({"query": query})
122
+
123
+ if results and isinstance(results, dict) and len(results["results"]) > 0:
124
+ return_dict = {
125
+ #"answer": "The following is an unconfirmed answer. Confirm it by extracting cotent from a url." + results.get("answer", ""),
126
+ "results": []
127
+ }
128
+ for result in results["results"]:
129
+ if "title" in result and "content" in result and result['score'] > 0.25: # Filter results based on score
130
+ return_dict["results"].append({
131
+ "title": result["title"],
132
+ "url": result["url"],
133
+ "content": result["content"],
134
+ })
135
+ if len(return_dict["results"]) == 0:
136
+ return "No results found. If the query is too specific, try a more general search term."
137
+ return json.dumps(return_dict, indent=2)
138
+
139
+ else:
140
+ return "No results found. If the query is too specific, try a more general search term."
141
+
142
+ @tool(parse_docstring=True)
143
+ def extract_url_content(url_list: list[str]) -> str:
144
+ """
145
+ Extracts the content from URLs using Tavily's extract tool.
146
+ This tool is useful for retrieving content from web pages.
147
+ This tool will most likely be used after a web search to extract content from the URLs returned by the search.
148
+
149
+ Args:
150
+ url_list (list[str]): The URLs to extract content from.
151
+
152
+ Returns:
153
+ str: The extracted content or an error message if extraction fails.
154
+ """
155
+ extract_tool = TavilyExtract(api_key=SecretStr(TAVILY_API_KEY))
156
+ extract_results = extract_tool.invoke({'urls': url_list})
157
+
158
+ if extract_results and 'results' in extract_results and len(extract_results['results']) > 0:
159
+ for i, page_content in enumerate(extract_results['results']):
160
+ del extract_results['results'][i]['images']
161
+ # if len(page_content['raw_content']) > 40000:
162
+ # extract_results['results'][i]['raw_content'] = page_content['raw_content'][:40000] + '... [truncated]'
163
+ return json.dumps(extract_results['results'], indent=2)
164
+ else:
165
+ return f"No content could be extracted from the provided URLs: {url_list}"
166
+
167
+
168
+
169
+ def bs_html_parser(url):
170
+ response = requests.get(url) # Send a GET request to the URL
171
+
172
+ # Check if the request was successful
173
+ if response.status_code == 200:
174
+ return BeautifulSoup(response.text, "html.parser") # Parse and return the HTML
175
+ else:
176
+ return None # Return None if the request fails
177
+
178
+ def get_table_title(table_tag):
179
+ """
180
+ Extracts a title for a given table tag.
181
+ It looks for a <caption>, then for the closest preceding <h1>-<h6> tag.
182
+ """
183
+ title = "Untitled Table"
184
+
185
+ # 1. Check for a <caption> element within the table
186
+ caption = table_tag.find('caption')
187
+ if caption:
188
+ caption_text = caption.get_text(strip=True)
189
+ if caption_text: # Ensure caption is not empty and use it
190
+ return caption_text
191
+
192
+ # 2. If no caption, look for the closest preceding heading tag (h1-h6)
193
+ headings = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']
194
+ # find_all_previous gets all previous tags matching criteria, in reverse document order.
195
+ # limit=1 gets the closest one (the last one encountered before the table).
196
+ preceding_headings = table_tag.find_all_previous(headings, limit=1)
197
+
198
+ if preceding_headings:
199
+ heading_tag = preceding_headings[0]
200
+
201
+ # To get the cleanest text, prefer 'mw-headline' if it exists,
202
+ # otherwise, clone the heading, remove edit sections, and then get text.
203
+
204
+ # Try to find a specific 'mw-headline' span first (common in Wikipedia)
205
+ headline_span = heading_tag.find("span", class_="mw-headline")
206
+ if headline_span:
207
+ title_text = headline_span.get_text(strip=True)
208
+ else:
209
+ # Fallback: create a temporary copy of the heading tag to modify it
210
+ # without affecting the main soup.
211
+ temp_heading_soup = BeautifulSoup(str(heading_tag), 'html.parser')
212
+ temp_heading_tag = temp_heading_soup.find(heading_tag.name)
213
+
214
+ if temp_heading_tag:
215
+ # Remove "edit" links (span with class "mw-editsection")
216
+ for span in temp_heading_tag.find_all("span", class_="mw-editsection"):
217
+ span.decompose()
218
+ title_text = temp_heading_tag.get_text(strip=True)
219
+ else:
220
+ # If cloning somehow failed, take raw text (less ideal)
221
+ title_text = heading_tag.get_text(strip=True)
222
+
223
+ if title_text: # Ensure title_text is not empty
224
+ title = title_text
225
+
226
+ return title
227
+
228
+ @tool(parse_docstring=True)
229
+ def wikipedia_reader(url: str) -> str:
230
+ """
231
+ Extracts sections, paragraphs, and tables from a Wikipedia page.
232
+
233
+ Args:
234
+ url (str): The URL of the Wikipedia page to extract content from.
235
+
236
+ Returns:
237
+ str: A JSON string containing sections, paragraphs, and tables.
238
+ """
239
+ soup = bs_html_parser(url)
240
+ if not soup:
241
+ return "" # Return empty if soup creation failed
242
+
243
+ def extract_links(soup_obj):
244
+ links = []
245
+ for link in soup_obj.find_all('a', href=True):
246
+ href = link.get('href')
247
+ # Filter for internal page links (sections)
248
+ if href and href.startswith("#") and "#cite_" not in href and len(href) > 1:
249
+ links.append(url+href)
250
+ # Original logic for other links starting with the base URL (might need adjustment based on desired links)
251
+ # elif href and href.startswith(url):
252
+ # links.append(href)
253
+ return links
254
+
255
+ links = extract_links(soup)
256
+
257
+ def extract_paragraphs(soup_obj):
258
+ paragraphs_text = [p.get_text(strip=True) for p in soup_obj.find_all("p")]
259
+ return [p for p in paragraphs_text if p and len(p) > 10]
260
+
261
+ paragraphs = extract_paragraphs(soup)
262
+
263
+ def extract_tables(soup_obj):
264
+ tables_with_titles = []
265
+ for table_tag in soup_obj.find_all("table", {"class": "wikitable"}):
266
+ title = get_table_title(table_tag) # Get the title
267
+ try:
268
+ # Pandas read_html expects a string or file-like object
269
+ table_html_str = str(table_tag)
270
+ # Using StringIO to simulate a file, as read_html can be sensitive
271
+ df_list = pd.read_html(StringIO(table_html_str))
272
+ if df_list:
273
+ df = df_list[0] # read_html returns a list of DataFrames
274
+ tables_with_titles.append({"title": title, "table_data": df.to_dict(orient='records')})
275
+ else:
276
+ tables_with_titles.append({"title": title, "table_data": None, "error": "pd.read_html returned empty list"})
277
+ except Exception as e:
278
+
279
+ tables_with_titles.append({"title": title, "table_data" : None, "error": str(e)})
280
+ return tables_with_titles
281
+
282
+ tables = extract_tables(soup) # This now returns a list of dicts
283
+
284
+ return_dict = {
285
+ "sections": links,
286
+ "paragraphs": paragraphs,
287
+ "tables": tables
288
+ }
289
+
290
+ return json.dumps(return_dict, indent=2, ensure_ascii=False) # Return as JSON string
291
+
292
+
293
+ # Singleton class for Whisper model
294
+ # we use this so we don't have to load the model multiple times, just once the first time the tool is used
295
+ class WhisperTranscriber:
296
+ _instance = None
297
+
298
+ def __new__(cls):
299
+ if cls._instance is None:
300
+ import torch
301
+ from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
302
+ from transformers.pipelines import pipeline
303
+
304
+ device = "cuda:0" if torch.cuda.is_available() else "cpu"
305
+ torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
306
+ model_id = "openai/whisper-large-v3"
307
+
308
+ model = AutoModelForSpeechSeq2Seq.from_pretrained(
309
+ model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
310
+ )
311
+ model.to(device)
312
+
313
+ processor = AutoProcessor.from_pretrained(model_id)
314
+ pipe = pipeline(
315
+ "automatic-speech-recognition",
316
+ model=model,
317
+ tokenizer=processor.tokenizer,
318
+ feature_extractor=processor.feature_extractor,
319
+ torch_dtype=torch_dtype,
320
+ device=device,
321
+ )
322
+
323
+ cls._instance = pipe
324
+ return cls._instance
325
+
326
+
327
+ @tool(parse_docstring=True)
328
+ def transcribe_audio_file(file_path: str) -> str:
329
+ """
330
+ Transcribes an audio file to text using OpenAI's Whisper-large-v3 model, caching the model after the first load.
331
+ Args:
332
+ file_path (str): The path to the audio file to transcribe.
333
+ Returns:
334
+ str: The transcription of the audio file.
335
+ """
336
+ pipe = WhisperTranscriber()
337
+ transcription = pipe(file_path)["text"]
338
+ return transcription.strip() if transcription else "No transcription available."
339
+
340
+
341
+ @tool(parse_docstring=True)
342
+ def question_youtube_video(video_url: str, query: str) -> str:
343
+ """
344
+ Answers a question about a YouTube video.
345
+ The video is streamed and one frame is captured every x seconds, where x is declared in the environment settings.
346
+ Captured frames are sent sequentially to a multimodal model to answer the question about the video.
347
+ The final answer is aggregated from the answers to each frame.
348
+ DOES NOT USE AUDIO! ONLY FRAMES FROM THE VIDEO ARE USED TO ANSWER THE QUESTION.
349
+ Args:
350
+ video_url (str): The URL of the video to capture frames from.
351
+ query (str): The question to answer about the video.
352
+ Returns:
353
+ str: The answer to the question about the video.
354
+ """
355
+ CAPTURE_INTERVAL_SEC = int(os.getenv("CAPTURE_INTERVAL_SEC", 2)) # Default to 2 seconds if not set
356
+
357
+ # First, we need to get the video stream URL using yt-dlp
358
+ ydl_opts = {
359
+ "quiet": True,
360
+ "skip_download": True,
361
+ "format": "mp4[ext=mp4]+bestaudio/best",
362
+ "forceurl": True,
363
+ "noplaylist": True,
364
+ "writesubtitles": True,
365
+ "writeautomaticsub": True,
366
+ "subtitlesformat": "vtt",
367
+ "subtitleslangs": ['en'],
368
+ }
369
+
370
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
371
+ info_dict = ydl.extract_info(video_url, download=False)
372
+ assert isinstance(info_dict, dict), "Failed to extract video information. Please check the video URL."
373
+ stream_url = info_dict.get("url", None)
374
+
375
+ # Second, we use FFmpeg to capture frames from the video stream
376
+ ffmpeg_cmd = [
377
+ "ffmpeg",
378
+ "-i",
379
+ stream_url,
380
+ "-f",
381
+ "matroska", # container format
382
+ "-",
383
+ ]
384
+
385
+ process = subprocess.Popen(
386
+ ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
387
+ )
388
+
389
+ container = av.open(process.stdout)
390
+ stream = container.streams.video[0]
391
+ time_base = stream.time_base
392
+ if time_base is None:
393
+ raise ValueError("Could not determine time base for the video stream. Please check the video URL and try again.")
394
+ else:
395
+ time_base = float(time_base)
396
+
397
+ # Third, we need to use a multimodal model to analyze the video frames.
398
+ if stream_url is None:
399
+ raise ValueError("Could not retrieve video stream URL. Please check the video URL and try again.")
400
+ else:
401
+ image_model = ChatOpenAI(
402
+ model="google/gemini-2.5-flash-preview-05-20", # Example multimodal model
403
+ api_key=SecretStr(OPENROUTER_API_KEY), # Your OpenRouter API key
404
+ base_url="https://openrouter.ai/api/v1", # Standard OpenRouter API base
405
+ verbose=True # Optional: for debugging
406
+ )
407
+ image_model_system_prompt = SystemMessage(
408
+ content="You will be shown a frame from a video along with a question about that video and an answer based on the previous frames in the video. "\
409
+ "Your task is to analyze the frame and provide an answer to the question using both the current frame and the previous answer. " \
410
+ "If the previous answer is reasonable and the current frame can not answer the question return the previous answer. " \
411
+ "For example, if the question is about the color of a car and the previous answer is 'red' but the current frame shows no car, you should return 'red'. " \
412
+ "If the question is about the greatest number of something in the video, you should return the number counted in the current frame or the previous answer, whichever is greater. " \
413
+ "For example, if the current frame has 5 objects but the previous answer is 10 objects, you should return '10'. " \
414
+ "Be concise and clear in your answers, and do not repeat the question. " \
415
+ )
416
+
417
+
418
+ # Then, we loop through the frames and analyze them one by one, skipping frames based on the capture interval
419
+ next_capture_time = 0
420
+ aggregated_answer = ''
421
+ response = ''
422
+
423
+ answers_list: List[dict] = []
424
+
425
+ for frame in container.decode(stream):
426
+ if frame.pts is None:
427
+ continue
428
+
429
+ timestamp = float(frame.pts * time_base)
430
+ if CAPTURE_INTERVAL_SEC is None or timestamp >= next_capture_time:
431
+ # Convert the frame to an image format that the model can process
432
+ buf = io.BytesIO()
433
+ img = frame.to_image()
434
+ img.save(buf, format="JPEG") # using PIL.Image.save
435
+ jpeg_bytes = buf.getvalue()
436
+ frame_base64 = base64.b64encode(jpeg_bytes).decode("utf-8")
437
+
438
+ # Explicitly type the list to hold instances of BaseMessage
439
+ msgs: List[BaseMessage] = [image_model_system_prompt]
440
+
441
+ frame_query = query
442
+
443
+ if aggregated_answer:
444
+ frame_query += f"\nPrevious Answer: {aggregated_answer}"
445
+ frame_query += "\nProvide a concise answer based on the previous answer and the current frame. " \
446
+ "If the current frame does not answer the question but there is a previous answer, return the previous answer. " \
447
+ "REMEMBER: This question is not about the current frame! It is about the video as a whole. ALWAYS PAY ATTENTION TO THE PREVIOUS ANSWER!"
448
+
449
+ msgs.append(HumanMessage(content = [
450
+ {
451
+ "type": "text",
452
+ "text": frame_query
453
+ },
454
+ {
455
+ "type": "image",
456
+ "source_type": "base64",
457
+ "mime_type": "image/jpeg",
458
+ "data": frame_base64
459
+ }
460
+ ]))
461
+
462
+ response = image_model.invoke(msgs) # Pass the image bytes to the model
463
+ # Extract the answer from the model's response
464
+ assert isinstance(response.content, str), "The model's response should be a string."
465
+ answer = response.content.strip()
466
+ answers_list.append({"timestamp": timestamp, "answer": answer})
467
+ if answer:
468
+ aggregated_answer = answer
469
+ if CAPTURE_INTERVAL_SEC is not None:
470
+ next_capture_time += CAPTURE_INTERVAL_SEC
471
+
472
+ process.terminate()
473
+
474
+ final_answer_model = ChatOpenAI(
475
+ model="google/gemini-2.5-pro-preview", # Example multimodal model
476
+ api_key=SecretStr(OPENROUTER_API_KEY), # Your OpenRouter API key
477
+ base_url="https://openrouter.ai/api/v1", # Standard OpenRouter API base
478
+ verbose=True # Optional: for debugging
479
+ )
480
+
481
+ final_answer_system_message = SystemMessage(
482
+ "You are a brilliant assistant who is eager to help and extremely detailed oriented. " \
483
+ "A group of individuals have been asked the same question about a video. " \
484
+ "None of the individuals have seen the entire video. " \
485
+ "Each individual, when asked the question, was provided a frame from the video, as well as the previously reported answer based on the previous frame. " \
486
+ "Your job is to report a final answer for the question about the video. " \
487
+ "Ideally, the final answer has already been reported correctly by the last individual. " \
488
+ "However, this is similar to the game a telephone, where the true answer can become corrupted along the way. " \
489
+ "Assess all of the answers. If you can confirm the final answer is correct, simply return it. " \
490
+ "If you notice that the final answer is incorrect, then identify the correct answer and report that. " \
491
+ "You will also have access to the video title and description, which may help you identify the correct answer. " \
492
+ "Be concise and only respond with the correct final answer!"
493
+ )
494
+
495
+ answers_list_str = "\n".join([f"Answer {i+1} at {ans['timestamp']:.2f}s: {ans['answer']}" for i, ans in enumerate(answers_list)])
496
+
497
+ final_query = (
498
+ f"Video Title: {info_dict.get('title', 'No title found')}. "
499
+ f"Video Description: {info_dict.get('description', 'No description found')}. "
500
+ f"Question about video: {query} "
501
+ f"Answers provided by individuals: \n{answers_list_str}\n\n "
502
+ "Provide a concise final answer to the question about the video based on the previous answers. "
503
+ "Include a short explanation of why you chose this answer. "
504
+ "Format the answer like so: "
505
+ "Explanation: <your explanation here>. "
506
+ "Final Answer: <your answer here>. "
507
+ )
508
+
509
+
510
+ final_msgs = [
511
+ final_answer_system_message,
512
+ HumanMessage(content=[
513
+ {
514
+ "type": "text",
515
+ "text": final_query
516
+ }
517
+ ])
518
+ ]
519
+ final_response = final_answer_model.invoke(final_msgs)
520
+ assert isinstance(final_response.content, str), "The final model's response should be a string."
521
+ final_answer = final_response.content.strip()
522
+
523
+ return final_answer