Spaces:
Running
Running
import gradio as gr | |
from pydub import AudioSegment | |
import json | |
import uuid | |
import io | |
import edge_tts | |
import asyncio | |
import aiofiles | |
import pypdf | |
import os | |
import time | |
from typing import List, Dict, Tuple | |
import openai | |
import logging | |
# At the beginning of your script, set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class PodcastGenerator: | |
def __init__(self): | |
pass | |
async def generate_script(self, prompt: str, language: str, api_key: str) -> Dict: | |
""" | |
非同步生成基於給定提示和語言的Podcast劇本。 | |
引數: | |
prompt (str): 用於生成Podcast劇本的使用者輸入文字。 | |
language (str): Podcast指劇本所需的語言。 | |
api_key (str): 用於訪問 SambaNova API 服務的 API 金鑰。 | |
返回: | |
Dict: 包含以 JSON 格式生成Podcast劇本的字典。 | |
異常: | |
gr.Error: 如果 API 金鑰或速率限制出現問題。 | |
此方法使用 SambaNova API 根據使用者的輸入生成Podcast劇本。 | |
它處理語言選擇,使用適當配置設定 AI 模型,並處理生成的響應。 | |
""" | |
# Significantly shorten the system prompt | |
system_prompt = f"""Generate a podcast script with 2 speakers. {language} language. Be concise, engaging, and in JSON format.""" | |
example = """{"podcast":[{"speaker":1,"line":"Hello"},{"speaker":2,"line":"Hi there"}]}""" | |
async def generate_chunk(chunk: str) -> str: | |
try: | |
# Calculate the available tokens for generation | |
prompt_tokens = len(chunk.split()) | |
system_tokens = len(system_prompt.split()) | |
max_tokens = 3000 # Reduced from 4096 to leave more room for the prompt | |
logger.info(f"Sending request to SambaNova API with prompt chunk: {chunk[:100]}...") | |
response = client.chat.completions.create( | |
model='Meta-Llama-3.1-405B-Instruct', | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": f"Generate a podcast script based on this: {chunk}\nUse this format: {example}"} | |
], | |
temperature=1, | |
max_tokens=max_tokens | |
) | |
logger.info(f"Received response from API: {response}") | |
if hasattr(response, 'error'): | |
logger.error(f"API returned an error: {response.error}") | |
return {"error": f"API error: {response.error.get('message', 'Unknown error')}"} | |
if response.choices and len(response.choices) > 0: | |
generated_text = response.choices[0].message.content | |
logger.info(f"Generated text: {generated_text[:100]}...") | |
return generated_text | |
else: | |
logger.warning("No content generated from the API") | |
return {"error": "No content generated from the API"} | |
except Exception as e: | |
logger.error(f"Error generating script chunk: {str(e)}") | |
return {"error": f"Failed to generate podcast script chunk: {str(e)}"} | |
# Split the prompt into smaller chunks | |
chunk_size = 500 # Reduced from 1000 | |
chunks = [prompt[i:i+chunk_size] for i in range(0, len(prompt), chunk_size)] | |
# Generate script for each chunk | |
generated_chunks = [] | |
for chunk in chunks: | |
result = await generate_chunk(chunk) | |
if isinstance(result, dict) and "error" in result: | |
return result | |
generated_chunks.append(result) | |
# Combine generated chunks | |
generated_text = " ".join(generated_chunks) | |
# Try to parse JSON, if fails then extract dialogue from raw text | |
try: | |
parsed_json = json.loads(generated_text) | |
if "podcast" in parsed_json: | |
return parsed_json | |
else: | |
raise json.JSONDecodeError("Missing 'podcast' key", generated_text, 0) | |
except json.JSONDecodeError: | |
logger.warning("Generated text is not valid JSON or missing 'podcast' key. Attempting to extract dialogue.") | |
lines = generated_text.split('\n') | |
podcast = [] | |
current_speaker = 1 | |
for line in lines: | |
line = line.strip() | |
if line: | |
podcast.append({ | |
"speaker": current_speaker, | |
"line": line | |
}) | |
current_speaker = 3 - current_speaker # Switch between 1 and 2 | |
return {"podcast": podcast} | |
async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str: | |
""" | |
非同步生成文字轉語音音訊檔案。 | |
引數: | |
text (str): 要轉換爲語音的文字內容。 | |
speaker (int): 說話者的編號(1 或 2)。 | |
speaker1 (str): 第一位說話者的語音設定。 | |
speaker2 (str): 第二位說話者的語音設定。 | |
返回: | |
str: 生成的臨時音訊檔案的檔名,或者 None 如果生成失敗。 | |
此方法使用 Edge TTS 將文字轉換爲語音,並將結果儲存爲臨時音訊檔案。 | |
根據指定的說話者編號選擇相應的語音設定。 | |
""" | |
# 根據說話者選擇語音 | |
voice = speaker1 if speaker == 1 else speaker2 | |
# 建立語音合成對象 | |
speech = edge_tts.Communicate(text, voice) | |
# 生成臨時檔名 | |
temp_filename = f"temp_{uuid.uuid4()}.wav" | |
try: | |
# 儲存語音檔案 | |
await speech.save(temp_filename) | |
return temp_filename | |
except edge_tts.exceptions.NoAudioReceived: | |
logger.error(f"No audio received for text: '{text[:50]}...' with voice: {voice}") | |
return None | |
except Exception as e: | |
logger.error(f"Error generating audio for text: '{text[:50]}...' with voice: {voice}. Error: {str(e)}") | |
return None | |
finally: | |
# 如果檔案存在但生成失敗,刪除臨時檔案 | |
if os.path.exists(temp_filename): | |
os.remove(temp_filename) | |
async def combine_audio_files(self, audio_files: List[str]) -> str: | |
""" | |
非同步合併音訊檔案。 | |
引數: | |
audio_files (List[str]): 包含音訊檔案路徑的列表。 | |
返回: | |
str: 合併後的音訊檔案的檔名。 | |
""" | |
# 建立空的音訊段 | |
combined_audio = AudioSegment.empty() | |
# 遍歷所有音訊檔案並合併 | |
for audio_file in audio_files: | |
combined_audio += AudioSegment.from_file(audio_file) | |
os.remove(audio_file) # 清理臨時檔案 | |
# 生成輸出文件名 | |
output_filename = f"output_{uuid.uuid4()}.wav" | |
# 匯出合併後的音訊 | |
combined_audio.export(output_filename, format="wav") | |
return output_filename | |
async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, api_key: str) -> str: | |
""" | |
非同步生成Podcast音訊檔案。 | |
引數: | |
input_text (str): 用於生成Podcast指令碼的輸入文字。 | |
language (str): Podcast使用的語言。 | |
speaker1 (str): 第一位說話者的語音設定。 | |
speaker2 (str): 第二位說話者的語音設定。 | |
api_key (str): 用於訪問 Gemini AI 服務的 API 金鑰。 | |
返回: | |
str: 生成的Podcast音訊檔案的檔名。 | |
此方法執行以下步驟: | |
1. 使用 generate_script 方法生成Podcast劇本。 | |
2. 使用 tts_generate 方法爲每個對話行生成音訊檔案。 | |
3. 使用 combine_audio_files 方法將所有音訊檔案合併爲一個完整的Podcast。 | |
整個過程是非同步的,以提高效率。方法還會記錄並顯示每個步驟的執行時間。 | |
""" | |
# 生成Podcast劇本 | |
gr.Info("Generating podcast script...") | |
start_time = time.time() | |
script_result = await self.generate_script(input_text, language, api_key) | |
end_time = time.time() | |
if "error" in script_result: | |
gr.Error(f"Failed to generate podcast script: {script_result['error']}") | |
return None | |
if "raw_text" in script_result: | |
gr.Warning("Generated text is not in the expected JSON format. Attempting to process raw text.") | |
# Here you might want to implement a fallback method to process raw text | |
# For now, we'll just return None | |
return None | |
if "podcast" not in script_result: | |
gr.Error("Generated script does not contain a 'podcast' key.") | |
return None | |
gr.Info(f"Successfully generated podcast script in {(end_time - start_time):.2f} seconds!") | |
# 生成Podcast音訊檔案 | |
gr.Info("Generating podcast audio files...") | |
start_time = time.time() | |
audio_files = await asyncio.gather(*[self.tts_generate(item['line'], item['speaker'], speaker1, speaker2) for item in script_result['podcast']]) | |
end_time = time.time() | |
# Filter out None values (failed TTS generations) | |
audio_files = [file for file in audio_files if file is not None] | |
if not audio_files: | |
gr.Error("Failed to generate any audio files. Please check your language and voice settings.") | |
return None | |
gr.Info(f"Successfully generated {len(audio_files)} out of {len(script_result['podcast'])} audio files in {(end_time - start_time):.2f} seconds!") | |
# 合併音訊檔案 | |
combined_audio = await self.combine_audio_files(audio_files) | |
return combined_audio | |
class TextExtractor: | |
async def extract_from_pdf(file_path: str) -> str: | |
# 從PDF檔案中提取文字 | |
async with aiofiles.open(file_path, 'rb') as file: | |
content = await file.read() | |
pdf_reader = pypdf.PdfReader(io.BytesIO(content)) | |
return "\n\n".join(page.extract_text() for page in pdf_reader.pages if page.extract_text()) | |
async def extract_from_txt(file_path: str) -> str: | |
# 從TXT檔案中提取文字 | |
async with aiofiles.open(file_path, 'r') as file: | |
return await file.read() | |
async def extract_text(cls, file_path: str) -> str: | |
# 根據檔案型別選擇適當的提取方法 | |
_, file_extension = os.path.splitext(file_path) | |
if file_extension.lower() == '.pdf': | |
return await cls.extract_from_pdf(file_path) | |
elif file_extension.lower() == '.txt': | |
return await cls.extract_from_txt(file_path) | |
else: | |
raise gr.Error(f"Unsupported file type: {file_extension}") | |
async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, api_key: str = "") -> str: | |
""" | |
處理輸入並生成Podcast的非同步函式。 | |
引數: | |
input_text (str): 使用者輸入的文字內容。 | |
input_file: 使用者上傳的檔案(可以是 PDF 或 TXT)。 | |
language (str): 選擇的語言。 | |
speaker1 (str): 第一位說話者的語音選擇。 | |
speaker2 (str): 第二位說話者的語音選擇。 | |
api_key (str): 用於生成 AI 的 API 金鑰,預設爲空字串。 | |
返回: | |
str: 生成的Podcast音訊檔案路徑。 | |
此函式協調整個Podcast生成過程,包括文字提取、指令碼生成和音訊合成。 | |
它處理不同的輸入型別(文字或檔案),並使用指定的語音和語言設定來建立最終的Podcast。 | |
""" | |
# 開始生成Podcast | |
gr.Info("Starting podcast generation...") | |
start_time = time.time() | |
# 定義語音名稱對映 | |
voice_names = { | |
"Andrew - English (United States)": "en-US-AndrewMultilingualNeural", | |
"Ava - English (United States)": "en-US-AvaMultilingualNeural", | |
"Brian - English (United States)": "en-US-BrianMultilingualNeural", | |
"Emma - English (United States)": "en-US-EmmaMultilingualNeural", | |
"Florian - German (Germany)": "de-DE-FlorianMultilingualNeural", | |
"Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural", | |
"Remy - French (France)": "fr-FR-RemyMultilingualNeural", | |
"Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural" | |
} | |
# 獲取實際的語音名稱 | |
speaker1 = voice_names[speaker1] | |
speaker2 = voice_names[speaker2] | |
# Check if the selected voices are compatible with the chosen language | |
if language != "Auto Detect": | |
if not (speaker1.startswith(language[:2].lower()) and speaker2.startswith(language[:2].lower())): | |
gr.Error(f"Selected voices may not be compatible with the chosen language: {language}") | |
return None | |
# 如果提供了輸入檔案,則從檔案中提取文字 | |
if input_file: | |
input_text = await TextExtractor.extract_text(input_file.name) | |
# Limit input text length | |
max_input_length = 3000 # Adjust this value as needed | |
if len(input_text) > max_input_length: | |
input_text = input_text[:max_input_length] | |
gr.Warning(f"Input text was truncated to {max_input_length} characters due to length limitations.") | |
# 如果沒有提供API金鑰,則使用環境變數中的金鑰 | |
if not api_key: | |
api_key = os.getenv("Your_API_KEY") | |
# 建立PodcastGenerator實例並生成Podcast | |
podcast_generator = PodcastGenerator() | |
podcast = await podcast_generator.generate_podcast(input_text, language, speaker1, speaker2, api_key) | |
if podcast is None: | |
return None | |
# 計算總耗時並顯示資訊 | |
end_time = time.time() | |
gr.Info(f"Successfully generated podcast in {(end_time - start_time):.2f} seconds!") | |
return podcast | |
# 定義Gradio介面 | |
iface = gr.Interface( | |
fn=process_input, | |
inputs=[ | |
gr.Textbox(label="Input Text"), | |
gr.File(label="Or Upload a PDF or TXT file"), | |
gr.Dropdown(label="Language", choices=[ | |
"Auto Detect", | |
"Chinese Taiwanese", "Afrikaans", "Albanian", "Amharic", "Arabic", "Armenian", "Azerbaijani", | |
"Bahasa Indonesian", "Bangla", "Basque", "Bengali", "Bosnian", "Bulgarian", | |
"Burmese", "Catalan", "Chinese Cantonese", "Chinese Mandarin", "Croatian", "Czech", "Danish", "Dutch", "English", | |
"Estonian", "Filipino", "Finnish", "French", "Galician", "Georgian", | |
"German", "Greek", "Hebrew", "Hindi", "Hungarian", "Icelandic", "Irish", | |
"Italian", "Japanese", "Javanese", "Kannada", "Kazakh", "Khmer", "Korean", | |
"Lao", "Latvian", "Lithuanian", "Macedonian", "Malay", "Malayalam", | |
"Maltese", "Mongolian", "Nepali", "Norwegian Bokmål", "Pashto", "Persian", | |
"Polish", "Portuguese", "Romanian", "Russian", "Serbian", "Sinhala", | |
"Slovak", "Slovene", "Somali", "Spanish", "Sundanese", "Swahili", | |
"Swedish", "Tamil", "Telugu", "Thai", "Turkish", "Ukrainian", "Urdu", | |
"Uzbek", "Vietnamese", "Welsh", "Zulu" | |
], | |
value="Auto Detect"), | |
gr.Dropdown(label="Speaker 1 Voice", choices=[ | |
"Andrew - English (United States)", | |
"Ava - English (United States)", | |
"Brian - English (United States)", | |
"Emma - English (United States)", | |
"Florian - German (Germany)", | |
"Seraphina - German (Germany)", | |
"Remy - French (France)", | |
"Vivienne - French (France)" | |
], | |
value="Andrew - English (United States)"), | |
gr.Dropdown(label="Speaker 2 Voice", choices=[ | |
"Andrew - English (United States)", | |
"Ava - English (United States)", | |
"Brian - English (United States)", | |
"Emma - English (United States)", | |
"Florian - German (Germany)", | |
"Seraphina - German (Germany)", | |
"Remy - French (France)", | |
"Vivienne - French (France)" | |
], | |
value="Ava - English (United States)"), | |
gr.Textbox(label="Your Gemini API Key (Optional) - In case you are getting rate limited"), | |
], | |
outputs=[ | |
gr.Audio(label="Generated Podcast Audio") | |
], | |
title="🎙️ PodcastGen 🎙️", | |
description="Generate a 2-speaker podcast from text input or documents!", | |
allow_flagging="never" | |
) | |
if __name__ == "__main__": | |
iface.launch() | |