|
|
|
""" |
|
Generates slide markdown plus TTS audio and images using Gemini models. |
|
Functions exposed: |
|
generate_slideshow_with_audio(topic, api_key) -> (list_of_slide_markdown, list_of_audio_paths, list_of_image_paths) |
|
""" |
|
|
|
import asyncio |
|
import atexit |
|
import datetime |
|
import os |
|
import re |
|
import shutil |
|
import struct |
|
import tempfile |
|
from pathlib import Path |
|
from io import BytesIO |
|
from typing import Dict, List, Optional |
|
|
|
from google import genai |
|
from google.genai import types |
|
from PIL import Image |
|
|
|
|
|
try: |
|
from deepgram import DeepgramClient |
|
|
|
try: |
|
from deepgram.clients.speak.v1.speak_client import SpeakOptions |
|
except ImportError: |
|
try: |
|
from deepgram.clients.speak.v1 import SpeakOptions |
|
except ImportError: |
|
from deepgram.clients.speak import SpeakOptions |
|
DEEPGRAM_AVAILABLE = True |
|
except ImportError: |
|
print("Deepgram SDK not available. Install with 'pip install deepgram-sdk'") |
|
DEEPGRAM_AVAILABLE = False |
|
|
|
|
|
DEEPGRAM_KEY = os.environ.get("DEEPGRAM_KEY") |
|
|
|
|
|
_temp_dirs: Dict[str, str] = {} |
|
|
|
def get_temp_dir(session_id: str) -> str: |
|
"""Get or create a temporary directory for a user session""" |
|
if session_id not in _temp_dirs: |
|
temp_dir = tempfile.mkdtemp(prefix=f"slideshow_{session_id}_") |
|
_temp_dirs[session_id] = temp_dir |
|
return _temp_dirs[session_id] |
|
|
|
def cleanup_temp_dirs(): |
|
"""Clean up all temporary directories on exit""" |
|
for session_id, temp_dir in _temp_dirs.items(): |
|
if os.path.exists(temp_dir): |
|
print(f"Cleaning up temporary directory for session {session_id}") |
|
shutil.rmtree(temp_dir, ignore_errors=True) |
|
_temp_dirs.clear() |
|
|
|
|
|
atexit.register(cleanup_temp_dirs) |
|
|
|
|
|
|
|
def _convert_to_wav(audio_data: bytes, mime_type: str) -> bytes: |
|
"""Ensure Gemini's raw audio is saved as a proper WAV container.""" |
|
params = _parse_audio_mime_type(mime_type) |
|
bits_per_sample = params["bits_per_sample"] |
|
sample_rate = params["rate"] |
|
num_channels, data_size = 1, len(audio_data) |
|
bytes_per_sample = bits_per_sample // 8 |
|
block_align = num_channels * bytes_per_sample |
|
byte_rate = sample_rate * block_align |
|
chunk_size = 36 + data_size |
|
|
|
header = struct.pack( |
|
"<4sI4s4sIHHIIHH4sI", |
|
b"RIFF", |
|
chunk_size, |
|
b"WAVE", |
|
b"fmt ", |
|
16, |
|
1, |
|
num_channels, |
|
sample_rate, |
|
byte_rate, |
|
block_align, |
|
bits_per_sample, |
|
b"data", |
|
data_size, |
|
) |
|
return header + audio_data |
|
|
|
|
|
def _parse_audio_mime_type(mime_type: str) -> dict[str, int]: |
|
"""Extract sampleβrate & bitβdepth from something like audio/L16;rate=24000;β¦""" |
|
bits_per_sample, rate = 16, 24_000 |
|
for part in mime_type.split(";"): |
|
part = part.strip().lower() |
|
if part.startswith("rate="): |
|
rate = int(part.split("=", 1)[1]) |
|
elif part.startswith("audio/l"): |
|
bits_per_sample = int(part.split("l", 1)[1]) |
|
return {"bits_per_sample": bits_per_sample, "rate": rate} |
|
|
|
|
|
|
|
import json |
|
import re as _re |
|
|
|
def _parse_slides_json(response_text: str) -> list[dict]: |
|
"""Parse the JSON response from Gemini and extract slides data.""" |
|
try: |
|
|
|
json_match = re.search(r'```json\s*(.+?)\s*```', response_text, re.DOTALL) |
|
if json_match: |
|
json_text = json_match.group(1).strip() |
|
else: |
|
json_text = response_text.strip() |
|
|
|
|
|
json_text = json_text.replace('\t', ' ') |
|
|
|
|
|
slides_data = json.loads(json_text) |
|
return slides_data |
|
except json.JSONDecodeError as e: |
|
print(f"Error parsing JSON: {e}\nAttempting fallback parsing...") |
|
return _fallback_parse(response_text) |
|
|
|
def _fallback_parse(text: str) -> list[dict]: |
|
"""Fallback method to extract slides if JSON parsing fails.""" |
|
|
|
slides = [] |
|
|
|
|
|
content_matches = re.findall(r'"slide_content"\s*:\s*"([^"]+)"', text, re.DOTALL) |
|
notes_matches = re.findall(r'"speaker_notes"\s*:\s*"([^"]+)"', text, re.DOTALL) |
|
|
|
|
|
for i in range(min(len(content_matches), len(notes_matches))): |
|
slides.append({ |
|
"slide_content": content_matches[i].replace('\\n', '\n'), |
|
"speaker_notes": notes_matches[i] |
|
}) |
|
|
|
return slides if slides else _extract_markdown_slides(text) |
|
|
|
def _extract_markdown_slides(markdown: str) -> list[dict]: |
|
"""Extract slides from traditional markdown format (for backwards compatibility).""" |
|
raw = re.split(r"^##\s+", markdown, flags=re.MULTILINE) |
|
md_slides = [s for s in raw if s.strip()] |
|
|
|
result = [] |
|
for slide in md_slides: |
|
|
|
if slide.lstrip().startswith("# "): |
|
content = slide.lstrip() |
|
else: |
|
content = f"## {slide}" |
|
|
|
|
|
m = re.search(r"Speaker Notes:\s*(.+)", content, flags=re.I | re.S) |
|
notes = "" |
|
if m: |
|
notes = m.group(1).strip() |
|
|
|
content = content.split("Speaker Notes:")[0].strip() |
|
|
|
result.append({"slide_content": content, "speaker_notes": notes}) |
|
|
|
return result |
|
|
|
|
|
|
|
async def _generate_image(prompt: str, output_path: Path, api_key: str) -> str: |
|
"""Generate an image using Gemini Imagen model and save it to the specified path.""" |
|
client = genai.Client(api_key=api_key) |
|
|
|
try: |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
result = await loop.run_in_executor( |
|
None, |
|
lambda: client.models.generate_images( |
|
model="models/imagen-3.0-generate-002", |
|
prompt=prompt, |
|
config=dict( |
|
number_of_images=1, |
|
output_mime_type="image/jpeg", |
|
person_generation="ALLOW_ADULT", |
|
aspect_ratio="16:9", |
|
), |
|
) |
|
) |
|
|
|
if not result.generated_images or len(result.generated_images) == 0: |
|
print("No images generated.") |
|
return "" |
|
|
|
|
|
image = Image.open(BytesIO(result.generated_images[0].image.image_bytes)) |
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
image.save(output_path) |
|
return str(output_path) |
|
except Exception as e: |
|
print(f"Error generating image: {e}") |
|
return "" |
|
|
|
def _generate_slideshow_markdown(topic: str, api_key: str) -> str: |
|
"""Ask Gemini 2.5 Flash for a markdown deck following strict rules.""" |
|
client = genai.Client(api_key=api_key) |
|
|
|
model = "gemini-2.5-pro-preview-06-05" |
|
|
|
sys_prompt = f""" |
|
<role> |
|
You are SlideGen, an AI that creates fun and engaging narrated slide decks with visual elements about various topics. |
|
</role> |
|
<instructions> |
|
Create a presentation about '{topic}'. |
|
Include: |
|
- An introduction slide with bullet points about the overview of the presentation topic and the key areas that will be covered |
|
- 3 content slides with bullet points |
|
- A conclusion slide with bullet points summarizing the key points and insights. |
|
For each slide provide: |
|
1. Each title should be a single concise and coherent phrase accompanied by exactly one relevant emoji. (Do NOT use the colon ":" format for titles) |
|
2. 3-4 concise bullet points, you will go into more detail in the speaker notes. |
|
3. Clear prose speaker notes suitable for narration that is accessible to general audiences |
|
4. A detailed and specific image prompt for an AI image generator that is relevent to the slide's content. Do not include any text in the image. |
|
Respond with a JSON array where each element represents a slide in the following format: |
|
```json |
|
[ |
|
{{ |
|
"slide_content": "## Introduction Slide Title\n\n", |
|
"speaker_notes": "Speaker notes", |
|
"image_prompt": "Image prompt" |
|
}}, |
|
{{ |
|
"slide_content": "## Content Slide Title\n\n", |
|
"speaker_notes": "Speaker notes", |
|
"image_prompt": "Image prompt" |
|
}}, |
|
{{ |
|
"slide_content": "## Content Slide Title\n\n", |
|
"speaker_notes": "Speaker notes", |
|
"image_prompt": "Image prompt" |
|
}}, |
|
{{ |
|
"slide_content": "## Content Slide Title\n\n", |
|
"speaker_notes": "Speaker notes", |
|
"image_prompt": "Image prompt" |
|
}}, |
|
{{ |
|
"slide_content": "## Conclusion Slide Title\n\n", |
|
"speaker_notes": "Speaker notes", |
|
"image_prompt": "Image prompt" |
|
}}, |
|
] |
|
</instructions> |
|
""".strip() |
|
|
|
response = client.models.generate_content( |
|
model=model, |
|
contents=[{"role": "user", "parts": [{"text": sys_prompt}]}], |
|
config=types.GenerateContentConfig(response_mime_type="text/plain", temperature=0.7), |
|
) |
|
return response.text.strip() |
|
|
|
|
|
async def _generate_tts(narration: str, out_path: Path, api_key: str): |
|
"""GenAI TTS β WAV - Async version with fallback model support""" |
|
client = genai.Client(api_key=api_key) |
|
|
|
|
|
models_to_try = ["gemini-2.5-flash-preview-tts", "gemini-2.5-pro-preview-06-05"] |
|
|
|
|
|
with open(out_path, "wb") as _: |
|
pass |
|
|
|
|
|
gemini_exhausted = True |
|
for model in models_to_try: |
|
try: |
|
print(f"Attempting TTS with model: {model}") |
|
|
|
stream_instance = client.models.generate_content_stream( |
|
model=model, |
|
contents=[{"role": "user", "parts": [{"text": narration}]}], |
|
config=types.GenerateContentConfig( |
|
temperature=1, |
|
response_modalities=["audio"] if "tts" in model else [], |
|
speech_config=types.SpeechConfig( |
|
voice_config=types.VoiceConfig( |
|
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Algenib") |
|
) |
|
) if "tts" in model else None, |
|
), |
|
) |
|
|
|
|
|
async def process_stream(): |
|
for chunk in stream_instance: |
|
if ( |
|
chunk.candidates |
|
and chunk.candidates[0].content |
|
and chunk.candidates[0].content.parts |
|
): |
|
part = chunk.candidates[0].content.parts[0].inline_data |
|
if part and part.data: |
|
data = ( |
|
_convert_to_wav(part.data, part.mime_type) |
|
if not part.mime_type.endswith("wav") |
|
else part.data |
|
) |
|
with open(out_path, "ab") as f: |
|
f.write(data) |
|
|
|
await process_stream() |
|
|
|
print(f"Successfully generated TTS using model: {model}") |
|
gemini_exhausted = False |
|
return |
|
|
|
except Exception as e: |
|
if hasattr(e, 'code') and getattr(e, 'code', None) == 429: |
|
print(f"Model {model} quota exhausted. Trying next model...") |
|
continue |
|
else: |
|
|
|
print(f"Error with model {model}: {e}") |
|
raise |
|
|
|
|
|
if gemini_exhausted and DEEPGRAM_AVAILABLE and DEEPGRAM_KEY: |
|
try: |
|
print("Attempting TTS with Deepgram...") |
|
|
|
loop = asyncio.get_event_loop() |
|
await loop.run_in_executor(None, lambda: _generate_tts_with_deepgram(narration, out_path)) |
|
print("Successfully generated TTS using Deepgram") |
|
return |
|
except Exception as e: |
|
print(f"Error with Deepgram TTS: {e}") |
|
|
|
|
|
|
|
print("All TTS models quota exhausted. Creating empty audio file.") |
|
with open(out_path, "wb") as f: |
|
f.write(b'RIFF$\x00\x00\x00WAVEfmt \x10\x00\x00\x00\x01\x00\x01\x00\x00\x04\x00\x00\x00\x04\x00\x00\x01\x00\x08\x00data\x00\x00\x00\x00') |
|
|
|
|
|
def _generate_tts_with_deepgram(narration: str, out_path: Path): |
|
"""Generate TTS using Deepgram API""" |
|
|
|
deepgram = DeepgramClient(DEEPGRAM_KEY) |
|
print(f"Using Deepgram for TTS generation") |
|
|
|
|
|
options = SpeakOptions( |
|
model="aura-2-thalia-en", |
|
encoding="linear16", |
|
container="wav", |
|
sample_rate=24000 |
|
) |
|
|
|
|
|
try: |
|
response = deepgram.speak.rest.v("1").save( |
|
str(out_path), |
|
{"text": narration}, |
|
options |
|
) |
|
print(f"Successfully generated TTS with Deepgram: {out_path}") |
|
return response |
|
except Exception as e: |
|
print(f"Error generating TTS with Deepgram: {e}") |
|
raise |
|
|
|
|
|
|
|
async def generate_slideshow_with_audio_async(topic: str, api_key: str, **kwargs): |
|
""" |
|
Async version of generate_slideshow_with_audio that processes slides concurrently. |
|
|
|
Args: |
|
topic: The topic to generate a slideshow about |
|
api_key: Gemini API key |
|
**kwargs: Optional parameters including session_id |
|
|
|
Returns: |
|
slides_md : list[str] β markdown for each slide |
|
audio : list[str] β file paths (one per slide, same order) |
|
images : list[str|None] β file paths for slide images (one per slide, same order) |
|
""" |
|
|
|
json_response = _generate_slideshow_markdown(topic, api_key) |
|
|
|
|
|
slides_data = _parse_slides_json(json_response) |
|
|
|
|
|
temp_dir = get_temp_dir(str(kwargs.get("session_id", datetime.datetime.now().strftime("%Y%m%d_%H%M%S")))) |
|
safe_topic = re.sub(r"[^\w\s-]", "", topic)[:30] |
|
safe_topic = re.sub(r"[-\s]+", "-", safe_topic) |
|
pres_dir = Path(temp_dir) / safe_topic |
|
pres_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
slides_md = [] |
|
audio_files = [] |
|
slide_images = [None] * len(slides_data) |
|
|
|
print("\n====== GENERATING SLIDESHOW CONTENT ======") |
|
|
|
|
|
tts_tasks = [] |
|
image_tasks = [] |
|
|
|
for i, slide_info in enumerate(slides_data, start=1): |
|
slides_md.append(slide_info["slide_content"]) |
|
|
|
|
|
title_match = re.search(r'##\s+(.+?)\n', slide_info["slide_content"].strip()) |
|
title = title_match.group(1) if title_match else f"Slide {i}" |
|
print(f"\n--- Processing Slide {i}: {title} ---") |
|
|
|
|
|
narration = slide_info.get("speaker_notes", "") |
|
print("SPEAKER NOTES:") |
|
print(narration or "No speaker notes provided.") |
|
|
|
|
|
wav_path = pres_dir / f"{safe_topic}_slide_{i:02d}.wav" |
|
audio_files.append(str(wav_path)) |
|
|
|
|
|
if narration: |
|
print(f"Scheduling TTS for slide {i} -> {wav_path}") |
|
tts_tasks.append(_generate_tts(narration, wav_path, api_key)) |
|
else: |
|
|
|
with open(wav_path, "wb") as f: |
|
f.write(b'RIFF$\x00\x00\x00WAVEfmt \x10\x00\x00\x00\x01\x00\x01\x00\x00\x04\x00\x00\x00\x04\x00\x00\x01\x00\x08\x00data\x00\x00\x00\x00') |
|
print(f"No narration for slide {i}, created empty WAV: {wav_path}") |
|
|
|
|
|
image_prompt = slide_info.get("image_prompt", "") |
|
|
|
if image_prompt: |
|
image_prompt = image_prompt.strip() + " Do not include any text in the image." |
|
print("IMAGE PROMPT:") |
|
print(image_prompt or "No image prompt provided.") |
|
if image_prompt: |
|
image_path = pres_dir / f"{safe_topic}_slide_{i:02d}_image.jpg" |
|
print(f"Scheduling image for slide {i} -> {image_path}") |
|
|
|
image_tasks.append((i-1, _generate_image(image_prompt, image_path, api_key))) |
|
else: |
|
print(f"No image prompt for slide {i}, skipping image generation.") |
|
|
|
print("-"*50) |
|
|
|
|
|
print("\n====== GENERATING TTS IN PARALLEL ======") |
|
if tts_tasks: |
|
await asyncio.gather(*tts_tasks) |
|
print("====== TTS GENERATION COMPLETE ======") |
|
|
|
|
|
print("\n====== GENERATING IMAGES IN PARALLEL ======") |
|
if image_tasks: |
|
|
|
image_results = await asyncio.gather(*[task for _, task in image_tasks]) |
|
|
|
for (idx, _), path in zip(image_tasks, image_results): |
|
slide_images[idx] = path |
|
print("====== IMAGE GENERATION COMPLETE ======") |
|
|
|
print("\n====== SLIDESHOW GENERATION COMPLETE ======\n") |
|
|
|
|
|
num_slides = len(slides_md) |
|
while len(audio_files) < num_slides: |
|
audio_files.append(None) |
|
while len(slide_images) < num_slides: |
|
slide_images.append(None) |
|
|
|
return slides_md, audio_files, slide_images |
|
|
|
|
|
def generate_slideshow_with_audio(topic: str, api_key: str, **kwargs): |
|
""" |
|
Synchronous wrapper for the async slideshow generation function. |
|
Maintains backward compatibility with existing code. |
|
|
|
Args: |
|
topic: The topic to generate a slideshow about |
|
api_key: Gemini API key |
|
**kwargs: Optional parameters including: |
|
- session_id: Unique identifier for the user session |
|
|
|
Returns: |
|
slides_md : list[str] β markdown for each slide |
|
audio : list[str] β file paths (one per slide, same order) |
|
images : list[str|None] β file paths for slide images (one per slide, same order) |
|
""" |
|
return asyncio.run(generate_slideshow_with_audio_async(topic, api_key, **kwargs)) |
|
|
|
|
|
def validate_topic(topic: str, api_key: str) -> bool: |
|
"""Use Gemini Flash Preview to determine if a topic is suitable for a slideshow.""" |
|
client = genai.Client(api_key=api_key) |
|
system_prompt = f''' |
|
<role> |
|
You are SlideGenInputGuard, an AI assistant that determines if a user input is a suitable topic for a narrated slideshow presentation. |
|
</role> |
|
<instructions> |
|
Evaluate if "{topic}" is a real-world topic, question, or concept suitable for an educational slideshow. It is fine to include topics that are silly and not real-world topics. |
|
If it is a valid topic, respond with exactly: 1 |
|
If it is nonsense, gibberish, meaningless, empty, or not a valid topic, respond with exactly: 0 |
|
Only respond with a single digit: 1 or 0. No spaces, newlines or explanations. JUST THE NUMBER 1 OR 0. |
|
</instructions> |
|
<examples> |
|
Input:How does lightning form? |
|
Output:1 |
|
Input:The history of horses |
|
Output:1 |
|
Input:basketball |
|
Output:1 |
|
Input:boobs |
|
Output:1 |
|
Input:King Kong |
|
Output:1 |
|
Input:Batman |
|
Output:1 |
|
Input:Hitler |
|
Output:1 |
|
Input:bing bong |
|
Output:0 |
|
Input:asdf |
|
Output:0 |
|
Input:qwerty |
|
Output:0 |
|
Input::) |
|
Output:0 |
|
Input: |
|
Output:0 |
|
</examples> |
|
'''.strip() |
|
|
|
response = client.models.generate_content( |
|
model="gemini-2.5-flash-preview-05-20", |
|
contents=[{"role": "user", "parts": [{"text": system_prompt}]}], |
|
config=types.GenerateContentConfig(response_mime_type="text/plain", temperature=0), |
|
) |
|
result = response.text.strip() |
|
return result == "1" |