|
import re |
|
import hashlib |
|
import io |
|
import json |
|
from datetime import datetime |
|
from typing import Dict, List, Tuple |
|
from bson import ObjectId |
|
import logging |
|
from config import logger |
|
|
|
def clean_text_response(text: str) -> str: |
|
text = re.sub(r'\n\s*\n', '\n\n', text) |
|
text = re.sub(r'[ ]+', ' ', text) |
|
return text.replace("**", "").replace("__", "").strip() |
|
|
|
def extract_section(text: str, heading: str) -> str: |
|
try: |
|
pattern = rf"{re.escape(heading)}:\s*\n(.*?)(?=\n[A-Z][^\n]*:|\Z)" |
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) |
|
return match.group(1).strip() if match else "" |
|
except Exception as e: |
|
logger.error(f"Section extraction failed for heading '{heading}': {e}") |
|
return "" |
|
|
|
def structure_medical_response(text: str) -> Dict: |
|
def extract_improved(text: str, heading: str) -> str: |
|
patterns = [ |
|
rf"{re.escape(heading)}:\s*\n(.*?)(?=\n\s*\n|\Z)", |
|
rf"\*\*{re.escape(heading)}\*\*:\s*\n(.*?)(?=\n\s*\n|\Z)", |
|
rf"{re.escape(heading)}[\s\-]+(.*?)(?=\n\s*\n|\Z)", |
|
rf"\n{re.escape(heading)}\s*\n(.*?)(?=\n\s*\n|\Z)" |
|
] |
|
for pattern in patterns: |
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) |
|
if match: |
|
content = match.group(1).strip() |
|
content = re.sub(r'^\s*[\-\*]\s*', '', content, flags=re.MULTILINE) |
|
return content |
|
return "" |
|
|
|
text = text.replace('**', '').replace('__', '') |
|
return { |
|
"summary": extract_improved(text, "Summary of Patient's Medical History") or |
|
extract_improved(text, "Summarize the patient's medical history"), |
|
"risks": extract_improved(text, "Identify Risks or Red Flags") or |
|
extract_improved(text, "Risks or Red Flags"), |
|
"missed_issues": extract_improved(text, "Missed Diagnoses or Treatments") or |
|
extract_improved(text, "What the doctor might have missed"), |
|
"recommendations": extract_improved(text, "Suggest Next Clinical Steps") or |
|
extract_improved(text, "Suggested Clinical Actions") |
|
} |
|
|
|
def serialize_patient(patient: dict) -> dict: |
|
patient_copy = patient.copy() |
|
if "_id" in patient_copy: |
|
patient_copy["_id"] = str(patient_copy["_id"]) |
|
return patient_copy |
|
|
|
def compute_patient_data_hash(data: dict) -> str: |
|
serialized = json.dumps(data, sort_keys=True) |
|
return hashlib.sha256(serialized.encode()).hexdigest() |
|
|
|
def compute_file_content_hash(file_content: bytes) -> str: |
|
return hashlib.sha256(file_content).hexdigest() |