File size: 8,929 Bytes
a6968c2
c9b3ae0
a6968c2
176dbe1
973658c
176dbe1
a6968c2
 
 
176dbe1
c9b3ae0
a6968c2
 
 
176dbe1
a6968c2
 
176dbe1
 
 
 
 
a6968c2
 
176dbe1
41eb6bd
a6968c2
 
176dbe1
41eb6bd
 
a6968c2
c9b3ae0
176dbe1
a6968c2
 
 
c9b3ae0
41eb6bd
c9b3ae0
 
 
a6968c2
3dfd69d
a6968c2
176dbe1
 
a6968c2
 
176dbe1
a6968c2
41eb6bd
 
c9b3ae0
41eb6bd
c9b3ae0
 
 
 
176dbe1
c9b3ae0
 
 
 
 
176dbe1
41eb6bd
176dbe1
a6968c2
176dbe1
 
 
a6968c2
176dbe1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41eb6bd
 
 
c9b3ae0
41eb6bd
 
c9b3ae0
41eb6bd
c3218a0
176dbe1
c3218a0
c9b3ae0
41eb6bd
 
176dbe1
41eb6bd
 
96347cc
176dbe1
 
818eb65
41eb6bd
176dbe1
 
c9b3ae0
176dbe1
 
26668b6
c9b3ae0
41eb6bd
c9b3ae0
26668b6
176dbe1
41eb6bd
 
c9b3ae0
41eb6bd
 
 
 
a6968c2
fe67870
e24be23
818eb65
176dbe1
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import os
import pandas as pd
import pdfplumber
import re
import gradio as gr
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
import hashlib

# Persistent directories
persistent_dir = "/data/hf_cache"
os.makedirs(persistent_dir, exist_ok=True)
file_cache_dir = os.path.join(persistent_dir, "cache")
report_dir = os.path.join(persistent_dir, "reports")
for directory in [file_cache_dir, report_dir]:
    os.makedirs(directory, exist_ok=True)

# Medical keywords for PDF extraction
MEDICAL_KEYWORDS = {
    'diagnosis', 'assessment', 'plan', 'results', 'medications',
    'allergies', 'summary', 'impression', 'findings', 'recommendations'
}

def sanitize_utf8(text: str) -> str:
    """Sanitize text to handle UTF-8 encoding issues."""
    return text.encode("utf-8", "ignore").decode("utf-8")

def file_hash(path: str) -> str:
    """Generate MD5 hash of a file."""
    with open(path, "rb") as f:
        return hashlib.md5(f.read()).hexdigest()

def extract_priority_pages(file_path: str) -> str:
    """Extract text from PDF pages, prioritizing those with medical keywords."""
    try:
        text_chunks = []
        with pdfplumber.open(file_path) as pdf:
            for i, page in enumerate(pdf.pages):
                page_text = page.extract_text() or ""
                if i < 3 or any(re.search(rf'\b{kw}\b', page_text.lower()) for kw in MEDICAL_KEYWORDS):
                    text_chunks.append(f"=== Page {i+1} ===\n{page_text.strip()}")
        return "\n\n".join(text_chunks)
    except Exception as e:
        return f"PDF processing error: {str(e)}"

def convert_file_to_text(file_path: str, file_type: str) -> str:
    """Convert supported file types to text, caching results."""
    try:
        h = file_hash(file_path)
        cache_path = os.path.join(file_cache_dir, f"{h}.txt")
        if os.path.exists(cache_path):
            with open(cache_path, "r", encoding="utf-8") as f:
                return f.read()

        if file_type == "pdf":
            text = extract_priority_pages(file_path)
        elif file_type == "csv":
            df = pd.read_csv(file_path, encoding_errors="replace", header=None, dtype=str,
                             skip_blank_lines=False, on_bad_lines="skip")
            text = "\n".join(df.fillna("").astype(str).agg(" ".join, axis=1))
        elif file_type in ["xls", "xlsx"]:
            try:
                df = pd.read_excel(file_path, engine="openpyxl", header=None, dtype=str)
            except Exception:
                df = pd.read_excel(file_path, engine="xlrd", header=None, dtype=str)
            text = "\n".join(df.fillna("").astype(str).agg(" ".join, axis=1))
        else:
            text = f"Unsupported file type: {file_type}"

        with open(cache_path, "w", encoding="utf-8") as f:
            f.write(text)
        return text
    except Exception as e:
        return f"Error processing {os.path.basename(file_path)}: {str(e)}"

