from fastapi import FastAPI, UploadFile, File, Form , HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import sys import os import shutil import uuid # Ensure sibling module fluency is discoverable #sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from fluency.fluency_api import main as analyze_fluency_main from tone_modulation.tone_api import main as analyze_tone_main from vcs.vcs_api import main as analyze_vcs_main from vers.vers_api import main as analyze_vers_main from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main from vps.vps_api import main as analyze_vps_main from ves.ves import calc_voice_engagement_score from transcribe import transcribe_audio from filler_count.filler_score import analyze_fillers from emotion.emo_predict import predict_emotion app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, replace "*" with allowed frontend domains allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/analyze_fluency/") async def analyze_fluency(file: UploadFile): # idk if we can use pydantic model here If we need I can add later if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path for temporary storage of the uploaded file this will be deleted after processing temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) result = analyze_fluency_main(temp_filepath, model_size="base") return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Fluency analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) @app.post('/analyze_tone/') async def analyze_tone(file: UploadFile): """ Endpoint to analyze tone of an uploaded audio file (.wav or .mp3). """ if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Analyze tone using your custom function result = analyze_tone_main(temp_filepath) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Tone analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) @app.post('/analyze_vcs/') async def analyze_vcs(file: UploadFile): """ Endpoint to analyze voice clarity of an uploaded audio file (.wav or .mp3). """ if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Analyze voice clarity using your custom function result = analyze_vcs_main(temp_filepath) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Voice clarity analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) @app.post('/analyze_vers/') async def analyze_vers(file: UploadFile): """ Endpoint to analyze VERS of an uploaded audio file (.wav or .mp3). """ if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Analyze VERS using your custom function result = analyze_vers_main(temp_filepath) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"VERS analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) @app.post('/voice_confidence/') async def analyze_voice_confidence(file: UploadFile): """ Endpoint to analyze voice confidence of an uploaded audio file (.wav or .mp3). """ if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Analyze voice confidence using your custom function result = analyze_voice_confidence_main(temp_filepath) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Voice confidence analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) @app.post('/analyze_vps/') async def analyze_vps(file: UploadFile): """ Endpoint to analyze voice pacing score of an uploaded audio file (.wav or .mp3). """ if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Analyze voice pacing score using your custom function result = analyze_vps_main(temp_filepath) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Voice pacing score analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) @app.post('/voice_engagement_score/') async def analyze_voice_engagement_score(file: UploadFile): """ Endpoint to analyze voice engagement score of an uploaded audio file (.wav or .mp3). """ if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Analyze voice engagement score using your custom function result = calc_voice_engagement_score(temp_filepath) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Voice engagement score analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) @app.post('/analyze_fillers/') async def analyze_fillers_count(file: UploadFile): """ Endpoint to analyze filler words in an uploaded audio file (.wav or .mp3). """ if not file.filename.endswith(('.wav', '.mp3','.mp4','.m4a','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Call the analysis function with the file path result = analyze_fillers(temp_filepath) # Pass the file path, not the UploadFile object return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Filler analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) import time @app.post('/transcribe/') async def transcribe(file: UploadFile, language: str = Form(...)): """ Endpoint to transcribe an uploaded audio file (.wav or .mp3). """ #calculate time to transcribe start_time = time.time() if not file.filename.endswith(('.wav', '.mp3','mp4','.m4a','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav ,mp4 and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Transcribe using your custom function result = transcribe_audio(temp_filepath, language=language, model_size="base") end_time = time.time() transcription_time = end_time - start_time response = { "transcription": result, "transcription_time": transcription_time } return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath) @app.post('/analyze_all/') async def analyze_all(file: UploadFile, language: str = Form(...)): """ Endpoint to analyze all aspects of an uploaded audio file (.wav or .mp3). """ if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") # Generate a safe temporary file path temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_dir = "temp_uploads" temp_filepath = os.path.join(temp_dir, temp_filename) os.makedirs(temp_dir, exist_ok=True) try: # Save uploaded file with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Analyze all aspects using your custom functions fluency_result = analyze_fluency_main(temp_filepath, model_size="base") tone_result = analyze_tone_main(temp_filepath) vcs_result = analyze_vcs_main(temp_filepath) vers_result = analyze_vers_main(temp_filepath) voice_confidence_result = analyze_voice_confidence_main(temp_filepath) vps_result = analyze_vps_main(temp_filepath) ves_result = calc_voice_engagement_score(temp_filepath) filler_count = analyze_fillers(temp_filepath) # Assuming this function returns a dict with filler count transcript = transcribe_audio(temp_filepath, language, "base") #fix this emotion = predict_emotion(temp_filepath) avg_score = (fluency_result['fluency_score'] + tone_result['speech_dynamism_score'] + vcs_result['Voice Clarity Sore'] + vers_result['VERS Score'] + voice_confidence_result['voice_confidence_score'] + vps_result['VPS'] + ves_result['ves']) / 7 # Combine results into a single response combined_result = { "fluency": fluency_result, "tone": tone_result, "vcs": vcs_result, "vers": vers_result, "voice_confidence": voice_confidence_result, "vps": vps_result, "ves": ves_result, "filler_words": filler_count, "transcript": transcript, "emotion": emotion , "sank_score": avg_score } return JSONResponse(content=combined_result) except Exception as e: raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_filepath): os.remove(temp_filepath)