File size: 16,900 Bytes
10b2a46
 
 
 
 
 
 
 
 
 
 
 
 
7635190
 
 
 
 
10b2a46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1282b4e
10b2a46
fce0934
 
10b2a46
fce0934
10b2a46
d77a3a5
 
 
fce0934
 
 
d77a3a5
 
 
 
 
 
fce0934
d77a3a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fce0934
 
d77a3a5
 
 
 
 
 
 
 
 
 
 
 
10b2a46
fce0934
7635190
fce0934
 
 
 
 
7635190
fce0934
15ffe7c
 
 
 
 
 
 
 
 
 
 
 
10b2a46
 
 
 
 
 
 
 
 
 
 
 
6c5cfef
10b2a46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c5cfef
 
 
10b2a46
6c5cfef
 
 
 
10b2a46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15ffe7c
10b2a46
91d8fc0
15ffe7c
 
 
 
 
 
 
 
 
 
 
 
91d8fc0
 
10b2a46
 
 
 
 
15ffe7c
10b2a46
6c5cfef
 
 
 
 
 
 
 
 
10b2a46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c5cfef
 
 
 
 
 
10b2a46
 
 
 
fce0934
 
 
 
 
 
10b2a46
 
 
 
 
 
 
91d8fc0
 
 
10b2a46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b3fa2ef
10b2a46
b3fa2ef
10b2a46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
import gradio as gr
from pydub import AudioSegment
import json
import uuid
import io
import edge_tts
import asyncio
import aiofiles
import pypdf
import os
import time
from typing import List, Dict, Tuple
import openai
import logging

