File size: 9,037 Bytes
46a11a0
 
 
 
 
 
 
 
 
 
 
 
 
b05966a
46a11a0
54aea1b
 
 
46a11a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef20d33
46a11a0
b05966a
 
46a11a0
 
 
 
 
 
 
 
 
 
 
ef20d33
 
 
 
 
 
0318876
ef20d33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46a11a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0318876
54aea1b
 
0318876
 
54aea1b
0318876
 
 
 
 
 
54aea1b
 
 
 
 
 
 
 
 
 
 
 
 
0318876
54aea1b
 
 
 
 
 
 
 
 
 
 
 
 
46a11a0
 
 
 
54aea1b
46a11a0
 
 
 
 
 
0318876
46a11a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef20d33
46a11a0
 
 
 
 
 
 
 
 
0318876
46a11a0
 
 
 
 
0318876
46a11a0
 
 
0318876
46a11a0
 
 
0318876
46a11a0
 
 
 
0318876
46a11a0
0318876
 
 
46a11a0
 
0318876
 
46a11a0
ef20d33
46a11a0
 
 
0318876
46a11a0
 
 
 
 
0318876
b05966a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import os
import torch
import librosa
import numpy as np
from typing import List, Dict, Any, Optional
from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
import tempfile
import uuid
import shutil
from contextlib import asynccontextmanager

# Disable numba JIT to avoid caching issues
os.environ["NUMBA_DISABLE_JIT"] = "1"

