Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, HTTPException, Form | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel | |
import soundfile as sf | |
import numpy as np | |
import tempfile | |
import os | |
import warnings | |
from pydub import AudioSegment | |
import time | |
warnings.filterwarnings("ignore") | |
app = FastAPI() | |
def convert_mp3_to_wav(mp3_path): | |
sound = AudioSegment.from_mp3(mp3_path) | |
wav_path = mp3_path.replace(".mp3", ".wav") | |
sound.export(wav_path, format="wav") | |
return wav_path | |
def extract_audio_features(audio_file_path): | |
waveform, sample_rate = sf.read(audio_file_path) | |
if waveform.ndim > 1: | |
waveform = waveform.mean(axis=1) | |
energy = np.mean(waveform ** 2) | |
mfccs = np.mean(np.abs(np.fft.fft(waveform)[:13]), axis=0) | |
speech_rate = 4.0 | |
f0 = np.mean(np.abs(np.diff(waveform))) * sample_rate / (2 * np.pi) | |
return f0, energy, speech_rate, mfccs, waveform, sample_rate | |
def analyze_voice_stress(audio_file_path): | |
f0, energy, speech_rate, mfccs, waveform, sample_rate = extract_audio_features(audio_file_path) | |
mean_f0 = f0 | |
mean_energy = energy | |
gender = 'male' if mean_f0 < 165 else 'female' | |
norm_mean_f0 = 110 if gender == 'male' else 220 | |
norm_std_f0 = 20 | |
norm_mean_energy = 0.02 | |
norm_std_energy = 0.005 | |
norm_speech_rate = 4.4 | |
norm_std_speech_rate = 0.5 | |
z_f0 = (mean_f0 - norm_mean_f0) / norm_std_f0 | |
z_energy = (mean_energy - norm_mean_energy) / norm_std_energy | |
z_speech_rate = (speech_rate - norm_speech_rate) / norm_std_speech_rate | |
stress_score = (0.4 * z_f0) + (0.4 * z_speech_rate) + (0.2 * z_energy) | |
stress_level = round(float(1 / (1 + np.exp(-stress_score)) * 100), 2) | |
categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"] | |
category_idx = min(int(stress_level / 20), 4) | |
stress_category = categories[category_idx] | |
return {"stress_level": stress_level, "category": stress_category, "gender": gender} | |
def analyze_text_stress(text: str): | |
stress_keywords = ["anxious", "nervous", "stress", "panic", "tense"] | |
stress_score = sum([1 for word in stress_keywords if word in text.lower()]) | |
stress_level = min(stress_score * 20, 100) | |
categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"] | |
category_idx = min(int(stress_level / 20), 4) | |
stress_category = categories[category_idx] | |
return {"stress_level": stress_level, "category": stress_category} | |
class StressResponse(BaseModel): | |
stress_level: float | |
category: str | |
gender: str = None | |
status: str | |
time: str | |
size: str | |
async def analyze_stress( | |
file: UploadFile = File(None), | |
file_path: str = Form(None), | |
text: str = Form(None) | |
): | |
if file is None and file_path is None and text is None: | |
raise HTTPException(status_code=400, detail="Either a file, file path, or text input is required.") | |
start_time = time.time() | |
if file or file_path: | |
if file: | |
if not (file.filename.endswith(".wav") or file.filename.endswith(".mp3")): | |
raise HTTPException(status_code=400, detail="Only .wav and .mp3 files are supported.") | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[-1]) as temp_file: | |
temp_file.write(await file.read()) | |
temp_audio_path = temp_file.name | |
file_size = os.path.getsize(temp_audio_path) | |
else: | |
if not (file_path.endswith(".wav") or file_path.endswith(".mp3")): | |
raise HTTPException(status_code=400, detail="Only .wav and .mp3 files are supported.") | |
if not os.path.exists(file_path): | |
raise HTTPException(status_code=400, detail="File path does not exist.") | |
temp_audio_path = file_path | |
file_size = os.path.getsize(file_path) | |
if temp_audio_path.endswith(".mp3"): | |
temp_audio_path = convert_mp3_to_wav(temp_audio_path) | |
try: | |
result = analyze_voice_stress(temp_audio_path) | |
processing_time_ms = int((time.time() - start_time) * 1000) | |
result.update({ | |
"status": "200 (OK)", | |
"time": f"{processing_time_ms} ms", | |
"size": f"{round(file_size / 1024, 2)} KB" | |
}) | |
return JSONResponse(content=result, status_code=200) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
finally: | |
if file: | |
os.remove(temp_audio_path) | |
elif text: | |
result = analyze_text_stress(text) | |
processing_time_ms = int((time.time() - start_time) * 1000) | |
result.update({ | |
"status": "200 (OK)", | |
"time": f"{processing_time_ms} ms", | |
"size": "N/A" | |
}) | |
return JSONResponse(content=result, status_code=200) | |
if __name__ == "__main__": | |
import uvicorn | |
port = int(os.getenv("PORT", 7860)) | |
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True) | |