|
import base64 |
|
import io |
|
import json |
|
import os |
|
import subprocess |
|
from email.message import Message |
|
from io import StringIO |
|
from pathlib import Path |
|
from typing import List |
|
import av |
|
import pandas as pd |
|
import requests |
|
import yt_dlp |
|
from bs4 import BeautifulSoup |
|
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage |
|
from langchain_core.tools import tool |
|
from langchain_openai import ChatOpenAI |
|
from langchain_tavily import TavilyExtract, TavilySearch |
|
from pydantic import SecretStr |
|
|
|
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "") |
|
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") |
|
YOUTUBE_FRAME_ASSESSMENT_MODEL = os.getenv("YOUTUBE_FRAME_ASSESSMENT_MODEL", "google/gemini-2.5-flash-preview-05-20") |
|
YOUTUBE_CONFIRMATION_MODEL = os.getenv("YOUTUBE_CONFIRMATION_MODEL", "google/gemini-2.5-flash-preview-05-20") |
|
|
|
|
|
@tool(parse_docstring=True) |
|
def download_file_from_url(url: str, filename_override: str | None = None) -> str: |
|
""" |
|
Downloads a file from a URL to a directory in the cwd. Prefer to use the filename associated with the URL, but can override if directed to. |
|
Filename Logic: |
|
1. If `filename_override` is provided, it is used directly. |
|
2. Otherwise, the filename is extracted from the 'Content-Disposition' HTTP header |
|
using Python's `email.message.Message` parser. The result is sanitized. |
|
3. If no filename is provided via override and none can be determined from |
|
the header, a ValueError is raised. |
|
Args: |
|
url: The URL of the file to download. |
|
filename_override: Optional. If provided, this exact name is used for the downloaded file. Using the name associated with the URL is recommended (but may require identifying the extension). |
|
Returns: |
|
The full path to the downloaded file. |
|
Raises: |
|
requests.exceptions.RequestException: For HTTP errors (e.g., 404, network issues). |
|
IOError: If the file cannot be written. |
|
ValueError: If no filename can be determined (neither provided via override |
|
nor found in Content-Disposition header). |
|
""" |
|
try: |
|
with requests.Session() as session: |
|
with session.get(url, stream=True, allow_redirects=True, timeout=30) as response: |
|
response.raise_for_status() |
|
|
|
final_filename = None |
|
|
|
if filename_override: |
|
final_filename = filename_override |
|
print(f"Using provided filename: {final_filename}") |
|
else: |
|
content_disposition = response.headers.get('content-disposition') |
|
if content_disposition: |
|
msg = Message() |
|
msg['Content-Disposition'] = content_disposition |
|
filename_from_header = msg.get_filename() |
|
|
|
if filename_from_header: |
|
|
|
final_filename = os.path.basename(filename_from_header) |
|
print(f"Using filename from Content-Disposition: {final_filename}") |
|
|
|
if not final_filename: |
|
raise ValueError( |
|
"No filename could be determined. " |
|
"None was provided as an override, and it could not be " |
|
"extracted from the Content-Disposition header." |
|
) |
|
|
|
current_dir = Path.cwd() |
|
temp_dir = current_dir / "temp_downloads" |
|
temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
local_filepath = os.path.join(temp_dir, final_filename) |
|
|
|
with open(local_filepath, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
f.write(chunk) |
|
|
|
|
|
return_str = f"File downloaded successfully. Local File Path: {local_filepath}" |
|
return return_str |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"Error during download from {url}: {e}") |
|
raise |
|
except IOError as e: |
|
print(f"Error writing file: {e}") |
|
raise |
|
|
|
|
|
@tool(parse_docstring=True) |
|
def basic_web_search(query: str, search_domains: list[str] | None = None) -> str: |
|
""" |
|
Perform a web search using Tavily. Useful for retrieving relevant URLs and content summaries based on a search query. |
|
The content returned by this tool is limited. For more detailed content extraction, use the `extract_url_content` tool. |
|
If you would like to limit the search to specific domains, you can pass a comma-separated string of domains (['wikipedia.org', 'example.com']). |
|
Args: |
|
query (str): The search query to perform. |
|
search_domains (None | list[str]): Optional. A list of domains (E.g., ['wikipedia.org', 'example.com']) to restrict the search to. If None, searches across all domains. |
|
|
|
Returns: |
|
str: a json formatted string containing the search results, including titles, content snippets, and URLs. |
|
""" |
|
search_tool = TavilySearch( |
|
api_key=SecretStr(TAVILY_API_KEY), |
|
max_results=5, |
|
include_raw_content=False, |
|
|
|
include_domains=search_domains |
|
) |
|
|
|
results = search_tool.invoke({"query": query}) |
|
|
|
if results and isinstance(results, dict) and len(results["results"]) > 0: |
|
return_dict = { |
|
|
|
"results": [] |
|
} |
|
for result in results["results"]: |
|
if "title" in result and "content" in result and result['score'] > 0.25: |
|
return_dict["results"].append({ |
|
"title": result["title"], |
|
"url": result["url"], |
|
"content": result["content"], |
|
}) |
|
if len(return_dict["results"]) == 0: |
|
return "No results found. If the query is too specific, try a more general search term." |
|
return json.dumps(return_dict, indent=2) |
|
|
|
else: |
|
return "No results found. If the query is too specific, try a more general search term." |
|
|
|
@tool(parse_docstring=True) |
|
def extract_url_content(url_list: list[str]) -> str: |
|
""" |
|
Extracts the content from URLs using Tavily's extract tool. |
|
This tool is useful for retrieving content from web pages. |
|
This tool will most likely be used after a web search to extract content from the URLs returned by the search. |
|
|
|
Args: |
|
url_list (list[str]): The URLs to extract content from. |
|
|
|
Returns: |
|
str: The extracted content or an error message if extraction fails. |
|
""" |
|
extract_tool = TavilyExtract(api_key=SecretStr(TAVILY_API_KEY)) |
|
extract_results = extract_tool.invoke({'urls': url_list}) |
|
|
|
if extract_results and 'results' in extract_results and len(extract_results['results']) > 0: |
|
for i, page_content in enumerate(extract_results['results']): |
|
del extract_results['results'][i]['images'] |
|
|
|
|
|
return json.dumps(extract_results['results'], indent=2) |
|
else: |
|
return f"No content could be extracted from the provided URLs: {url_list}" |
|
|
|
|
|
|
|
def bs_html_parser(url): |
|
response = requests.get(url) |
|
|
|
|
|
if response.status_code == 200: |
|
return BeautifulSoup(response.text, "html.parser") |
|
else: |
|
return None |
|
|
|
def get_table_title(table_tag): |
|
""" |
|
Extracts a title for a given table tag. |
|
It looks for a <caption>, then for the closest preceding <h1>-<h6> tag. |
|
""" |
|
title = "Untitled Table" |
|
|
|
|
|
caption = table_tag.find('caption') |
|
if caption: |
|
caption_text = caption.get_text(strip=True) |
|
if caption_text: |
|
return caption_text |
|
|
|
|
|
headings = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6'] |
|
|
|
|
|
preceding_headings = table_tag.find_all_previous(headings, limit=1) |
|
|
|
if preceding_headings: |
|
heading_tag = preceding_headings[0] |
|
|
|
|
|
|
|
|
|
|
|
headline_span = heading_tag.find("span", class_="mw-headline") |
|
if headline_span: |
|
title_text = headline_span.get_text(strip=True) |
|
else: |
|
|
|
|
|
temp_heading_soup = BeautifulSoup(str(heading_tag), 'html.parser') |
|
temp_heading_tag = temp_heading_soup.find(heading_tag.name) |
|
|
|
if temp_heading_tag: |
|
|
|
for span in temp_heading_tag.find_all("span", class_="mw-editsection"): |
|
span.decompose() |
|
title_text = temp_heading_tag.get_text(strip=True) |
|
else: |
|
|
|
title_text = heading_tag.get_text(strip=True) |
|
|
|
if title_text: |
|
title = title_text |
|
|
|
return title |
|
|
|
@tool(parse_docstring=True) |
|
def wikipedia_reader(url: str) -> str: |
|
""" |
|
Extracts sections, paragraphs, and tables from a Wikipedia page. |
|
|
|
Args: |
|
url (str): The URL of the Wikipedia page to extract content from. |
|
|
|
Returns: |
|
str: A JSON string containing sections, paragraphs, and tables. |
|
""" |
|
soup = bs_html_parser(url) |
|
if not soup: |
|
return "" |
|
|
|
def extract_links(soup_obj): |
|
links = [] |
|
for link in soup_obj.find_all('a', href=True): |
|
href = link.get('href') |
|
|
|
if href and href.startswith("#") and "#cite_" not in href and len(href) > 1: |
|
links.append(url+href) |
|
|
|
|
|
|
|
return links |
|
|
|
links = extract_links(soup) |
|
|
|
def extract_paragraphs(soup_obj): |
|
paragraphs_text = [p.get_text(strip=True) for p in soup_obj.find_all("p")] |
|
return [p for p in paragraphs_text if p and len(p) > 10] |
|
|
|
paragraphs = extract_paragraphs(soup) |
|
|
|
def extract_tables(soup_obj): |
|
tables_with_titles = [] |
|
for table_tag in soup_obj.find_all("table", {"class": "wikitable"}): |
|
title = get_table_title(table_tag) |
|
try: |
|
|
|
table_html_str = str(table_tag) |
|
|
|
df_list = pd.read_html(StringIO(table_html_str)) |
|
if df_list: |
|
df = df_list[0] |
|
tables_with_titles.append({"title": title, "table_data": df.to_dict(orient='records')}) |
|
else: |
|
tables_with_titles.append({"title": title, "table_data": None, "error": "pd.read_html returned empty list"}) |
|
except Exception as e: |
|
|
|
tables_with_titles.append({"title": title, "table_data" : None, "error": str(e)}) |
|
return tables_with_titles |
|
|
|
tables = extract_tables(soup) |
|
|
|
return_dict = { |
|
"sections": links, |
|
"paragraphs": paragraphs, |
|
"tables": tables |
|
} |
|
|
|
return json.dumps(return_dict, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
|
class WhisperTranscriber: |
|
_instance = None |
|
|
|
def __new__(cls): |
|
if cls._instance is None: |
|
import torch |
|
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor |
|
from transformers.pipelines import pipeline |
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
model_id = "openai/whisper-large-v3" |
|
|
|
model = AutoModelForSpeechSeq2Seq.from_pretrained( |
|
model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True |
|
) |
|
model.to(device) |
|
|
|
processor = AutoProcessor.from_pretrained(model_id) |
|
pipe = pipeline( |
|
"automatic-speech-recognition", |
|
model=model, |
|
tokenizer=processor.tokenizer, |
|
feature_extractor=processor.feature_extractor, |
|
torch_dtype=torch_dtype, |
|
device=device, |
|
) |
|
|
|
cls._instance = pipe |
|
return cls._instance |
|
|
|
|
|
@tool(parse_docstring=True) |
|
def transcribe_audio_file(file_path: str) -> str: |
|
""" |
|
Transcribes an audio file to text using OpenAI's Whisper-large-v3 model, caching the model after the first load. |
|
Args: |
|
file_path (str): The path to the audio file to transcribe. |
|
Returns: |
|
str: The transcription of the audio file. |
|
""" |
|
pipe = WhisperTranscriber() |
|
transcription = pipe(file_path)["text"] |
|
return transcription.strip() if transcription else "No transcription available." |
|
|
|
|
|
@tool(parse_docstring=True) |
|
def question_youtube_video(video_url: str, query: str) -> str: |
|
""" |
|
Answers a question about a YouTube video. |
|
The video is streamed and one frame is captured every x seconds, where x is declared in the environment settings. |
|
Captured frames are sent sequentially to a multimodal model to answer the question about the video. |
|
The final answer is aggregated from the answers to each frame. |
|
DOES NOT USE AUDIO! ONLY FRAMES FROM THE VIDEO ARE USED TO ANSWER THE QUESTION. |
|
Args: |
|
video_url (str): The URL of the video to capture frames from. |
|
query (str): The question to answer about the video. |
|
Returns: |
|
str: The answer to the question about the video. |
|
""" |
|
CAPTURE_INTERVAL_SEC = int(os.getenv("CAPTURE_INTERVAL_SEC", 2)) |
|
|
|
|
|
ydl_opts = { |
|
"quiet": True, |
|
"skip_download": True, |
|
"format": "mp4[ext=mp4]+bestaudio/best", |
|
"forceurl": True, |
|
"noplaylist": True, |
|
"writesubtitles": True, |
|
"writeautomaticsub": True, |
|
"subtitlesformat": "vtt", |
|
"subtitleslangs": ['en'], |
|
} |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(video_url, download=False) |
|
assert isinstance(info_dict, dict), "Failed to extract video information. Please check the video URL." |
|
stream_url = info_dict.get("url", None) |
|
|
|
|
|
ffmpeg_cmd = [ |
|
"ffmpeg", |
|
"-i", |
|
stream_url, |
|
"-f", |
|
"matroska", |
|
"-", |
|
] |
|
|
|
process = subprocess.Popen( |
|
ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
|
) |
|
|
|
container = av.open(process.stdout) |
|
stream = container.streams.video[0] |
|
time_base = stream.time_base |
|
if time_base is None: |
|
raise ValueError("Could not determine time base for the video stream. Please check the video URL and try again.") |
|
else: |
|
time_base = float(time_base) |
|
|
|
|
|
if stream_url is None: |
|
raise ValueError("Could not retrieve video stream URL. Please check the video URL and try again.") |
|
else: |
|
image_model = ChatOpenAI( |
|
model="google/gemini-2.5-flash-preview-05-20", |
|
api_key=SecretStr(OPENROUTER_API_KEY), |
|
base_url="https://openrouter.ai/api/v1", |
|
verbose=True |
|
) |
|
image_model_system_prompt = SystemMessage( |
|
content="You will be shown a frame from a video along with a question about that video and an answer based on the previous frames in the video. "\ |
|
"Your task is to analyze the frame and provide an answer to the question using both the current frame and the previous answer. " \ |
|
"If the previous answer is reasonable and the current frame can not answer the question return the previous answer. " \ |
|
"For example, if the question is about the color of a car and the previous answer is 'red' but the current frame shows no car, you should return 'red'. " \ |
|
"If the question is about the greatest number of something in the video, you should return the number counted in the current frame or the previous answer, whichever is greater. " \ |
|
"For example, if the current frame has 5 objects but the previous answer is 10 objects, you should return '10'. " \ |
|
"Be concise and clear in your answers, and do not repeat the question. " \ |
|
) |
|
|
|
|
|
|
|
next_capture_time = 0 |
|
aggregated_answer = '' |
|
response = '' |
|
|
|
answers_list: List[dict] = [] |
|
|
|
for frame in container.decode(stream): |
|
if frame.pts is None: |
|
continue |
|
|
|
timestamp = float(frame.pts * time_base) |
|
if CAPTURE_INTERVAL_SEC is None or timestamp >= next_capture_time: |
|
|
|
buf = io.BytesIO() |
|
img = frame.to_image() |
|
img.save(buf, format="JPEG") |
|
jpeg_bytes = buf.getvalue() |
|
frame_base64 = base64.b64encode(jpeg_bytes).decode("utf-8") |
|
|
|
|
|
msgs: List[BaseMessage] = [image_model_system_prompt] |
|
|
|
frame_query = query |
|
|
|
if aggregated_answer: |
|
frame_query += f"\nPrevious Answer: {aggregated_answer}" |
|
frame_query += "\nProvide a concise answer based on the previous answer and the current frame. " \ |
|
"If the current frame does not answer the question but there is a previous answer, return the previous answer. " \ |
|
"REMEMBER: This question is not about the current frame! It is about the video as a whole. ALWAYS PAY ATTENTION TO THE PREVIOUS ANSWER!" |
|
|
|
msgs.append(HumanMessage(content = [ |
|
{ |
|
"type": "text", |
|
"text": frame_query |
|
}, |
|
{ |
|
"type": "image", |
|
"source_type": "base64", |
|
"mime_type": "image/jpeg", |
|
"data": frame_base64 |
|
} |
|
])) |
|
|
|
response = image_model.invoke(msgs) |
|
|
|
assert isinstance(response.content, str), "The model's response should be a string." |
|
answer = response.content.strip() |
|
answers_list.append({"timestamp": timestamp, "answer": answer}) |
|
if answer: |
|
aggregated_answer = answer |
|
if CAPTURE_INTERVAL_SEC is not None: |
|
next_capture_time += CAPTURE_INTERVAL_SEC |
|
|
|
process.terminate() |
|
|
|
final_answer_model = ChatOpenAI( |
|
model="google/gemini-2.5-flash-preview-05-20", |
|
api_key=SecretStr(OPENROUTER_API_KEY), |
|
base_url="https://openrouter.ai/api/v1", |
|
verbose=True |
|
) |
|
|
|
final_answer_system_message = SystemMessage( |
|
"You are a brilliant assistant who is eager to help and extremely detailed oriented. " \ |
|
"A group of individuals have been asked the same question about a video. " \ |
|
"None of the individuals have seen the entire video. " \ |
|
"Each individual, when asked the question, was provided a frame from the video, as well as the previously reported answer based on the previous frame. " \ |
|
"Your job is to report a final answer for the question about the video. " \ |
|
"Ideally, the final answer has already been reported correctly by the last individual. " \ |
|
"However, this is similar to the game a telephone, where the true answer can become corrupted along the way. " \ |
|
"Assess all of the answers. If you can confirm the final answer is correct, simply return it. " \ |
|
"If you notice that the final answer is incorrect, then identify the correct answer and report that. " \ |
|
"You will also have access to the video title and description, which may help you identify the correct answer. " \ |
|
"Be concise and only respond with the correct final answer!" |
|
) |
|
|
|
answers_list_str = "\n".join([f"Answer {i+1} at {ans['timestamp']:.2f}s: {ans['answer']}" for i, ans in enumerate(answers_list)]) |
|
|
|
final_query = ( |
|
f"Video Title: {info_dict.get('title', 'No title found')}. " |
|
f"Video Description: {info_dict.get('description', 'No description found')}. " |
|
f"Question about video: {query} " |
|
f"Answers provided by individuals: \n{answers_list_str}\n\n " |
|
"Provide a concise final answer to the question about the video based on the previous answers. " |
|
"Include a short explanation of why you chose this answer. " |
|
"Format the answer like so: " |
|
"Explanation: <your explanation here>. " |
|
"Final Answer: <your answer here>. " |
|
) |
|
|
|
|
|
final_msgs = [ |
|
final_answer_system_message, |
|
HumanMessage(content=[ |
|
{ |
|
"type": "text", |
|
"text": final_query |
|
} |
|
]) |
|
] |
|
final_response = final_answer_model.invoke(final_msgs) |
|
assert isinstance(final_response.content, str), "The final model's response should be a string." |
|
final_answer = final_response.content.strip() |
|
|
|
return final_answer |