aig2 / app.py
vitorcalvi's picture
1
2f6587c
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import soundfile as sf
import numpy as np
import tempfile
import os
import warnings
from pydub import AudioSegment
import time
warnings.filterwarnings("ignore")
app = FastAPI()
def convert_mp3_to_wav(mp3_path):
sound = AudioSegment.from_mp3(mp3_path)
wav_path = mp3_path.replace(".mp3", ".wav")
sound.export(wav_path, format="wav")
return wav_path
def extract_audio_features(audio_file_path):
waveform, sample_rate = sf.read(audio_file_path)
if waveform.ndim > 1:
waveform = waveform.mean(axis=1)
energy = np.mean(waveform ** 2)
mfccs = np.mean(np.abs(np.fft.fft(waveform)[:13]), axis=0)
speech_rate = 4.0
f0 = np.mean(np.abs(np.diff(waveform))) * sample_rate / (2 * np.pi)
return f0, energy, speech_rate, mfccs, waveform, sample_rate
def analyze_voice_stress(audio_file_path):
f0, energy, speech_rate, mfccs, waveform, sample_rate = extract_audio_features(audio_file_path)
mean_f0 = f0
mean_energy = energy
gender = 'male' if mean_f0 < 165 else 'female'
norm_mean_f0 = 110 if gender == 'male' else 220
norm_std_f0 = 20
norm_mean_energy = 0.02
norm_std_energy = 0.005
norm_speech_rate = 4.4
norm_std_speech_rate = 0.5
z_f0 = (mean_f0 - norm_mean_f0) / norm_std_f0
z_energy = (mean_energy - norm_mean_energy) / norm_std_energy
z_speech_rate = (speech_rate - norm_speech_rate) / norm_std_speech_rate
stress_score = (0.4 * z_f0) + (0.4 * z_speech_rate) + (0.2 * z_energy)
stress_level = round(float(1 / (1 + np.exp(-stress_score)) * 100), 2)
categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"]
category_idx = min(int(stress_level / 20), 4)
stress_category = categories[category_idx]
return {"stress_level": stress_level, "category": stress_category, "gender": gender}
def analyze_text_stress(text: str):
stress_keywords = ["anxious", "nervous", "stress", "panic", "tense"]
stress_score = sum([1 for word in stress_keywords if word in text.lower()])
stress_level = min(stress_score * 20, 100)
categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"]
category_idx = min(int(stress_level / 20), 4)
stress_category = categories[category_idx]
return {"stress_level": stress_level, "category": stress_category}
class StressResponse(BaseModel):
stress_level: float
category: str
gender: str = None
status: str
time: str
size: str
@app.post("/analyze-stress/", response_model=StressResponse)
async def analyze_stress(
file: UploadFile = File(None),
file_path: str = Form(None),
text: str = Form(None)
):
if file is None and file_path is None and text is None:
raise HTTPException(status_code=400, detail="Either a file, file path, or text input is required.")
start_time = time.time()
if file or file_path:
if file:
if not (file.filename.endswith(".wav") or file.filename.endswith(".mp3")):
raise HTTPException(status_code=400, detail="Only .wav and .mp3 files are supported.")
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[-1]) as temp_file:
temp_file.write(await file.read())
temp_audio_path = temp_file.name
file_size = os.path.getsize(temp_audio_path)
else:
if not (file_path.endswith(".wav") or file_path.endswith(".mp3")):
raise HTTPException(status_code=400, detail="Only .wav and .mp3 files are supported.")
if not os.path.exists(file_path):
raise HTTPException(status_code=400, detail="File path does not exist.")
temp_audio_path = file_path
file_size = os.path.getsize(file_path)
if temp_audio_path.endswith(".mp3"):
temp_audio_path = convert_mp3_to_wav(temp_audio_path)
try:
result = analyze_voice_stress(temp_audio_path)
processing_time_ms = int((time.time() - start_time) * 1000)
result.update({
"status": "200 (OK)",
"time": f"{processing_time_ms} ms",
"size": f"{round(file_size / 1024, 2)} KB"
})
return JSONResponse(content=result, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
if file:
os.remove(temp_audio_path)
elif text:
result = analyze_text_stress(text)
processing_time_ms = int((time.time() - start_time) * 1000)
result.update({
"status": "200 (OK)",
"time": f"{processing_time_ms} ms",
"size": "N/A"
})
return JSONResponse(content=result, status_code=200)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 7860))
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)