|
import base64
|
|
import io
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from email.message import Message
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from typing import List
|
|
import av
|
|
import pandas as pd
|
|
import requests
|
|
import yt_dlp
|
|
from bs4 import BeautifulSoup
|
|
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
|
|
from langchain_core.tools import tool
|
|
from langchain_openai import ChatOpenAI
|
|
from langchain_tavily import TavilyExtract, TavilySearch
|
|
from pydantic import SecretStr
|
|
|
|
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "")
|
|
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
|
|
YOUTUBE_FRAME_ASSESSMENT_MODEL = os.getenv("YOUTUBE_FRAME_ASSESSMENT_MODEL", "google/gemini-2.5-flash-preview-05-20")
|
|
YOUTUBE_CONFIRMATION_MODEL = os.getenv("YOUTUBE_CONFIRMATION_MODEL", "google/gemini-2.5-pro-preview")
|
|
|
|
|
|
@tool(parse_docstring=True)
|
|
def download_file_from_url(url: str, filename_override: str|None = None) -> str:
|
|
"""
|
|
Downloads a file from a URL to a directory in the cwd. Prefer to use the filename associated with the URL, but can override if directed to.
|
|
Filename Logic:
|
|
1. If `filename_override` is provided, it is used directly.
|
|
2. Otherwise, the filename is extracted from the 'Content-Disposition' HTTP header
|
|
using Python's `email.message.Message` parser. The result is sanitized.
|
|
3. If no filename is provided via override and none can be determined from
|
|
the header, a ValueError is raised.
|
|
Args:
|
|
url: The URL of the file to download.
|
|
filename_override: Optional. If provided, this exact name is used for the downloaded file. Using the name associated with the URL is recommended (but may require identifying the extension).
|
|
Returns:
|
|
The full path to the downloaded file.
|
|
Raises:
|
|
requests.exceptions.RequestException: For HTTP errors (e.g., 404, network issues).
|
|
IOError: If the file cannot be written.
|
|
ValueError: If no filename can be determined (neither provided via override
|
|
nor found in Content-Disposition header).
|
|
"""
|
|
try:
|
|
with requests.Session() as session:
|
|
with session.get(url, stream=True, allow_redirects=True, timeout=30) as response:
|
|
response.raise_for_status()
|
|
|
|
final_filename = None
|
|
|
|
if filename_override:
|
|
final_filename = filename_override
|
|
print(f"Using provided filename: {final_filename}")
|
|
else:
|
|
content_disposition = response.headers.get('content-disposition')
|
|
if content_disposition:
|
|
msg = Message()
|
|
msg['Content-Disposition'] = content_disposition
|
|
filename_from_header = msg.get_filename()
|
|
|
|
if filename_from_header:
|
|
|
|
final_filename = os.path.basename(filename_from_header)
|
|
print(f"Using filename from Content-Disposition: {final_filename}")
|
|
|
|
if not final_filename:
|
|
raise ValueError(
|
|
"No filename could be determined. "
|
|
"None was provided as an override, and it could not be "
|
|
"extracted from the Content-Disposition header."
|
|
)
|
|
|
|
current_dir = Path.cwd()
|
|
temp_dir = current_dir / "temp_downloads"
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
local_filepath = os.path.join(temp_dir, final_filename)
|
|
|
|
with open(local_filepath, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
|
|
return_str = f"File downloaded successfully. Local File Path: {local_filepath}"
|
|
return return_str
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error during download from {url}: {e}")
|
|
raise
|
|
except IOError as e:
|
|
print(f"Error writing file: {e}")
|
|
raise
|
|
|
|
|
|
@tool(parse_docstring=True)
|
|
def basic_web_search(query: str, search_domains: list[str]|None = None) -> str:
|
|
"""
|
|
Perform a web search using Tavily. Useful for retrieving relevant URLs and content summaries based on a search query.
|
|
The content returned by this tool is limited. For more detailed content extraction, use the `extract_url_content` tool.
|
|
If you would like to limit the search to specific domains, you can pass a comma-separated string of domains (['wikipedia.org', 'example.com']).
|
|
Args:
|
|
query (str): The search query to perform.
|
|
search_domains (None | list[str]): Optional. A list of domains (E.g., ['wikipedia.org', 'example.com']) to restrict the search to. If None, searches across all domains.
|
|
|
|
Returns:
|
|
str: a json formatted string containing the search results, including titles, content snippets, and URLs.
|
|
"""
|
|
search_tool = TavilySearch(
|
|
api_key=SecretStr(TAVILY_API_KEY),
|
|
max_results=5,
|
|
include_raw_content=False,
|
|
|
|
include_domains=search_domains
|
|
)
|
|
|
|
results = search_tool.invoke({"query": query})
|
|
|
|
if results and isinstance(results, dict) and len(results["results"]) > 0:
|
|
return_dict = {
|
|
|
|
"results": []
|
|
}
|
|
for result in results["results"]:
|
|
if "title" in result and "content" in result and result['score'] > 0.25:
|
|
return_dict["results"].append({
|
|
"title": result["title"],
|
|
"url": result["url"],
|
|
"content": result["content"],
|
|
})
|
|
if len(return_dict["results"]) == 0:
|
|
return "No results found. If the query is too specific, try a more general search term."
|
|
return json.dumps(return_dict, indent=2)
|
|
|
|
else:
|
|
return "No results found. If the query is too specific, try a more general search term."
|
|
|
|
@tool(parse_docstring=True)
|
|
def extract_url_content(url_list: list[str]) -> str:
|
|
"""
|
|
Extracts the content from URLs using Tavily's extract tool.
|
|
This tool is useful for retrieving content from web pages.
|
|
This tool will most likely be used after a web search to extract content from the URLs returned by the search.
|
|
|
|
Args:
|
|
url_list (list[str]): The URLs to extract content from.
|
|
|
|
Returns:
|
|
str: The extracted content or an error message if extraction fails.
|
|
"""
|
|
extract_tool = TavilyExtract(api_key=SecretStr(TAVILY_API_KEY))
|
|
extract_results = extract_tool.invoke({'urls': url_list})
|
|
|
|
if extract_results and 'results' in extract_results and len(extract_results['results']) > 0:
|
|
for i, page_content in enumerate(extract_results['results']):
|
|
del extract_results['results'][i]['images']
|
|
|
|
|
|
return json.dumps(extract_results['results'], indent=2)
|
|
else:
|
|
return f"No content could be extracted from the provided URLs: {url_list}"
|
|
|
|
|
|
|
|
def bs_html_parser(url):
|
|
response = requests.get(url)
|
|
|
|
|
|
if response.status_code == 200:
|
|
return BeautifulSoup(response.text, "html.parser")
|
|
else:
|
|
return None
|
|
|
|
def get_table_title(table_tag):
|
|
"""
|
|
Extracts a title for a given table tag.
|
|
It looks for a <caption>, then for the closest preceding <h1>-<h6> tag.
|
|
"""
|
|
title = "Untitled Table"
|
|
|
|
|
|
caption = table_tag.find('caption')
|
|
if caption:
|
|
caption_text = caption.get_text(strip=True)
|
|
if caption_text:
|
|
return caption_text
|
|
|
|
|
|
headings = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']
|
|
|
|
|
|
preceding_headings = table_tag.find_all_previous(headings, limit=1)
|
|
|
|
if preceding_headings:
|
|
heading_tag = preceding_headings[0]
|
|
|
|
|
|
|
|
|
|
|
|
headline_span = heading_tag.find("span", class_="mw-headline")
|
|
if headline_span:
|
|
title_text = headline_span.get_text(strip=True)
|
|
else:
|
|
|
|
|
|
temp_heading_soup = BeautifulSoup(str(heading_tag), 'html.parser')
|
|
temp_heading_tag = temp_heading_soup.find(heading_tag.name)
|
|
|
|
if temp_heading_tag:
|
|
|
|
for span in temp_heading_tag.find_all("span", class_="mw-editsection"):
|
|
span.decompose()
|
|
title_text = temp_heading_tag.get_text(strip=True)
|
|
else:
|
|
|
|
title_text = heading_tag.get_text(strip=True)
|
|
|
|
if title_text:
|
|
title = title_text
|
|
|
|
return title
|
|
|
|
@tool(parse_docstring=True)
|
|
def wikipedia_reader(url: str) -> str:
|
|
"""
|
|
Extracts sections, paragraphs, and tables from a Wikipedia page.
|
|
|
|
Args:
|
|
url (str): The URL of the Wikipedia page to extract content from.
|
|
|
|
Returns:
|
|
str: A JSON string containing sections, paragraphs, and tables.
|
|
"""
|
|
soup = bs_html_parser(url)
|
|
if not soup:
|
|
return ""
|
|
|
|
def extract_links(soup_obj):
|
|
links = []
|
|
for link in soup_obj.find_all('a', href=True):
|
|
href = link.get('href')
|
|
|
|
if href and href.startswith("#") and "#cite_" not in href and len(href) > 1:
|
|
links.append(url+href)
|
|
|
|
|
|
|
|
return links
|
|
|
|
links = extract_links(soup)
|
|
|
|
def extract_paragraphs(soup_obj):
|
|
paragraphs_text = [p.get_text(strip=True) for p in soup_obj.find_all("p")]
|
|
return [p for p in paragraphs_text if p and len(p) > 10]
|
|
|
|
paragraphs = extract_paragraphs(soup)
|
|
|
|
def extract_tables(soup_obj):
|
|
tables_with_titles = []
|
|
for table_tag in soup_obj.find_all("table", {"class": "wikitable"}):
|
|
title = get_table_title(table_tag)
|
|
try:
|
|
|
|
table_html_str = str(table_tag)
|
|
|
|
df_list = pd.read_html(StringIO(table_html_str))
|
|
if df_list:
|
|
df = df_list[0]
|
|
tables_with_titles.append({"title": title, "table_data": df.to_dict(orient='records')})
|
|
else:
|
|
tables_with_titles.append({"title": title, "table_data": None, "error": "pd.read_html returned empty list"})
|
|
except Exception as e:
|
|
|
|
tables_with_titles.append({"title": title, "table_data" : None, "error": str(e)})
|
|
return tables_with_titles
|
|
|
|
tables = extract_tables(soup)
|
|
|
|
return_dict = {
|
|
"sections": links,
|
|
"paragraphs": paragraphs,
|
|
"tables": tables
|
|
}
|
|
|
|
return json.dumps(return_dict, indent=2, ensure_ascii=False)
|
|
|
|
|
|
|
|
|
|
class WhisperTranscriber:
|
|
_instance = None
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
import torch
|
|
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
|
|
from transformers.pipelines import pipeline
|
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu"
|
|
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
|
|
model_id = "openai/whisper-large-v3"
|
|
|
|
model = AutoModelForSpeechSeq2Seq.from_pretrained(
|
|
model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
|
|
)
|
|
model.to(device)
|
|
|
|
processor = AutoProcessor.from_pretrained(model_id)
|
|
pipe = pipeline(
|
|
"automatic-speech-recognition",
|
|
model=model,
|
|
tokenizer=processor.tokenizer,
|
|
feature_extractor=processor.feature_extractor,
|
|
torch_dtype=torch_dtype,
|
|
device=device,
|
|
)
|
|
|
|
cls._instance = pipe
|
|
return cls._instance
|
|
|
|
|
|
@tool(parse_docstring=True)
|
|
def transcribe_audio_file(file_path: str) -> str:
|
|
"""
|
|
Transcribes an audio file to text using OpenAI's Whisper-large-v3 model, caching the model after the first load.
|
|
Args:
|
|
file_path (str): The path to the audio file to transcribe.
|
|
Returns:
|
|
str: The transcription of the audio file.
|
|
"""
|
|
pipe = WhisperTranscriber()
|
|
transcription = pipe(file_path)["text"]
|
|
return transcription.strip() if transcription else "No transcription available."
|
|
|
|
|
|
@tool(parse_docstring=True)
|
|
def question_youtube_video(video_url: str, query: str) -> str:
|
|
"""
|
|
Answers a question about a YouTube video.
|
|
The video is streamed and one frame is captured every x seconds, where x is declared in the environment settings.
|
|
Captured frames are sent sequentially to a multimodal model to answer the question about the video.
|
|
The final answer is aggregated from the answers to each frame.
|
|
DOES NOT USE AUDIO! ONLY FRAMES FROM THE VIDEO ARE USED TO ANSWER THE QUESTION.
|
|
Args:
|
|
video_url (str): The URL of the video to capture frames from.
|
|
query (str): The question to answer about the video.
|
|
Returns:
|
|
str: The answer to the question about the video.
|
|
"""
|
|
CAPTURE_INTERVAL_SEC = int(os.getenv("CAPTURE_INTERVAL_SEC", 2))
|
|
|
|
|
|
ydl_opts = {
|
|
"quiet": True,
|
|
"skip_download": True,
|
|
"format": "mp4[ext=mp4]+bestaudio/best",
|
|
"forceurl": True,
|
|
"noplaylist": True,
|
|
"writesubtitles": True,
|
|
"writeautomaticsub": True,
|
|
"subtitlesformat": "vtt",
|
|
"subtitleslangs": ['en'],
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info_dict = ydl.extract_info(video_url, download=False)
|
|
assert isinstance(info_dict, dict), "Failed to extract video information. Please check the video URL."
|
|
stream_url = info_dict.get("url", None)
|
|
|
|
|
|
ffmpeg_cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
stream_url,
|
|
"-f",
|
|
"matroska",
|
|
"-",
|
|
]
|
|
|
|
process = subprocess.Popen(
|
|
ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
|
|
container = av.open(process.stdout)
|
|
stream = container.streams.video[0]
|
|
time_base = stream.time_base
|
|
if time_base is None:
|
|
raise ValueError("Could not determine time base for the video stream. Please check the video URL and try again.")
|
|
else:
|
|
time_base = float(time_base)
|
|
|
|
|
|
if stream_url is None:
|
|
raise ValueError("Could not retrieve video stream URL. Please check the video URL and try again.")
|
|
else:
|
|
image_model = ChatOpenAI(
|
|
model="google/gemini-2.5-flash-preview-05-20",
|
|
api_key=SecretStr(OPENROUTER_API_KEY),
|
|
base_url="https://openrouter.ai/api/v1",
|
|
verbose=True
|
|
)
|
|
image_model_system_prompt = SystemMessage(
|
|
content="You will be shown a frame from a video along with a question about that video and an answer based on the previous frames in the video. "\
|
|
"Your task is to analyze the frame and provide an answer to the question using both the current frame and the previous answer. " \
|
|
"If the previous answer is reasonable and the current frame can not answer the question return the previous answer. " \
|
|
"For example, if the question is about the color of a car and the previous answer is 'red' but the current frame shows no car, you should return 'red'. " \
|
|
"If the question is about the greatest number of something in the video, you should return the number counted in the current frame or the previous answer, whichever is greater. " \
|
|
"For example, if the current frame has 5 objects but the previous answer is 10 objects, you should return '10'. " \
|
|
"Be concise and clear in your answers, and do not repeat the question. " \
|
|
)
|
|
|
|
|
|
|
|
next_capture_time = 0
|
|
aggregated_answer = ''
|
|
response = ''
|
|
|
|
answers_list: List[dict] = []
|
|
|
|
for frame in container.decode(stream):
|
|
if frame.pts is None:
|
|
continue
|
|
|
|
timestamp = float(frame.pts * time_base)
|
|
if CAPTURE_INTERVAL_SEC is None or timestamp >= next_capture_time:
|
|
|
|
buf = io.BytesIO()
|
|
img = frame.to_image()
|
|
img.save(buf, format="JPEG")
|
|
jpeg_bytes = buf.getvalue()
|
|
frame_base64 = base64.b64encode(jpeg_bytes).decode("utf-8")
|
|
|
|
|
|
msgs: List[BaseMessage] = [image_model_system_prompt]
|
|
|
|
frame_query = query
|
|
|
|
if aggregated_answer:
|
|
frame_query += f"\nPrevious Answer: {aggregated_answer}"
|
|
frame_query += "\nProvide a concise answer based on the previous answer and the current frame. " \
|
|
"If the current frame does not answer the question but there is a previous answer, return the previous answer. " \
|
|
"REMEMBER: This question is not about the current frame! It is about the video as a whole. ALWAYS PAY ATTENTION TO THE PREVIOUS ANSWER!"
|
|
|
|
msgs.append(HumanMessage(content = [
|
|
{
|
|
"type": "text",
|
|
"text": frame_query
|
|
},
|
|
{
|
|
"type": "image",
|
|
"source_type": "base64",
|
|
"mime_type": "image/jpeg",
|
|
"data": frame_base64
|
|
}
|
|
]))
|
|
|
|
response = image_model.invoke(msgs)
|
|
|
|
assert isinstance(response.content, str), "The model's response should be a string."
|
|
answer = response.content.strip()
|
|
answers_list.append({"timestamp": timestamp, "answer": answer})
|
|
if answer:
|
|
aggregated_answer = answer
|
|
if CAPTURE_INTERVAL_SEC is not None:
|
|
next_capture_time += CAPTURE_INTERVAL_SEC
|
|
|
|
process.terminate()
|
|
|
|
final_answer_model = ChatOpenAI(
|
|
model="google/gemini-2.5-pro-preview",
|
|
api_key=SecretStr(OPENROUTER_API_KEY),
|
|
base_url="https://openrouter.ai/api/v1",
|
|
verbose=True
|
|
)
|
|
|
|
final_answer_system_message = SystemMessage(
|
|
"You are a brilliant assistant who is eager to help and extremely detailed oriented. " \
|
|
"A group of individuals have been asked the same question about a video. " \
|
|
"None of the individuals have seen the entire video. " \
|
|
"Each individual, when asked the question, was provided a frame from the video, as well as the previously reported answer based on the previous frame. " \
|
|
"Your job is to report a final answer for the question about the video. " \
|
|
"Ideally, the final answer has already been reported correctly by the last individual. " \
|
|
"However, this is similar to the game a telephone, where the true answer can become corrupted along the way. " \
|
|
"Assess all of the answers. If you can confirm the final answer is correct, simply return it. " \
|
|
"If you notice that the final answer is incorrect, then identify the correct answer and report that. " \
|
|
"You will also have access to the video title and description, which may help you identify the correct answer. " \
|
|
"Be concise and only respond with the correct final answer!"
|
|
)
|
|
|
|
answers_list_str = "\n".join([f"Answer {i+1} at {ans['timestamp']:.2f}s: {ans['answer']}" for i, ans in enumerate(answers_list)])
|
|
|
|
final_query = (
|
|
f"Video Title: {info_dict.get('title', 'No title found')}. "
|
|
f"Video Description: {info_dict.get('description', 'No description found')}. "
|
|
f"Question about video: {query} "
|
|
f"Answers provided by individuals: \n{answers_list_str}\n\n "
|
|
"Provide a concise final answer to the question about the video based on the previous answers. "
|
|
"Include a short explanation of why you chose this answer. "
|
|
"Format the answer like so: "
|
|
"Explanation: <your explanation here>. "
|
|
"Final Answer: <your answer here>. "
|
|
)
|
|
|
|
|
|
final_msgs = [
|
|
final_answer_system_message,
|
|
HumanMessage(content=[
|
|
{
|
|
"type": "text",
|
|
"text": final_query
|
|
}
|
|
])
|
|
]
|
|
final_response = final_answer_model.invoke(final_msgs)
|
|
assert isinstance(final_response.content, str), "The final model's response should be a string."
|
|
final_answer = final_response.content.strip()
|
|
|
|
return final_answer |