# At the beginning of your script, set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PodcastGenerator:
    def __init__(self):
        pass

    async def generate_script(self, prompt: str, language: str, api_key: str) -> Dict:
        """
        非同步生成基於給定提示和語言的Podcast劇本。

        引數:
            prompt (str): 用於生成Podcast劇本的使用者輸入文字。
            language (str): Podcast指劇本所需的語言。
            api_key (str): 用於訪問 SambaNova API 服務的 API 金鑰。

        返回:
            Dict: 包含以 JSON 格式生成Podcast劇本的字典。

        異常:
            gr.Error: 如果 API 金鑰或速率限制出現問題。

        此方法使用 SambaNova API 根據使用者的輸入生成Podcast劇本。
        它處理語言選擇,使用適當配置設定 AI 模型,並處理生成的響應。
        """
        # Significantly shorten the system prompt
        system_prompt = f"""Generate a podcast script with 2 speakers. {language} language. Be concise, engaging, and in JSON format."""

        example = """{"podcast":[{"speaker":1,"line":"Hello"},{"speaker":2,"line":"Hi there"}]}"""

        async def generate_chunk(chunk: str) -> str:
            try:
                # Calculate the available tokens for generation
                prompt_tokens = len(chunk.split())
                system_tokens = len(system_prompt.split())
                max_tokens = 3000  # Reduced from 4096 to leave more room for the prompt

                logger.info(f"Sending request to SambaNova API with prompt chunk: {chunk[:100]}...")
                response = client.chat.completions.create(
                    model='Meta-Llama-3.1-405B-Instruct',
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": f"Generate a podcast script based on this: {chunk}\nUse this format: {example}"}
                    ],
                    temperature=1,
                    max_tokens=max_tokens
                )
                logger.info(f"Received response from API: {response}")
                
                if hasattr(response, 'error'):
                    logger.error(f"API returned an error: {response.error}")
                    return {"error": f"API error: {response.error.get('message', 'Unknown error')}"}
                
                if response.choices and len(response.choices) > 0:
                    generated_text = response.choices[0].message.content
                    logger.info(f"Generated text: {generated_text[:100]}...")
                    return generated_text
                else:
                    logger.warning("No content generated from the API")
                    return {"error": "No content generated from the API"}

            except Exception as e:
                logger.error(f"Error generating script chunk: {str(e)}")
                return {"error": f"Failed to generate podcast script chunk: {str(e)}"}

        # Split the prompt into smaller chunks
        chunk_size = 500  # Reduced from 1000
        chunks = [prompt[i:i+chunk_size] for i in range(0, len(prompt), chunk_size)]

        # Generate script for each chunk
        generated_chunks = []
        for chunk in chunks:
            result = await generate_chunk(chunk)
            if isinstance(result, dict) and "error" in result:
                return result
            generated_chunks.append(result)

        # Combine generated chunks
        generated_text = " ".join(generated_chunks)

        # Try to parse JSON, if fails then extract dialogue from raw text
        try:
            parsed_json = json.loads(generated_text)
            if "podcast" in parsed_json:
                return parsed_json
            else:
                raise json.JSONDecodeError("Missing 'podcast' key", generated_text, 0)
        except json.JSONDecodeError:
            logger.warning("Generated text is not valid JSON or missing 'podcast' key. Attempting to extract dialogue.")
            lines = generated_text.split('\n')
            podcast = []
            current_speaker = 1
            for line in lines:
                line = line.strip()
                if line:
                    podcast.append({
                        "speaker": current_speaker,
                        "line": line
                    })
                    current_speaker = 3 - current_speaker  # Switch between 1 and 2
            return {"podcast": podcast}

    async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str:
        """
        非同步生成文字轉語音音訊檔案。

        引數:
            text (str): 要轉換爲語音的文字內容。
            speaker (int): 說話者的編號(1 或 2)。
            speaker1 (str): 第一位說話者的語音設定。
            speaker2 (str): 第二位說話者的語音設定。

        返回:
            str: 生成的臨時音訊檔案的檔名,或者 None 如果生成失敗。

        此方法使用 Edge TTS 將文字轉換爲語音,並將結果儲存爲臨時音訊檔案。
        根據指定的說話者編號選擇相應的語音設定。
        """
        # 根據說話者選擇語音
        voice = speaker1 if speaker == 1 else speaker2
        # 建立語音合成對象
        speech = edge_tts.Communicate(text, voice)

        # 生成臨時檔名
        temp_filename = f"temp_{uuid.uuid4()}.wav"
        try:
            # 儲存語音檔案
            await speech.save(temp_filename)
            return temp_filename
        except edge_tts.exceptions.NoAudioReceived:
            logger.error(f"No audio received for text: '{text[:50]}...' with voice: {voice}")
            return None
        except Exception as e:
            logger.error(f"Error generating audio for text: '{text[:50]}...' with voice: {voice}. Error: {str(e)}")
            return None
        finally:
            # 如果檔案存在但生成失敗,刪除臨時檔案
            if os.path.exists(temp_filename):
                os.remove(temp_filename)

    async def combine_audio_files(self, audio_files: List[str]) -> str:
        """
        非同步合併音訊檔案。

        引數:
            audio_files (List[str]): 包含音訊檔案路徑的列表。

        返回:
            str: 合併後的音訊檔案的檔名。
        """
        # 建立空的音訊段
        combined_audio = AudioSegment.empty()
        # 遍歷所有音訊檔案並合併
        for audio_file in audio_files:
            combined_audio += AudioSegment.from_file(audio_file)
            os.remove(audio_file)  # 清理臨時檔案

        # 生成輸出文件名
        output_filename = f"output_{uuid.uuid4()}.wav"
        # 匯出合併後的音訊
        combined_audio.export(output_filename, format="wav")
        return output_filename

    async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, api_key: str) -> str:
        """
        非同步生成Podcast音訊檔案。

        引數:
            input_text (str): 用於生成Podcast指令碼的輸入文字。
            language (str): Podcast使用的語言。
            speaker1 (str): 第一位說話者的語音設定。
            speaker2 (str): 第二位說話者的語音設定。
            api_key (str): 用於訪問 Gemini AI 服務的 API 金鑰。

        返回:
            str: 生成的Podcast音訊檔案的檔名。

        此方法執行以下步驟:
        1. 使用 generate_script 方法生成Podcast劇本。
        2. 使用 tts_generate 方法爲每個對話行生成音訊檔案。
        3. 使用 combine_audio_files 方法將所有音訊檔案合併爲一個完整的Podcast。

        整個過程是非同步的,以提高效率。方法還會記錄並顯示每個步驟的執行時間。
        """
        # 生成Podcast劇本
        gr.Info("Generating podcast script...")
        start_time = time.time()
        script_result = await self.generate_script(input_text, language, api_key)
        end_time = time.time()
        
        if "error" in script_result:
            gr.Error(f"Failed to generate podcast script: {script_result['error']}")
            return None
        
        if "raw_text" in script_result:
            gr.Warning("Generated text is not in the expected JSON format. Attempting to process raw text.")
            # Here you might want to implement a fallback method to process raw text
            # For now, we'll just return None
            return None
        
        if "podcast" not in script_result:
            gr.Error("Generated script does not contain a 'podcast' key.")
            return None
        
        gr.Info(f"Successfully generated podcast script in {(end_time - start_time):.2f} seconds!")

        # 生成Podcast音訊檔案
        gr.Info("Generating podcast audio files...")
        start_time = time.time()
        audio_files = await asyncio.gather(*[self.tts_generate(item['line'], item['speaker'], speaker1, speaker2) for item in script_result['podcast']])
        end_time = time.time()

        # Filter out None values (failed TTS generations)
        audio_files = [file for file in audio_files if file is not None]

        if not audio_files:
            gr.Error("Failed to generate any audio files. Please check your language and voice settings.")
            return None

        gr.Info(f"Successfully generated {len(audio_files)} out of {len(script_result['podcast'])} audio files in {(end_time - start_time):.2f} seconds!")

        # 合併音訊檔案
        combined_audio = await self.combine_audio_files(audio_files)
        return combined_audio

