from fastapi import FastAPI, File, UploadFile, HTTPException, Form from fastapi.responses import JSONResponse from pydantic import BaseModel import soundfile as sf import numpy as np import tempfile import os import warnings from pydub import AudioSegment import time warnings.filterwarnings("ignore") app = FastAPI() def convert_mp3_to_wav(mp3_path): sound = AudioSegment.from_mp3(mp3_path) wav_path = mp3_path.replace(".mp3", ".wav") sound.export(wav_path, format="wav") return wav_path def extract_audio_features(audio_file_path): waveform, sample_rate = sf.read(audio_file_path) if waveform.ndim > 1: waveform = waveform.mean(axis=1) energy = np.mean(waveform ** 2) mfccs = np.mean(np.abs(np.fft.fft(waveform)[:13]), axis=0) speech_rate = 4.0 f0 = np.mean(np.abs(np.diff(waveform))) * sample_rate / (2 * np.pi) return f0, energy, speech_rate, mfccs, waveform, sample_rate def analyze_voice_stress(audio_file_path): f0, energy, speech_rate, mfccs, waveform, sample_rate = extract_audio_features(audio_file_path) mean_f0 = f0 mean_energy = energy gender = 'male' if mean_f0 < 165 else 'female' norm_mean_f0 = 110 if gender == 'male' else 220 norm_std_f0 = 20 norm_mean_energy = 0.02 norm_std_energy = 0.005 norm_speech_rate = 4.4 norm_std_speech_rate = 0.5 z_f0 = (mean_f0 - norm_mean_f0) / norm_std_f0 z_energy = (mean_energy - norm_mean_energy) / norm_std_energy z_speech_rate = (speech_rate - norm_speech_rate) / norm_std_speech_rate stress_score = (0.4 * z_f0) + (0.4 * z_speech_rate) + (0.2 * z_energy) stress_level = round(float(1 / (1 + np.exp(-stress_score)) * 100), 2) categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"] category_idx = min(int(stress_level / 20), 4) stress_category = categories[category_idx] return {"stress_level": stress_level, "category": stress_category, "gender": gender} def analyze_text_stress(text: str): stress_keywords = ["anxious", "nervous", "stress", "panic", "tense"] stress_score = sum([1 for word in stress_keywords if word in text.lower()]) stress_level = min(stress_score * 20, 100) categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"] category_idx = min(int(stress_level / 20), 4) stress_category = categories[category_idx] return {"stress_level": stress_level, "category": stress_category} class StressResponse(BaseModel): stress_level: float category: str gender: str = None status: str time: str size: str @app.post("/analyze-stress/", response_model=StressResponse) async def analyze_stress( file: UploadFile = File(None), file_path: str = Form(None), text: str = Form(None) ): if file is None and file_path is None and text is None: raise HTTPException(status_code=400, detail="Either a file, file path, or text input is required.") start_time = time.time() if file or file_path: if file: if not (file.filename.endswith(".wav") or file.filename.endswith(".mp3")): raise HTTPException(status_code=400, detail="Only .wav and .mp3 files are supported.") with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[-1]) as temp_file: temp_file.write(await file.read()) temp_audio_path = temp_file.name file_size = os.path.getsize(temp_audio_path) else: if not (file_path.endswith(".wav") or file_path.endswith(".mp3")): raise HTTPException(status_code=400, detail="Only .wav and .mp3 files are supported.") if not os.path.exists(file_path): raise HTTPException(status_code=400, detail="File path does not exist.") temp_audio_path = file_path file_size = os.path.getsize(file_path) if temp_audio_path.endswith(".mp3"): temp_audio_path = convert_mp3_to_wav(temp_audio_path) try: result = analyze_voice_stress(temp_audio_path) processing_time_ms = int((time.time() - start_time) * 1000) result.update({ "status": "200 (OK)", "time": f"{processing_time_ms} ms", "size": f"{round(file_size / 1024, 2)} KB" }) return JSONResponse(content=result, status_code=200) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: if file: os.remove(temp_audio_path) elif text: result = analyze_text_stress(text) processing_time_ms = int((time.time() - start_time) * 1000) result.update({ "status": "200 (OK)", "time": f"{processing_time_ms} ms", "size": "N/A" }) return JSONResponse(content=result, status_code=200) if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 7860)) uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)