Ali2206 commited on
Commit
848a6a1
·
verified ·
1 Parent(s): 568ac0d

Update analysis.py

Browse files
Files changed (1) hide show
  1. analysis.py +145 -59
analysis.py CHANGED
@@ -1,13 +1,31 @@
1
  from typing import Optional, Tuple, List
 
2
  from config import agent, patients_collection, analysis_collection, alerts_collection, logger
3
  from models import RiskLevel
4
- from utils import structure_medical_response, compute_file_content_hash, compute_patient_data_hash, serialize_patient
 
 
 
 
 
 
5
  from datetime import datetime
6
  import asyncio
7
  import json
8
  import re
9
 
 
 
 
 
 
 
 
 
 
 
10
  async def create_alert(patient_id: str, risk_data: dict):
 
11
  alert_doc = {
12
  "patient_id": patient_id,
13
  "type": "suicide_risk",
@@ -16,7 +34,6 @@ async def create_alert(patient_id: str, risk_data: dict):
16
  "factors": risk_data["factors"],
17
  "timestamp": datetime.utcnow(),
18
  "acknowledged": False,
19
- # Facebook-like notification fields
20
  "notification": {
21
  "type": NotificationType.RISK_ALERT,
22
  "status": NotificationStatus.UNREAD,
@@ -27,83 +44,125 @@ async def create_alert(patient_id: str, risk_data: dict):
27
  "priority": "high" if risk_data["level"] in ["high", "severe"] else "medium"
28
  }
29
  }
30
- await alerts_collection.insert_one(alert_doc)
31
-
32
- # Trigger real-time notification
33
- await broadcast_notification(alert_doc["notification"])
34
 
35
- logger.warning(f"⚠️ Created suicide risk alert for patient {patient_id}")
36
- return alert_doc
37
- async def analyze_patient_report(patient_id: Optional[str], report_content: str, file_type: str, file_content: bytes):
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  identifier = patient_id if patient_id else compute_file_content_hash(file_content)
39
  report_data = {"identifier": identifier, "content": report_content, "file_type": file_type}
40
  report_hash = compute_patient_data_hash(report_data)
41
  logger.info(f"🧾 Analyzing report for identifier: {identifier}")
42
 
43
- existing_analysis = await analysis_collection.find_one({"identifier": identifier, "report_hash": report_hash})
 
 
 
44
  if existing_analysis:
45
  logger.info(f"✅ No changes in report data for {identifier}, skipping analysis")
46
  return existing_analysis
47
 
48
- prompt = (
49
- "You are a clinical decision support AI. Analyze the following patient report:\n"
50
- "1. Summarize the patient's medical history.\n"
51
- "2. Identify risks or red flags (including mental health and suicide risk).\n"
52
- "3. Highlight missed diagnoses or treatments.\n"
53
- "4. Suggest next clinical steps.\n"
54
- f"\nPatient Report ({file_type}):\n{'-'*40}\n{report_content[:10000]}"
55
- )
 
 
56
 
57
- raw_response = agent.chat(
58
- message=prompt,
59
- history=[],
60
- temperature=0.7,
61
- max_new_tokens=1024
62
- )
63
- structured_response = structure_medical_response(raw_response)
64
 
65
- risk_level, risk_score, risk_factors = detect_suicide_risk(raw_response)
66
- suicide_risk = {
67
- "level": risk_level.value,
68
- "score": risk_score,
69
- "factors": risk_factors
70
- }
 
71
 
72
- analysis_doc = {
73
- "identifier": identifier,
74
- "patient_id": patient_id,
75
- "timestamp": datetime.utcnow(),
76
- "summary": structured_response,
77
- "suicide_risk": suicide_risk,
78
- "raw": raw_response,
79
- "report_hash": report_hash,
80
- "file_type": file_type
81
- }
 
82
 
83
- await analysis_collection.update_one(
84
- {"identifier": identifier, "report_hash": report_hash},
85
- {"$set": analysis_doc},
86
- upsert=True
87
- )
88
 
89
- if patient_id and risk_level in [RiskLevel.MODERATE, RiskLevel.HIGH, RiskLevel.SEVERE]:
90
- await create_alert(patient_id, suicide_risk)
 
 
 
 
91
 
92
- logger.info(f"✅ Stored analysis for identifier {identifier}")
93
- return analysis_doc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
 
95
  async def analyze_patient(patient: dict):
 
96
  try:
97
  serialized = serialize_patient(patient)
98
  patient_id = serialized.get("fhir_id")
