|
from fastapi import FastAPI, UploadFile, File, Form , HTTPException |
|
from fastapi.responses import JSONResponse |
|
from fastapi.middleware.cors import CORSMiddleware |
|
import sys |
|
import os |
|
import shutil |
|
import uuid |
|
|
|
|
|
|
|
|
|
from fluency.fluency_api import main as analyze_fluency_main |
|
from tone_modulation.tone_api import main as analyze_tone_main |
|
from vcs.vcs_api import main as analyze_vcs_main |
|
from vers.vers_api import main as analyze_vers_main |
|
from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main |
|
from vps.vps_api import main as analyze_vps_main |
|
from ves.ves import calc_voice_engagement_score |
|
from transcribe import transcribe_audio |
|
from filler_count.filler_score import analyze_fillers |
|
from emotion.emo_predict import predict_emotion |
|
|
|
app = FastAPI() |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
@app.post("/analyze_fluency/") |
|
async def analyze_fluency(file: UploadFile): |
|
|
|
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = analyze_fluency_main(temp_filepath, model_size="base") |
|
|
|
return JSONResponse(content=result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Fluency analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
@app.post('/analyze_tone/') |
|
async def analyze_tone(file: UploadFile): |
|
""" |
|
Endpoint to analyze tone of an uploaded audio file (.wav or .mp3). |
|
""" |
|
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = analyze_tone_main(temp_filepath) |
|
|
|
return JSONResponse(content=result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Tone analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
@app.post('/analyze_vcs/') |
|
async def analyze_vcs(file: UploadFile): |
|
""" |
|
Endpoint to analyze voice clarity of an uploaded audio file (.wav or .mp3). |
|
""" |
|
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = analyze_vcs_main(temp_filepath) |
|
|
|
return JSONResponse(content=result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Voice clarity analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
@app.post('/analyze_vers/') |
|
async def analyze_vers(file: UploadFile): |
|
""" |
|
Endpoint to analyze VERS of an uploaded audio file (.wav or .mp3). |
|
""" |
|
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = analyze_vers_main(temp_filepath) |
|
|
|
return JSONResponse(content=result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"VERS analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
@app.post('/voice_confidence/') |
|
async def analyze_voice_confidence(file: UploadFile): |
|
""" |
|
Endpoint to analyze voice confidence of an uploaded audio file (.wav or .mp3). |
|
""" |
|
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = analyze_voice_confidence_main(temp_filepath) |
|
|
|
return JSONResponse(content=result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Voice confidence analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
@app.post('/analyze_vps/') |
|
async def analyze_vps(file: UploadFile): |
|
""" |
|
Endpoint to analyze voice pacing score of an uploaded audio file (.wav or .mp3). |
|
""" |
|
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = analyze_vps_main(temp_filepath) |
|
|
|
return JSONResponse(content=result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Voice pacing score analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
@app.post('/voice_engagement_score/') |
|
async def analyze_voice_engagement_score(file: UploadFile): |
|
""" |
|
Endpoint to analyze voice engagement score of an uploaded audio file (.wav or .mp3). |
|
""" |
|
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = calc_voice_engagement_score(temp_filepath) |
|
|
|
return JSONResponse(content=result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Voice engagement score analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
@app.post('/analyze_fillers/') |
|
async def analyze_fillers_count(file: UploadFile): |
|
""" |
|
Endpoint to analyze filler words in an uploaded audio file (.wav or .mp3). |
|
""" |
|
if not file.filename.endswith(('.wav', '.mp3','.mp4','.m4a','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = analyze_fillers(temp_filepath) |
|
|
|
return JSONResponse(content=result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Filler analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
|
|
import time |
|
|
|
|
|
|
|
@app.post('/transcribe/') |
|
async def transcribe(file: UploadFile, language: str = Form(...)): |
|
""" |
|
Endpoint to transcribe an uploaded audio file (.wav or .mp3). |
|
""" |
|
|
|
start_time = time.time() |
|
if not file.filename.endswith(('.wav', '.mp3','mp4','.m4a','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav ,mp4 and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
result = transcribe_audio(temp_filepath, language=language, model_size="base") |
|
end_time = time.time() |
|
transcription_time = end_time - start_time |
|
response = { |
|
"transcription": result, |
|
"transcription_time": transcription_time |
|
} |
|
|
|
return JSONResponse(content=response) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
|
|
@app.post('/analyze_all/') |
|
async def analyze_all(file: UploadFile, language: str = Form(...)): |
|
""" |
|
Endpoint to analyze all aspects of an uploaded audio file (.wav or .mp3). |
|
""" |
|
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
|
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
|
|
|
|
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
|
temp_dir = "temp_uploads" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
fluency_result = analyze_fluency_main(temp_filepath, model_size="base") |
|
tone_result = analyze_tone_main(temp_filepath) |
|
vcs_result = analyze_vcs_main(temp_filepath) |
|
vers_result = analyze_vers_main(temp_filepath) |
|
voice_confidence_result = analyze_voice_confidence_main(temp_filepath) |
|
vps_result = analyze_vps_main(temp_filepath) |
|
ves_result = calc_voice_engagement_score(temp_filepath) |
|
filler_count = analyze_fillers(temp_filepath) |
|
transcript = transcribe_audio(temp_filepath, language, "base") |
|
emotion = predict_emotion(temp_filepath) |
|
avg_score = (fluency_result['fluency_score'] + tone_result['speech_dynamism_score'] + vcs_result['Voice Clarity Sore'] + vers_result['VERS Score'] + voice_confidence_result['voice_confidence_score'] + vps_result['VPS'] + ves_result['ves']) / 7 |
|
|
|
|
|
|
|
combined_result = { |
|
"fluency": fluency_result, |
|
"tone": tone_result, |
|
"vcs": vcs_result, |
|
"vers": vers_result, |
|
"voice_confidence": voice_confidence_result, |
|
"vps": vps_result, |
|
"ves": ves_result, |
|
"filler_words": filler_count, |
|
"transcript": transcript, |
|
"emotion": emotion , |
|
"sank_score": avg_score |
|
} |
|
|
|
return JSONResponse(content=combined_result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|