import os import torch import librosa import numpy as np from typing import List, Dict, Any, Optional from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC import tempfile import uuid import shutil from contextlib import asynccontextmanager # Disable numba JIT to avoid caching issues os.environ["NUMBA_DISABLE_JIT"] = "1" # Global variables MODEL = None PROCESSOR = None UPLOAD_DIR = os.path.join(tempfile.gettempdir(), "quran_comparison_uploads") os.makedirs(UPLOAD_DIR, exist_ok=True) # Response models class SimilarityResponse(BaseModel): similarity_score: float interpretation: str class ErrorResponse(BaseModel): error: str # Initialize model from environment variable def initialize_model(): global MODEL, PROCESSOR hf_token = os.environ.get("HF_TOKEN", None) model_name = os.environ.get("MODEL_NAME", "jonatasgrosman/wav2vec2-large-xlsr-53-arabic") try: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Loading model on device: {device}") # Load model and processor using updated parameter `token` if hf_token: PROCESSOR = Wav2Vec2Processor.from_pretrained(model_name, token=hf_token) MODEL = Wav2Vec2ForCTC.from_pretrained(model_name, token=hf_token) else: PROCESSOR = Wav2Vec2Processor.from_pretrained(model_name) MODEL = Wav2Vec2ForCTC.from_pretrained(model_name) MODEL = MODEL.to(device) MODEL.eval() print("Model loaded successfully") except Exception as e: print(f"Error loading model: {e}") raise e # Lifespan event handler to initialize the model at startup @asynccontextmanager async def lifespan(app: FastAPI): initialize_model() yield # Create the FastAPI app with the lifespan handler and add CORS middleware app = FastAPI( title="Quran Recitation Comparison API", description="API for comparing similarity between Quran recitations using Wav2Vec2 embeddings", version="1.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) # Root endpoint @app.get("/") async def root(): """Welcome endpoint.""" return {"message": "Welcome to the Quran Recitation Comparison API"} # Load audio file def load_audio(file_path, target_sr=16000, trim_silence=True, normalize=True): """Load and preprocess an audio file.""" try: y, sr = librosa.load(file_path, sr=target_sr) if normalize: y = librosa.util.normalize(y) if trim_silence: y, _ = librosa.effects.trim(y, top_db=30) return y except Exception as e: raise HTTPException(status_code=400, detail=f"Error loading audio: {e}") # Get deep embedding def get_deep_embedding(audio, sr=16000): """Extract frame-wise deep embeddings using the pretrained model.""" global MODEL, PROCESSOR if MODEL is None or PROCESSOR is None: raise HTTPException(status_code=500, detail="Model not initialized") try: device = next(MODEL.parameters()).device input_values = PROCESSOR( audio, sampling_rate=sr, return_tensors="pt" ).input_values.to(device) with torch.no_grad(): outputs = MODEL(input_values, output_hidden_states=True) hidden_states = outputs.hidden_states[-1] embedding_seq = hidden_states.squeeze(0).cpu().numpy() return embedding_seq except Exception as e: raise HTTPException(status_code=500, detail=f"Error extracting embeddings: {e}") # Custom DTW implementation to avoid issues with librosa's dtw def custom_dtw(X, Y, metric='euclidean'): """ Custom implementation of DTW. X and Y are expected to be 2D numpy arrays. """ # Check inputs are 2D and non-empty if X.ndim != 2 or Y.ndim != 2: raise ValueError("Input features must be 2D arrays.") if X.shape[1] == 0 or Y.shape[1] == 0: raise ValueError("Empty embedding sequence encountered.") n, m = len(X[0]), len(Y[0]) D = np.zeros((n+1, m+1)) D[0, :] = np.inf D[:, 0] = np.inf D[0, 0] = 0 for i in range(1, n+1): for j in range(1, m+1): if metric == 'euclidean': cost = np.sqrt(np.sum((X[:, i-1] - Y[:, j-1])**2)) elif metric == 'cosine': cost = 1 - np.dot(X[:, i-1], Y[:, j-1]) / (np.linalg.norm(X[:, i-1]) * np.linalg.norm(Y[:, j-1])) else: cost = np.sum(np.abs(X[:, i-1] - Y[:, j-1])) D[i, j] = cost + min(D[i-1, j], D[i, j-1], D[i-1, j-1]) i, j = n, m wp = [(i, j)] while i > 1 or j > 1: candidates = [(i-1, j-1), (i-1, j), (i, j-1)] valid_candidates = [(ii, jj) for ii, jj in candidates if ii > 0 and jj > 0] i, j = min(valid_candidates, key=lambda x: D[x[0], x[1]]) wp.append((i, j)) wp.reverse() return D, wp # Compute DTW distance def compute_dtw_distance(features1, features2): """Compute the DTW distance between two sequences of features.""" try: D, wp = custom_dtw(features1, features2, metric='euclidean') distance = D[-1, -1] normalized_distance = distance / len(wp) return normalized_distance except Exception as e: raise HTTPException(status_code=500, detail=f"Error computing DTW distance: {e}") # Interpret similarity based on the normalized distance def interpret_similarity(norm_distance): if norm_distance == 0: result = "The recitations are identical based on the deep embeddings." score = 100 elif norm_distance < 1: result = "The recitations are extremely similar." score = 95 elif norm_distance < 5: result = "The recitations are very similar with minor differences." score = 80 elif norm_distance < 10: result = "The recitations show moderate similarity." score = 60 elif norm_distance < 20: result = "The recitations show some noticeable differences." score = 40 else: result = "The recitations are quite different." score = max(0, 100 - norm_distance) return result, score # Clean up temporary files def cleanup_temp_files(file_paths): for file_path in file_paths: if os.path.exists(file_path): try: os.remove(file_path) except Exception as e: print(f"Error removing temporary file {file_path}: {e}") # API endpoint for comparing recitations @app.post("/compare", response_model=SimilarityResponse) async def compare_recitations( background_tasks: BackgroundTasks, file1: UploadFile = File(...), file2: UploadFile = File(...) ): temp_file1 = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}.wav") temp_file2 = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}.wav") try: # Save uploaded files to temporary locations with open(temp_file1, "wb") as f: shutil.copyfileobj(file1.file, f) with open(temp_file2, "wb") as f: shutil.copyfileobj(file2.file, f) # Load audio files audio1 = load_audio(temp_file1) audio2 = load_audio(temp_file2) # Extract embeddings embedding1 = get_deep_embedding(audio1) embedding2 = get_deep_embedding(audio2) # Compute DTW distance (transpose so each column represents a frame) norm_distance = compute_dtw_distance(embedding1.T, embedding2.T) interpretation, similarity_score = interpret_similarity(norm_distance) background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2]) return {"similarity_score": similarity_score, "interpretation": interpretation} except HTTPException as he: background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2]) raise he except Exception as e: background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2]) print(f"Unexpected error in /compare: {e}") raise HTTPException(status_code=500, detail="An unexpected error occurred during comparison.") # Health check endpoint @app.get("/health") async def health_check(): if MODEL is None or PROCESSOR is None: return JSONResponse(status_code=503, content={"status": "error", "message": "Model not initialized"}) return {"status": "ok", "model_loaded": True} # Run the FastAPI app if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 7860)) uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)