# app.py - Main application file (OPTIMIZED FOR HUGGING FACE SPACES) import os import sys import logging import traceback import time import uuid import threading from functools import lru_cache import concurrent.futures from collections import defaultdict, deque # Configure logging - keeping it simple for Hugging Face Spaces logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(thread)d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger("speech_api") # Simple in-memory rate limiting REQUEST_HISTORY = defaultdict(deque) RATE_LIMIT_WINDOW = 60 # seconds MAX_REQUESTS_PER_WINDOW = 15 # More conservative for HF rate_limit_lock = threading.Lock() # Small thread pool suitable for HF Spaces MAX_WORKERS = 3 # Conservative number for HF Spaces worker_pool = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) # Set all cache directories to locations within /tmp cache_dirs = { "HF_HOME": "/tmp/hf_home", "TRANSFORMERS_CACHE": "/tmp/transformers_cache", "HUGGINGFACE_HUB_CACHE": "/tmp/huggingface_hub_cache", "TORCH_HOME": "/tmp/torch_home", "XDG_CACHE_HOME": "/tmp/xdg_cache" } # Set environment variables and create directories for env_var, path in cache_dirs.items(): os.environ[env_var] = path try: os.makedirs(path, exist_ok=True) logger.info(f"📁 Created cache directory: {path}") except Exception as e: logger.error(f"❌ Failed to create directory {path}: {str(e)}") # Now import the rest of the libraries try: import librosa import glob import numpy as np import torch from pydub import AudioSegment import tempfile import soundfile as sf from flask import Flask, request, jsonify, send_file, g from flask_cors import CORS from werkzeug.utils import secure_filename # Import functionality from other modules from translator import ( init_models, check_model_status, handle_asr_request, handle_tts_request, handle_translation_request ) from evaluate import ( handle_evaluation_request, handle_upload_reference, init_reference_audio, calculate_similarity ) logger.info("✅ All required libraries imported successfully") except ImportError as e: logger.critical(f"❌ Failed to import necessary libraries: {str(e)}") sys.exit(1) # Check CUDA availability and optimize memory usage if torch.cuda.is_available(): logger.info(f"🚀 CUDA available: {torch.cuda.get_device_name(0)}") device = "cuda" # Optimize CUDA memory usage for HF Spaces torch.cuda.empty_cache() # Conservative memory settings for HF Spaces torch.cuda.set_per_process_memory_fraction(0.7) # Don't use all GPU memory torch.backends.cudnn.benchmark = True # Speed up operations else: logger.info("⚠️ CUDA not available, using CPU") device = "cpu" # Constants SAMPLE_RATE = 16000 OUTPUT_DIR = "/tmp/audio_outputs" REFERENCE_AUDIO_DIR = "./reference_audios" MAX_CACHE_SIZE = 50 # Smaller cache for HF Spaces # In-memory caches asr_cache = {} tts_cache = {} translation_cache = {} try: os.makedirs(OUTPUT_DIR, exist_ok=True) logger.info(f"📁 Created output directory: {OUTPUT_DIR}") except Exception as e: logger.error(f"❌ Failed to create output directory: {str(e)}") # Create user-specific directories to prevent conflicts def get_user_output_dir(user_id=None): """Create and return a user-specific output directory""" if user_id is None: user_id = str(uuid.uuid4())[:8] user_dir = os.path.join(OUTPUT_DIR, user_id) os.makedirs(user_dir, exist_ok=True) return user_dir # Initialize Flask app app = Flask(__name__) CORS(app) app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload for HF # Load models init_models(device) # Rate limit decorator - simple in-memory implementation def rate_limit(f): from functools import wraps # Import wraps at the top of the decorator @wraps(f) # Add this line to preserve function metadata def decorated_function(*args, **kwargs): client_ip = request.remote_addr or request.headers.get('X-Forwarded-For', 'unknown') with rate_limit_lock: current_time = time.time() # Add current request timestamp if client_ip not in REQUEST_HISTORY: REQUEST_HISTORY[client_ip] = deque(maxlen=MAX_REQUESTS_PER_WINDOW) # Clean old requests (older than window) while REQUEST_HISTORY[client_ip] and current_time - REQUEST_HISTORY[client_ip][0] > RATE_LIMIT_WINDOW: REQUEST_HISTORY[client_ip].popleft() # Check if rate limit is exceeded if len(REQUEST_HISTORY[client_ip]) >= MAX_REQUESTS_PER_WINDOW: logger.warning(f"⚠️ Rate limit exceeded for {client_ip}") return jsonify({ "error": "Rate limit exceeded", "message": "Too many requests, please try again later" }), 429 # Add this request REQUEST_HISTORY[client_ip].append(current_time) return f(*args, **kwargs) return decorated_function # Caching helpers def compute_hash(data): """Compute a hash for caching purposes""" import hashlib if isinstance(data, str): return hashlib.md5(data.encode('utf-8')).hexdigest() return hashlib.md5(str(data).encode('utf-8')).hexdigest() # Cache decorator for responses def cache_response(cache_dict, key_fn, max_size=MAX_CACHE_SIZE): def decorator(f): def wrapper(*args, **kwargs): key = key_fn(*args, **kwargs) # Check cache if key in cache_dict: logger.info(f"✅ Cache hit for {f.__name__}") return cache_dict[key] # Get actual response response = f(*args, **kwargs) # Store in cache if it's a successful response if isinstance(response, tuple): result, status_code = response if status_code < 400: # Only cache successful responses cache_dict[key] = response else: cache_dict[key] = response # Limit cache size if len(cache_dict) > max_size: # Remove random item (simple approach for HF Spaces) cache_dict.pop(next(iter(cache_dict))) return response return wrapper return decorator # Request tracking middleware @app.before_request def before_request(): g.request_id = str(uuid.uuid4())[:8] g.start_time = time.time() # Initialize reference directory if needed if not hasattr(g, 'initialized'): global REFERENCE_AUDIO_DIR # This might return an updated path if the original fails updated_ref_dir = init_reference_audio(REFERENCE_AUDIO_DIR, OUTPUT_DIR) if updated_ref_dir and updated_ref_dir != REFERENCE_AUDIO_DIR: REFERENCE_AUDIO_DIR = updated_ref_dir logger.info(f"📁 Updated reference audio directory to: {REFERENCE_AUDIO_DIR}") g.initialized = True # Create user-specific directory user_id = request.headers.get('X-User-ID', str(uuid.uuid4())[:8]) g.user_output_dir = get_user_output_dir(user_id) logger.info(f"[{g.request_id}] 🔄 {request.method} {request.path} started") @app.after_request def after_request(response): if hasattr(g, 'request_id') and hasattr(g, 'start_time'): duration = time.time() - g.start_time logger.info(f"[{g.request_id}] ✅ Completed in {duration:.2f}s with status {response.status_code}") # Set cache headers if request.endpoint == 'download_audio': response.headers['Cache-Control'] = 'public, max-age=86400' # Cache audio for a day else: response.headers['Cache-Control'] = 'no-store' # No caching for API responses return response # Global error handler @app.errorhandler(Exception) def handle_exception(e): logger.error(f"❌ Unhandled exception: {str(e)}") logger.debug(traceback.format_exc()) return jsonify({ "error": "Internal server error", "message": str(e) }), 500 # Define routes @app.route("/", methods=["GET"]) def home(): return jsonify({ "message": "Speech API is running", "status": "active", "version": "1.1", "environment": "Hugging Face Spaces" }) @app.route("/health", methods=["GET"]) def health_check(): health_status = check_model_status() health_status["api_status"] = "online" health_status["device"] = device # Add memory usage info if torch.cuda.is_available(): health_status["memory"] = { "cuda_allocated_mb": round(torch.cuda.memory_allocated() / (1024 * 1024), 2), "cuda_reserved_mb": round(torch.cuda.memory_reserved() / (1024 * 1024), 2) } # Add cache stats health_status["cache_stats"] = { "asr_cache_size": len(asr_cache), "tts_cache_size": len(tts_cache), "translation_cache_size": len(translation_cache) } return jsonify(health_status) # ASR with optimizations @app.route("/asr", methods=["POST"]) @rate_limit def transcribe_audio(): # Get user-specific output directory user_output_dir = g.user_output_dir if hasattr(g, 'user_output_dir') else OUTPUT_DIR # Check cache first (simple caching logic) if 'audio' in request.files: audio_file = request.files['audio'] language = request.form.get("language", "english").lower() # Create a simple cache key audio_content = audio_file.read() audio_file.seek(0) # Reset file pointer cache_key = f"asr_{compute_hash(audio_content)}_{language}" if cache_key in asr_cache: logger.info(f"[{g.request_id}] ✅ Using cached ASR result") return asr_cache[cache_key] # Process the request normally result = handle_asr_request(request, user_output_dir, SAMPLE_RATE) # Cache successful responses if isinstance(result, tuple): response, status_code = result if status_code == 200: asr_cache[cache_key] = result # Limit cache size if len(asr_cache) > MAX_CACHE_SIZE: asr_cache.pop(next(iter(asr_cache))) return result @app.route("/tts", methods=["POST"]) @rate_limit def generate_tts(): # Get user-specific output directory user_output_dir = g.user_output_dir if hasattr(g, 'user_output_dir') else OUTPUT_DIR # Check cache first if request.is_json: data = request.get_json() if data: text = data.get("text", "").strip() language = data.get("language", "kapampangan").lower() cache_key = f"tts_{compute_hash(text)}_{language}" if cache_key in tts_cache: logger.info(f"[{g.request_id}] ✅ Using cached TTS result") return tts_cache[cache_key] # Process the request normally result = handle_tts_request(request, user_output_dir) # Cache successful responses if isinstance(result, tuple): response, status_code = result if status_code == 200 and request.is_json: tts_cache[cache_key] = result # Limit cache size if len(tts_cache) > MAX_CACHE_SIZE: tts_cache.pop(next(iter(tts_cache))) return result @app.route("/translate", methods=["POST"]) @rate_limit def translate_text(): # Check cache first if request.is_json: data = request.get_json() if data: text = data.get("text", "").strip() source_language = data.get("source_language", "").lower() target_language = data.get("target_language", "").lower() cache_key = f"translate_{compute_hash(text)}_{source_language}_{target_language}" if cache_key in translation_cache: logger.info(f"[{g.request_id}] ✅ Using cached translation result") return translation_cache[cache_key] # Process the request normally result = handle_translation_request(request) # Cache successful responses if isinstance(result, tuple): response, status_code = result if status_code == 200 and request.is_json: translation_cache[cache_key] = result # Limit cache size if len(translation_cache) > MAX_CACHE_SIZE: translation_cache.pop(next(iter(translation_cache))) return result @app.route("/download/", methods=["GET"]) def download_audio(filename): # First try user-specific directory if available if hasattr(g, 'user_output_dir'): file_path = os.path.join(g.user_output_dir, filename) if os.path.exists(file_path): logger.info(f"📤 Serving user audio file: {file_path}") return send_file(file_path, mimetype="audio/wav", as_attachment=True) # Then try main output directory file_path = os.path.join(OUTPUT_DIR, filename) if os.path.exists(file_path): logger.info(f"📤 Serving audio file: {file_path}") return send_file(file_path, mimetype="audio/wav", as_attachment=True) # Check for any subdirectories (simplified approach) for root, dirs, files in os.walk(OUTPUT_DIR): if filename in files: full_path = os.path.join(root, filename) logger.info(f"📤 Serving found audio file: {full_path}") return send_file(full_path, mimetype="audio/wav", as_attachment=True) logger.warning(f"⚠️ Requested file not found: {filename}") return jsonify({"error": "File not found"}), 404 @app.route("/evaluate", methods=["POST"]) @rate_limit def evaluate_pronunciation(): # Get user-specific output directory user_output_dir = g.user_output_dir if hasattr(g, 'user_output_dir') else OUTPUT_DIR return handle_evaluation_request(request, REFERENCE_AUDIO_DIR, user_output_dir, SAMPLE_RATE) @app.route("/check_references", methods=["GET"]) def check_references(): """Optimized endpoint to check if reference files exist""" ref_patterns = ["mayap_a_abak", "mayap_a_ugtu", "mayap_a_gatpanapun", "mayap_a_bengi", "komusta_ka", "malaus_ko_pu", "malaus_kayu", "agaganaka_da_ka", "pagdulapan_da_ka", "kaluguran_da_ka", "dakal_a_salamat", "panapaya_mu_ku", "wa", "ali", "tuknang", "lagwa", "galo", "buri_ke_ini", "tara_na", "nokarin_ka_ibat", "nokarin_ka_munta", "atiu_na_ku", "nanung_panayan_mu", "mako_na_ka", "muli_ta_na", "nanu_ing_pengan_mu", "mekeni", "mengan_na_ka", "munta_ka_karin", "magkanu_ini", "mimingat_ka", "mangan_ta_na", "lakwan_da_ka", "nanu_maliari_kung_daptan_keka", "pilan_na_ka_banwa", "saliwan_ke_ini", "makananu_munta_king","adwa", "anam", "apat", "apulu", "atlu", "dalan", "libu", "lima", "metung", "pitu", "siyam", "walu", "masala", "madalumdum", "maragul", "marimla", "malagu", "marok", "mababa", "malapit", "matuling", "maputi", "arung", "asbuk", "balugbug", "bitis", "buntuk", "butit", "gamat", "kuku", "salu", "tud", "pisan", "dara", "achi", "apu", "ima", "tatang", "pengari", "koya", "kapatad", "wali", "pasbul", "awang", "dagis", "bale", "ulas", "sambra", "sulu", "pitudturan", "luklukan", "ulnan" ] # Get a summary instead of details to reduce response size summary = { "reference_audio_dir": REFERENCE_AUDIO_DIR, "directory_exists": os.path.exists(REFERENCE_AUDIO_DIR), "total_patterns": len(ref_patterns), "existing_patterns": 0, "total_files": 0 } for pattern in ref_patterns: pattern_dir = os.path.join(REFERENCE_AUDIO_DIR, pattern) if os.path.exists(pattern_dir): wav_files = glob.glob(os.path.join(pattern_dir, "*.wav")) if wav_files: summary["existing_patterns"] += 1 summary["total_files"] += len(wav_files) return jsonify(summary) # Add detailed reference check as a separate endpoint @app.route("/check_references/detailed", methods=["GET"]) def check_references_detailed(): """Get detailed information for specific reference patterns""" patterns = request.args.get('patterns', '').split(',') # If no patterns specified, return the first 10 (avoid heavy response) if not patterns or patterns == ['']: ref_patterns = ["mayap_a_abak", "mayap_a_ugtu", "mayap_a_gatpanapun", "mayap_a_bengi", "komusta_ka", "malaus_ko_pu", "malaus_kayu", "agaganaka_da_ka", "pagdulapan_da_ka", "kaluguran_da_ka"] else: ref_patterns = [p.strip() for p in patterns if p.strip()] results = {} for pattern in ref_patterns: pattern_dir = os.path.join(REFERENCE_AUDIO_DIR, pattern) if os.path.exists(pattern_dir): wav_files = glob.glob(os.path.join(pattern_dir, "*.wav")) results[pattern] = { "exists": True, "path": pattern_dir, "file_count": len(wav_files), "files": [os.path.basename(f) for f in wav_files] } else: results[pattern] = { "exists": False, "path": pattern_dir } return jsonify({ "reference_audio_dir": REFERENCE_AUDIO_DIR, "patterns": results }) @app.route("/upload_reference", methods=["POST"]) @rate_limit def upload_reference_audio(): return handle_upload_reference(request, REFERENCE_AUDIO_DIR, SAMPLE_RATE) # Add a cleanup endpoint @app.route("/cleanup", methods=["POST"]) def cleanup_files(): """Clean up old files to free space (important for HF Spaces)""" try: # Only allow from local or with API key if not (request.remote_addr == '127.0.0.1' or request.headers.get('X-Cleanup-Key') == os.environ.get('CLEANUP_KEY', 'cleanup-secret')): return jsonify({"error": "Unauthorized"}), 403 # Delete files older than 2 hours cutoff_time = time.time() - 7200 # 2 hours in seconds deleted_count = 0 for root, dirs, files in os.walk(OUTPUT_DIR): for file in files: try: file_path = os.path.join(root, file) if os.path.getmtime(file_path) < cutoff_time: os.remove(file_path) deleted_count += 1 except Exception as e: logger.warning(f"⚠️ Failed to delete {file}: {e}") # Clear empty directories for root, dirs, files in os.walk(OUTPUT_DIR, topdown=False): for dir_name in dirs: try: dir_path = os.path.join(root, dir_name) if not os.listdir(dir_path): os.rmdir(dir_path) except Exception as e: logger.warning(f"⚠️ Failed to remove empty dir {dir_name}: {e}") # Clear torch cache if torch.cuda.is_available(): torch.cuda.empty_cache() return jsonify({ "message": "Cleanup completed", "files_deleted": deleted_count }) except Exception as e: logger.error(f"❌ Cleanup error: {str(e)}") return jsonify({"error": str(e)}), 500 if __name__ == "__main__": # This might return an updated path if the original fails updated_ref_dir = init_reference_audio(REFERENCE_AUDIO_DIR, OUTPUT_DIR) if updated_ref_dir and updated_ref_dir != REFERENCE_AUDIO_DIR: REFERENCE_AUDIO_DIR = updated_ref_dir logger.info(f"📁 Updated reference audio directory to: {REFERENCE_AUDIO_DIR}") logger.info("🚀 Starting Speech API server optimized for Hugging Face Spaces") # Get the status for logging status = check_model_status() logger.info(f"📊 System status: ASR model: {'✅' if status['asr_model'] == 'loaded' else '❌'}") for lang, model_status in status['tts_models'].items(): logger.info(f"📊 TTS model {lang}: {'✅' if model_status == 'loaded' else '❌'}") # Use threaded=True for better performance app.run(host="0.0.0.0", port=7860, debug=False, threaded=True)