class TextExtractor:
    @staticmethod
    async def extract_from_pdf(file_path: str) -> str:
        # 從PDF檔案中提取文字
        async with aiofiles.open(file_path, 'rb') as file:
            content = await file.read()
            pdf_reader = pypdf.PdfReader(io.BytesIO(content))
            return "\n\n".join(page.extract_text() for page in pdf_reader.pages if page.extract_text())

    @staticmethod
    async def extract_from_txt(file_path: str) -> str:
        # 從TXT檔案中提取文字
        async with aiofiles.open(file_path, 'r') as file:
            return await file.read()

    @classmethod
    async def extract_text(cls, file_path: str) -> str:
        # 根據檔案型別選擇適當的提取方法
        _, file_extension = os.path.splitext(file_path)
        if file_extension.lower() == '.pdf':
            return await cls.extract_from_pdf(file_path)
        elif file_extension.lower() == '.txt':
            return await cls.extract_from_txt(file_path)
        else:
            raise gr.Error(f"Unsupported file type: {file_extension}")

async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, api_key: str = "") -> str:
    """
    處理輸入並生成Podcast的非同步函式。

    引數:
    input_text (str): 使用者輸入的文字內容。
    input_file: 使用者上傳的檔案(可以是 PDF 或 TXT)。
    language (str): 選擇的語言。
    speaker1 (str): 第一位說話者的語音選擇。
    speaker2 (str): 第二位說話者的語音選擇。
    api_key (str): 用於生成 AI 的 API 金鑰,預設爲空字串。

    返回:
    str: 生成的Podcast音訊檔案路徑。

    此函式協調整個Podcast生成過程,包括文字提取、指令碼生成和音訊合成。
    它處理不同的輸入型別(文字或檔案),並使用指定的語音和語言設定來建立最終的Podcast。
    """
    # 開始生成Podcast
    gr.Info("Starting podcast generation...")
    start_time = time.time()

    # 定義語音名稱對映
    voice_names = {
        "Andrew - English (United States)": "en-US-AndrewMultilingualNeural",
        "Ava - English (United States)": "en-US-AvaMultilingualNeural",
        "Brian - English (United States)": "en-US-BrianMultilingualNeural",
        "Emma - English (United States)": "en-US-EmmaMultilingualNeural",
        "Florian - German (Germany)": "de-DE-FlorianMultilingualNeural",
        "Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural",
        "Remy - French (France)": "fr-FR-RemyMultilingualNeural",
        "Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural"
    }

    # 獲取實際的語音名稱
    speaker1 = voice_names[speaker1]
    speaker2 = voice_names[speaker2]

    # Check if the selected voices are compatible with the chosen language
    if language != "Auto Detect":
        if not (speaker1.startswith(language[:2].lower()) and speaker2.startswith(language[:2].lower())):
            gr.Error(f"Selected voices may not be compatible with the chosen language: {language}")
            return None

    # 如果提供了輸入檔案,則從檔案中提取文字
    if input_file:
        input_text = await TextExtractor.extract_text(input_file.name)

    # Limit input text length
    max_input_length = 3000  # Adjust this value as needed
    if len(input_text) > max_input_length:
        input_text = input_text[:max_input_length]
        gr.Warning(f"Input text was truncated to {max_input_length} characters due to length limitations.")

    # 如果沒有提供API金鑰,則使用環境變數中的金鑰
    if not api_key:
        api_key = os.getenv("Your_API_KEY")

    # 建立PodcastGenerator實例並生成Podcast
    podcast_generator = PodcastGenerator()
    podcast = await podcast_generator.generate_podcast(input_text, language, speaker1, speaker2, api_key)
    
    if podcast is None:
        return None

    # 計算總耗時並顯示資訊
    end_time = time.time()
    gr.Info(f"Successfully generated podcast in {(end_time - start_time):.2f} seconds!")

    return podcast