def parse_analysis_response(raw_response: str) -> Dict[str, List[str]]:
    """Parse raw analysis response into structured sections."""
    sections = {
        "Missed Diagnoses": [],
        "Medication Conflicts": [],
        "Incomplete Assessments": [],
        "Urgent Follow-up": []
    }
    current_section = None
    lines = raw_response.split("\n")

    for line in lines:
        line = line.strip()
        if not line:
            continue
        if line.startswith("Missed Diagnoses"):
            current_section = "Missed Diagnoses"
        elif line.startswith("Medication Conflicts"):
            current_section = "Medication Conflicts"
        elif line.startswith("Incomplete Assessments"):
            current_section = "Incomplete Assessments"
        elif line.startswith("Urgent Follow-up"):
            current_section = "Urgent Follow-up"
        elif current_section and line.startswith("-"):
            sections[current_section].append(line)
    
    return sections

def analyze_medical_records(extracted_text: str) -> str:
    """Analyze medical records for clinical oversights and return structured response."""
    # Placeholder for dynamic analysis (replace with actual model or rule-based logic)
    # Example response to demonstrate flexibility with varying content
    raw_response = """
    Missed Diagnoses:
    - Undiagnosed hypertension despite elevated BP readings.
    - Family history of diabetes not evaluated for prediabetes risk.
    
    Medication Conflicts:
    - Concurrent use of SSRIs and NSAIDs detected, increasing risk of gastrointestinal bleeding.
    - Beta-blocker prescribed without assessing asthma history, risking bronchospasm.
    
    Incomplete Assessments:
    - No cardiac stress test despite reported chest pain.
    - Social history lacks documentation of substance use or living conditions.
    
    Urgent Follow-up:
    - Abnormal ECG results require immediate cardiology referral.
    - Elevated liver enzymes not addressed, needing hepatology consultation.
    """
    
    # Parse the raw response into sections
    parsed = parse_analysis_response(raw_response)
    
    # Format the response
    response = ["### Clinical Oversight Analysis\n"]
    has_findings = False
    for section, items in parsed.items():
        response.append(f"#### {section}")
        if items:
            response.extend(items)
            has_findings = True
        else:
            response.append("- None identified.")
        response.append("")  # Add newline for readability
    
    response.append("### Summary")
    if has_findings:
        summary = "The analysis identified potential oversights in diagnosis, medication management, assessments, and follow-up needs. Immediate action is recommended to address critical findings and ensure comprehensive patient care."
    else:
        summary = "No significant clinical oversights were identified in the provided records. Continue monitoring and ensure complete documentation."
    response.append(summary)
    
    return "\n".join(response)

def create_ui():
    """Create Gradio UI for clinical oversight analysis."""
    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>")
        chatbot = gr.Chatbot(label="Analysis", height=600, type="messages")
        file_upload = gr.File(file_types=[".pdf", ".csv", ".xls", ".xlsx"], file_count="multiple")
        msg_input = gr.Textbox(placeholder="Ask about potential oversights...", show_label=False)
        send_btn = gr.Button("Analyze", variant="primary")
        download_output = gr.File(label="Download Full Report")

        def analyze(message: str, history: List[dict], files: List):
            """Handle analysis of medical records and update UI."""
            history.append({"role": "user", "content": message})
            history.append({"role": "assistant", "content": "⏳ Analyzing records for potential oversights..."})
            yield history, None

            extracted_text = ""
            file_hash_value = ""
            if files:
                with ThreadPoolExecutor(max_workers=6) as executor:
                    futures = [executor.submit(convert_file_to_text, f.name, f.name.split(".")[-1].lower()) for f in files]
                    extracted_text = "\n".join(sanitize_utf8(f.result()) for f in as_completed(futures))
                    file_hash_value = file_hash(files[0].name) if files else ""

            # Analyze extracted text
            history.pop()  # Remove "Analyzing..." message
            try:
                response = analyze_medical_records(extracted_text)
                history.append({"role": "assistant", "content": response})

                # Generate report file
                report_path = os.path.join(report_dir, f"{file_hash_value}_report.txt") if file_hash_value else None
                if report_path:
                    with open(report_path, "w", encoding="utf-8") as f:
                        f.write(response)
                yield history, report_path if report_path and os.path.exists(report_path) else None
            except Exception as e:
                history.append({"role": "assistant", "content": f"❌ Error occurred: {str(e)}"})
                yield history, None

        send_btn.click(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
        msg_input.submit(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
    return demo

if __name__ == "__main__":
    print("🚀 Launching app...")
    try:
        demo = create_ui()
        demo.queue(api_open=False).launch(
            server_name="0.0.0.0",
            server_port=7860,
            show_error=True,
            allowed_paths=[report_dir],
            share=False
        )
    except Exception as e:
        print(f"Failed to launch app: {str(e)}")