from typing import Optional, Tuple, List from config import agent, patients_collection, analysis_collection, alerts_collection, logger from models import RiskLevel from utils import structure_medical_response, compute_file_content_hash, compute_patient_data_hash, serialize_patient from datetime import datetime import asyncio import json import re async def create_alert(patient_id: str, risk_data: dict): alert_doc = { "patient_id": patient_id, "type": "suicide_risk", "level": risk_data["level"], "score": risk_data["score"], "factors": risk_data["factors"], "timestamp": datetime.utcnow(), "acknowledged": False } await alerts_collection.insert_one(alert_doc) logger.warning(f"โš ๏ธ Created suicide risk alert for patient {patient_id}") async def analyze_patient_report(patient_id: Optional[str], report_content: str, file_type: str, file_content: bytes): identifier = patient_id if patient_id else compute_file_content_hash(file_content) report_data = {"identifier": identifier, "content": report_content, "file_type": file_type} report_hash = compute_patient_data_hash(report_data) logger.info(f"๐Ÿงพ Analyzing report for identifier: {identifier}") existing_analysis = await analysis_collection.find_one({"identifier": identifier, "report_hash": report_hash}) if existing_analysis: logger.info(f"โœ… No changes in report data for {identifier}, skipping analysis") return existing_analysis prompt = ( "You are a clinical decision support AI. Analyze the following patient report:\n" "1. Summarize the patient's medical history.\n" "2. Identify risks or red flags (including mental health and suicide risk).\n" "3. Highlight missed diagnoses or treatments.\n" "4. Suggest next clinical steps.\n" f"\nPatient Report ({file_type}):\n{'-'*40}\n{report_content[:10000]}" ) raw_response = agent.chat( message=prompt, history=[], temperature=0.7, max_new_tokens=1024 ) structured_response = structure_medical_response(raw_response) risk_level, risk_score, risk_factors = detect_suicide_risk(raw_response) suicide_risk = { "level": risk_level.value, "score": risk_score, "factors": risk_factors } analysis_doc = { "identifier": identifier, "patient_id": patient_id, "timestamp": datetime.utcnow(), "summary": structured_response, "suicide_risk": suicide_risk, "raw": raw_response, "report_hash": report_hash, "file_type": file_type } await analysis_collection.update_one( {"identifier": identifier, "report_hash": report_hash}, {"$set": analysis_doc}, upsert=True ) if patient_id and risk_level in [RiskLevel.MODERATE, RiskLevel.HIGH, RiskLevel.SEVERE]: await create_alert(patient_id, suicide_risk) logger.info(f"โœ… Stored analysis for identifier {identifier}") return analysis_doc async def analyze_patient(patient: dict): try: serialized = serialize_patient(patient) patient_id = serialized.get("fhir_id") patient_hash = compute_patient_data_hash(serialized) logger.info(f"๐Ÿงพ Analyzing patient: {patient_id}") existing_analysis = await analysis_collection.find_one({"patient_id": patient_id}) if existing_analysis and existing_analysis.get("data_hash") == patient_hash: logger.info(f"โœ… No changes in patient data for {patient_id}, skipping analysis") return doc = json.dumps(serialized, indent=2) message = ( "You are a clinical decision support AI.\n\n" "Given the patient document below:\n" "1. Summarize the patient's medical history.\n" "2. Identify risks or red flags (including mental health and suicide risk).\n" "3. Highlight missed diagnoses or treatments.\n" "4. Suggest next clinical steps.\n" f"\nPatient Document:\n{'-'*40}\n{doc[:10000]}" ) raw = agent.chat(message=message, history=[], temperature=0.7, max_new_tokens=1024) structured = structure_medical_response(raw) risk_level, risk_score, risk_factors = detect_suicide_risk(raw) suicide_risk = { "level": risk_level.value, "score": risk_score, "factors": risk_factors } analysis_doc = { "identifier": patient_id, "patient_id": patient_id, "timestamp": datetime.utcnow(), "summary": structured, "suicide_risk": suicide_risk, "raw": raw, "data_hash": patient_hash } await analysis_collection.update_one( {"identifier": patient_id}, {"$set": analysis_doc}, upsert=True ) if risk_level in [RiskLevel.MODERATE, RiskLevel.HIGH, RiskLevel.SEVERE]: await create_alert(patient_id, suicide_risk) logger.info(f"โœ… Stored analysis for patient {patient_id}") except Exception as e: logger.error(f"Error analyzing patient: {e}") def detect_suicide_risk(text: str) -> Tuple[RiskLevel, float, List[str]]: suicide_keywords = [ 'suicide', 'suicidal', 'kill myself', 'end my life', 'want to die', 'self-harm', 'self harm', 'hopeless', 'no reason to live', 'plan to die' ] explicit_mentions = [kw for kw in suicide_keywords if kw in text.lower()] if not explicit_mentions: return RiskLevel.NONE, 0.0, [] assessment_prompt = ( "Assess the suicide risk level based on this text. " "Consider frequency, specificity, and severity of statements. " "Respond with JSON format: {\"risk_level\": \"low/moderate/high/severe\", " "\"risk_score\": 0-1, \"factors\": [\"list of risk factors\"]}\n\n" f"Text to assess:\n{text}" ) try: response = agent.chat( message=assessment_prompt, history=[], temperature=0.2, max_new_tokens=256 ) json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: assessment = json.loads(json_match.group()) return ( RiskLevel(assessment.get("risk_level", "none").lower()), float(assessment.get("risk_score", 0)), assessment.get("factors", []) ) except Exception as e: logger.error(f"Error in suicide risk assessment: {e}") risk_score = min(0.1 * len(explicit_mentions), 0.9) if risk_score > 0.7: return RiskLevel.HIGH, risk_score, explicit_mentions elif risk_score > 0.4: return RiskLevel.MODERATE, risk_score, explicit_mentions return RiskLevel.LOW, risk_score, explicit_mentions