Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query, Form, Path | |
| from fastapi.responses import StreamingResponse, JSONResponse, FileResponse | |
| from fastapi.encoders import jsonable_encoder | |
| from typing import Optional, List | |
| from pydantic import BaseModel | |
| from auth import get_current_user | |
| from utils import clean_text_response | |
| from analysis import analyze_patient_report | |
| from voice import recognize_speech, text_to_speech, extract_text_from_pdf | |
| from docx import Document | |
| import re | |
| import io | |
| from datetime import datetime | |
| from bson import ObjectId | |
| import asyncio | |
| from bson.errors import InvalidId | |
| import base64 | |
| import os | |
| from pathlib import Path as PathLib | |
| import tempfile | |
| import subprocess | |
| # Define the ChatRequest model with an optional patient_id | |
| class ChatRequest(BaseModel): | |
| message: str | |
| history: Optional[List[dict]] = None | |
| format: Optional[str] = "clean" | |
| temperature: Optional[float] = 0.7 | |
| max_new_tokens: Optional[int] = 512 | |
| patient_id: Optional[str] = None | |
| class VoiceOutputRequest(BaseModel): | |
| text: str | |
| language: str = "en-US" | |
| slow: bool = False | |
| return_format: str = "mp3" | |
| class RiskLevel(BaseModel): | |
| level: str | |
| score: float | |
| factors: Optional[List[str]] = None | |
| def create_router(agent, logger, patients_collection, analysis_collection, users_collection, chats_collection, notifications_collection): | |
| router = APIRouter() | |
| async def status(current_user: dict = Depends(get_current_user)): | |
| logger.info(f"Status endpoint accessed by {current_user['email']}") | |
| return { | |
| "status": "running", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "version": "2.6.0", | |
| "features": ["chat", "voice-input", "voice-output", "patient-analysis", "report-upload", "patient-reports-pdf", "all-patients-reports-pdf"] | |
| } | |
| async def get_patient_analysis_results( | |
| name: Optional[str] = Query(None), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Fetching analysis results by {current_user['email']}") | |
| try: | |
| query = {} | |
| if name: | |
| name_regex = re.compile(name, re.IGNORECASE) | |
| matching_patients = await patients_collection.find({"full_name": name_regex}).to_list(length=None) | |
| patient_ids = [p["fhir_id"] for p in matching_patients if "fhir_id" in p] | |
| if not patient_ids: | |
| return [] | |
| query = {"patient_id": {"$in": patient_ids}} | |
| analyses = await analysis_collection.find(query).sort("timestamp", -1).to_list(length=100) | |
| enriched_results = [] | |
| for analysis in analyses: | |
| patient = await patients_collection.find_one({"fhir_id": analysis.get("patient_id")}) | |
| if not patient: | |
| continue # Skip if patient no longer exists | |
| # Format the response to match the expected structure | |
| formatted_analysis = { | |
| "_id": str(analysis["_id"]), | |
| "patient_id": analysis.get("patient_id"), | |
| "full_name": patient.get("full_name", "Unknown"), | |
| "timestamp": analysis.get("timestamp"), | |
| "suicide_risk": { | |
| "level": analysis.get("suicide_risk", {}).get("level", "none"), | |
| "score": analysis.get("suicide_risk", {}).get("score", 0.0), | |
| "factors": analysis.get("suicide_risk", {}).get("factors", []) | |
| }, | |
| "summary": analysis.get("summary", ""), | |
| "recommendations": analysis.get("recommendations", []), | |
| # Add patient demographic information for modal display | |
| "date_of_birth": patient.get("date_of_birth"), | |
| "gender": patient.get("gender"), | |
| "city": patient.get("city"), | |
| "state": patient.get("state"), | |
| "phone": patient.get("phone"), | |
| "email": patient.get("email"), | |
| "address": patient.get("address"), | |
| "zip_code": patient.get("zip_code"), | |
| "insurance": patient.get("insurance"), | |
| "emergency_contact": patient.get("emergency_contact") | |
| } | |
| enriched_results.append(formatted_analysis) | |
| return enriched_results | |
| except Exception as e: | |
| logger.error(f"Error fetching analysis results: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve analysis results") | |
| async def analyze_patients( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Trigger analysis for all patients""" | |
| logger.info(f"Triggering analysis for all patients by {current_user['email']}") | |
| try: | |
| # Get all patients | |
| patients = await patients_collection.find({}).to_list(length=None) | |
| if not patients: | |
| return {"message": "No patients found to analyze", "analyzed_count": 0} | |
| analyzed_count = 0 | |
| for patient in patients: | |
| try: | |
| from analysis import analyze_patient | |
| await analyze_patient(patient) | |
| analyzed_count += 1 | |
| logger.info(f"✅ Analyzed patient: {patient.get('full_name', 'Unknown')}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to analyze patient {patient.get('full_name', 'Unknown')}: {e}") | |
| continue | |
| return { | |
| "message": f"Analysis completed for {analyzed_count} patients", | |
| "analyzed_count": analyzed_count, | |
| "total_patients": len(patients) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error triggering analysis: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to trigger analysis") | |
| async def analyze_specific_patient( | |
| patient_id: str = Path(..., description="The ID of the patient to analyze"), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Trigger analysis for a specific patient""" | |
| logger.info(f"Triggering analysis for patient {patient_id} by {current_user['email']}") | |
| try: | |
| # Get the patient | |
| patient = await patients_collection.find_one({"fhir_id": patient_id}) | |
| if not patient: | |
| raise HTTPException(status_code=404, detail="Patient not found") | |
| # Analyze the patient | |
| from analysis import analyze_patient | |
| await analyze_patient(patient) | |
| return { | |
| "message": f"Analysis completed for patient {patient.get('full_name', 'Unknown')}", | |
| "patient_id": patient_id, | |
| "patient_name": patient.get("full_name", "Unknown") | |
| } | |
| except Exception as e: | |
| logger.error(f"Error analyzing patient {patient_id}: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to analyze patient") | |
| async def get_patient_analysis_reports_pdf( | |
| patient_id: str = Path(..., description="The ID of the patient"), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Generating PDF analysis reports for patient {patient_id} by {current_user['email']}") | |
| try: | |
| # Fetch patient details | |
| patient = await patients_collection.find_one({"fhir_id": patient_id}) | |
| if not patient: | |
| raise HTTPException(status_code=404, detail="Patient not found") | |
| # Fetch all analyses for the patient | |
| analyses = await analysis_collection.find({"patient_id": patient_id}).sort("timestamp", -1).to_list(length=None) | |
| if not analyses: | |
| raise HTTPException(status_code=404, detail="No analysis reports found for this patient") | |
| # Generate PDF using ReportLab | |
| try: | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.lib import colors | |
| from io import BytesIO | |
| except ImportError: | |
| logger.error("ReportLab not available, falling back to text report") | |
| raise HTTPException(status_code=500, detail="PDF generation not available") | |
| logger.info("📄 Generating PDF report using ReportLab") | |
| buffer = BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18) | |
| story = [] | |
| styles = getSampleStyleSheet() | |
| # Custom styles | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=18, | |
| spaceAfter=30, | |
| alignment=1, | |
| textColor=colors.darkblue | |
| ) | |
| heading_style = ParagraphStyle( | |
| 'CustomHeading', | |
| parent=styles['Heading2'], | |
| fontSize=14, | |
| spaceAfter=12, | |
| spaceBefore=20, | |
| textColor=colors.darkblue | |
| ) | |
| normal_style = styles['Normal'] | |
| # Title | |
| patient_name = patient.get("full_name", "Unknown") | |
| story.append(Paragraph(f"PATIENT ANALYSIS REPORT", title_style)) | |
| story.append(Paragraph(f"Patient: {patient_name}", normal_style)) | |
| story.append(Paragraph(f"Generated on {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}", normal_style)) | |
| story.append(Spacer(1, 20)) | |
| # Analysis Reports | |
| for idx, analysis in enumerate(analyses, 1): | |
| timestamp = analysis.get("timestamp", datetime.utcnow()).strftime("%Y-%m-%d %H:%M:%S") | |
| suicide_risk = analysis.get("suicide_risk", {}) | |
| risk_level = suicide_risk.get("level", "none").capitalize() | |
| risk_score = suicide_risk.get("score", 0.0) | |
| risk_factors = suicide_risk.get("factors", []) | |
| story.append(Paragraph(f"Report {idx} - {timestamp}", heading_style)) | |
| # Risk Assessment Table | |
| risk_data = [ | |
| ["Risk Level", risk_level], | |
| ["Risk Score", f"{risk_score:.2f}"], | |
| ["Risk Factors", ", ".join(risk_factors) if risk_factors else "None"] | |
| ] | |
| risk_table = Table(risk_data, colWidths=[2*inch, 4*inch]) | |
| risk_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.lightblue), | |
| ('TEXTCOLOR', (0, 0), (-1, -1), colors.black), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 10), | |
| ('BOTTOMPADDING', (0, 0), (-1, -1), 12), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.beige), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(risk_table) | |
| story.append(Spacer(1, 12)) | |
| # Summary and Recommendations | |
| if analysis.get("summary"): | |
| story.append(Paragraph("Summary:", heading_style)) | |
| story.append(Paragraph(str(analysis["summary"]), normal_style)) | |
| story.append(Spacer(1, 12)) | |
| if analysis.get("recommendations"): | |
| recommendations = analysis["recommendations"] | |
| if isinstance(recommendations, list): | |
| recommendations = ", ".join(recommendations) | |
| story.append(Paragraph("Recommendations:", heading_style)) | |
| story.append(Paragraph(str(recommendations), normal_style)) | |
| story.append(Spacer(1, 20)) | |
| # Build PDF | |
| doc.build(story) | |
| pdf_content = buffer.getvalue() | |
| buffer.close() | |
| # Return PDF as response | |
| return StreamingResponse( | |
| io.BytesIO(pdf_content), | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"attachment; filename={patient_name.replace(' ', '_')}_{patient_id}_analysis_reports.pdf"} | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error generating PDF report for patient {patient_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to generate PDF report: {str(e)}") | |
| async def get_all_patients_analysis_reports_pdf( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Generating PDF analysis reports for all patients by {current_user['email']}") | |
| try: | |
| # Fetch all patients | |
| patients = await patients_collection.find().to_list(length=None) | |
| if not patients: | |
| raise HTTPException(status_code=404, detail="No patients found") | |
| # Generate PDF using ReportLab | |
| try: | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.lib import colors | |
| from io import BytesIO | |
| except ImportError: | |
| logger.error("ReportLab not available, falling back to text report") | |
| raise HTTPException(status_code=500, detail="PDF generation not available") | |
| logger.info("📄 Generating PDF report for all patients using ReportLab") | |
| buffer = BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18) | |
| story = [] | |
| styles = getSampleStyleSheet() | |
| # Custom styles | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=20, | |
| spaceAfter=30, | |
| alignment=1, | |
| textColor=colors.darkblue | |
| ) | |
| patient_heading_style = ParagraphStyle( | |
| 'PatientHeading', | |
| parent=styles['Heading2'], | |
| fontSize=16, | |
| spaceAfter=15, | |
| spaceBefore=25, | |
| textColor=colors.darkgreen | |
| ) | |
| report_heading_style = ParagraphStyle( | |
| 'ReportHeading', | |
| parent=styles['Heading3'], | |
| fontSize=12, | |
| spaceAfter=10, | |
| spaceBefore=15, | |
| textColor=colors.darkblue | |
| ) | |
| normal_style = styles['Normal'] | |
| # Title | |
| story.append(Paragraph("ALL PATIENTS ANALYSIS REPORTS", title_style)) | |
| story.append(Paragraph(f"Generated on {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}", normal_style)) | |
| story.append(Spacer(1, 30)) | |
| # Flag to track if any analyses exist | |
| has_analyses = False | |
| # Iterate through each patient | |
| for patient in patients: | |
| patient_id = patient.get("fhir_id") | |
| patient_name = patient.get("full_name", "Unknown") | |
| # Fetch all analyses for the current patient | |
| analyses = await analysis_collection.find({"patient_id": patient_id}).sort("timestamp", -1).to_list(length=None) | |
| if not analyses: | |
| continue # Skip patients with no analyses | |
| has_analyses = True | |
| # Patient section | |
| story.append(Paragraph(f"Patient: {patient_name}", patient_heading_style)) | |
| story.append(Spacer(1, 10)) | |
| # Analysis Reports for this patient | |
| for idx, analysis in enumerate(analyses, 1): | |
| timestamp = analysis.get("timestamp", datetime.utcnow()).strftime("%Y-%m-%d %H:%M:%S") | |
| suicide_risk = analysis.get("suicide_risk", {}) | |
| risk_level = suicide_risk.get("level", "none").capitalize() | |
| risk_score = suicide_risk.get("score", 0.0) | |
| risk_factors = suicide_risk.get("factors", []) | |
| story.append(Paragraph(f"Report {idx} - {timestamp}", report_heading_style)) | |
| # Risk Assessment Table | |
| risk_data = [ | |
| ["Risk Level", risk_level], | |
| ["Risk Score", f"{risk_score:.2f}"], | |
| ["Risk Factors", ", ".join(risk_factors) if risk_factors else "None"] | |
| ] | |
| risk_table = Table(risk_data, colWidths=[2*inch, 4*inch]) | |
| risk_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.lightblue), | |
| ('TEXTCOLOR', (0, 0), (-1, -1), colors.black), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 9), | |
| ('BOTTOMPADDING', (0, 0), (-1, -1), 8), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.beige), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(risk_table) | |
| story.append(Spacer(1, 10)) | |
| # Summary and Recommendations | |
| if analysis.get("summary"): | |
| story.append(Paragraph("Summary:", report_heading_style)) | |
| story.append(Paragraph(str(analysis["summary"]), normal_style)) | |
| story.append(Spacer(1, 10)) | |
| if analysis.get("recommendations"): | |
| recommendations = analysis["recommendations"] | |
| if isinstance(recommendations, list): | |
| recommendations = ", ".join(recommendations) | |
| story.append(Paragraph("Recommendations:", report_heading_style)) | |
| story.append(Paragraph(str(recommendations), normal_style)) | |
| story.append(Spacer(1, 15)) | |
| # Add page break between patients | |
| story.append(PageBreak()) | |
| if not has_analyses: | |
| raise HTTPException(status_code=404, detail="No analysis reports found for any patients") | |
| # Build PDF | |
| doc.build(story) | |
| pdf_content = buffer.getvalue() | |
| buffer.close() | |
| # Return PDF as response | |
| return StreamingResponse( | |
| io.BytesIO(pdf_content), | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"attachment; filename=all_patients_analysis_reports_{datetime.utcnow().strftime('%Y%m%d')}.pdf"} | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error generating PDF report for all patients: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to generate PDF report: {str(e)}") | |
| async def chat_stream_endpoint( | |
| request: ChatRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Chat stream initiated by {current_user['email']}") | |
| async def token_stream(): | |
| try: | |
| conversation = [{"role": "system", "content": agent.chat_prompt}] | |
| if request.history: | |
| conversation.extend(request.history) | |
| conversation.append({"role": "user", "content": request.message}) | |
| input_ids = agent.tokenizer.apply_chat_template( | |
| conversation, add_generation_prompt=True, return_tensors="pt" | |
| ).to(agent.device) | |
| output = agent.model.generate( | |
| input_ids, | |
| do_sample=True, | |
| temperature=request.temperature, | |
| max_new_tokens=request.max_new_tokens, | |
| pad_token_id=agent.tokenizer.eos_token_id, | |
| return_dict_in_generate=True | |
| ) | |
| text = agent.tokenizer.decode(output["sequences"][0][input_ids.shape[1]:], skip_special_tokens=True) | |
| cleaned_text = clean_text_response(text) | |
| full_response = "" | |
| # Store chat session in the chats_collection | |
| chat_entry = { | |
| "user_id": current_user["email"], | |
| "patient_id": request.patient_id, | |
| "message": request.message, | |
| "response": cleaned_text, | |
| "chat_type": "chat", | |
| "timestamp": datetime.utcnow(), | |
| "temperature": request.temperature, | |
| "max_new_tokens": request.max_new_tokens | |
| } | |
| logger.info(f"Attempting to insert chat entry into chats_collection: {chat_entry}") | |
| try: | |
| result = await chats_collection.insert_one(chat_entry) | |
| chat_entry["_id"] = str(result.inserted_id) | |
| logger.info(f"Successfully inserted chat entry with ID: {chat_entry['_id']}") | |
| except Exception as db_error: | |
| logger.error(f"Failed to insert chat entry into chats_collection: {str(db_error)}") | |
| yield f"⚠️ Error: Failed to store chat in database: {str(db_error)}" | |
| return | |
| for chunk in cleaned_text.split(): | |
| full_response += chunk + " " | |
| yield chunk + " " | |
| await asyncio.sleep(0.05) | |
| # Update chat entry with full response | |
| try: | |
| update_result = await chats_collection.update_one( | |
| {"_id": result.inserted_id}, | |
| {"$set": {"response": full_response.strip()}} | |
| ) | |
| logger.info(f"Updated chat entry {chat_entry['_id']}: matched {update_result.matched_count}, modified {update_result.modified_count}") | |
| except Exception as update_error: | |
| logger.error(f"Failed to update chat entry {chat_entry['_id']}: {str(update_error)}") | |
| yield f"⚠️ Warning: Chat streamed successfully, but failed to update in database: {str(update_error)}" | |
| except Exception as e: | |
| logger.error(f"Streaming error: {e}") | |
| yield f"⚠️ Error: {e}" | |
| return StreamingResponse(token_stream(), media_type="text/plain") | |
| async def get_chats( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Fetching chats for {current_user['email']}") | |
| try: | |
| chats = await chats_collection.find({"user_id": current_user["email"], "chat_type": "chat"}).sort("timestamp", -1).to_list(length=100) | |
| logger.info(f"Retrieved {len(chats)} chats for {current_user['email']}") | |
| return [ | |
| { | |
| "id": str(chat["_id"]), | |
| "title": chat.get("message", "Untitled Chat")[:30], | |
| "timestamp": chat["timestamp"].isoformat(), | |
| "message": chat["message"], | |
| "response": chat["response"] | |
| } | |
| for chat in chats | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error fetching chats: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve chats") | |
| async def get_notifications( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Fetching notifications for {current_user['email']}") | |
| try: | |
| # Fetch notifications for the current user | |
| notifications = await notifications_collection.find({"user_id": current_user["email"]}).sort("timestamp", -1).to_list(length=10) | |
| logger.info(f"Retrieved {len(notifications)} notifications for {current_user['email']}") | |
| return [ | |
| { | |
| "id": str(notification["_id"]), | |
| "title": f"Alert for Patient {notification.get('patient_id', 'Unknown')}", | |
| "message": notification.get("message", "No message"), | |
| "timestamp": notification.get("timestamp", datetime.utcnow()).isoformat(), | |
| "severity": notification.get("severity", "info"), | |
| "read": notification.get("read", False) | |
| } | |
| for notification in notifications | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error fetching notifications: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve notifications") | |
| async def mark_notification_as_read( | |
| notification_id: str = Path(..., description="The ID of the notification to mark as read"), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Marking notification {notification_id} as read for {current_user['email']}") | |
| try: | |
| result = await notifications_collection.update_one( | |
| {"_id": ObjectId(notification_id), "user_id": current_user["email"]}, | |
| {"$set": {"read": True}} | |
| ) | |
| if result.matched_count == 0: | |
| raise HTTPException(status_code=404, detail="Notification not found or not authorized") | |
| return {"status": "success", "message": "Notification marked as read"} | |
| except InvalidId: | |
| raise HTTPException(status_code=400, detail="Invalid notification ID format") | |
| except Exception as e: | |
| logger.error(f"Error marking notification as read: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to mark notification as read") | |
| async def mark_all_notifications_as_read( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Marking all notifications as read for {current_user['email']}") | |
| try: | |
| result = await notifications_collection.update_many( | |
| {"user_id": current_user["email"], "read": False}, | |
| {"$set": {"read": True}} | |
| ) | |
| if result.matched_count == 0: | |
| logger.info("No unread notifications to mark as read") | |
| return {"status": "success", "message": f"Marked {result.modified_count} notifications as read"} | |
| except Exception as e: | |
| logger.error(f"Error marking all notifications as read: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to mark all notifications as read") | |
| async def transcribe_voice( | |
| audio: UploadFile = File(...), | |
| language: str = Query("en-US", description="Language code for speech recognition"), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Voice transcription initiated by {current_user['email']}") | |
| try: | |
| audio_data = await audio.read() | |
| if not audio.filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')): | |
| raise HTTPException(status_code=400, detail="Unsupported audio format") | |
| text = recognize_speech(audio_data, language) | |
| return {"text": text} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in voice transcription: {e}") | |
| raise HTTPException(status_code=500, detail="Error processing voice input") | |
| async def synthesize_voice( | |
| request: VoiceOutputRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Voice synthesis initiated by {current_user['email']}") | |
| try: | |
| audio_data = text_to_speech(request.text, request.language, request.slow) | |
| if request.return_format == "base64": | |
| return {"audio": base64.b64encode(audio_data).decode('utf-8')} | |
| else: | |
| return StreamingResponse( | |
| io.BytesIO(audio_data), | |
| media_type="audio/mpeg", | |
| headers={"Content-Disposition": "attachment; filename=speech.mp3"} | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in voice synthesis: {e}") | |
| raise HTTPException(status_code=500, detail="Error generating voice output") | |
| async def voice_chat_endpoint( | |
| audio: UploadFile = File(...), | |
| language: str = Query("en-US", description="Language code for speech recognition"), | |
| temperature: float = Query(0.7, ge=0.1, le=1.0), | |
| max_new_tokens: int = Query(512, ge=50, le=1024), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Voice chat initiated by {current_user['email']}") | |
| try: | |
| audio_data = await audio.read() | |
| user_message = recognize_speech(audio_data, language) | |
| chat_response = agent.chat( | |
| message=user_message, | |
| history=[], | |
| temperature=temperature, | |
| max_new_tokens=max_new_tokens | |
| ) | |
| audio_data = text_to_speech(chat_response, language.split('-')[0]) | |
| # Store voice chat in the chats_collection | |
| chat_entry = { | |
| "user_id": current_user["email"], | |
| "patient_id": None, | |
| "message": user_message, | |
| "response": chat_response, | |
| "chat_type": "voice_chat", | |
| "timestamp": datetime.utcnow(), | |
| "temperature": temperature, | |
| "max_new_tokens": max_new_tokens | |
| } | |
| logger.info(f"Attempting to insert voice chat entry into chats_collection: {chat_entry}") | |
| try: | |
| result = await chats_collection.insert_one(chat_entry) | |
| chat_entry["_id"] = str(result.inserted_id) | |
| logger.info(f"Successfully inserted voice chat entry with ID: {chat_entry['_id']}") | |
| except Exception as db_error: | |
| logger.error(f"Failed to insert voice chat entry into chats_collection: {str(db_error)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to store voice chat: {str(db_error)}") | |
| return StreamingResponse( | |
| io.BytesIO(audio_data), | |
| media_type="audio/mpeg", | |
| headers={"Content-Disposition": "attachment; filename=response.mp3"} | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in voice chat: {e}") | |
| raise HTTPException(status_code=500, detail="Error processing voice chat") | |
| async def analyze_clinical_report( | |
| file: UploadFile = File(...), | |
| patient_id: Optional[str] = Form(None), | |
| temperature: float = Form(0.5), | |
| max_new_tokens: int = Form(1024), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Report analysis initiated by {current_user['email']}") | |
| try: | |
| content_type = file.content_type | |
| allowed_types = [ | |
| 'application/pdf', | |
| 'text/plain', | |
| 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' | |
| ] | |
| if content_type not in allowed_types: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file type: {content_type}. Supported types: PDF, TXT, DOCX" | |
| ) | |
| file_content = await file.read() | |
| if content_type == 'application/pdf': | |
| text = extract_text_from_pdf(file_content) | |
| elif content_type == 'text/plain': | |
| text = file_content.decode('utf-8') | |
| elif content_type == 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': | |
| doc = Document(io.BytesIO(file_content)) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| else: | |
| raise HTTPException(status_code=400, detail="Unsupported file type") | |
| text = clean_text_response(text) | |
| if len(text.strip()) < 50: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Extracted text is too short (minimum 50 characters required)" | |
| ) | |
| analysis = await analyze_patient_report( | |
| patient_id=patient_id, | |
| report_content=text, | |
| file_type=content_type, | |
| file_content=file_content | |
| ) | |
| logger.info(f"Analysis result for patient {patient_id}: {analysis}") | |
| # Create a notification if suicide risk is detected | |
| suicide_risk = analysis.get("suicide_risk", {}) | |
| logger.info(f"Suicide risk detected: {suicide_risk}") | |
| if suicide_risk.get("level") != "none": | |
| notification = { | |
| "user_id": current_user["email"], | |
| "message": f"Suicide risk alert for patient {patient_id}: {suicide_risk['level'].upper()} (Score: {suicide_risk['score']})", | |
| "patient_id": patient_id, | |
| "timestamp": datetime.utcnow(), | |
| "severity": "high" if suicide_risk["level"] in ["moderate", "severe"] else "medium", | |
| "read": False | |
| } | |
| await notifications_collection.insert_one(notification) | |
| logger.info(f"✅ Created notification for suicide risk alert: {notification}") | |
| else: | |
| logger.warning(f"No suicide risk detected for patient {patient_id}, no notification created") | |
| if "_id" in analysis and isinstance(analysis["_id"], ObjectId): | |
| analysis["_id"] = str(analysis["_id"]) | |
| if "timestamp" in analysis and isinstance(analysis["timestamp"], datetime): | |
| analysis["timestamp"] = analysis["timestamp"].isoformat() | |
| return JSONResponse(content=jsonable_encoder({ | |
| "status": "success", | |
| "analysis": analysis, | |
| "patient_id": patient_id, | |
| "file_type": content_type, | |
| "file_size": len(file_content) | |
| })) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in report analysis: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to analyze report: {str(e)}" | |
| ) | |
| async def delete_patient( | |
| patient_id: str, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Patient deletion initiated by {current_user['email']} for patient {patient_id}") | |
| try: | |
| # Check if the patient exists | |
| patient = await patients_collection.find_one({"fhir_id": patient_id}) | |
| if not patient: | |
| raise HTTPException(status_code=404, detail="Patient not found") | |
| # Check if the current user is authorized (e.g., created_by matches or is admin) | |
| if patient.get("created_by") != current_user["email"] and not current_user.get("is_admin", False): | |
| raise HTTPException(status_code=403, detail="Not authorized to delete this patient") | |
| # Delete all analyses and chats associated with this patient | |
| await analysis_collection.delete_many({"patient_id": patient_id}) | |
| await chats_collection.delete_many({"patient_id": patient_id}) | |
| logger.info(f"Deleted analyses and chats for patient {patient_id}") | |
| # Delete the patient | |
| await patients_collection.delete_one({"fhir_id": patient_id}) | |
| logger.info(f"Patient {patient_id} deleted successfully") | |
| return {"status": "success", "message": f"Patient {patient_id} and associated analyses/chats deleted"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error deleting patient {patient_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to delete patient: {str(e)}") | |
| return router |