Fast_api / app.py
mulasagg's picture
push app
e4b8438
raw
history blame
13.9 kB
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import sys
import os
import shutil
import uuid
# Ensure sibling module fluency is discoverable
#sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from fluency.fluency_api import main as analyze_fluency_main
from tone_modulation.tone_api import main as analyze_tone_main
from vcs.vcs_api import main as analyze_vcs_main
from vers.vers_api import main as analyze_vers_main
from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main
from vps.vps_api import main as analyze_vps_main
from ves.ves import calc_voice_engagement_score
from transcribe import transcribe_audio
from filler_count.filler_score import analyze_fillers
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace "*" with allowed frontend domains
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/analyze_fluency/")
async def analyze_fluency(file: UploadFile):
# idk if we can use pydantic model here If we need I can add later
if not file.filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path for temporary storage of the uploaded file this will be deleted after processing
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
result = analyze_fluency_main(temp_filepath, model_size="base")
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Fluency analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_tone/')
async def analyze_tone(file: UploadFile):
"""
Endpoint to analyze tone of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze tone using your custom function
result = analyze_tone_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Tone analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_vcs/')
async def analyze_vcs(file: UploadFile):
"""
Endpoint to analyze voice clarity of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze voice clarity using your custom function
result = analyze_vcs_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voice clarity analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_vers/')
async def analyze_vers(file: UploadFile):
"""
Endpoint to analyze VERS of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze VERS using your custom function
result = analyze_vers_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"VERS analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/voice_confidence/')
async def analyze_voice_confidence(file: UploadFile):
"""
Endpoint to analyze voice confidence of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze voice confidence using your custom function
result = analyze_voice_confidence_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voice confidence analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_vps/')
async def analyze_vps(file: UploadFile):
"""
Endpoint to analyze voice pacing score of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze voice pacing score using your custom function
result = analyze_vps_main(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voice pacing score analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/voice_engagement_score/')
async def analyze_voice_engagement_score(file: UploadFile):
"""
Endpoint to analyze voice engagement score of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze voice engagement score using your custom function
result = calc_voice_engagement_score(temp_filepath)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voice engagement score analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_fillers/')
async def analyze_fillers_count(file: UploadFile):
"""
Endpoint to analyze filler words in an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3','.mp4')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Call the analysis function with the file path
result = analyze_fillers(temp_filepath) # Pass the file path, not the UploadFile object
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Filler analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
import time
@app.post('/transcribe/')
async def transcribe(file: UploadFile, language: str = Form(...)):
"""
Endpoint to transcribe an uploaded audio file (.wav or .mp3).
"""
#calculate time to transcribe
start_time = time.time()
if not file.filename.endswith(('.wav', '.mp3','mp4')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav ,mp4 and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Transcribe using your custom function
result = transcribe_audio(temp_filepath, language=language, model_size="base")
end_time = time.time()
transcription_time = end_time - start_time
response = {
"transcription": result,
"transcription_time": transcription_time
}
return JSONResponse(content=response)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
@app.post('/analyze_all/')
async def analyze_all(file: UploadFile, language: str = Form(...)):
"""
Endpoint to analyze all aspects of an uploaded audio file (.wav or .mp3).
"""
if not file.filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.")
# Generate a safe temporary file path
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_dir = "temp_uploads"
temp_filepath = os.path.join(temp_dir, temp_filename)
os.makedirs(temp_dir, exist_ok=True)
try:
# Save uploaded file
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze all aspects using your custom functions
fluency_result = analyze_fluency_main(temp_filepath, model_size="base")
tone_result = analyze_tone_main(temp_filepath)
vcs_result = analyze_vcs_main(temp_filepath)
vers_result = analyze_vers_main(temp_filepath)
voice_confidence_result = analyze_voice_confidence_main(temp_filepath)
vps_result = analyze_vps_main(temp_filepath)
ves_result = calc_voice_engagement_score(temp_filepath)
filler_count = analyze_fillers(temp_filepath) # Assuming this function returns a dict with filler count
transcript = transcribe_audio(temp_filepath, language, "base") #fix this
# Combine results into a single response
combined_result = {
"fluency": fluency_result,
"tone": tone_result,
"vcs": vcs_result,
"vers": vers_result,
"voice_confidence": voice_confidence_result,
"vps": vps_result,
"ves": ves_result,
"filler_words": filler_count,
"transcript": transcript
}
return JSONResponse(content=combined_result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_filepath):
os.remove(temp_filepath)