from typing import Optional, Tuple, List from enum import Enum from config import agent, patients_collection, analysis_collection, alerts_collection, logger from models import RiskLevel from utils import ( structure_medical_response, compute_file_content_hash, compute_patient_data_hash, serialize_patient, broadcast_notification ) from datetime import datetime import asyncio import json import re import os class NotificationType(str, Enum): RISK_ALERT = "risk_alert" SYSTEM = "system" MESSAGE = "message" class NotificationStatus(str, Enum): UNREAD = "unread" READ = "read" ARCHIVED = "archived" async def create_alert(patient_id: str, risk_data: dict): try: alert_doc = { "patient_id": patient_id, "type": "suicide_risk", "level": risk_data["level"], "score": risk_data["score"], "factors": risk_data["factors"], "timestamp": datetime.utcnow(), "acknowledged": False, "notification": { "type": "risk_alert", "status": "unread", "title": f"Suicide Risk: {risk_data['level'].capitalize()}", "message": f"Patient {patient_id} shows {risk_data['level']} risk factors", "icon": "โš ๏ธ", "action_url": f"/patient/{patient_id}/risk-assessment", "priority": "high" if risk_data["level"] in ["high", "severe"] else "medium" } } await alerts_collection.insert_one(alert_doc) # Simplified WebSocket notification - remove Hugging Face specific code await broadcast_notification(alert_doc["notification"]) logger.warning(f"โš ๏ธ Created suicide risk alert for patient {patient_id}") return alert_doc except Exception as e: logger.error(f"Failed to create alert: {str(e)}") raise async def analyze_patient_report( patient_id: Optional[str], report_content: str, file_type: str, file_content: bytes ): """Analyze a patient report and create alerts for risks""" identifier = patient_id if patient_id else compute_file_content_hash(file_content) report_data = {"identifier": identifier, "content": report_content, "file_type": file_type} report_hash = compute_patient_data_hash(report_data) logger.info(f"๐Ÿงพ Analyzing report for identifier: {identifier}") # Check for existing analysis existing_analysis = await analysis_collection.find_one( {"identifier": identifier, "report_hash": report_hash} ) if existing_analysis: logger.info(f"โœ… No changes in report data for {identifier}, skipping analysis") return existing_analysis try: # Generate analysis prompt = ( "You are a clinical decision support AI. Analyze the following patient report:\n" "1. Summarize the patient's medical history.\n" "2. Identify risks or red flags (including mental health and suicide risk).\n" "3. Highlight missed diagnoses or treatments.\n" "4. Suggest next clinical steps.\n" f"\nPatient Report ({file_type}):\n{'-'*40}\n{report_content[:10000]}" ) raw_response = agent.chat( message=prompt, history=[], temperature=0.7, max_new_tokens=1024 ) structured_response = structure_medical_response(raw_response) # Detect suicide risk risk_level, risk_score, risk_factors = detect_suicide_risk(raw_response) suicide_risk = { "level": risk_level.value, "score": risk_score, "factors": risk_factors } # Store analysis analysis_doc = { "identifier": identifier, "patient_id": patient_id, "timestamp": datetime.utcnow(), "summary": structured_response, "suicide_risk": suicide_risk, "raw": raw_response, "report_hash": report_hash, "file_type": file_type } await analysis_collection.update_one( {"identifier": identifier, "report_hash": report_hash}, {"$set": analysis_doc}, upsert=True ) # Create alert if risk detected if patient_id and risk_level in [RiskLevel.MODERATE, RiskLevel.HIGH, RiskLevel.SEVERE]: await create_alert(patient_id, suicide_risk) logger.info(f"โœ… Stored analysis for identifier {identifier}") return analysis_doc except Exception as e: logger.error(f"Error analyzing report for {identifier}: {str(e)}") error_alert = { "identifier": identifier, "type": "system_error", "level": "high", "message": f"Report analysis failed: {str(e)}", "timestamp": datetime.utcnow(), "acknowledged": False, "notification": { "type": NotificationType.SYSTEM, "status": NotificationStatus.UNREAD, "title": "Report Analysis Error", "message": f"Failed to analyze report for {'patient ' + patient_id if patient_id else 'unknown identifier'}", "icon": "โŒ", "action_url": "/system/errors", "priority": "high" } } await alerts_collection.insert_one(error_alert) raise async def analyze_patient(patient: dict): """Analyze complete patient record and create alerts for risks""" try: serialized = serialize_patient(patient) patient_id = serialized.get("fhir_id") patient_hash = compute_patient_data_hash(serialized) logger.info(f"๐Ÿงพ Analyzing patient: {patient_id}") # Check for existing analysis existing_analysis = await analysis_collection.find_one({"patient_id": patient_id}) if existing_analysis and existing_analysis.get("data_hash") == patient_hash: logger.info(f"โœ… No changes in patient data for {patient_id}, skipping analysis") return # Generate analysis doc = json.dumps(serialized, indent=2) message = ( "You are a clinical decision support AI.\n\n" "Given the patient document below:\n" "1. Summarize the patient's medical history.\n" "2. Identify risks or red flags (including mental health and suicide risk).\n" "3. Highlight missed diagnoses or treatments.\n" "4. Suggest next clinical steps.\n" f"\nPatient Document:\n{'-'*40}\n{doc[:10000]}" ) raw = agent.chat(message=message, history=[], temperature=0.7, max_new_tokens=1024) structured = structure_medical_response(raw) # Detect suicide risk risk_level, risk_score, risk_factors = detect_suicide_risk(raw) suicide_risk = { "level": risk_level.value, "score": risk_score, "factors": risk_factors } # Store analysis analysis_doc = { "identifier": patient_id, "patient_id": patient_id, "timestamp": datetime.utcnow(), "summary": structured, "suicide_risk": suicide_risk, "raw": raw, "data_hash": patient_hash } await analysis_collection.update_one( {"identifier": patient_id}, {"$set": analysis_doc}, upsert=True ) # Create alert if risk detected if risk_level in [RiskLevel.MODERATE, RiskLevel.HIGH, RiskLevel.SEVERE]: await create_alert(patient_id, suicide_risk) logger.info(f"โœ… Stored analysis for patient {patient_id}") except Exception as e: logger.error(f"Error analyzing patient: {str(e)}") error_alert = { "patient_id": patient_id if 'patient_id' in locals() else "unknown", "type": "system_error", "level": "high", "message": f"Patient analysis failed: {str(e)}", "timestamp": datetime.utcnow(), "acknowledged": False, "notification": { "type": NotificationType.SYSTEM, "status": NotificationStatus.UNREAD, "title": "Analysis Error", "message": f"Failed to analyze patient {patient_id if 'patient_id' in locals() else 'unknown'}", "icon": "โŒ", "action_url": "/system/errors", "priority": "high" } } await alerts_collection.insert_one(error_alert) raise def detect_suicide_risk(text: str) -> Tuple[RiskLevel, float, List[str]]: """Detect suicide risk level from text analysis""" suicide_keywords = [ 'suicide', 'suicidal', 'kill myself', 'end my life', 'want to die', 'self-harm', 'self harm', 'hopeless', 'no reason to live', 'plan to die' ] explicit_mentions = [kw for kw in suicide_keywords if kw in text.lower()] if not explicit_mentions: return RiskLevel.NONE, 0.0, [] try: # Get AI assessment assessment_prompt = ( "Assess the suicide risk level based on this text. " "Consider frequency, specificity, and severity of statements. " "Respond with JSON format: {\"risk_level\": \"low/moderate/high/severe\", " "\"risk_score\": 0-1, \"factors\": [\"list of risk factors\"]}\n\n" f"Text to assess:\n{text}" ) response = agent.chat( message=assessment_prompt, history=[], temperature=0.2, max_new_tokens=256 ) # Parse response json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: assessment = json.loads(json_match.group()) return ( RiskLevel(assessment.get("risk_level", "none").lower()), float(assessment.get("risk_score", 0)), assessment.get("factors", []) ) except Exception as e: logger.error(f"Error in suicide risk assessment: {e}") # Fallback heuristic if AI assessment fails risk_score = min(0.1 * len(explicit_mentions), 0.9) if risk_score > 0.7: return RiskLevel.HIGH, risk_score, explicit_mentions elif risk_score > 0.4: return RiskLevel.MODERATE, risk_score, explicit_mentions return RiskLevel.LOW, risk_score, explicit_mentions