Create utils.py
Browse files
utils.py
ADDED
@@ -0,0 +1,64 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import re
|
2 |
+
import hashlib
|
3 |
+
import io
|
4 |
+
import json
|
5 |
+
from datetime import datetime
|
6 |
+
from typing import Dict, List, Tuple
|
7 |
+
from bson import ObjectId
|
8 |
+
import logging
|
9 |
+
from config import logger
|
10 |
+
|
11 |
+
def clean_text_response(text: str) -> str:
|
12 |
+
text = re.sub(r'\n\s*\n', '\n\n', text)
|
13 |
+
text = re.sub(r'[ ]+', ' ', text)
|
14 |
+
return text.replace("**", "").replace("__", "").strip()
|
15 |
+
|
16 |
+
def extract_section(text: str, heading: str) -> str:
|
17 |
+
try:
|
18 |
+
pattern = rf"{re.escape(heading)}:\s*\n(.*?)(?=\n[A-Z][^\n]*:|\Z)"
|
19 |
+
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
|
20 |
+
return match.group(1).strip() if match else ""
|
21 |
+
except Exception as e:
|
22 |
+
logger.error(f"Section extraction failed for heading '{heading}': {e}")
|
23 |
+
return ""
|
24 |
+
|
25 |
+
def structure_medical_response(text: str) -> Dict:
|
26 |
+
def extract_improved(text: str, heading: str) -> str:
|
27 |
+
patterns = [
|
28 |
+
rf"{re.escape(heading)}:\s*\n(.*?)(?=\n\s*\n|\Z)",
|
29 |
+
rf"\*\*{re.escape(heading)}\*\*:\s*\n(.*?)(?=\n\s*\n|\Z)",
|
30 |
+
rf"{re.escape(heading)}[\s\-]+(.*?)(?=\n\s*\n|\Z)",
|
31 |
+
rf"\n{re.escape(heading)}\s*\n(.*?)(?=\n\s*\n|\Z)"
|
32 |
+
]
|
33 |
+
for pattern in patterns:
|
34 |
+
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
|
35 |
+
if match:
|
36 |
+
content = match.group(1).strip()
|
37 |
+
content = re.sub(r'^\s*[\-\*]\s*', '', content, flags=re.MULTILINE)
|
38 |
+
return content
|
39 |
+
return ""
|
40 |
+
|
41 |
+
text = text.replace('**', '').replace('__', '')
|
42 |
+
return {
|
43 |
+
"summary": extract_improved(text, "Summary of Patient's Medical History") or
|
44 |
+
extract_improved(text, "Summarize the patient's medical history"),
|
45 |
+
"risks": extract_improved(text, "Identify Risks or Red Flags") or
|
46 |
+
extract_improved(text, "Risks or Red Flags"),
|
47 |
+
"missed_issues": extract_improved(text, "Missed Diagnoses or Treatments") or
|
48 |
+
extract_improved(text, "What the doctor might have missed"),
|
49 |
+
"recommendations": extract_improved(text, "Suggest Next Clinical Steps") or
|
50 |
+
extract_improved(text, "Suggested Clinical Actions")
|
51 |
+
}
|
52 |
+
|
53 |
+
def serialize_patient(patient: dict) -> dict:
|
54 |
+
patient_copy = patient.copy()
|
55 |
+
if "_id" in patient_copy:
|
56 |
+
patient_copy["_id"] = str(patient_copy["_id"])
|
57 |
+
return patient_copy
|
58 |
+
|
59 |
+
def compute_patient_data_hash(data: dict) -> str:
|
60 |
+
serialized = json.dumps(data, sort_keys=True)
|
61 |
+
return hashlib.sha256(serialized.encode()).hexdigest()
|
62 |
+
|
63 |
+
def compute_file_content_hash(file_content: bytes) -> str:
|
64 |
+
return hashlib.sha256(file_content).hexdigest()
|