import subprocess import sys # Ensure compatible versions of httpx and httpcore are installed subprocess.check_call([sys.executable, "-m", "pip", "install", "httpx==0.18.2", "httpcore==0.13.6"]) import gradio as gr import requests import os import re import yt_dlp import logging # Configure logging for debugging purposes logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # Fetch the keys from the environment variable and convert them into a list YOUTUBE_API_KEYS = os.getenv("YOUTUBE_API_KEYS") if YOUTUBE_API_KEYS: YOUTUBE_API_KEYS = [key.strip() for key in YOUTUBE_API_KEYS.split(",")] else: raise ValueError("API keys not found. Make sure the secret 'YOUTUBE_API_KEYS' is set.") # Index to keep track of which API key to use key_index = 0 def get_api_key(): global key_index # Get the current API key and increment the index api_key = YOUTUBE_API_KEYS[key_index] key_index = (key_index + 1) % len(YOUTUBE_API_KEYS) # Rotate to the next key return api_key # Function to search YouTube videos using yt-dlp for better reliability def youtube_search(query, max_results=50): ydl_opts = { 'quiet': False, # Set to False to get more detailed output from yt-dlp 'extract_flat': 'in_playlist', 'logger': logging.getLogger(), # Use the logging module to capture yt-dlp logs 'simulate': True, 'noplaylist': True, # To avoid playlist entries } search_url = f"ytsearch{max_results}:{query}" logging.debug(f"Starting YouTube search for query: {query}") try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: result = ydl.extract_info(search_url, download=False) gallery_items = [] if 'entries' in result: logging.debug(f"Number of entries found: {len(result['entries'])}") for entry in result['entries']: video_id = entry.get('id') thumbnail_url = entry.get('thumbnail') if entry.get('thumbnail') else "https://via.placeholder.com/150" video_title = entry.get('title', "Unknown Title") if video_id: gallery_items.append((thumbnail_url, video_id, video_title)) logging.debug(f"Added video: ID={video_id}, Thumbnail={thumbnail_url}, Title={video_title}") else: logging.debug(f"Missing video ID for entry: {entry}") else: logging.warning("No entries found in search result.") return gallery_items, "" except Exception as e: error_message = f"Error during YouTube yt-dlp request: {e}" logging.error(error_message) return [], error_message # Function to search YouTube videos using the API with pagination to get up to 1,000 results def youtube_api_search(query, max_results=1000): search_url = "https://www.googleapis.com/youtube/v3/search" all_results = [] params = { "part": "snippet", "q": query, "type": "video", "maxResults": 50 # YouTube API allows a maximum of 50 per request } try: while len(all_results) < max_results: params["key"] = get_api_key() # Get the current API key response = requests.get(search_url, params=params) # If we get a bad response, try the next API key if response.status_code == 403 or response.status_code == 429: logging.debug(f"Quota exceeded or forbidden for API key. Trying next key...") continue response.raise_for_status() # Raise an error for other bad responses (4xx or 5xx) results = response.json().get("items", []) all_results.extend(results) # If there is no nextPageToken, we've reached the end if 'nextPageToken' not in response.json() or len(all_results) >= max_results: break # Update params with the nextPageToken to get the next batch of results params['pageToken'] = response.json()['nextPageToken'] # Create a list of tuples with thumbnail URL and video ID as the caption gallery_items = [ (result["snippet"]["thumbnails"]["medium"]["url"], result["id"]["videoId"]) for result in all_results ] return gallery_items except requests.exceptions.RequestException as e: # Print the error message to help debug issues logging.error(f"Error during YouTube API request: {e}") return [], f"Error retrieving video results: {str(e)}" # Function to display the video using the video URL def show_video(video_url): # Regular expression to extract the YouTube video ID from the URL video_id = None patterns = [ r"youtube\.com/watch\?v=([^&?\/]+)", r"youtube\.com/embed/([^&?\/]+)", r"youtube\.com/v/([^&?\/]+)", r"youtu\.be/([^&?\/]+)" ] for pattern in patterns: match = re.search(pattern, video_url) if match: video_id = match.group(1) logging.debug(f"Extracted video ID: {video_id}") break # If no video ID is found, return an error message if not video_id: logging.error("Invalid YouTube URL. Please enter a valid YouTube video link.") return "Invalid YouTube URL. Please enter a valid YouTube video link." # Create the embed URL embed_url = f"https://www.youtube.com/embed/{video_id}" logging.debug(f"Embed URL generated: {embed_url}") # Return an iframe with the video html_code = f''' ''' return html_code # Create the Gradio interface with gr.Blocks() as demo: gr.Markdown("## YouTube Video Search, Selection, and Playback") with gr.Row(): with gr.Column(scale=3): search_query_input = gr.Textbox(label="Search YouTube", placeholder="Enter your search query here") search_button = gr.Button("Search") search_output = gr.Gallery(label="Search Results", columns=5, height="1500px") error_output = gr.Textbox(label="Error Message", interactive=False, visible=False) with gr.Column(scale=2): selected_video_link = gr.Textbox(label="Selected Video Link", interactive=False) play_video_button = gr.Button("Play Video") video_output = gr.HTML(label="Video Player") # Define search button behavior def update_search_results(query): gallery_items, error_message = youtube_search(query) if not gallery_items: # If yt-dlp search fails or returns empty, fall back to YouTube API gallery_items = youtube_api_search(query) if error_message: return [], error_message, gr.update(visible=True) # Display videos even if the title or thumbnail is missing by using placeholders gallery_items_display = [(item[0], item[1]) for item in gallery_items] # Show thumbnail and video ID only return gallery_items_display, "", gr.update(visible=False) # Update the selected video link field when a video is clicked in the gallery def on_video_select(evt: gr.SelectData): # Extract the video ID from the event value, which is a dictionary containing details of the selected item selected_video_id = evt.value["caption"] video_url = f"https://www.youtube.com/watch?v={selected_video_id}" logging.debug(f"Video selected: {video_url}") return video_url # Play the video when the Play Video button is clicked def play_video(video_url): logging.debug(f"Playing video with URL: {video_url}") return show_video(video_url) search_button.click(update_search_results, inputs=search_query_input, outputs=[search_output, error_output, error_output]) search_output.select(on_video_select, inputs=None, outputs=selected_video_link) play_video_button.click(play_video, inputs=selected_video_link, outputs=video_output) # Launch the Gradio interface demo.launch()