Voice / app.py
mulasagg's picture
push new
27acc7d
from fastapi import FastAPI, UploadFile, File, Form , HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import sys
import os
import shutil
import uuid
# Ensure sibling module fluency is discoverable
#sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from fluency.fluency_api import main as analyze_fluency_main
from tone_modulation.tone_api import main as analyze_tone_main
from vcs.vcs_api import main as analyze_vcs_main
from vers.vers_api import main as analyze_vers_main
from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main
from vps.vps_api import main as analyze_vps_main
from ves.ves import calc_voice_engagement_score
from transcribe import transcribe_audio
from filler_count.filler_score import analyze_fillers
from emotion.emo_predict import predict_emotion
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace "*" with allowed frontend domains
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/analyze_fluency/")
async def analyze_fluency(file: UploadFile):
# idk if we can use pydantic model here If we need I can add later
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path for temporary storage of the uploaded file this will be deleted after processing
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
result = analyze_fluency_main(temp_filepath, model_size="base")
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Fluency analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_tone/')
async def analyze_tone(file: UploadFile):
"""
Endpoint to analyze tone of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze tone using your custom function
result = analyze_tone_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Tone analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_vcs/')
async def analyze_vcs(file: UploadFile):
"""
Endpoint to analyze voice clarity of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze voice clarity using your custom function
result = analyze_vcs_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voice clarity analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_vers/')
async def analyze_vers(file: UploadFile):
"""
Endpoint to analyze VERS of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze VERS using your custom function
result = analyze_vers_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"VERS analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/voice_confidence/')
async def analyze_voice_confidence(file: UploadFile):
"""
Endpoint to analyze voice confidence of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze voice confidence using your custom function
result = analyze_voice_confidence_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voice confidence analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_vps/')
async def analyze_vps(file: UploadFile):
"""
Endpoint to analyze voice pacing score of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze voice pacing score using your custom function
result = analyze_vps_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voice pacing score analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/voice_engagement_score/')
async def analyze_voice_engagement_score(file: UploadFile):
"""
Endpoint to analyze voice engagement score of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze voice engagement score using your custom function
result = calc_voice_engagement_score(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voice engagement score analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_fillers/')
async def analyze_fillers_count(file: UploadFile):
"""
Endpoint to analyze filler words in an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.mp4','.m4a','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Call the analysis function with the file path
result = analyze_fillers(temp_filepath) # Pass the file path, not the UploadFile object
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Filler analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
import time
@app.post('/transcribe/')
async def transcribe(file: UploadFile, language: str = Form(...)):
"""
Endpoint to transcribe an uploaded audio file (.wav or .mp3).
"""
#calculate time to transcribe
start_time = time.time()
if not file.filename.endswith(('.wav', '.mp3','mp4','.m4a','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav ,mp4 and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Transcribe using your custom function
result = transcribe_audio(temp_filepath, language=language, model_size="base")
end_time = time.time()
transcription_time = end_time - start_time
response = {
"transcription": result,
"transcription_time": transcription_time
}
return JSONResponse(content=response)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_all/')
async def analyze_all(file: UploadFile, language: str = Form(...)):
"""
Endpoint to analyze all aspects of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze all aspects using your custom functions
fluency_result = analyze_fluency_main(temp_filepath, model_size="base")
tone_result = analyze_tone_main(temp_filepath)
vcs_result = analyze_vcs_main(temp_filepath)
vers_result = analyze_vers_main(temp_filepath)
voice_confidence_result = analyze_voice_confidence_main(temp_filepath)
vps_result = analyze_vps_main(temp_filepath)
ves_result = calc_voice_engagement_score(temp_filepath)
filler_count = analyze_fillers(temp_filepath) # Assuming this function returns a dict with filler count
transcript = transcribe_audio(temp_filepath, language, "base") #fix this
emotion = predict_emotion(temp_filepath)
avg_score = (fluency_result['fluency_score'] + tone_result['speech_dynamism_score'] + vcs_result['Voice Clarity Sore'] + vers_result['VERS Score'] + voice_confidence_result['voice_confidence_score'] + vps_result['VPS'] + ves_result['ves']) / 7
# Combine results into a single response
combined_result = {
"fluency": fluency_result,
"tone": tone_result,
"vcs": vcs_result,
"vers": vers_result,
"voice_confidence": voice_confidence_result,
"vps": vps_result,
"ves": ves_result,
"filler_words": filler_count,
"transcript": transcript,
"emotion": emotion ,
"sank_score": avg_score
}
return JSONResponse(content=combined_result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)