Jiangxz01's picture
Upload app.py
fce0934 verified
raw
history blame
16.9 kB
import gradio as gr
from pydub import AudioSegment
import json
import uuid
import io
import edge_tts
import asyncio
import aiofiles
import pypdf
import os
import time
from typing import List, Dict, Tuple
import openai
import logging
# At the beginning of your script, set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PodcastGenerator:
def __init__(self):
pass
async def generate_script(self, prompt: str, language: str, api_key: str) -> Dict:
"""
非同步生成基於給定提示和語言的Podcast劇本。
引數:
prompt (str): 用於生成Podcast劇本的使用者輸入文字。
language (str): Podcast指劇本所需的語言。
api_key (str): 用於訪問 SambaNova API 服務的 API 金鑰。
返回:
Dict: 包含以 JSON 格式生成Podcast劇本的字典。
異常:
gr.Error: 如果 API 金鑰或速率限制出現問題。
此方法使用 SambaNova API 根據使用者的輸入生成Podcast劇本。
它處理語言選擇,使用適當配置設定 AI 模型,並處理生成的響應。
"""
# Significantly shorten the system prompt
system_prompt = f"""Generate a podcast script with 2 speakers. {language} language. Be concise, engaging, and in JSON format."""
example = """{"podcast":[{"speaker":1,"line":"Hello"},{"speaker":2,"line":"Hi there"}]}"""
async def generate_chunk(chunk: str) -> str:
try:
# Calculate the available tokens for generation
prompt_tokens = len(chunk.split())
system_tokens = len(system_prompt.split())
max_tokens = 3000 # Reduced from 4096 to leave more room for the prompt
logger.info(f"Sending request to SambaNova API with prompt chunk: {chunk[:100]}...")
response = client.chat.completions.create(
model='Meta-Llama-3.1-405B-Instruct',
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Generate a podcast script based on this: {chunk}\nUse this format: {example}"}
],
temperature=1,
max_tokens=max_tokens
)
logger.info(f"Received response from API: {response}")
if hasattr(response, 'error'):
logger.error(f"API returned an error: {response.error}")
return {"error": f"API error: {response.error.get('message', 'Unknown error')}"}
if response.choices and len(response.choices) > 0:
generated_text = response.choices[0].message.content
logger.info(f"Generated text: {generated_text[:100]}...")
return generated_text
else:
logger.warning("No content generated from the API")
return {"error": "No content generated from the API"}
except Exception as e:
logger.error(f"Error generating script chunk: {str(e)}")
return {"error": f"Failed to generate podcast script chunk: {str(e)}"}
# Split the prompt into smaller chunks
chunk_size = 500 # Reduced from 1000
chunks = [prompt[i:i+chunk_size] for i in range(0, len(prompt), chunk_size)]
# Generate script for each chunk
generated_chunks = []
for chunk in chunks:
result = await generate_chunk(chunk)
if isinstance(result, dict) and "error" in result:
return result
generated_chunks.append(result)
# Combine generated chunks
generated_text = " ".join(generated_chunks)
# Try to parse JSON, if fails then extract dialogue from raw text
try:
parsed_json = json.loads(generated_text)
if "podcast" in parsed_json:
return parsed_json
else:
raise json.JSONDecodeError("Missing 'podcast' key", generated_text, 0)
except json.JSONDecodeError:
logger.warning("Generated text is not valid JSON or missing 'podcast' key. Attempting to extract dialogue.")
lines = generated_text.split('\n')
podcast = []
current_speaker = 1
for line in lines:
line = line.strip()
if line:
podcast.append({
"speaker": current_speaker,
"line": line
})
current_speaker = 3 - current_speaker # Switch between 1 and 2
return {"podcast": podcast}
async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str:
"""
非同步生成文字轉語音音訊檔案。
引數:
text (str): 要轉換爲語音的文字內容。
speaker (int): 說話者的編號(1 或 2)。
speaker1 (str): 第一位說話者的語音設定。
speaker2 (str): 第二位說話者的語音設定。
返回:
str: 生成的臨時音訊檔案的檔名,或者 None 如果生成失敗。
此方法使用 Edge TTS 將文字轉換爲語音,並將結果儲存爲臨時音訊檔案。
根據指定的說話者編號選擇相應的語音設定。
"""
# 根據說話者選擇語音
voice = speaker1 if speaker == 1 else speaker2
# 建立語音合成對象
speech = edge_tts.Communicate(text, voice)
# 生成臨時檔名
temp_filename = f"temp_{uuid.uuid4()}.wav"
try:
# 儲存語音檔案
await speech.save(temp_filename)
return temp_filename
except edge_tts.exceptions.NoAudioReceived:
logger.error(f"No audio received for text: '{text[:50]}...' with voice: {voice}")
return None
except Exception as e:
logger.error(f"Error generating audio for text: '{text[:50]}...' with voice: {voice}. Error: {str(e)}")
return None
finally:
# 如果檔案存在但生成失敗,刪除臨時檔案
if os.path.exists(temp_filename):
os.remove(temp_filename)
async def combine_audio_files(self, audio_files: List[str]) -> str:
"""
非同步合併音訊檔案。
引數:
audio_files (List[str]): 包含音訊檔案路徑的列表。
返回:
str: 合併後的音訊檔案的檔名。
"""
# 建立空的音訊段
combined_audio = AudioSegment.empty()
# 遍歷所有音訊檔案並合併
for audio_file in audio_files:
combined_audio += AudioSegment.from_file(audio_file)
os.remove(audio_file) # 清理臨時檔案
# 生成輸出文件名
output_filename = f"output_{uuid.uuid4()}.wav"
# 匯出合併後的音訊
combined_audio.export(output_filename, format="wav")
return output_filename
async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, api_key: str) -> str:
"""
非同步生成Podcast音訊檔案。
引數:
input_text (str): 用於生成Podcast指令碼的輸入文字。
language (str): Podcast使用的語言。
speaker1 (str): 第一位說話者的語音設定。
speaker2 (str): 第二位說話者的語音設定。
api_key (str): 用於訪問 Gemini AI 服務的 API 金鑰。
返回:
str: 生成的Podcast音訊檔案的檔名。
此方法執行以下步驟:
1. 使用 generate_script 方法生成Podcast劇本。
2. 使用 tts_generate 方法爲每個對話行生成音訊檔案。
3. 使用 combine_audio_files 方法將所有音訊檔案合併爲一個完整的Podcast。
整個過程是非同步的,以提高效率。方法還會記錄並顯示每個步驟的執行時間。
"""
# 生成Podcast劇本
gr.Info("Generating podcast script...")
start_time = time.time()
script_result = await self.generate_script(input_text, language, api_key)
end_time = time.time()
if "error" in script_result:
gr.Error(f"Failed to generate podcast script: {script_result['error']}")
return None
if "raw_text" in script_result:
gr.Warning("Generated text is not in the expected JSON format. Attempting to process raw text.")
# Here you might want to implement a fallback method to process raw text
# For now, we'll just return None
return None
if "podcast" not in script_result:
gr.Error("Generated script does not contain a 'podcast' key.")
return None
gr.Info(f"Successfully generated podcast script in {(end_time - start_time):.2f} seconds!")
# 生成Podcast音訊檔案
gr.Info("Generating podcast audio files...")
start_time = time.time()
audio_files = await asyncio.gather(*[self.tts_generate(item['line'], item['speaker'], speaker1, speaker2) for item in script_result['podcast']])
end_time = time.time()
# Filter out None values (failed TTS generations)
audio_files = [file for file in audio_files if file is not None]
if not audio_files:
gr.Error("Failed to generate any audio files. Please check your language and voice settings.")
return None
gr.Info(f"Successfully generated {len(audio_files)} out of {len(script_result['podcast'])} audio files in {(end_time - start_time):.2f} seconds!")
# 合併音訊檔案
combined_audio = await self.combine_audio_files(audio_files)
return combined_audio
class TextExtractor:
@staticmethod
async def extract_from_pdf(file_path: str) -> str:
# 從PDF檔案中提取文字
async with aiofiles.open(file_path, 'rb') as file:
content = await file.read()
pdf_reader = pypdf.PdfReader(io.BytesIO(content))
return "\n\n".join(page.extract_text() for page in pdf_reader.pages if page.extract_text())
@staticmethod
async def extract_from_txt(file_path: str) -> str:
# 從TXT檔案中提取文字
async with aiofiles.open(file_path, 'r') as file:
return await file.read()
@classmethod
async def extract_text(cls, file_path: str) -> str:
# 根據檔案型別選擇適當的提取方法
_, file_extension = os.path.splitext(file_path)
if file_extension.lower() == '.pdf':
return await cls.extract_from_pdf(file_path)
elif file_extension.lower() == '.txt':
return await cls.extract_from_txt(file_path)
else:
raise gr.Error(f"Unsupported file type: {file_extension}")
async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, api_key: str = "") -> str:
"""
處理輸入並生成Podcast的非同步函式。
引數:
input_text (str): 使用者輸入的文字內容。
input_file: 使用者上傳的檔案(可以是 PDF 或 TXT)。
language (str): 選擇的語言。
speaker1 (str): 第一位說話者的語音選擇。
speaker2 (str): 第二位說話者的語音選擇。
api_key (str): 用於生成 AI 的 API 金鑰,預設爲空字串。
返回:
str: 生成的Podcast音訊檔案路徑。
此函式協調整個Podcast生成過程,包括文字提取、指令碼生成和音訊合成。
它處理不同的輸入型別(文字或檔案),並使用指定的語音和語言設定來建立最終的Podcast。
"""
# 開始生成Podcast
gr.Info("Starting podcast generation...")
start_time = time.time()
# 定義語音名稱對映
voice_names = {
"Andrew - English (United States)": "en-US-AndrewMultilingualNeural",
"Ava - English (United States)": "en-US-AvaMultilingualNeural",
"Brian - English (United States)": "en-US-BrianMultilingualNeural",
"Emma - English (United States)": "en-US-EmmaMultilingualNeural",
"Florian - German (Germany)": "de-DE-FlorianMultilingualNeural",
"Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural",
"Remy - French (France)": "fr-FR-RemyMultilingualNeural",
"Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural"
}
# 獲取實際的語音名稱
speaker1 = voice_names[speaker1]
speaker2 = voice_names[speaker2]
# Check if the selected voices are compatible with the chosen language
if language != "Auto Detect":
if not (speaker1.startswith(language[:2].lower()) and speaker2.startswith(language[:2].lower())):
gr.Error(f"Selected voices may not be compatible with the chosen language: {language}")
return None
# 如果提供了輸入檔案,則從檔案中提取文字
if input_file:
input_text = await TextExtractor.extract_text(input_file.name)
# Limit input text length
max_input_length = 3000 # Adjust this value as needed
if len(input_text) > max_input_length:
input_text = input_text[:max_input_length]
gr.Warning(f"Input text was truncated to {max_input_length} characters due to length limitations.")
# 如果沒有提供API金鑰,則使用環境變數中的金鑰
if not api_key:
api_key = os.getenv("Your_API_KEY")
# 建立PodcastGenerator實例並生成Podcast
podcast_generator = PodcastGenerator()
podcast = await podcast_generator.generate_podcast(input_text, language, speaker1, speaker2, api_key)
if podcast is None:
return None
# 計算總耗時並顯示資訊
end_time = time.time()
gr.Info(f"Successfully generated podcast in {(end_time - start_time):.2f} seconds!")
return podcast
# 定義Gradio介面
iface = gr.Interface(
fn=process_input,
inputs=[
gr.Textbox(label="Input Text"),
gr.File(label="Or Upload a PDF or TXT file"),
gr.Dropdown(label="Language", choices=[
"Auto Detect",
"Chinese Taiwanese", "Afrikaans", "Albanian", "Amharic", "Arabic", "Armenian", "Azerbaijani",
"Bahasa Indonesian", "Bangla", "Basque", "Bengali", "Bosnian", "Bulgarian",
"Burmese", "Catalan", "Chinese Cantonese", "Chinese Mandarin", "Croatian", "Czech", "Danish", "Dutch", "English",
"Estonian", "Filipino", "Finnish", "French", "Galician", "Georgian",
"German", "Greek", "Hebrew", "Hindi", "Hungarian", "Icelandic", "Irish",
"Italian", "Japanese", "Javanese", "Kannada", "Kazakh", "Khmer", "Korean",
"Lao", "Latvian", "Lithuanian", "Macedonian", "Malay", "Malayalam",
"Maltese", "Mongolian", "Nepali", "Norwegian Bokmål", "Pashto", "Persian",
"Polish", "Portuguese", "Romanian", "Russian", "Serbian", "Sinhala",
"Slovak", "Slovene", "Somali", "Spanish", "Sundanese", "Swahili",
"Swedish", "Tamil", "Telugu", "Thai", "Turkish", "Ukrainian", "Urdu",
"Uzbek", "Vietnamese", "Welsh", "Zulu"
],
value="Auto Detect"),
gr.Dropdown(label="Speaker 1 Voice", choices=[
"Andrew - English (United States)",
"Ava - English (United States)",
"Brian - English (United States)",
"Emma - English (United States)",
"Florian - German (Germany)",
"Seraphina - German (Germany)",
"Remy - French (France)",
"Vivienne - French (France)"
],
value="Andrew - English (United States)"),
gr.Dropdown(label="Speaker 2 Voice", choices=[
"Andrew - English (United States)",
"Ava - English (United States)",
"Brian - English (United States)",
"Emma - English (United States)",
"Florian - German (Germany)",
"Seraphina - German (Germany)",
"Remy - French (France)",
"Vivienne - French (France)"
],
value="Ava - English (United States)"),
gr.Textbox(label="Your Gemini API Key (Optional) - In case you are getting rate limited"),
],
outputs=[
gr.Audio(label="Generated Podcast Audio")
],
title="🎙️ PodcastGen 🎙️",
description="Generate a 2-speaker podcast from text input or documents!",
allow_flagging="never"
)
if __name__ == "__main__":
iface.launch()