mobenta commited on
Commit
e995928
·
verified ·
1 Parent(s): d3a4b08

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +67 -2
app.py CHANGED
@@ -6,6 +6,7 @@ subprocess.check_call([sys.executable, "-m", "pip", "install", "httpx==0.18.2",
6
 
7
  import gradio as gr
8
  import requests
 
9
  import re
10
  import yt_dlp
11
  import logging
@@ -13,6 +14,24 @@ import logging
13
  # Configure logging for debugging purposes
14
  logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  # Function to search YouTube videos using yt-dlp for better reliability
17
  def youtube_search(query, max_results=50):
18
  ydl_opts = {
@@ -51,6 +70,50 @@ def youtube_search(query, max_results=50):
51
  logging.error(error_message)
52
  return [], error_message
53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  # Function to display the video using the video URL
55
  def show_video(video_url):
56
  # Regular expression to extract the YouTube video ID from the URL
@@ -105,6 +168,8 @@ with gr.Blocks() as demo:
105
  # Define search button behavior
106
  def update_search_results(query):
107
  gallery_items, error_message = youtube_search(query)
 
 
108
  if error_message:
109
  return [], error_message, gr.update(visible=True)
110
  # Display videos even if the title or thumbnail is missing by using placeholders
@@ -118,7 +183,7 @@ with gr.Blocks() as demo:
118
  video_url = f"https://www.youtube.com/watch?v={selected_video_id}"
119
  logging.debug(f"Video selected: {video_url}")
120
  return video_url
121
-
122
  # Play the video when the Play Video button is clicked
123
  def play_video(video_url):
124
  logging.debug(f"Playing video with URL: {video_url}")
@@ -129,4 +194,4 @@ with gr.Blocks() as demo:
129
  play_video_button.click(play_video, inputs=selected_video_link, outputs=video_output)
130
 
131
  # Launch the Gradio interface
132
- demo.launch()
 
6
 
7
  import gradio as gr
8
  import requests
9
+ import os
10
  import re
11
  import yt_dlp
12
  import logging
 
14
  # Configure logging for debugging purposes
15
  logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
16
 
17
+ # Fetch the keys from the environment variable and convert them into a list
18
+ YOUTUBE_API_KEYS = os.getenv("YOUTUBE_API_KEYS")
19
+
20
+ if YOUTUBE_API_KEYS:
21
+ YOUTUBE_API_KEYS = [key.strip() for key in YOUTUBE_API_KEYS.split(",")]
22
+ else:
23
+ raise ValueError("API keys not found. Make sure the secret 'YOUTUBE_API_KEYS' is set.")
24
+
25
+ # Index to keep track of which API key to use
26
+ key_index = 0
27
+
28
+ def get_api_key():
29
+ global key_index
30
+ # Get the current API key and increment the index
31
+ api_key = YOUTUBE_API_KEYS[key_index]
32
+ key_index = (key_index + 1) % len(YOUTUBE_API_KEYS) # Rotate to the next key
33
+ return api_key
34
+
35
  # Function to search YouTube videos using yt-dlp for better reliability
36
  def youtube_search(query, max_results=50):
37
  ydl_opts = {
 
70
  logging.error(error_message)
71
  return [], error_message
72
 
73
+ # Function to search YouTube videos using the API with pagination to get up to 1,000 results
74
+ def youtube_api_search(query, max_results=1000):
75
+ search_url = "https://www.googleapis.com/youtube/v3/search"
76
+ all_results = []
77
+ params = {
78
+ "part": "snippet",
79
+ "q": query,
80
+ "type": "video",
81
+ "maxResults": 50 # YouTube API allows a maximum of 50 per request
82
+ }
83
+
84
+ try:
85
+ while len(all_results) < max_results:
86
+ params["key"] = get_api_key() # Get the current API key
87
+ response = requests.get(search_url, params=params)
88
+
89
+ # If we get a bad response, try the next API key
90
+ if response.status_code == 403 or response.status_code == 429:
91
+ logging.debug(f"Quota exceeded or forbidden for API key. Trying next key...")
92
+ continue
93
+ response.raise_for_status() # Raise an error for other bad responses (4xx or 5xx)
94
+
95
+ results = response.json().get("items", [])
96
+ all_results.extend(results)
97
+
98
+ # If there is no nextPageToken, we've reached the end
99
+ if 'nextPageToken' not in response.json() or len(all_results) >= max_results:
100
+ break
101
+
102
+ # Update params with the nextPageToken to get the next batch of results
103
+ params['pageToken'] = response.json()['nextPageToken']
104
+
105
+ # Create a list of tuples with thumbnail URL and video ID as the caption
106
+ gallery_items = [
107
+ (result["snippet"]["thumbnails"]["medium"]["url"], result["id"]["videoId"]) for result in all_results
108
+ ]
109
+
110
+ return gallery_items
111
+
112
+ except requests.exceptions.RequestException as e:
113
+ # Print the error message to help debug issues
114
+ logging.error(f"Error during YouTube API request: {e}")
115
+ return [], f"Error retrieving video results: {str(e)}"
116
+
117
  # Function to display the video using the video URL
118
  def show_video(video_url):
119
  # Regular expression to extract the YouTube video ID from the URL
 
168
  # Define search button behavior
169
  def update_search_results(query):
170
  gallery_items, error_message = youtube_search(query)
171
+ if not gallery_items: # If yt-dlp search fails or returns empty, fall back to YouTube API
172
+ gallery_items = youtube_api_search(query)
173
  if error_message:
174
  return [], error_message, gr.update(visible=True)
175
  # Display videos even if the title or thumbnail is missing by using placeholders
 
183
  video_url = f"https://www.youtube.com/watch?v={selected_video_id}"
184
  logging.debug(f"Video selected: {video_url}")
185
  return video_url
186
+
187
  # Play the video when the Play Video button is clicked
188
  def play_video(video_url):
189
  logging.debug(f"Playing video with URL: {video_url}")
 
194
  play_video_button.click(play_video, inputs=selected_video_link, outputs=video_output)
195
 
196
  # Launch the Gradio interface
197
+ demo.launch()