|
import gradio as gr |
|
from pydub import AudioSegment |
|
import json |
|
import uuid |
|
import io |
|
import edge_tts |
|
import asyncio |
|
import aiofiles |
|
import pypdf |
|
import os |
|
import time |
|
from typing import List, Dict, Tuple |
|
import openai |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class PodcastGenerator: |
|
def __init__(self): |
|
pass |
|
|
|
async def generate_script(self, prompt: str, language: str, api_key: str) -> Dict: |
|
""" |
|
非同步生成基於給定提示和語言的Podcast劇本。 |
|
|
|
引數: |
|
prompt (str): 用於生成Podcast劇本的使用者輸入文字。 |
|
language (str): Podcast指劇本所需的語言。 |
|
api_key (str): 用於訪問 SambaNova API 服務的 API 金鑰。 |
|
|
|
返回: |
|
Dict: 包含以 JSON 格式生成Podcast劇本的字典。 |
|
|
|
異常: |
|
gr.Error: 如果 API 金鑰或速率限制出現問題。 |
|
|
|
此方法使用 SambaNova API 根據使用者的輸入生成Podcast劇本。 |
|
它處理語言選擇,使用適當配置設定 AI 模型,並處理生成的響應。 |
|
""" |
|
|
|
system_prompt = f"""Generate a podcast script with 2 speakers. {language} language. Be concise, engaging, and in JSON format.""" |
|
|
|
example = """{"podcast":[{"speaker":1,"line":"Hello"},{"speaker":2,"line":"Hi there"}]}""" |
|
|
|
async def generate_chunk(chunk: str) -> str: |
|
try: |
|
|
|
prompt_tokens = len(chunk.split()) |
|
system_tokens = len(system_prompt.split()) |
|
max_tokens = 3000 |
|
|
|
logger.info(f"Sending request to SambaNova API with prompt chunk: {chunk[:100]}...") |
|
response = client.chat.completions.create( |
|
model='Meta-Llama-3.1-405B-Instruct', |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": f"Generate a podcast script based on this: {chunk}\nUse this format: {example}"} |
|
], |
|
temperature=1, |
|
max_tokens=max_tokens |
|
) |
|
logger.info(f"Received response from API: {response}") |
|
|
|
if hasattr(response, 'error'): |
|
logger.error(f"API returned an error: {response.error}") |
|
return {"error": f"API error: {response.error.get('message', 'Unknown error')}"} |
|
|
|
if response.choices and len(response.choices) > 0: |
|
generated_text = response.choices[0].message.content |
|
logger.info(f"Generated text: {generated_text[:100]}...") |
|
return generated_text |
|
else: |
|
logger.warning("No content generated from the API") |
|
return {"error": "No content generated from the API"} |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating script chunk: {str(e)}") |
|
return {"error": f"Failed to generate podcast script chunk: {str(e)}"} |
|
|
|
|
|
chunk_size = 500 |
|
chunks = [prompt[i:i+chunk_size] for i in range(0, len(prompt), chunk_size)] |
|
|
|
|
|
generated_chunks = [] |
|
for chunk in chunks: |
|
result = await generate_chunk(chunk) |
|
if isinstance(result, dict) and "error" in result: |
|
return result |
|
generated_chunks.append(result) |
|
|
|
|
|
generated_text = " ".join(generated_chunks) |
|
|
|
|
|
try: |
|
parsed_json = json.loads(generated_text) |
|
if "podcast" in parsed_json: |
|
return parsed_json |
|
else: |
|
raise json.JSONDecodeError("Missing 'podcast' key", generated_text, 0) |
|
except json.JSONDecodeError: |
|
logger.warning("Generated text is not valid JSON or missing 'podcast' key. Attempting to extract dialogue.") |
|
lines = generated_text.split('\n') |
|
podcast = [] |
|
current_speaker = 1 |
|
for line in lines: |
|
line = line.strip() |
|
if line: |
|
podcast.append({ |
|
"speaker": current_speaker, |
|
"line": line |
|
}) |
|
current_speaker = 3 - current_speaker |
|
return {"podcast": podcast} |
|
|
|
async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str: |
|
""" |
|
非同步生成文字轉語音音訊檔案。 |
|
|
|
引數: |
|
text (str): 要轉換爲語音的文字內容。 |
|
speaker (int): 說話者的編號(1 或 2)。 |
|
speaker1 (str): 第一位說話者的語音設定。 |
|
speaker2 (str): 第二位說話者的語音設定。 |
|
|
|
返回: |
|
str: 生成的臨時音訊檔案的檔名,或者 None 如果生成失敗。 |
|
|
|
此方法使用 Edge TTS 將文字轉換爲語音,並將結果儲存爲臨時音訊檔案。 |
|
根據指定的說話者編號選擇相應的語音設定。 |
|
""" |
|
|
|
voice = speaker1 if speaker == 1 else speaker2 |
|
|
|
speech = edge_tts.Communicate(text, voice) |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}.wav" |
|
try: |
|
|
|
await speech.save(temp_filename) |
|
return temp_filename |
|
except edge_tts.exceptions.NoAudioReceived: |
|
logger.error(f"No audio received for text: '{text[:50]}...' with voice: {voice}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Error generating audio for text: '{text[:50]}...' with voice: {voice}. Error: {str(e)}") |
|
return None |
|
finally: |
|
|
|
if os.path.exists(temp_filename): |
|
os.remove(temp_filename) |
|
|
|
async def combine_audio_files(self, audio_files: List[str]) -> str: |
|
""" |
|
非同步合併音訊檔案。 |
|
|
|
引數: |
|
audio_files (List[str]): 包含音訊檔案路徑的列表。 |
|
|
|
返回: |
|
str: 合併後的音訊檔案的檔名。 |
|
""" |
|
|
|
combined_audio = AudioSegment.empty() |
|
|
|
for audio_file in audio_files: |
|
combined_audio += AudioSegment.from_file(audio_file) |
|
os.remove(audio_file) |
|
|
|
|
|
output_filename = f"output_{uuid.uuid4()}.wav" |
|
|
|
combined_audio.export(output_filename, format="wav") |
|
return output_filename |
|
|
|
async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, api_key: str) -> str: |
|
""" |
|
非同步生成Podcast音訊檔案。 |
|
|
|
引數: |
|
input_text (str): 用於生成Podcast指令碼的輸入文字。 |
|
language (str): Podcast使用的語言。 |
|
speaker1 (str): 第一位說話者的語音設定。 |
|
speaker2 (str): 第二位說話者的語音設定。 |
|
api_key (str): 用於訪問 Gemini AI 服務的 API 金鑰。 |
|
|
|
返回: |
|
str: 生成的Podcast音訊檔案的檔名。 |
|
|
|
此方法執行以下步驟: |
|
1. 使用 generate_script 方法生成Podcast劇本。 |
|
2. 使用 tts_generate 方法爲每個對話行生成音訊檔案。 |
|
3. 使用 combine_audio_files 方法將所有音訊檔案合併爲一個完整的Podcast。 |
|
|
|
整個過程是非同步的,以提高效率。方法還會記錄並顯示每個步驟的執行時間。 |
|
""" |
|
|
|
gr.Info("Generating podcast script...") |
|
start_time = time.time() |
|
script_result = await self.generate_script(input_text, language, api_key) |
|
end_time = time.time() |
|
|
|
if "error" in script_result: |
|
gr.Error(f"Failed to generate podcast script: {script_result['error']}") |
|
return None |
|
|
|
if "raw_text" in script_result: |
|
gr.Warning("Generated text is not in the expected JSON format. Attempting to process raw text.") |
|
|
|
|
|
return None |
|
|
|
if "podcast" not in script_result: |
|
gr.Error("Generated script does not contain a 'podcast' key.") |
|
return None |
|
|
|
gr.Info(f"Successfully generated podcast script in {(end_time - start_time):.2f} seconds!") |
|
|
|
|
|
gr.Info("Generating podcast audio files...") |
|
start_time = time.time() |
|
audio_files = await asyncio.gather(*[self.tts_generate(item['line'], item['speaker'], speaker1, speaker2) for item in script_result['podcast']]) |
|
end_time = time.time() |
|
|
|
|
|
audio_files = [file for file in audio_files if file is not None] |
|
|
|
if not audio_files: |
|
gr.Error("Failed to generate any audio files. Please check your language and voice settings.") |
|
return None |
|
|
|
gr.Info(f"Successfully generated {len(audio_files)} out of {len(script_result['podcast'])} audio files in {(end_time - start_time):.2f} seconds!") |
|
|
|
|
|
combined_audio = await self.combine_audio_files(audio_files) |
|
return combined_audio |
|
|
|
class TextExtractor: |
|
@staticmethod |
|
async def extract_from_pdf(file_path: str) -> str: |
|
|
|
async with aiofiles.open(file_path, 'rb') as file: |
|
content = await file.read() |
|
pdf_reader = pypdf.PdfReader(io.BytesIO(content)) |
|
return "\n\n".join(page.extract_text() for page in pdf_reader.pages if page.extract_text()) |
|
|
|
@staticmethod |
|
async def extract_from_txt(file_path: str) -> str: |
|
|
|
async with aiofiles.open(file_path, 'r') as file: |
|
return await file.read() |
|
|
|
@classmethod |
|
async def extract_text(cls, file_path: str) -> str: |
|
|
|
_, file_extension = os.path.splitext(file_path) |
|
if file_extension.lower() == '.pdf': |
|
return await cls.extract_from_pdf(file_path) |
|
elif file_extension.lower() == '.txt': |
|
return await cls.extract_from_txt(file_path) |
|
else: |
|
raise gr.Error(f"Unsupported file type: {file_extension}") |
|
|
|
async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, api_key: str = "") -> str: |
|
""" |
|
處理輸入並生成Podcast的非同步函式。 |
|
|
|
引數: |
|
input_text (str): 使用者輸入的文字內容。 |
|
input_file: 使用者上傳的檔案(可以是 PDF 或 TXT)。 |
|
language (str): 選擇的語言。 |
|
speaker1 (str): 第一位說話者的語音選擇。 |
|
speaker2 (str): 第二位說話者的語音選擇。 |
|
api_key (str): 用於生成 AI 的 API 金鑰,預設爲空字串。 |
|
|
|
返回: |
|
str: 生成的Podcast音訊檔案路徑。 |
|
|
|
此函式協調整個Podcast生成過程,包括文字提取、指令碼生成和音訊合成。 |
|
它處理不同的輸入型別(文字或檔案),並使用指定的語音和語言設定來建立最終的Podcast。 |
|
""" |
|
|
|
gr.Info("Starting podcast generation...") |
|
start_time = time.time() |
|
|
|
|
|
voice_names = { |
|
"Andrew - English (United States)": "en-US-AndrewMultilingualNeural", |
|
"Ava - English (United States)": "en-US-AvaMultilingualNeural", |
|
"Brian - English (United States)": "en-US-BrianMultilingualNeural", |
|
"Emma - English (United States)": "en-US-EmmaMultilingualNeural", |
|
"Florian - German (Germany)": "de-DE-FlorianMultilingualNeural", |
|
"Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural", |
|
"Remy - French (France)": "fr-FR-RemyMultilingualNeural", |
|
"Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural" |
|
} |
|
|
|
|
|
speaker1 = voice_names[speaker1] |
|
speaker2 = voice_names[speaker2] |
|
|
|
|
|
if language != "Auto Detect": |
|
if not (speaker1.startswith(language[:2].lower()) and speaker2.startswith(language[:2].lower())): |
|
gr.Error(f"Selected voices may not be compatible with the chosen language: {language}") |
|
return None |
|
|
|
|
|
if input_file: |
|
input_text = await TextExtractor.extract_text(input_file.name) |
|
|
|
|
|
max_input_length = 3000 |
|
if len(input_text) > max_input_length: |
|
input_text = input_text[:max_input_length] |
|
gr.Warning(f"Input text was truncated to {max_input_length} characters due to length limitations.") |
|
|
|
|
|
if not api_key: |
|
api_key = os.getenv("Your_API_KEY") |
|
|
|
|
|
podcast_generator = PodcastGenerator() |
|
podcast = await podcast_generator.generate_podcast(input_text, language, speaker1, speaker2, api_key) |
|
|
|
if podcast is None: |
|
return None |
|
|
|
|
|
end_time = time.time() |
|
gr.Info(f"Successfully generated podcast in {(end_time - start_time):.2f} seconds!") |
|
|
|
return podcast |
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_input, |
|
inputs=[ |
|
gr.Textbox(label="Input Text"), |
|
gr.File(label="Or Upload a PDF or TXT file"), |
|
gr.Dropdown(label="Language", choices=[ |
|
"Auto Detect", |
|
"Chinese Taiwanese", "Afrikaans", "Albanian", "Amharic", "Arabic", "Armenian", "Azerbaijani", |
|
"Bahasa Indonesian", "Bangla", "Basque", "Bengali", "Bosnian", "Bulgarian", |
|
"Burmese", "Catalan", "Chinese Cantonese", "Chinese Mandarin", "Croatian", "Czech", "Danish", "Dutch", "English", |
|
"Estonian", "Filipino", "Finnish", "French", "Galician", "Georgian", |
|
"German", "Greek", "Hebrew", "Hindi", "Hungarian", "Icelandic", "Irish", |
|
"Italian", "Japanese", "Javanese", "Kannada", "Kazakh", "Khmer", "Korean", |
|
"Lao", "Latvian", "Lithuanian", "Macedonian", "Malay", "Malayalam", |
|
"Maltese", "Mongolian", "Nepali", "Norwegian Bokmål", "Pashto", "Persian", |
|
"Polish", "Portuguese", "Romanian", "Russian", "Serbian", "Sinhala", |
|
"Slovak", "Slovene", "Somali", "Spanish", "Sundanese", "Swahili", |
|
"Swedish", "Tamil", "Telugu", "Thai", "Turkish", "Ukrainian", "Urdu", |
|
"Uzbek", "Vietnamese", "Welsh", "Zulu" |
|
], |
|
value="Auto Detect"), |
|
gr.Dropdown(label="Speaker 1 Voice", choices=[ |
|
"Andrew - English (United States)", |
|
"Ava - English (United States)", |
|
"Brian - English (United States)", |
|
"Emma - English (United States)", |
|
"Florian - German (Germany)", |
|
"Seraphina - German (Germany)", |
|
"Remy - French (France)", |
|
"Vivienne - French (France)" |
|
], |
|
value="Andrew - English (United States)"), |
|
gr.Dropdown(label="Speaker 2 Voice", choices=[ |
|
"Andrew - English (United States)", |
|
"Ava - English (United States)", |
|
"Brian - English (United States)", |
|
"Emma - English (United States)", |
|
"Florian - German (Germany)", |
|
"Seraphina - German (Germany)", |
|
"Remy - French (France)", |
|
"Vivienne - French (France)" |
|
], |
|
value="Ava - English (United States)"), |
|
gr.Textbox(label="Your Gemini API Key (Optional) - In case you are getting rate limited"), |
|
], |
|
outputs=[ |
|
gr.Audio(label="Generated Podcast Audio") |
|
], |
|
title="🎙️ PodcastGen 🎙️", |
|
description="Generate a 2-speaker podcast from text input or documents!", |
|
allow_flagging="never" |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |
|
|
|
|
|
|