Spaces:
Sleeping
Sleeping
import os | |
import torch | |
import librosa | |
import numpy as np | |
import tempfile | |
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC | |
from librosa.sequence import dtw | |
from fastapi import FastAPI, UploadFile, File, HTTPException | |
from fastapi.responses import JSONResponse | |
import shutil | |
# Define the QuranRecitationComparer class as provided | |
class QuranRecitationComparer: | |
def __init__(self, model_name="jonatasgrosman/wav2vec2-large-xlsr-53-arabic", auth_token=None): | |
"""Initialize the Quran recitation comparer with a specific Wav2Vec2 model.""" | |
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Load model and processor once during initialization | |
if auth_token: | |
self.processor = Wav2Vec2Processor.from_pretrained(model_name, token=auth_token) | |
self.model = Wav2Vec2ForCTC.from_pretrained(model_name, token=auth_token) | |
else: | |
self.processor = Wav2Vec2Processor.from_pretrained(model_name) | |
self.model = Wav2Vec2ForCTC.from_pretrained(model_name) | |
self.model = self.model.to(self.device) | |
self.model.eval() | |
# Cache for embeddings to avoid recomputation | |
self.embedding_cache = {} | |
def load_audio(self, file_path, target_sr=16000, trim_silence=True, normalize=True): | |
"""Load and preprocess an audio file.""" | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"Audio file not found: {file_path}") | |
y, sr = librosa.load(file_path, sr=target_sr) | |
if normalize: | |
y = librosa.util.normalize(y) | |
if trim_silence: | |
y, _ = librosa.effects.trim(y, top_db=30) | |
return y | |
def get_deep_embedding(self, audio, sr=16000): | |
"""Extract frame-wise deep embeddings using the pretrained model.""" | |
input_values = self.processor( | |
audio, | |
sampling_rate=sr, | |
return_tensors="pt" | |
).input_values.to(self.device) | |
with torch.no_grad(): | |
outputs = self.model(input_values, output_hidden_states=True) | |
hidden_states = outputs.hidden_states[-1] | |
embedding_seq = hidden_states.squeeze(0).cpu().numpy() | |
return embedding_seq | |
def compute_dtw_distance(self, features1, features2): | |
"""Compute the DTW distance between two sequences of features.""" | |
D, wp = dtw(X=features1, Y=features2, metric='euclidean') | |
distance = D[-1, -1] | |
normalized_distance = distance / len(wp) | |
return normalized_distance | |
def interpret_similarity(self, norm_distance): | |
"""Interpret the normalized distance value.""" | |
if norm_distance == 0: | |
result = "The recitations are identical based on the deep embeddings." | |
score = 100 | |
elif norm_distance < 1: | |
result = "The recitations are extremely similar." | |
score = 95 | |
elif norm_distance < 5: | |
result = "The recitations are very similar with minor differences." | |
score = 80 | |
elif norm_distance < 10: | |
result = "The recitations show moderate similarity." | |
score = 60 | |
elif norm_distance < 20: | |
result = "The recitations show some noticeable differences." | |
score = 40 | |
else: | |
result = "The recitations are quite different." | |
score = max(0, 100 - norm_distance) | |
return result, score | |
def get_embedding_for_file(self, file_path): | |
"""Get embedding for a file, using cache if available.""" | |
if file_path in self.embedding_cache: | |
return self.embedding_cache[file_path] | |
audio = self.load_audio(file_path) | |
embedding = self.get_deep_embedding(audio) | |
# Store in cache for future use | |
self.embedding_cache[file_path] = embedding | |
return embedding | |
def predict(self, file_path1, file_path2): | |
""" | |
Predict the similarity between two audio files. | |
This method can be called repeatedly without reloading the model. | |
""" | |
# Get embeddings (using cache if available) | |
embedding1 = self.get_embedding_for_file(file_path1) | |
embedding2 = self.get_embedding_for_file(file_path2) | |
# Compute DTW distance (transposing so that each column represents a frame) | |
norm_distance = self.compute_dtw_distance(embedding1.T, embedding2.T) | |
# Interpret results | |
interpretation, similarity_score = self.interpret_similarity(norm_distance) | |
print(f"Similarity Score: {similarity_score:.1f}/100") | |
print(f"Interpretation: {interpretation}") | |
return similarity_score, interpretation | |
def clear_cache(self): | |
"""Clear the embedding cache to free memory.""" | |
self.embedding_cache = {} | |
# Create FastAPI application | |
app = FastAPI( | |
title="Quran Recitation Comparison API", | |
description="API for comparing similarity between Quran recitations", | |
version="1.0.0" | |
) | |
# Global instance of the comparer | |
comparer = None | |
async def startup_event(): | |
global comparer | |
# Optionally, set the HF authentication token from an environment variable | |
auth_token = os.getenv("HF_TOKEN", None) | |
comparer = QuranRecitationComparer(auth_token=auth_token) | |
print("Model initialized and ready for predictions.") | |
# Root endpoint | |
async def root(): | |
return {"message": "Welcome to the Quran Recitation Comparison API"} | |
# Compare endpoint that accepts two audio files | |
async def compare_recitations(file1: UploadFile = File(...), file2: UploadFile = File(...)): | |
if comparer is None: | |
raise HTTPException(status_code=503, detail="Model not initialized") | |
try: | |
# Save the uploaded files to temporary files | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp1: | |
tmp1.write(await file1.read()) | |
file_path1 = tmp1.name | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp2: | |
tmp2.write(await file2.read()) | |
file_path2 = tmp2.name | |
# Use the comparer to predict similarity | |
similarity_score, interpretation = comparer.predict(file_path1, file_path2) | |
# Clean up temporary files | |
os.remove(file_path1) | |
os.remove(file_path2) | |
return {"similarity_score": similarity_score, "interpretation": interpretation} | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=str(e)) | |
# Run the application with uvicorn if this module is executed directly. | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=False) | |