File size: 7,960 Bytes
a6968c2
c9b3ae0
a6968c2
176dbe1
973658c
176dbe1
4d00da5
a6968c2
 
176dbe1
c9b3ae0
a6968c2
 
 
176dbe1
a6968c2
 
 
176dbe1
41eb6bd
a6968c2
 
176dbe1
41eb6bd
 
a6968c2
3683afe
 
a6968c2
 
 
3683afe
41eb6bd
3683afe
 
 
 
a6968c2
4d00da5
176dbe1
a6968c2
 
176dbe1
a6968c2
41eb6bd
 
c9b3ae0
41eb6bd
3683afe
c9b3ae0
 
3683afe
 
c9b3ae0
3683afe
 
41eb6bd
3683afe
a6968c2
3683afe
 
 
176dbe1
3683afe
 
176dbe1
 
3683afe
176dbe1
 
 
 
 
 
 
3683afe
 
176dbe1
3683afe
176dbe1
 
 
3683afe
 
 
176dbe1
 
 
 
4d00da5
 
3683afe
 
 
 
 
 
176dbe1
 
 
 
 
3683afe
176dbe1
 
3683afe
176dbe1
 
3683afe
176dbe1
 
3683afe
 
 
 
 
 
 
 
 
 
4d00da5
3683afe
 
 
176dbe1
4d00da5
176dbe1
 
3683afe
176dbe1
 
3683afe
176dbe1
 
 
3683afe
176dbe1
 
3683afe
 
 
176dbe1
 
4d00da5
176dbe1
4d00da5
176dbe1
4d00da5
 
3683afe
 
 
 
 
 
 
4d00da5
 
 
 
 
3683afe
 
 
 
 
4d00da5
 
3683afe
 
4d00da5
3683afe
 
 
 
 
41eb6bd
 
 
c9b3ae0
41eb6bd
 
3683afe
41eb6bd
 
 
a6968c2
fe67870
e24be23
818eb65
176dbe1
4d00da5
 
176dbe1
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
import pandas as pd
import pdfplumber
import re
import gradio as gr
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor
import hashlib

# Persistent directories
persistent_dir = "/data/hf_cache"
os.makedirs(persistent_dir, exist_ok=True)
file_cache_dir = os.path.join(persistent_dir, "cache")
report_dir = os.path.join(persistent_dir, "reports")
for directory in [file_cache_dir, report_dir]:
    os.makedirs(directory, exist_ok=True)

def sanitize_utf8(text: str) -> str:
    """Sanitize text to handle UTF-8 encoding issues."""
    return text.encode("utf-8", "ignore").decode("utf-8")

def file_hash(path: str) -> str:
    """Generate MD5 hash of a file."""
    with open(path, "rb") as f:
        return hashlib.md5(f.read()).hexdigest()

def extract_all_pages(file_path: str) -> str:
    """Extract text from all pages of a PDF."""
    try:
        text_chunks = []
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                page_text = page.extract_text() or ""
                text_chunks.append(page_text.strip())
        return "\n".join(text_chunks)
    except Exception:
        return ""

def convert_file_to_text(file_path: str, file_type: str) -> str:
    """Convert supported file types to text, caching results."""
    try:
        h = file_hash(file_path)
        cache_path = os.path.join(file_cache_dir, f"{h}.txt")
        if os.path.exists(cache_path):
            with open(cache_path, "r", encoding="utf-8") as f:
                return f.read()

        if file_type == "pdf":
            text = extract_all_pages(file_path)
        elif file_type == "csv":
            df = pd.read_csv(file_path, encoding_errors="replace", header=None, dtype=str,
                             skip_blank_lines=True, on_bad_lines="skip")
            text = " ".join(df.fillna("").astype(str).agg(" ".join, axis=1))
        elif file_type in ["xls", "xlsx"]:
            df = pd.read_excel(file_path, engine="openpyxl", header=None, dtype=str)
            text = " ".join(df.fillna("").astype(str).agg(" ".join, axis=1))
        else:
            text = ""

        if text:
            with open(cache_path, "w", encoding="utf-8") as f:
                f.write(text)
        return text
    except Exception:
        return ""