# 定義Gradio介面
iface = gr.Interface(
    fn=process_input,
    inputs=[
        gr.Textbox(label="Input Text"),
        gr.File(label="Or Upload a PDF or TXT file"),
        gr.Dropdown(label="Language", choices=[
            "Auto Detect",
            "Chinese Taiwanese", "Afrikaans", "Albanian", "Amharic", "Arabic", "Armenian", "Azerbaijani",
            "Bahasa Indonesian", "Bangla", "Basque", "Bengali", "Bosnian", "Bulgarian",
            "Burmese", "Catalan", "Chinese Cantonese", "Chinese Mandarin", "Croatian", "Czech", "Danish", "Dutch", "English",
            "Estonian", "Filipino", "Finnish", "French", "Galician", "Georgian",
            "German", "Greek", "Hebrew", "Hindi", "Hungarian", "Icelandic", "Irish",
            "Italian", "Japanese", "Javanese", "Kannada", "Kazakh", "Khmer", "Korean",
            "Lao", "Latvian", "Lithuanian", "Macedonian", "Malay", "Malayalam",
            "Maltese", "Mongolian", "Nepali", "Norwegian Bokmål", "Pashto", "Persian",
            "Polish", "Portuguese", "Romanian", "Russian", "Serbian", "Sinhala",
            "Slovak", "Slovene", "Somali", "Spanish", "Sundanese", "Swahili",
            "Swedish", "Tamil", "Telugu", "Thai", "Turkish", "Ukrainian", "Urdu",
            "Uzbek", "Vietnamese", "Welsh", "Zulu"
        ],
        value="Auto Detect"),
        gr.Dropdown(label="Speaker 1 Voice", choices=[
            "Andrew - English (United States)",
            "Ava - English (United States)",
            "Brian - English (United States)",
            "Emma - English (United States)",
            "Florian - German (Germany)",
            "Seraphina - German (Germany)",
            "Remy - French (France)",
            "Vivienne - French (France)"
        ],
        value="Andrew - English (United States)"),
        gr.Dropdown(label="Speaker 2 Voice", choices=[
            "Andrew - English (United States)",
            "Ava - English (United States)",
            "Brian - English (United States)",
            "Emma - English (United States)",
            "Florian - German (Germany)",
            "Seraphina - German (Germany)",
            "Remy - French (France)",
            "Vivienne - French (France)"
        ],
        value="Ava - English (United States)"),
        gr.Textbox(label="Your Gemini API Key (Optional) - In case you are getting rate limited"),
    ],
    outputs=[
        gr.Audio(label="Generated Podcast Audio")
    ],
    title="🎙️ PodcastGen 🎙️",
    description="Generate a 2-speaker podcast from text input or documents!",
    allow_flagging="never"
)

if __name__ == "__main__":
    iface.launch()