# Global variables
MODEL = None
PROCESSOR = None
UPLOAD_DIR = os.path.join(tempfile.gettempdir(), "quran_comparison_uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)

# Response models
class SimilarityResponse(BaseModel):
    similarity_score: float
    interpretation: str

class ErrorResponse(BaseModel):
    error: str

# Initialize model from environment variable
def initialize_model():
    global MODEL, PROCESSOR
    hf_token = os.environ.get("HF_TOKEN", None)
    model_name = os.environ.get("MODEL_NAME", "jonatasgrosman/wav2vec2-large-xlsr-53-arabic")
    
    try:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Loading model on device: {device}")
        
        # Load model and processor using updated parameter `token`
        if hf_token:
            PROCESSOR = Wav2Vec2Processor.from_pretrained(model_name, token=hf_token)
            MODEL = Wav2Vec2ForCTC.from_pretrained(model_name, token=hf_token)
        else:
            PROCESSOR = Wav2Vec2Processor.from_pretrained(model_name)
            MODEL = Wav2Vec2ForCTC.from_pretrained(model_name)
        
        MODEL = MODEL.to(device)
        MODEL.eval()
        print("Model loaded successfully")
    except Exception as e:
        print(f"Error loading model: {e}")
        raise e

# Lifespan event handler to initialize the model at startup
@asynccontextmanager
async def lifespan(app: FastAPI):
    initialize_model()
    yield

# Create the FastAPI app with the lifespan handler and add CORS middleware
app = FastAPI(
    title="Quran Recitation Comparison API",
    description="API for comparing similarity between Quran recitations using Wav2Vec2 embeddings",
    version="1.0.0",
    lifespan=lifespan
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)

# Root endpoint
@app.get("/")
async def root():
    """Welcome endpoint."""
    return {"message": "Welcome to the Quran Recitation Comparison API"}

# Load audio file
def load_audio(file_path, target_sr=16000, trim_silence=True, normalize=True):
    """Load and preprocess an audio file."""
    try:
        y, sr = librosa.load(file_path, sr=target_sr)
        if normalize:
            y = librosa.util.normalize(y)
        if trim_silence:
            y, _ = librosa.effects.trim(y, top_db=30)
        return y
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Error loading audio: {e}")

# Get deep embedding
def get_deep_embedding(audio, sr=16000):
    """Extract frame-wise deep embeddings using the pretrained model."""
    global MODEL, PROCESSOR
    if MODEL is None or PROCESSOR is None:
        raise HTTPException(status_code=500, detail="Model not initialized")
    try:
        device = next(MODEL.parameters()).device
        input_values = PROCESSOR(
            audio, 
            sampling_rate=sr, 
            return_tensors="pt"
        ).input_values.to(device)
        
        with torch.no_grad():
            outputs = MODEL(input_values, output_hidden_states=True)
        
        hidden_states = outputs.hidden_states[-1]
        embedding_seq = hidden_states.squeeze(0).cpu().numpy()
        return embedding_seq
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error extracting embeddings: {e}")

# Custom DTW implementation to avoid issues with librosa's dtw
def custom_dtw(X, Y, metric='euclidean'):
    """
    Custom implementation of DTW.
    X and Y are expected to be 2D numpy arrays.
    """
    # Check inputs are 2D and non-empty
    if X.ndim != 2 or Y.ndim != 2:
        raise ValueError("Input features must be 2D arrays.")
    if X.shape[1] == 0 or Y.shape[1] == 0:
        raise ValueError("Empty embedding sequence encountered.")
    
    n, m = len(X[0]), len(Y[0])
    D = np.zeros((n+1, m+1))
    D[0, :] = np.inf
    D[:, 0] = np.inf
    D[0, 0] = 0
    
    for i in range(1, n+1):
        for j in range(1, m+1):
            if metric == 'euclidean':
                cost = np.sqrt(np.sum((X[:, i-1] - Y[:, j-1])**2))
            elif metric == 'cosine':
                cost = 1 - np.dot(X[:, i-1], Y[:, j-1]) / (np.linalg.norm(X[:, i-1]) * np.linalg.norm(Y[:, j-1]))
            else:
                cost = np.sum(np.abs(X[:, i-1] - Y[:, j-1]))
            D[i, j] = cost + min(D[i-1, j], D[i, j-1], D[i-1, j-1])
    
    i, j = n, m
    wp = [(i, j)]
    while i > 1 or j > 1:
        candidates = [(i-1, j-1), (i-1, j), (i, j-1)]
        valid_candidates = [(ii, jj) for ii, jj in candidates if ii > 0 and jj > 0]
        i, j = min(valid_candidates, key=lambda x: D[x[0], x[1]])
        wp.append((i, j))
    
    wp.reverse()
    return D, wp

# Compute DTW distance
def compute_dtw_distance(features1, features2):
    """Compute the DTW distance between two sequences of features."""
    try:
        D, wp = custom_dtw(features1, features2, metric='euclidean')
        distance = D[-1, -1]
        normalized_distance = distance / len(wp)
        return normalized_distance
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error computing DTW distance: {e}")

# Interpret similarity based on the normalized distance
def interpret_similarity(norm_distance):
    if norm_distance == 0:
        result = "The recitations are identical based on the deep embeddings."
        score = 100
    elif norm_distance < 1:
        result = "The recitations are extremely similar."
        score = 95
    elif norm_distance < 5:
        result = "The recitations are very similar with minor differences."
        score = 80
    elif norm_distance < 10:
        result = "The recitations show moderate similarity."
        score = 60
    elif norm_distance < 20:
        result = "The recitations show some noticeable differences."
        score = 40
    else:
        result = "The recitations are quite different."
        score = max(0, 100 - norm_distance)
    return result, score

# Clean up temporary files
def cleanup_temp_files(file_paths):
    for file_path in file_paths:
        if os.path.exists(file_path):
            try:
                os.remove(file_path)
            except Exception as e:
                print(f"Error removing temporary file {file_path}: {e}")

# API endpoint for comparing recitations
@app.post("/compare", response_model=SimilarityResponse)
async def compare_recitations(
    background_tasks: BackgroundTasks,
    file1: UploadFile = File(...), 
    file2: UploadFile = File(...)
):
    temp_file1 = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}.wav")
    temp_file2 = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}.wav")
    try:
        # Save uploaded files to temporary locations
        with open(temp_file1, "wb") as f:
            shutil.copyfileobj(file1.file, f)
        with open(temp_file2, "wb") as f:
            shutil.copyfileobj(file2.file, f)
        
        # Load audio files
        audio1 = load_audio(temp_file1)
        audio2 = load_audio(temp_file2)
        
        # Extract embeddings
        embedding1 = get_deep_embedding(audio1)
        embedding2 = get_deep_embedding(audio2)
        
        # Compute DTW distance (transpose so each column represents a frame)
        norm_distance = compute_dtw_distance(embedding1.T, embedding2.T)
        interpretation, similarity_score = interpret_similarity(norm_distance)
        
        background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2])
        return {"similarity_score": similarity_score, "interpretation": interpretation}
    
    except HTTPException as he:
        background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2])
        raise he
    except Exception as e:
        background_tasks.add_task(cleanup_temp_files, [temp_file1, temp_file2])
        print(f"Unexpected error in /compare: {e}")
        raise HTTPException(status_code=500, detail="An unexpected error occurred during comparison.")

# Health check endpoint
@app.get("/health")
async def health_check():
    if MODEL is None or PROCESSOR is None:
        return JSONResponse(status_code=503, content={"status": "error", "message": "Model not initialized"})
    return {"status": "ok", "model_loaded": True}

# Run the FastAPI app
if __name__ == "__main__":
    import uvicorn
    port = int(os.environ.get("PORT", 7860))
    uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)