webscarper / app.py
mobenta's picture
Update app.py
4146723 verified
raw
history blame
8.79 kB
import subprocess
import sys
# Ensure compatible versions of httpx and httpcore are installed
subprocess.check_call([sys.executable, "-m", "pip", "install", "httpx==0.18.2", "httpcore==0.13.6"])
import gradio as gr
import requests
import os
import re
import yt_dlp
import logging
# Configure logging for debugging purposes
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Fetch the keys from the environment variable and convert them into a list
YOUTUBE_API_KEYS = os.getenv("YOUTUBE_API_KEYS")
if YOUTUBE_API_KEYS:
YOUTUBE_API_KEYS = [key.strip() for key in YOUTUBE_API_KEYS.split(",")]
else:
raise ValueError("API keys not found. Make sure the secret 'YOUTUBE_API_KEYS' is set.")
# Index to keep track of which API key to use
key_index = 0
def get_api_key():
global key_index
# Get the current API key and increment the index
api_key = YOUTUBE_API_KEYS[key_index]
key_index = (key_index + 1) % len(YOUTUBE_API_KEYS) # Rotate to the next key
return api_key
# Function to search YouTube videos using yt-dlp for better reliability
def youtube_search(query, max_results=50):
ydl_opts = {
'quiet': False, # Set to False to get more detailed output from yt-dlp
'extract_flat': 'in_playlist',
'logger': logging.getLogger(), # Use the logging module to capture yt-dlp logs
'simulate': True,
'noplaylist': True, # To avoid playlist entries
}
search_url = f"ytsearch{max_results}:{query}"
logging.debug(f"Starting YouTube search for query: {query}")
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
result = ydl.extract_info(search_url, download=False)
gallery_items = []
if 'entries' in result:
logging.debug(f"Number of entries found: {len(result['entries'])}")
for entry in result['entries']:
video_id = entry.get('id')
thumbnail_url = entry.get('thumbnail') if entry.get('thumbnail') else "https://via.placeholder.com/150"
video_title = entry.get('title', "Unknown Title")
if video_id:
gallery_items.append((thumbnail_url, video_id, video_title))
logging.debug(f"Added video: ID={video_id}, Thumbnail={thumbnail_url}, Title={video_title}")
else:
logging.debug(f"Missing video ID for entry: {entry}")
else:
logging.warning("No entries found in search result.")
return gallery_items, ""
except Exception as e:
error_message = f"Error during YouTube yt-dlp request: {e}"
logging.error(error_message)
return [], error_message
# Function to search YouTube videos using the API with pagination to get up to 1,000 results
def youtube_api_search(query, max_results=1000):
search_url = "https://www.googleapis.com/youtube/v3/search"
all_results = []
params = {
"part": "snippet",
"q": query,
"type": "video",
"maxResults": 50 # YouTube API allows a maximum of 50 per request
}
try:
while len(all_results) < max_results:
params["key"] = get_api_key() # Get the current API key
response = requests.get(search_url, params=params)
# If we get a bad response, try the next API key
if response.status_code == 403 or response.status_code == 429:
logging.debug(f"Quota exceeded or forbidden for API key. Trying next key...")
continue
response.raise_for_status() # Raise an error for other bad responses (4xx or 5xx)
results = response.json().get("items", [])
all_results.extend(results)
# If there is no nextPageToken, we've reached the end
if 'nextPageToken' not in response.json() or len(all_results) >= max_results:
break
# Update params with the nextPageToken to get the next batch of results
params['pageToken'] = response.json()['nextPageToken']
# Create a list of tuples with thumbnail URL, video ID, and video title
gallery_items = [
(
result["snippet"].get("thumbnails", {}).get("medium", {}).get("url", "https://via.placeholder.com/150"),
result["id"]["videoId"],
result["snippet"].get("title", "No title available")
) for result in all_results
]
return gallery_items
except requests.exceptions.RequestException as e:
# Print the error message to help debug issues
logging.error(f"Error during YouTube API request: {e}")
return [], f"Error retrieving video results: {str(e)}"
# Function to display the video using the video URL
def show_video(video_url):
# Regular expression to extract the YouTube video ID from the URL
video_id = None
patterns = [
r"youtube\.com/watch\?v=([^&?\/]+)",
r"youtube\.com/embed/([^&?\/]+)",
r"youtube\.com/v/([^&?\/]+)",
r"youtu\.be/([^&?\/]+)"
]
for pattern in patterns:
match = re.search(pattern, video_url)
if match:
video_id = match.group(1)
logging.debug(f"Extracted video ID: {video_id}")
break
# If no video ID is found, return an error message
if not video_id:
logging.error("Invalid YouTube URL. Please enter a valid YouTube video link.")
return "Invalid YouTube URL. Please enter a valid YouTube video link."
# Create the embed URL
embed_url = f"https://www.youtube.com/embed/{video_id}"
logging.debug(f"Embed URL generated: {embed_url}")
# Return an iframe with the video
html_code = f'''
<iframe width="560" height="315" src="{embed_url}"
frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowfullscreen></iframe>
'''
return html_code
# Create the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("## YouTube Video Search, Selection, and Playback")
with gr.Row():
with gr.Column(scale=3):
search_query_input = gr.Textbox(label="Search YouTube", placeholder="Enter your search query here")
search_button = gr.Button("Search")
search_output = gr.Gallery(label="Search Results", columns=5, height="1500px")
error_output = gr.Textbox(label="Error Message", interactive=False, visible=False)
with gr.Column(scale=2):
selected_video_link = gr.Textbox(label="Selected Video Link", interactive=False)
play_video_button = gr.Button("Play Video")
video_output = gr.HTML(label="Video Player")
# Define search button behavior
def update_search_results(query):
gallery_items, error_message = youtube_search(query)
if not gallery_items: # If yt-dlp search fails or returns empty, fall back to YouTube API
gallery_items = youtube_api_search(query)
if error_message:
return [], error_message, gr.update(visible=True)
# Display videos with thumbnails, video IDs, and titles
gallery_items_display = [
(
item[0],
f"""
<div style='display: flex; align-items: center;'>
<img src='{item[0]}' style='width: 150px; height: auto; margin-right: 10px;'>
<div style='flex-grow: 1;'>
<strong>{item[2]}</strong><br>
Video ID: {item[1]}
</div>
</div>
"""
) for item in gallery_items
]
return gallery_items_display, "", gr.update(visible=False)
# Update the selected video link field when a video is clicked in the gallery
def on_video_select(evt: gr.SelectData):
# Extract the video ID from the event value, which is a dictionary containing details of the selected item
selected_video_id = evt.value["caption"].split('(')[-1][:-1] # Extract video ID from caption
video_url = f"https://www.youtube.com/watch?v={selected_video_id}"
logging.debug(f"Video selected: {video_url}")
return video_url
# Play the video when the Play Video button is clicked
def play_video(video_url):
logging.debug(f"Playing video with URL: {video_url}")
return show_video(video_url)
search_button.click(update_search_results, inputs=search_query_input, outputs=[search_output, error_output, error_output])
search_output.select(on_video_select, inputs=None, outputs=selected_video_link)
play_video_button.click(play_video, inputs=selected_video_link, outputs=video_output)
# Launch the Gradio interface
demo.launch()