# Import configuration first to setup environment import app_config import os import sys import io import subprocess import uuid import time import torch import torchaudio import tempfile import logging from typing import Optional # Fix PyTorch weights_only issue for XTTS import torch.serialization from TTS.tts.configs.xtts_config import XttsConfig torch.serialization.add_safe_globals([XttsConfig]) # Set environment variables os.environ["COQUI_TOS_AGREED"] = "1" os.environ["NUMBA_DISABLE_JIT"] = "1" # Force CPU usage if specified if os.environ.get("FORCE_CPU", "false").lower() == "true": os.environ["CUDA_VISIBLE_DEVICES"] = "" from fastapi import FastAPI, HTTPException, UploadFile, File, Form from fastapi.responses import FileResponse from pydantic import BaseModel import langid from scipy.io.wavfile import write from pydub import AudioSegment from TTS.api import TTS from TTS.tts.configs.xtts_config import XttsConfig from TTS.tts.models.xtts import Xtts from TTS.utils.generic_utils import get_user_data_dir # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="XTTS C3PO API", description="Text-to-Speech API using XTTS-v2 C3PO model", version="1.0.0") class TTSRequest(BaseModel): text: str language: str = "en" voice_cleanup: bool = False no_lang_auto_detect: bool = False class XTTSService: def __init__(self): self.device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Using device: {self.device}") # Use the C3PO model path self.model_path = "XTTS-v2_C3PO/" self.config_path = "XTTS-v2_C3PO/config.json" # Check if model files exist, if not download them if not os.path.exists(self.config_path): logger.info("C3PO model not found locally, downloading...") self._download_c3po_model() # Load configuration config = XttsConfig() config.load_json(self.config_path) # Initialize and load model self.model = Xtts.init_from_config(config) self.model.load_checkpoint( config, checkpoint_path=os.path.join(self.model_path, "model.pth"), vocab_path=os.path.join(self.model_path, "vocab.json"), eval=True, ) if self.device == "cuda": self.model.cuda() self.supported_languages = config.languages logger.info(f"XTTS C3PO model loaded successfully. Supported languages: {self.supported_languages}") # Set default reference audio (C3PO voice) self.default_reference = os.path.join(self.model_path, "reference.wav") if not os.path.exists(self.default_reference): # Look for any reference audio in the model directory for file in os.listdir(self.model_path): if file.endswith(('.wav', '.mp3', '.m4a')): self.default_reference = os.path.join(self.model_path, file) break else: self.default_reference = None if self.default_reference: logger.info(f"Default C3PO reference audio: {self.default_reference}") else: logger.warning("No default reference audio found in C3PO model directory") def _download_c3po_model(self): """Download the C3PO model from Hugging Face""" try: logger.info("Downloading C3PO model from Hugging Face...") subprocess.run([ "git", "clone", "https://huggingface.co/Borcherding/XTTS-v2_C3PO", "XTTS-v2_C3PO" ], check=True) logger.info("C3PO model downloaded successfully") except subprocess.CalledProcessError as e: logger.error(f"Failed to download C3PO model: {e}") raise HTTPException(status_code=500, detail="Failed to download C3PO model") def generate_speech(self, text: str, speaker_wav_path: str = None, language: str = "en", voice_cleanup: bool = False, no_lang_auto_detect: bool = False) -> str: """Generate speech and return the path to the output file""" try: # Use default C3PO voice if no speaker file provided if speaker_wav_path is None: if self.default_reference is None: raise HTTPException(status_code=400, detail="No reference audio available. Please upload a speaker file.") speaker_wav_path = self.default_reference logger.info("Using default C3PO voice") # Validate language if language not in self.supported_languages: raise HTTPException(status_code=400, detail=f"Language '{language}' not supported. Supported: {self.supported_languages}") # Language detection for longer texts if len(text) > 15 and not no_lang_auto_detect: language_predicted = langid.classify(text)[0].strip() if language_predicted == "zh": language_predicted = "zh-cn" if language_predicted != language: logger.warning(f"Detected language: {language_predicted}, chosen: {language}") # Text length validation if len(text) < 2: raise HTTPException(status_code=400, detail="Text too short, please provide longer text") if len(text) > 500: # Increased limit for API raise HTTPException(status_code=400, detail="Text too long, maximum 500 characters") # Voice cleanup if requested processed_speaker_wav = speaker_wav_path if voice_cleanup: processed_speaker_wav = self._cleanup_audio(speaker_wav_path) # Generate conditioning latents try: gpt_cond_latent, speaker_embedding = self.model.get_conditioning_latents( audio_path=processed_speaker_wav, gpt_cond_len=30, gpt_cond_chunk_len=4, max_ref_length=60 ) except Exception as e: logger.error(f"Speaker encoding error: {e}") raise HTTPException(status_code=400, detail="Error processing reference audio. Please check the audio file.") # Generate speech logger.info("Generating speech...") start_time = time.time() out = self.model.inference( text, language, gpt_cond_latent, speaker_embedding, repetition_penalty=5.0, temperature=0.75, ) inference_time = time.time() - start_time logger.info(f"Speech generation completed in {inference_time:.2f} seconds") # Save output output_filename = f"xtts_c3po_output_{uuid.uuid4().hex}.wav" output_path = os.path.join(tempfile.gettempdir(), output_filename) torchaudio.save(output_path, torch.tensor(out["wav"]).unsqueeze(0), 24000) return output_path except Exception as e: logger.error(f"Error generating speech: {e}") if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail=f"Failed to generate speech: {str(e)}") def _cleanup_audio(self, audio_path: str) -> str: """Apply audio cleanup filters""" try: output_path = audio_path + "_cleaned.wav" # Basic audio cleanup using ffmpeg-python or similar # For now, just return the original path # You can implement more sophisticated cleanup here return audio_path except Exception as e: logger.warning(f"Audio cleanup failed: {e}, using original audio") return audio_path # Initialize XTTS service logger.info("Initializing XTTS C3PO service...") tts_service = XTTSService() @app.get("/") async def root(): return {"message": "XTTS C3PO API is running", "status": "healthy", "model": "C3PO"} @app.get("/health") async def health_check(): return { "status": "healthy", "device": tts_service.device, "model": "XTTS-v2 C3PO", "supported_languages": tts_service.supported_languages, "default_voice": "C3PO" if tts_service.default_reference else "None" } @app.get("/languages") async def get_languages(): """Get list of supported languages""" return {"languages": tts_service.supported_languages} @app.post("/tts") async def text_to_speech( text: str = Form(...), language: str = Form("en"), voice_cleanup: bool = Form(False), no_lang_auto_detect: bool = Form(False), speaker_file: UploadFile = File(None) ): """ Convert text to speech using XTTS C3PO voice cloning - **text**: The text to convert to speech (max 500 characters) - **language**: Language code (default: "en") - **voice_cleanup**: Apply audio cleanup to reference voice - **no_lang_auto_detect**: Disable automatic language detection - **speaker_file**: Reference speaker audio file (optional, uses C3PO voice if not provided) """ if not text.strip(): raise HTTPException(status_code=400, detail="Text cannot be empty") speaker_temp_path = None try: # Handle speaker file if provided if speaker_file is not None: # Validate file type if not speaker_file.content_type.startswith('audio/'): raise HTTPException(status_code=400, detail="Speaker file must be an audio file") # Save uploaded speaker file temporarily speaker_temp_path = os.path.join(tempfile.gettempdir(), f"speaker_{uuid.uuid4().hex}.wav") with open(speaker_temp_path, "wb") as buffer: content = await speaker_file.read() buffer.write(content) # Generate speech (will use C3PO voice if no speaker file provided) output_path = tts_service.generate_speech( text, speaker_temp_path, language, voice_cleanup, no_lang_auto_detect ) # Clean up temporary speaker file if speaker_temp_path and os.path.exists(speaker_temp_path): try: os.remove(speaker_temp_path) except: pass # Return the generated audio file voice_type = "custom" if speaker_file else "c3po" return FileResponse( output_path, media_type="audio/wav", filename=f"xtts_{voice_type}_output_{uuid.uuid4().hex}.wav", headers={"Content-Disposition": "attachment"} ) except Exception as e: # Clean up files in case of error if speaker_temp_path and os.path.exists(speaker_temp_path): try: os.remove(speaker_temp_path) except: pass logger.error(f"Error in TTS endpoint: {e}") if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail=str(e)) @app.post("/tts-json") async def text_to_speech_json( request: TTSRequest, speaker_file: UploadFile = File(None) ): """ Convert text to speech using JSON request body - **request**: TTSRequest containing text, language, and options - **speaker_file**: Reference speaker audio file (optional, uses C3PO voice if not provided) """ if not request.text.strip(): raise HTTPException(status_code=400, detail="Text cannot be empty") speaker_temp_path = None try: # Handle speaker file if provided if speaker_file is not None: # Validate file type if not speaker_file.content_type.startswith('audio/'): raise HTTPException(status_code=400, detail="Speaker file must be an audio file") # Save uploaded speaker file temporarily speaker_temp_path = os.path.join(tempfile.gettempdir(), f"speaker_{uuid.uuid4().hex}.wav") with open(speaker_temp_path, "wb") as buffer: content = await speaker_file.read() buffer.write(content) # Generate speech output_path = tts_service.generate_speech( request.text, speaker_temp_path, request.language, request.voice_cleanup, request.no_lang_auto_detect ) # Clean up temporary speaker file if speaker_temp_path and os.path.exists(speaker_temp_path): try: os.remove(speaker_temp_path) except: pass # Return the generated audio file voice_type = "custom" if speaker_file else "c3po" return FileResponse( output_path, media_type="audio/wav", filename=f"xtts_{voice_type}_{request.language}_{uuid.uuid4().hex}.wav", headers={"Content-Disposition": "attachment"} ) except Exception as e: # Clean up files in case of error if speaker_temp_path and os.path.exists(speaker_temp_path): try: os.remove(speaker_temp_path) except: pass logger.error(f"Error in TTS JSON endpoint: {e}") if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail=str(e)) @app.post("/tts-c3po") async def text_to_speech_c3po_only( text: str = Form(...), language: str = Form("en"), no_lang_auto_detect: bool = Form(False) ): """ Convert text to speech using C3PO voice only (no file upload needed) - **text**: The text to convert to speech (max 500 characters) - **language**: Language code (default: "en") - **no_lang_auto_detect**: Disable automatic language detection """ if not text.strip(): raise HTTPException(status_code=400, detail="Text cannot be empty") try: # Generate speech using C3PO voice output_path = tts_service.generate_speech( text, None, # Use default C3PO voice language, False, # No voice cleanup needed for default voice no_lang_auto_detect ) # Return the generated audio file return FileResponse( output_path, media_type="audio/wav", filename=f"c3po_voice_{uuid.uuid4().hex}.wav", headers={"Content-Disposition": "attachment"} ) except Exception as e: logger.error(f"Error in C3PO TTS endpoint: {e}") if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail=str(e))