def parse_analysis_response(raw_response: str) -> Dict[str, List[str]]:
    """Parse raw analysis response into structured sections using regex."""
    sections = {
        "Missed Diagnoses": [],
        "Medication Conflicts": [],
        "Incomplete Assessments": [],
        "Urgent Follow-up": []
    }
    current_section = None
    section_pattern = re.compile(r"^(Missed Diagnoses|Medication Conflicts|Incomplete Assessments|Urgent Follow-up):$", re.MULTILINE)
    item_pattern = re.compile(r"^- .+$", re.MULTILINE)

    for line in raw_response.splitlines():
        line = line.strip()
        if not line:
            continue
        if section_pattern.match(line):
            current_section = line[:-1]
        elif current_section and item_pattern.match(line):
            sections[current_section].append(line)
    
    return sections

def analyze_medical_records(extracted_text: str) -> str:
    """Analyze medical records and return structured response."""
    # Split text into chunks to handle large inputs
    chunk_size = 10000
    chunks = [extracted_text[i:i + chunk_size] for i in range(0, len(extracted_text), chunk_size)]
    
    # Placeholder for analysis (replace with model or rule-based logic)
    raw_response_template = """
    Missed Diagnoses:
    - Undiagnosed hypertension despite elevated BP readings.
    - Family history of diabetes not evaluated for prediabetes risk.
    
    Medication Conflicts:
    - SSRIs and NSAIDs detected, increasing GI bleeding risk.
    
    Incomplete Assessments:
    - No cardiac stress test despite chest pain.
    
    Urgent Follow-up:
    - Abnormal ECG requires cardiology referral.
    """
    
    # Aggregate findings across chunks
    all_sections = {
        "Missed Diagnoses": set(),
        "Medication Conflicts": set(),
        "Incomplete Assessments": set(),
        "Urgent Follow-up": set()
    }
    
    for chunk_idx, chunk in enumerate(chunks, 1):
        # Simulate analysis per chunk (replace with real logic)
        raw_response = raw_response_template
        parsed = parse_analysis_response(raw_response)
        for section, items in parsed.items():
            all_sections[section].update(items)
    
    # Format final response
    response = ["### Clinical Oversight Analysis\n"]
    has_findings = False
    for section, items in all_sections.items():
        response.append(f"#### {section}")
        if items:
            response.extend(sorted(items))
            has_findings = True
        else:
            response.append("- None identified.")
        response.append("")
    
    response.append("### Summary")
    summary = ("The analysis identified potential oversights in diagnosis, medication management, "
               "assessments, and follow-up needs. Immediate action is recommended.") if has_findings else \
              "No significant oversights identified. Continue monitoring."
    response.append(summary)
    
    return "\n".join(response)

def create_ui():
    """Create Gradio UI for clinical oversight analysis."""
    def analyze(message: str, history: List[dict], files: List):
        """Handle analysis and return results."""
        history.append({"role": "user", "content": message})
        history.append({"role": "assistant", "content": "⏳ Analyzing..."})
        yield history, None

        extracted_text = ""
        file_hash_value = ""
        if files:
            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = [executor.submit(convert_file_to_text, f.name, f.name.split(".")[-1].lower()) for f in files]
                results = [f.result() for f in futures]
                extracted_text = "\n".join(sanitize_utf8(r) for r in results if r)
                file_hash_value = file_hash(files[0].name) if files else ""

        history.pop()  # Remove "Analyzing..."
        report_path = os.path.join(report_dir, f"{file_hash_value}_report.txt") if file_hash_value else None

        try:
            response = analyze_medical_records(extracted_text)
            history.append({"role": "assistant", "content": response})
            if report_path:
                with open(report_path, "w", encoding="utf-8") as f:
                    f.write(response)
            yield history, report_path if report_path and os.path.exists(report_path) else None
        except Exception as e:
            history.append({"role": "assistant", "content": f"❌ Error: {str(e)}"})
            yield history, None

    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>")
        chatbot = gr.Chatbot(label="Analysis", height=600, type="messages")
        file_upload = gr.File(file_types=[".pdf", ".csv", ".xls", ".xlsx"], file_count="multiple")
        msg_input = gr.Textbox(placeholder="Ask about potential oversights...", show_label=False)
        send_btn = gr.Button("Analyze", variant="primary")
        download_output = gr.File(label="Download Report")

        send_btn.click(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
        msg_input.submit(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
    return demo

if __name__ == "__main__":
    print("🚀 Launching app...")
    try:
        demo = create_ui()
        demo.launch(
            server_name="0.0.0.0",
            server_port=7860,
            show_error=True,
            allowed_paths=[report_dir],
            share=False
        )
    except Exception as e:
        print(f"Failed to launch app: {str(e)}")