|
import asyncio |
|
from pathlib import Path |
|
import sys |
|
import time |
|
from typing import List |
|
|
|
|
|
sys.path.append(str(Path(__file__).parent.parent)) |
|
|
|
from utils.youtube_utils import get_transcript, get_playlist_video_ids |
|
from utils.content_generator import ContentGenerator, ContentRequest |
|
|
|
PLAYLIST_URL = "https://www.youtube.com/playlist?list=PLd7-bHaQwnthaNDpZ32TtYONGVk95-fhF" |
|
MAX_CONCURRENT = 3 |
|
RETRY_DELAY = 65 |
|
|
|
async def process_video(video_id: str, generator: ContentGenerator, retry_count: int = 0) -> str: |
|
"""Process a single video and return the formatted result.""" |
|
try: |
|
print(f"Processing video {video_id}...") |
|
|
|
|
|
transcript = get_transcript(video_id) |
|
if not transcript: |
|
print(f"No transcript available for {video_id}") |
|
return "" |
|
|
|
|
|
request = ContentRequest("titles_and_thumbnails") |
|
result = await generator.generate_content(request, transcript) |
|
return f"Video ID: {video_id}\n{result}\n{'='*50}\n" |
|
|
|
except Exception as e: |
|
if "rate_limit_error" in str(e) and retry_count < 3: |
|
print(f"Rate limit hit for {video_id}, waiting {RETRY_DELAY}s before retry {retry_count + 1}") |
|
await asyncio.sleep(RETRY_DELAY) |
|
return await process_video(video_id, generator, retry_count + 1) |
|
print(f"Error processing {video_id}: {e}") |
|
return "" |
|
|
|
async def process_batch(video_ids: List[str], generator: ContentGenerator) -> List[str]: |
|
"""Process a batch of videos with rate limiting.""" |
|
tasks = [process_video(video_id, generator) for video_id in video_ids] |
|
return await asyncio.gather(*tasks) |
|
|
|
async def process_playlist(): |
|
"""Process all videos in playlist with batching.""" |
|
generator = ContentGenerator() |
|
output_file = Path("output/playlist-titles-thumbnails.txt") |
|
|
|
|
|
print("Getting videos from playlist...") |
|
video_ids = get_playlist_video_ids(PLAYLIST_URL) |
|
print(f"Found {len(video_ids)} videos") |
|
|
|
|
|
results = [] |
|
for i in range(0, len(video_ids), MAX_CONCURRENT): |
|
batch = video_ids[i:i + MAX_CONCURRENT] |
|
print(f"\nProcessing batch {i//MAX_CONCURRENT + 1}") |
|
batch_results = await process_batch(batch, generator) |
|
results.extend(batch_results) |
|
|
|
|
|
if i + MAX_CONCURRENT < len(video_ids): |
|
delay = 5 |
|
print(f"Waiting {delay}s before next batch...") |
|
await asyncio.sleep(delay) |
|
|
|
|
|
results = [r for r in results if r] |
|
output_file.parent.mkdir(parents=True, exist_ok=True) |
|
output_file.write_text("\n".join(results)) |
|
print(f"\nResults written to {output_file}") |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(process_playlist()) |