99
  patient_hash = compute_patient_data_hash(serialized)
100
  logger.info(f"🧾 Analyzing patient: {patient_id}")
101
 
 
102
  existing_analysis = await analysis_collection.find_one({"patient_id": patient_id})
103
  if existing_analysis and existing_analysis.get("data_hash") == patient_hash:
104
  logger.info(f"✅ No changes in patient data for {patient_id}, skipping analysis")
105
  return
106
 
 
107
  doc = json.dumps(serialized, indent=2)
108
  message = (
109
  "You are a clinical decision support AI.\n\n"
@@ -118,6 +177,7 @@ async def analyze_patient(patient: dict):
118
  raw = agent.chat(message=message, history=[], temperature=0.7, max_new_tokens=1024)
119
  structured = structure_medical_response(raw)
120
 
 
121
  risk_level, risk_score, risk_factors = detect_suicide_risk(raw)
122
  suicide_risk = {
123
  "level": risk_level.value,
@@ -125,6 +185,7 @@ async def analyze_patient(patient: dict):
125
  "factors": risk_factors
126
  }
127
 
 
128
  analysis_doc = {
129
  "identifier": patient_id,
130
  "patient_id": patient_id,
@@ -141,15 +202,36 @@ async def analyze_patient(patient: dict):
141
  upsert=True
142
  )
143
 
 
144
  if risk_level in [RiskLevel.MODERATE, RiskLevel.HIGH, RiskLevel.SEVERE]:
145
  await create_alert(patient_id, suicide_risk)
146
 
147
  logger.info(f"✅ Stored analysis for patient {patient_id}")
148
 
149
  except Exception as e:
150
- logger.error(f"Error analyzing patient: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
 
152
  def detect_suicide_risk(text: str) -> Tuple[RiskLevel, float, List[str]]:
 
153
  suicide_keywords = [
154
  'suicide', 'suicidal', 'kill myself', 'end my life',
155
  'want to die', 'self-harm', 'self harm', 'hopeless',
@@ -159,21 +241,24 @@ def detect_suicide_risk(text: str) -> Tuple[RiskLevel, float, List[str]]:
159
  if not explicit_mentions:
160
  return RiskLevel.NONE, 0.0, []
161
 
162
- assessment_prompt = (
163
- "Assess the suicide risk level based on this text. "
164
- "Consider frequency, specificity, and severity of statements. "
165
- "Respond with JSON format: {\"risk_level\": \"low/moderate/high/severe\", "
166
- "\"risk_score\": 0-1, \"factors\": [\"list of risk factors\"]}\n\n"
167
- f"Text to assess:\n{text}"
168
- )
169
-
170
  try:
 
 
 
 
 
 
 
 
 
171
  response = agent.chat(
172
  message=assessment_prompt,
173
  history=[],
174
  temperature=0.2,
175
  max_new_tokens=256
176
  )
 
 
177
  json_match = re.search(r'\{.*\}', response, re.DOTALL)
178
  if json_match:
179
  assessment = json.loads(json_match.group())
@@ -185,6 +270,7 @@ def detect_suicide_risk(text: str) -> Tuple[RiskLevel, float, List[str]]:
185
  except Exception as e:
186
  logger.error(f"Error in suicide risk assessment: {e}")
187
 
 
188
  risk_score = min(0.1 * len(explicit_mentions), 0.9)
189
  if risk_score > 0.7:
190
  return RiskLevel.HIGH, risk_score, explicit_mentions
 
1
  from typing import Optional, Tuple, List
2
+ from enum import Enum
3
  from config import agent, patients_collection, analysis_collection, alerts_collection, logger
4
  from models import RiskLevel
5
+ from utils import (
6
+ structure_medical_response,
7
+ compute_file_content_hash,
8
+ compute_patient_data_hash,
9
+ serialize_patient,
10
+ broadcast_notification
11
+ )
12
  from datetime import datetime
13
  import asyncio
14
  import json
15
  import re
16
 
17
+ class NotificationType(str, Enum):
18
+ RISK_ALERT = "risk_alert"
19
+ SYSTEM = "system"
20
+ MESSAGE = "message"
21
+
22
+ class NotificationStatus(str, Enum):
23
+ UNREAD = "unread"
24
+ READ = "read"
25
+ ARCHIVED = "archived"
26
+
27
  async def create_alert(patient_id: str, risk_data: dict):
28
+ """Create a new risk alert with notification metadata"""
29
  alert_doc = {
30
  "patient_id": patient_id,
31
  "type": "suicide_risk",
 
34
  "factors": risk_data["factors"],
35
  "timestamp": datetime.utcnow(),
36
  "acknowledged": False,
 
37
  "notification": {
38
  "type": NotificationType.RISK_ALERT,
39
  "status": NotificationStatus.UNREAD,
 
44
  "priority": "high" if risk_data["level"] in ["high", "severe"] else "medium"
45
  }
46
  }
 
 
 
 
47
 
48
+ try:
49
+ await alerts_collection.insert_one(alert_doc)
50
+ await broadcast_notification(alert_doc["notification"])
51
+ logger.warning(f"⚠️ Created suicide risk alert for patient {patient_id}")
52
+ return alert_doc
53
+ except Exception as e:
54
+ logger.error(f"Failed to create alert for patient {patient_id}: {str(e)}")
55
+ raise
56
+
57
+ async def analyze_patient_report(
58
+ patient_id: Optional[str],
59
+ report_content: str,
60
+ file_type: str,
61
+ file_content: bytes
62
+ ):
63
+ """Analyze a patient report and create alerts for risks"""
64
  identifier = patient_id if patient_id else compute_file_content_hash(file_content)
65
  report_data = {"identifier": identifier, "content": report_content, "file_type": file_type}
66
  report_hash = compute_patient_data_hash(report_data)
67
  logger.info(f"🧾 Analyzing report for identifier: {identifier}")
68
 
69
+ # Check for existing analysis
70
+ existing_analysis = await analysis_collection.find_one(
71
+ {"identifier": identifier, "report_hash": report_hash}
72
+ )
73
  if existing_analysis:
74
  logger.info(f"✅ No changes in report data for {identifier}, skipping analysis")
75
  return existing_analysis
76
 
77
+ try:
78
+ # Generate analysis
79
+ prompt = (
80
+ "You are a clinical decision support AI. Analyze the following patient report:\n"
81
+ "1. Summarize the patient's medical history.\n"
82
+ "2. Identify risks or red flags (including mental health and suicide risk).\n"
83
+ "3. Highlight missed diagnoses or treatments.\n"
84
+ "4. Suggest next clinical steps.\n"
85
+ f"\nPatient Report ({file_type}):\n{'-'*40}\n{report_content[:10000]}"
86
+ )
87
 
88
+ raw_response = agent.chat(
89
+ message=prompt,
90
+ history=[],
91
+ temperature=0.7,
92
+ max_new_tokens=1024
93
+ )
94
+ structured_response = structure_medical_response(raw_response)
95
 
96
+ # Detect suicide risk
97
+ risk_level, risk_score, risk_factors = detect_suicide_risk(raw_response)
98
+ suicide_risk = {
99
+ "level": risk_level.value,
100
+ "score": risk_score,
101
+ "factors": risk_factors
102
+ }
103
 
104
+ # Store analysis
105
+ analysis_doc = {
106
+ "identifier": identifier,
107
+ "patient_id": patient_id,
108
+ "timestamp": datetime.utcnow(),
109
+ "summary": structured_response,
110
+ "suicide_risk": suicide_risk,
111
+ "raw": raw_response,
112
+ "report_hash": report_hash,
113
+ "file_type": file_type
114
+ }
115
 
116
+ await analysis_collection.update_one(
117
+ {"identifier": identifier, "report_hash": report_hash},
118
+ {"$set": analysis_doc},
119
+ upsert=True
120
+ )
121
 
122
+ # Create alert if risk detected
123
+ if patient_id and risk_level in [RiskLevel.MODERATE, RiskLevel.HIGH, RiskLevel.SEVERE]:
124
+ await create_alert(patient_id, suicide_risk)
125
+
126
+ logger.info(f"✅ Stored analysis for identifier {identifier}")
127
+ return analysis_doc
128
 
129
+ except Exception as e:
130
+ logger.error(f"Error analyzing report for {identifier}: {str(e)}")
131
+ error_alert = {
132
+ "identifier": identifier,
133
+ "type": "system_error",
134
+ "level": "high",
135
+ "message": f"Report analysis failed: {str(e)}",
136
+ "timestamp": datetime.utcnow(),
137
+ "acknowledged": False,
138
+ "notification": {
139
+ "type": NotificationType.SYSTEM,
140
+ "status": NotificationStatus.UNREAD,
141
+ "title": "Report Analysis Error",
142
+ "message": f"Failed to analyze report for {'patient ' + patient_id if patient_id else 'unknown identifier'}",
143
+ "icon": "❌",
144
+ "action_url": "/system/errors",
145
+ "priority": "high"
146
+ }
147
+ }
148
+ await alerts_collection.insert_one(error_alert)
149
+ raise
150
 
151
  async def analyze_patient(patient: dict):
152
+ """Analyze complete patient record and create alerts for risks"""
153
  try:
154
  serialized = serialize_patient(patient)
155
  patient_id = serialized.get("fhir_id")
156
  patient_hash = compute_patient_data_hash(serialized)
157
  logger.info(f"🧾 Analyzing patient: {patient_id}")
158
 
159
+ # Check for existing analysis
160
  existing_analysis = await analysis_collection.find_one({"patient_id": patient_id})
161
  if existing_analysis and existing_analysis.get("data_hash") == patient_hash:
162
  logger.info(f"✅ No changes in patient data for {patient_id}, skipping analysis")
163
  return
164
 
165
+ # Generate analysis
166
  doc = json.dumps(serialized, indent=2)
167
  message = (
168
  "You are a clinical decision support AI.\n\n"
 
177
  raw = agent.chat(message=message, history=[], temperature=0.7, max_new_tokens=1024)
178
  structured = structure_medical_response(raw)
179
 
180
+ # Detect suicide risk
181
  risk_level, risk_score, risk_factors = detect_suicide_risk(raw)
182
  suicide_risk = {
183
  "level": risk_level.value,
 
185
  "factors": risk_factors
186
  }
187
 
188
+ # Store analysis
189
  analysis_doc = {
190
  "identifier": patient_id,
191
  "patient_id": patient_id,
 
202
  upsert=True
203
  )
204
 
205
+ # Create alert if risk detected
206
  if risk_level in [RiskLevel.MODERATE, RiskLevel.HIGH, RiskLevel.SEVERE]:
207
  await create_alert(patient_id, suicide_risk)
208
 
209
  logger.info(f"✅ Stored analysis for patient {patient_id}")
210
 
211
  except Exception as e:
212
+ logger.error(f"Error analyzing patient: {str(e)}")
213
+ error_alert = {
214
+ "patient_id": patient_id if 'patient_id' in locals() else "unknown",
215
+ "type": "system_error",
216
+ "level": "high",
217
+ "message": f"Patient analysis failed: {str(e)}",
218
+ "timestamp": datetime.utcnow(),
219
+ "acknowledged": False,
220
+ "notification": {
221
+ "type": NotificationType.SYSTEM,
222
+ "status": NotificationStatus.UNREAD,
223
+ "title": "Analysis Error",
224
+ "message": f"Failed to analyze patient {patient_id if 'patient_id' in locals() else 'unknown'}",
225
+ "icon": "❌",
226
+ "action_url": "/system/errors",
227
+ "priority": "high"
228
+ }
229
+ }
230
+ await alerts_collection.insert_one(error_alert)
231
+ raise
232
 
233
  def detect_suicide_risk(text: str) -> Tuple[RiskLevel, float, List[str]]:
234
+ """Detect suicide risk level from text analysis"""
235
  suicide_keywords = [
236
  'suicide', 'suicidal', 'kill myself', 'end my life',
237
  'want to die', 'self-harm', 'self harm', 'hopeless',
 
241
  if not explicit_mentions:
242
  return RiskLevel.NONE, 0.0, []
243
 
 
 
 
 
 
 
 
 
244
  try:
245
+ # Get AI assessment
246
+ assessment_prompt = (
247
+ "Assess the suicide risk level based on this text. "
248
+ "Consider frequency, specificity, and severity of statements. "
249
+ "Respond with JSON format: {\"risk_level\": \"low/moderate/high/severe\", "
250
+ "\"risk_score\": 0-1, \"factors\": [\"list of risk factors\"]}\n\n"
251
+ f"Text to assess:\n{text}"
252
+ )
253
+
254
  response = agent.chat(
255
  message=assessment_prompt,
256
  history=[],
257
  temperature=0.2,
258
  max_new_tokens=256
259
  )
260
+
261
+ # Parse response
262
  json_match = re.search(r'\{.*\}', response, re.DOTALL)
263
  if json_match:
264
  assessment = json.loads(json_match.group())
 
270
  except Exception as e:
271
  logger.error(f"Error in suicide risk assessment: {e}")
272
 
273
+ # Fallback heuristic if AI assessment fails
274
  risk_score = min(0.1 * len(explicit_mentions), 0.9)
275
  if risk_score > 0.7:
276
  return RiskLevel.HIGH, risk_score, explicit_mentions