Hammad712's picture
Update main.py
0318876 verified
raw
history blame
9.04 kB
import os
import torch
import librosa
import numpy as np
from typing import List, Dict, Any, Optional
from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
import tempfile
import uuid
import shutil
from contextlib import asynccontextmanager
# Disable numba JIT to avoid caching issues
os.environ["NUMBA_DISABLE_JIT"] = "1"
# Global variables
MODEL = None
PROCESSOR = None
UPLOAD_DIR = os.path.join(tempfile.gettempdir(), "quran_comparison_uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
# Response models
class SimilarityResponse(BaseModel):
similarity_score: float
interpretation: str
class ErrorResponse(BaseModel):
error: str
# Initialize model from environment variable
def initialize_model():
global MODEL, PROCESSOR
hf_token = os.environ.get("HF_TOKEN", None)
model_name = os.environ.get("MODEL_NAME", "jonatasgrosman/wav2vec2-large-xlsr-53-arabic")
try:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Loading model on device: {device}")
# Load model and processor using updated parameter `token`
if hf_token:
PROCESSOR = Wav2Vec2Processor.from_pretrained(model_name, token=hf_token)
MODEL = Wav2Vec2ForCTC.from_pretrained(model_name, token=hf_token)
else:
PROCESSOR = Wav2Vec2Processor.from_pretrained(model_name)
MODEL = Wav2Vec2ForCTC.from_pretrained(model_name)
MODEL = MODEL.to(device)
MODEL.eval()
print("Model loaded successfully")
except Exception as e:
print(f"Error loading model: {e}")
raise e
# Lifespan event handler to initialize the model at startup
@asynccontextmanager
async def lifespan(app: FastAPI):
initialize_model()
yield
# Create the FastAPI app with the lifespan handler and add CORS middleware
app = FastAPI(
title="Quran Recitation Comparison API",
description="API for comparing similarity between Quran recitations using Wav2Vec2 embeddings",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# Root endpoint
@app.get("/")
async def root():
"""Welcome endpoint."""
return {"message": "Welcome to the Quran Recitation Comparison API"}
# Load audio file
def load_audio(file_path, target_sr=16000, trim_silence=True, normalize=True):
"""Load and preprocess an audio file."""
try:
y, sr = librosa.load(file_path, sr=target_sr)
if normalize:
y = librosa.util.normalize(y)
if trim_silence:
y, _ = librosa.effects.trim(y, top_db=30)
return y
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error loading audio: {e}")
# Get deep embedding
def get_deep_embedding(audio, sr=16000):
"""Extract frame-wise deep embeddings using the pretrained model."""
global MODEL, PROCESSOR
if MODEL is None or PROCESSOR is None:
raise HTTPException(status_code=500, detail="Model not initialized")
try:
device = next(MODEL.parameters()).device
input_values = PROCESSOR(
audio,
sampling_rate=sr,
return_tensors="pt"
).input_values.to(device)
with torch.no_grad():
outputs = MODEL(input_values, output_hidden_states=True)
hidden_states = outputs.hidden_states[-1]
embedding_seq = hidden_states.squeeze(0).cpu().numpy()
return embedding_seq
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error extracting embeddings: {e}")
# Custom DTW implementation to avoid issues with librosa's dtw
def custom_dtw(X, Y, metric='euclidean'):
"""
Custom implementation of DTW.
X and Y are expected to be 2D numpy arrays.
"""
# Check inputs are 2D and non-empty
if X.ndim != 2 or Y.ndim != 2:
raise ValueError("Input features must be 2D arrays.")
if X.shape[1] == 0 or Y.shape[1] == 0:
raise ValueError("Empty embedding sequence encountered.")
n, m = len(X[0]), len(Y[0])
D = np.zeros((n+1, m+1))
D[0, :] = np.inf
D[:, 0] = np.inf
D[0, 0] = 0
for i in range(1, n+1):
for j in range(1, m+1):
if metric == 'euclidean':
cost = np.sqrt(np.sum((X[:, i-1] - Y[:, j-1])**2))
elif metric == 'cosine':
cost = 1 - np.dot(X[:, i-1], Y[:, j-1]) / (np.linalg.norm(X[:, i-1]) * np.linalg.norm(Y[:, j-1]))
else:
cost = np.sum(np.abs(X[:, i-1] - Y[:, j-1]))
D[i, j] = cost + min(D[i-1, j], D[i, j-1], D[i-1, j-1])
i, j = n, m
wp = [(i, j)]
while i > 1 or j > 1:
candidates = [(i-1, j-1), (i-1, j), (i, j-1)]
valid_candidates = [(ii, jj) for ii, jj in candidates if ii > 0 and jj > 0]
i, j = min(valid_candidates, key=lambda x: D[x[0], x[1]])
wp.append((i, j))
wp.reverse()
return D, wp
# Compute DTW distance
def compute_dtw_distance(features1, features2):
"""Compute the DTW distance between two sequences of features."""
try:
D, wp = custom_dtw(features1, features2, metric='euclidean')
distance = D[-1, -1]
normalized_distance = distance / len(wp)
return normalized_distance
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error computing DTW distance: {e}")
# Interpret similarity based on the normalized distance
def interpret_similarity(norm_distance):
if norm_distance == 0:
result = "The recitations are identical based on the deep embeddings."
score = 100
elif norm_distance < 1:
result = "The recitations are extremely similar."
score = 95
elif norm_distance < 5:
result = "The recitations are very similar with minor differences."
score = 80
elif norm_distance < 10:
result = "The recitations show moderate similarity."
score = 60
elif norm_distance < 20:
result = "The recitations show some noticeable differences."
score = 40
else:
result = "The recitations are quite different."
score = max(0, 100 - norm_distance)
return result, score
# Clean up temporary files
def cleanup_temp_files(file_paths):
for file_path in file_paths:
if os.path.exists(file_path):
try:
os.remove(file_path)
except Exception as e:
print(f"Error removing temporary file {file_path}: {e}")
# API endpoint for comparing recitations
@app.post("/compare", response_model=SimilarityResponse)
async def compare_recitations(
background_tasks: BackgroundTasks,
file1: UploadFile = File(...),
file2: UploadFile = File(...)
):
temp_file1 = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}.wav")
temp_file2 = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}.wav")
try:
# Save uploaded files to temporary locations
with open(temp_file1, "wb") as f:
shutil.copyfileobj(file1.file, f)
with open(temp_file2, "wb") as f:
shutil.copyfileobj(file2.file, f)
# Load audio files
audio1 = load_audio(temp_file1)
audio2 = load_audio(temp_file2)
# Extract embeddings
embedding1 = get_deep_embedding(audio1)
embedding2 = get_deep_embedding(audio2)
# Compute DTW distance (transpose so each column represents a frame)
norm_distance = compute_dtw_distance(embedding1.T, embedding2.T)
interpretation, similarity_score = interpret_similarity(norm_distance)
background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2])
return {"similarity_score": similarity_score, "interpretation": interpretation}
except HTTPException as he:
background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2])
raise he
except Exception as e:
background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2])
print(f"Unexpected error in /compare: {e}")
raise HTTPException(status_code=500, detail="An unexpected error occurred during comparison.")
# Health check endpoint
@app.get("/health")
async def health_check():
if MODEL is None or PROCESSOR is None:
return JSONResponse(status_code=503, content={"status": "error", "message": "Model not initialized"})
return {"status": "ok", "model_loaded": True}
# Run the FastAPI app
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 7860))
uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)