Kapamtalk / app.py
Coco-18's picture
Update app.py
fec0be4 verified
raw
history blame
20.8 kB
# app.py - Main application file (OPTIMIZED FOR HUGGING FACE SPACES)
import os
import sys
import logging
import traceback
import time
import uuid
import threading
from functools import lru_cache
import concurrent.futures
from collections import defaultdict, deque
# Configure logging - keeping it simple for Hugging Face Spaces
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(thread)d] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("speech_api")
# Simple in-memory rate limiting
REQUEST_HISTORY = defaultdict(deque)
RATE_LIMIT_WINDOW = 60 # seconds
MAX_REQUESTS_PER_WINDOW = 15 # More conservative for HF
rate_limit_lock = threading.Lock()
# Small thread pool suitable for HF Spaces
MAX_WORKERS = 3 # Conservative number for HF Spaces
worker_pool = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
# Set all cache directories to locations within /tmp
cache_dirs = {
"HF_HOME": "/tmp/hf_home",
"TRANSFORMERS_CACHE": "/tmp/transformers_cache",
"HUGGINGFACE_HUB_CACHE": "/tmp/huggingface_hub_cache",
"TORCH_HOME": "/tmp/torch_home",
"XDG_CACHE_HOME": "/tmp/xdg_cache"
}
# Set environment variables and create directories
for env_var, path in cache_dirs.items():
os.environ[env_var] = path
try:
os.makedirs(path, exist_ok=True)
logger.info(f"πŸ“ Created cache directory: {path}")
except Exception as e:
logger.error(f"❌ Failed to create directory {path}: {str(e)}")
# Now import the rest of the libraries
try:
import librosa
import glob
import numpy as np
import torch
from pydub import AudioSegment
import tempfile
import soundfile as sf
from flask import Flask, request, jsonify, send_file, g
from flask_cors import CORS
from werkzeug.utils import secure_filename
# Import functionality from other modules
from translator import (
init_models, check_model_status, handle_asr_request,
handle_tts_request, handle_translation_request
)
from evaluate import (
handle_evaluation_request, handle_upload_reference,
init_reference_audio, calculate_similarity
)
logger.info("βœ… All required libraries imported successfully")
except ImportError as e:
logger.critical(f"❌ Failed to import necessary libraries: {str(e)}")
sys.exit(1)
# Check CUDA availability and optimize memory usage
if torch.cuda.is_available():
logger.info(f"πŸš€ CUDA available: {torch.cuda.get_device_name(0)}")
device = "cuda"
# Optimize CUDA memory usage for HF Spaces
torch.cuda.empty_cache()
# Conservative memory settings for HF Spaces
torch.cuda.set_per_process_memory_fraction(0.7) # Don't use all GPU memory
torch.backends.cudnn.benchmark = True # Speed up operations
else:
logger.info("⚠️ CUDA not available, using CPU")
device = "cpu"
# Constants
SAMPLE_RATE = 16000
OUTPUT_DIR = "/tmp/audio_outputs"
REFERENCE_AUDIO_DIR = "./reference_audios"
MAX_CACHE_SIZE = 50 # Smaller cache for HF Spaces
# In-memory caches
asr_cache = {}
tts_cache = {}
translation_cache = {}
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
logger.info(f"πŸ“ Created output directory: {OUTPUT_DIR}")
except Exception as e:
logger.error(f"❌ Failed to create output directory: {str(e)}")
# Create user-specific directories to prevent conflicts
def get_user_output_dir(user_id=None):
"""Create and return a user-specific output directory"""
if user_id is None:
user_id = str(uuid.uuid4())[:8]
user_dir = os.path.join(OUTPUT_DIR, user_id)
os.makedirs(user_dir, exist_ok=True)
return user_dir
# Initialize Flask app
app = Flask(__name__)
CORS(app)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload for HF
# Load models
init_models(device)
# Rate limit decorator - simple in-memory implementation
def rate_limit(f):
from functools import wraps # Import wraps at the top of the decorator
@wraps(f) # Add this line to preserve function metadata
def decorated_function(*args, **kwargs):
client_ip = request.remote_addr or request.headers.get('X-Forwarded-For', 'unknown')
with rate_limit_lock:
current_time = time.time()
# Add current request timestamp
if client_ip not in REQUEST_HISTORY:
REQUEST_HISTORY[client_ip] = deque(maxlen=MAX_REQUESTS_PER_WINDOW)
# Clean old requests (older than window)
while REQUEST_HISTORY[client_ip] and current_time - REQUEST_HISTORY[client_ip][0] > RATE_LIMIT_WINDOW:
REQUEST_HISTORY[client_ip].popleft()
# Check if rate limit is exceeded
if len(REQUEST_HISTORY[client_ip]) >= MAX_REQUESTS_PER_WINDOW:
logger.warning(f"⚠️ Rate limit exceeded for {client_ip}")
return jsonify({
"error": "Rate limit exceeded",
"message": "Too many requests, please try again later"
}), 429
# Add this request
REQUEST_HISTORY[client_ip].append(current_time)
return f(*args, **kwargs)
return decorated_function
# Caching helpers
def compute_hash(data):
"""Compute a hash for caching purposes"""
import hashlib
if isinstance(data, str):
return hashlib.md5(data.encode('utf-8')).hexdigest()
return hashlib.md5(str(data).encode('utf-8')).hexdigest()
# Cache decorator for responses
def cache_response(cache_dict, key_fn, max_size=MAX_CACHE_SIZE):
def decorator(f):
def wrapper(*args, **kwargs):
key = key_fn(*args, **kwargs)
# Check cache
if key in cache_dict:
logger.info(f"βœ… Cache hit for {f.__name__}")
return cache_dict[key]
# Get actual response
response = f(*args, **kwargs)
# Store in cache if it's a successful response
if isinstance(response, tuple):
result, status_code = response
if status_code < 400: # Only cache successful responses
cache_dict[key] = response
else:
cache_dict[key] = response
# Limit cache size
if len(cache_dict) > max_size:
# Remove random item (simple approach for HF Spaces)
cache_dict.pop(next(iter(cache_dict)))
return response
return wrapper
return decorator
# Request tracking middleware
@app.before_request
def before_request():
g.request_id = str(uuid.uuid4())[:8]
g.start_time = time.time()
# Initialize reference directory if needed
if not hasattr(g, 'initialized'):
global REFERENCE_AUDIO_DIR
# This might return an updated path if the original fails
updated_ref_dir = init_reference_audio(REFERENCE_AUDIO_DIR, OUTPUT_DIR)
if updated_ref_dir and updated_ref_dir != REFERENCE_AUDIO_DIR:
REFERENCE_AUDIO_DIR = updated_ref_dir
logger.info(f"πŸ“ Updated reference audio directory to: {REFERENCE_AUDIO_DIR}")
g.initialized = True
# Create user-specific directory
user_id = request.headers.get('X-User-ID', str(uuid.uuid4())[:8])
g.user_output_dir = get_user_output_dir(user_id)
logger.info(f"[{g.request_id}] πŸ”„ {request.method} {request.path} started")
@app.after_request
def after_request(response):
if hasattr(g, 'request_id') and hasattr(g, 'start_time'):
duration = time.time() - g.start_time
logger.info(f"[{g.request_id}] βœ… Completed in {duration:.2f}s with status {response.status_code}")
# Set cache headers
if request.endpoint == 'download_audio':
response.headers['Cache-Control'] = 'public, max-age=86400' # Cache audio for a day
else:
response.headers['Cache-Control'] = 'no-store' # No caching for API responses
return response
# Global error handler
@app.errorhandler(Exception)
def handle_exception(e):
logger.error(f"❌ Unhandled exception: {str(e)}")
logger.debug(traceback.format_exc())
return jsonify({
"error": "Internal server error",
"message": str(e)
}), 500
# Define routes
@app.route("/", methods=["GET"])
def home():
return jsonify({
"message": "Speech API is running",
"status": "active",
"version": "1.1",
"environment": "Hugging Face Spaces"
})
@app.route("/health", methods=["GET"])
def health_check():
health_status = check_model_status()
health_status["api_status"] = "online"
health_status["device"] = device
# Add memory usage info
if torch.cuda.is_available():
health_status["memory"] = {
"cuda_allocated_mb": round(torch.cuda.memory_allocated() / (1024 * 1024), 2),
"cuda_reserved_mb": round(torch.cuda.memory_reserved() / (1024 * 1024), 2)
}
# Add cache stats
health_status["cache_stats"] = {
"asr_cache_size": len(asr_cache),
"tts_cache_size": len(tts_cache),
"translation_cache_size": len(translation_cache)
}
return jsonify(health_status)
# ASR with optimizations
@app.route("/asr", methods=["POST"])
@rate_limit
def transcribe_audio():
# Get user-specific output directory
user_output_dir = g.user_output_dir if hasattr(g, 'user_output_dir') else OUTPUT_DIR
# Check cache first (simple caching logic)
if 'audio' in request.files:
audio_file = request.files['audio']
language = request.form.get("language", "english").lower()
# Create a simple cache key
audio_content = audio_file.read()
audio_file.seek(0) # Reset file pointer
cache_key = f"asr_{compute_hash(audio_content)}_{language}"
if cache_key in asr_cache:
logger.info(f"[{g.request_id}] βœ… Using cached ASR result")
return asr_cache[cache_key]
# Process the request normally
result = handle_asr_request(request, user_output_dir, SAMPLE_RATE)
# Cache successful responses
if isinstance(result, tuple):
response, status_code = result
if status_code == 200:
asr_cache[cache_key] = result
# Limit cache size
if len(asr_cache) > MAX_CACHE_SIZE:
asr_cache.pop(next(iter(asr_cache)))
return result
@app.route("/tts", methods=["POST"])
@rate_limit
def generate_tts():
# Get user-specific output directory
user_output_dir = g.user_output_dir if hasattr(g, 'user_output_dir') else OUTPUT_DIR
# Check cache first
if request.is_json:
data = request.get_json()
if data:
text = data.get("text", "").strip()
language = data.get("language", "kapampangan").lower()
cache_key = f"tts_{compute_hash(text)}_{language}"
if cache_key in tts_cache:
logger.info(f"[{g.request_id}] βœ… Using cached TTS result")
return tts_cache[cache_key]
# Process the request normally
result = handle_tts_request(request, user_output_dir)
# Cache successful responses
if isinstance(result, tuple):
response, status_code = result
if status_code == 200 and request.is_json:
tts_cache[cache_key] = result
# Limit cache size
if len(tts_cache) > MAX_CACHE_SIZE:
tts_cache.pop(next(iter(tts_cache)))
return result
@app.route("/translate", methods=["POST"])
@rate_limit
def translate_text():
# Check cache first
if request.is_json:
data = request.get_json()
if data:
text = data.get("text", "").strip()
source_language = data.get("source_language", "").lower()
target_language = data.get("target_language", "").lower()
cache_key = f"translate_{compute_hash(text)}_{source_language}_{target_language}"
if cache_key in translation_cache:
logger.info(f"[{g.request_id}] βœ… Using cached translation result")
return translation_cache[cache_key]
# Process the request normally
result = handle_translation_request(request)
# Cache successful responses
if isinstance(result, tuple):
response, status_code = result
if status_code == 200 and request.is_json:
translation_cache[cache_key] = result
# Limit cache size
if len(translation_cache) > MAX_CACHE_SIZE:
translation_cache.pop(next(iter(translation_cache)))
return result
@app.route("/download/<filename>", methods=["GET"])
def download_audio(filename):
# First try user-specific directory if available
if hasattr(g, 'user_output_dir'):
file_path = os.path.join(g.user_output_dir, filename)
if os.path.exists(file_path):
logger.info(f"πŸ“€ Serving user audio file: {file_path}")
return send_file(file_path, mimetype="audio/wav", as_attachment=True)
# Then try main output directory
file_path = os.path.join(OUTPUT_DIR, filename)
if os.path.exists(file_path):
logger.info(f"πŸ“€ Serving audio file: {file_path}")
return send_file(file_path, mimetype="audio/wav", as_attachment=True)
# Check for any subdirectories (simplified approach)
for root, dirs, files in os.walk(OUTPUT_DIR):
if filename in files:
full_path = os.path.join(root, filename)
logger.info(f"πŸ“€ Serving found audio file: {full_path}")
return send_file(full_path, mimetype="audio/wav", as_attachment=True)
logger.warning(f"⚠️ Requested file not found: {filename}")
return jsonify({"error": "File not found"}), 404
@app.route("/evaluate", methods=["POST"])
@rate_limit
def evaluate_pronunciation():
# Get user-specific output directory
user_output_dir = g.user_output_dir if hasattr(g, 'user_output_dir') else OUTPUT_DIR
return handle_evaluation_request(request, REFERENCE_AUDIO_DIR, user_output_dir, SAMPLE_RATE)
@app.route("/check_references", methods=["GET"])
def check_references():
"""Optimized endpoint to check if reference files exist"""
ref_patterns = ["mayap_a_abak", "mayap_a_ugtu", "mayap_a_gatpanapun", "mayap_a_bengi",
"komusta_ka", "malaus_ko_pu", "malaus_kayu", "agaganaka_da_ka",
"pagdulapan_da_ka", "kaluguran_da_ka", "dakal_a_salamat", "panapaya_mu_ku",
"wa", "ali", "tuknang", "lagwa", "galo", "buri_ke_ini", "tara_na",
"nokarin_ka_ibat", "nokarin_ka_munta", "atiu_na_ku", "nanung_panayan_mu",
"mako_na_ka", "muli_ta_na", "nanu_ing_pengan_mu", "mekeni", "mengan_na_ka",
"munta_ka_karin", "magkanu_ini", "mimingat_ka", "mangan_ta_na", "lakwan_da_ka",
"nanu_maliari_kung_daptan_keka", "pilan_na_ka_banwa", "saliwan_ke_ini",
"makananu_munta_king","adwa", "anam", "apat", "apulu", "atlu", "dalan", "libu", "lima",
"metung", "pitu", "siyam", "walu", "masala", "madalumdum", "maragul", "marimla", "malagu", "marok", "mababa", "malapit", "matuling", "maputi",
"arung", "asbuk", "balugbug", "bitis", "buntuk", "butit", "gamat", "kuku", "salu", "tud",
"pisan", "dara", "achi", "apu", "ima", "tatang", "pengari", "koya", "kapatad", "wali",
"pasbul", "awang", "dagis", "bale", "ulas", "sambra", "sulu", "pitudturan", "luklukan", "ulnan"
]
# Get a summary instead of details to reduce response size
summary = {
"reference_audio_dir": REFERENCE_AUDIO_DIR,
"directory_exists": os.path.exists(REFERENCE_AUDIO_DIR),
"total_patterns": len(ref_patterns),
"existing_patterns": 0,
"total_files": 0
}
for pattern in ref_patterns:
pattern_dir = os.path.join(REFERENCE_AUDIO_DIR, pattern)
if os.path.exists(pattern_dir):
wav_files = glob.glob(os.path.join(pattern_dir, "*.wav"))
if wav_files:
summary["existing_patterns"] += 1
summary["total_files"] += len(wav_files)
return jsonify(summary)
# Add detailed reference check as a separate endpoint
@app.route("/check_references/detailed", methods=["GET"])
def check_references_detailed():
"""Get detailed information for specific reference patterns"""
patterns = request.args.get('patterns', '').split(',')
# If no patterns specified, return the first 10 (avoid heavy response)
if not patterns or patterns == ['']:
ref_patterns = ["mayap_a_abak", "mayap_a_ugtu", "mayap_a_gatpanapun", "mayap_a_bengi",
"komusta_ka", "malaus_ko_pu", "malaus_kayu", "agaganaka_da_ka",
"pagdulapan_da_ka", "kaluguran_da_ka"]
else:
ref_patterns = [p.strip() for p in patterns if p.strip()]
results = {}
for pattern in ref_patterns:
pattern_dir = os.path.join(REFERENCE_AUDIO_DIR, pattern)
if os.path.exists(pattern_dir):
wav_files = glob.glob(os.path.join(pattern_dir, "*.wav"))
results[pattern] = {
"exists": True,
"path": pattern_dir,
"file_count": len(wav_files),
"files": [os.path.basename(f) for f in wav_files]
}
else:
results[pattern] = {
"exists": False,
"path": pattern_dir
}
return jsonify({
"reference_audio_dir": REFERENCE_AUDIO_DIR,
"patterns": results
})
@app.route("/upload_reference", methods=["POST"])
@rate_limit
def upload_reference_audio():
return handle_upload_reference(request, REFERENCE_AUDIO_DIR, SAMPLE_RATE)
# Add a cleanup endpoint
@app.route("/cleanup", methods=["POST"])
def cleanup_files():
"""Clean up old files to free space (important for HF Spaces)"""
try:
# Only allow from local or with API key
if not (request.remote_addr == '127.0.0.1' or
request.headers.get('X-Cleanup-Key') == os.environ.get('CLEANUP_KEY', 'cleanup-secret')):
return jsonify({"error": "Unauthorized"}), 403
# Delete files older than 2 hours
cutoff_time = time.time() - 7200 # 2 hours in seconds
deleted_count = 0
for root, dirs, files in os.walk(OUTPUT_DIR):
for file in files:
try:
file_path = os.path.join(root, file)
if os.path.getmtime(file_path) < cutoff_time:
os.remove(file_path)
deleted_count += 1
except Exception as e:
logger.warning(f"⚠️ Failed to delete {file}: {e}")
# Clear empty directories
for root, dirs, files in os.walk(OUTPUT_DIR, topdown=False):
for dir_name in dirs:
try:
dir_path = os.path.join(root, dir_name)
if not os.listdir(dir_path):
os.rmdir(dir_path)
except Exception as e:
logger.warning(f"⚠️ Failed to remove empty dir {dir_name}: {e}")
# Clear torch cache
if torch.cuda.is_available():
torch.cuda.empty_cache()
return jsonify({
"message": "Cleanup completed",
"files_deleted": deleted_count
})
except Exception as e:
logger.error(f"❌ Cleanup error: {str(e)}")
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
# This might return an updated path if the original fails
updated_ref_dir = init_reference_audio(REFERENCE_AUDIO_DIR, OUTPUT_DIR)
if updated_ref_dir and updated_ref_dir != REFERENCE_AUDIO_DIR:
REFERENCE_AUDIO_DIR = updated_ref_dir
logger.info(f"πŸ“ Updated reference audio directory to: {REFERENCE_AUDIO_DIR}")
logger.info("πŸš€ Starting Speech API server optimized for Hugging Face Spaces")
# Get the status for logging
status = check_model_status()
logger.info(f"πŸ“Š System status: ASR model: {'βœ…' if status['asr_model'] == 'loaded' else '❌'}")
for lang, model_status in status['tts_models'].items():
logger.info(f"πŸ“Š TTS model {lang}: {'βœ…' if model_status == 'loaded' else '❌'}")
# Use threaded=True for better performance
app.run(host="0.0.0.0", port=7860, debug=False, threaded=True)