File size: 9,637 Bytes
f394b25
 
 
e4d9325
f394b25
e4d9325
 
f394b25
e4d9325
 
 
 
 
 
f394b25
e4d9325
a71a831
 
 
 
e4d9325
499e72e
a71a831
 
e4d9325
a71a831
 
 
e4d9325
a71a831
499e72e
828effe
e4d9325
 
a71a831
02a4d5e
a71a831
e4d9325
02a4d5e
e4d9325
 
 
 
d88209d
e4d9325
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a71a831
02a4d5e
e4d9325
02a4d5e
12ddaba
 
 
a71a831
e4d9325
d88209d
870dc53
e4d9325
 
499e72e
e4d9325
 
499e72e
e4d9325
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2416301
870dc53
 
 
 
 
 
e4d9325
870dc53
 
 
a71a831
55e3db0
f394b25
02a4d5e
e4d9325
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import os
import pandas as pd
import pdfplumber
import re
import gradio as gr
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor
import hashlib
import multiprocessing
from functools import partial
import logging

# Suppress pdfplumber CropBox warnings
logging.getLogger("pdfplumber").setLevel(logging.ERROR)

# Persistent directories
persistent_dir = "/data/hf_cache"
os.makedirs(persistent_dir, exist_ok=True)
file_cache_dir = os.path.join(persistent_dir, "cache")
report_dir = os.path.join(persistent_dir, "reports")
for directory in [file_cache_dir, report_dir]:
    os.makedirs(directory, exist_ok=True)

def sanitize_utf8(text: str) -> str:
    """Sanitize text to handle UTF-8 encoding issues."""
    return text.encode("utf-8", "ignore").decode("utf-8")

def file_hash(path: str) -> str:
    """Generate MD5 hash of a file."""
    with open(path, "rb") as f:
        return hashlib.md5(f.read()).hexdigest()

def extract_page_range(file_path: str, start_page: int, end_page: int) -> str:
    """Extract text from a range of PDF pages."""
    try:
        text_chunks = []
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages[start_page:end_page]:
                page_text = page.extract_text() or ""
                text_chunks.append(page_text.strip())
        return "\n".join(text_chunks)
    except Exception:
        return ""

def extract_all_pages(file_path: str) -> str:
    """Extract text from all pages of a PDF using parallel processing."""
    try:
        with pdfplumber.open(file_path) as pdf:
            total_pages = len(pdf.pages)
        
        if total_pages == 0:
            return ""
        
        # Use 4 processes (adjust based on CPU cores)
        num_processes = min(4, multiprocessing.cpu_count())
        pages_per_process = max(1, total_pages // num_processes)
        
        # Create page ranges for parallel processing
        ranges = [(i * pages_per_process, min((i + 1) * pages_per_process, total_pages))
                  for i in range(num_processes)]
        if ranges[-1][1] != total_pages:
            ranges[-1] = (ranges[-1][0], total_pages)
        
        # Process page ranges in parallel
        with multiprocessing.Pool(processes=num_processes) as pool:
            extract_func = partial(extract_page_range, file_path)
            results = pool.starmap(extract_func, ranges)
        
        return "\n".join(filter(None, results))
    except Exception:
        return ""

def convert_file_to_text(file_path: str, file_type: str) -> str:
    """Convert supported file types to text, caching results."""
    try:
        h = file_hash(file_path)
        cache_path = os.path.join(file_cache_dir, f"{h}.txt")
        if os.path.exists(cache_path):
            with open(cache_path, "r", encoding="utf-8") as f:
                return f.read()

        if file_type == "pdf":
            text = extract_all_pages(file_path)
        elif file_type == "csv":
            df = pd.read_csv(file_path, encoding_errors="replace", header=None, dtype=str,
                             skip_blank_lines=True, on_bad_lines="skip")
            text = " ".join(df.fillna("").astype(str).agg(" ".join, axis=1))
        elif file_type in ["xls", "xlsx"]:
            df = pd.read_excel(file_path, engine="openpyxl", header=None, dtype=str)
            text = " ".join(df.fillna("").astype(str).agg(" ".join, axis=1))
        else:
            text = ""

        if text:
            # Compress text by removing redundant whitespace
            text = re.sub(r'\s+', ' ', text).strip()
            with open(cache_path, "w", encoding="utf-8") as f:
                f.write(text)
        return text
    except Exception:
        return ""

def parse_analysis_response(raw_response: str) -> Dict[str, List[str]]:
    """Parse raw analysis response into structured sections using regex."""
    sections = {
        "Missed Diagnoses": [],
        "Medication Conflicts": [],
        "Incomplete Assessments": [],
        "Urgent Follow-up": []
    }
    current_section = None
    section_pattern = re.compile(r"^(Missed Diagnoses|Medication Conflicts|Incomplete Assessments|Urgent Follow-up):$", re.MULTILINE)
    item_pattern = re.compile(r"^- .+$", re.MULTILINE)

    for line in raw_response.splitlines():
        line = line.strip()
        if not line:
            continue
        if section_pattern.match(line):
            current_section = line[:-1]
        elif current_section and item_pattern.match(line):
            sections[current_section].append(line)
    
    return sections

def analyze_medical_records(extracted_text: str) -> str:
    """Analyze medical records and return structured response."""
    # Split text into chunks to handle large inputs
    chunk_size = 10000
    chunks = [extracted_text[i:i + chunk_size] for i in range(0, len(extracted_text), chunk_size)]
    
    # Placeholder for analysis (replace with model or rule-based logic)
    raw_response_template = """
    Missed Diagnoses:
    - Undiagnosed hypertension despite elevated BP readings.
    - Family history of diabetes not evaluated for prediabetes risk.
    
    Medication Conflicts:
    - SSRIs and NSAIDs detected, increasing GI bleeding risk.
    
    Incomplete Assessments:
    - No cardiac stress test despite chest pain.
    
    Urgent Follow-up:
    - Abnormal ECG requires cardiology referral.
    """
    
    # Aggregate findings across chunks
    all_sections = {
        "Missed Diagnoses": set(),
        "Medication Conflicts": set(),
        "Incomplete Assessments": set(),
        "Urgent Follow-up": set()
    }
    
    for chunk_idx, chunk in enumerate(chunks, 1):
        # Simulate analysis per chunk (replace with real logic)
        raw_response = raw_response_template
        parsed = parse_analysis_response(raw_response)
        for section, items in parsed.items():
            all_sections[section].update(items)
    
    # Format final response
    response = ["### Clinical Oversight Analysis\n"]
    has_findings = False
    for section, items in all_sections.items():
        response.append(f"#### {section}")
        if items:
            response.extend(sorted(items))
            has_findings = True
        else:
            response.append("- None identified.")
        response.append("")
    
    response.append("### Summary")
    summary = ("The analysis identified potential oversights in diagnosis, medication management, "
               "assessments, and follow-up needs. Immediate action is recommended.") if has_findings else \
              "No significant oversights identified. Continue monitoring."
    response.append(summary)
    
    return "\n".join(response)

def create_ui():
    """Create Gradio UI for clinical oversight analysis."""
    def analyze(message: str, history: List[dict], files: List):
        """Handle analysis and return results."""
        history.append({"role": "user", "content": message})
        history.append({"role": "assistant", "content": "⏳ Extracting text from files..."})
        yield history, None

        extracted_text = ""
        file_hash_value = ""
        if files:
            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = [executor.submit(convert_file_to_text, f.name, f.name.split(".")[-1].lower()) for f in files]
                results = [f.result() for f in futures]
                extracted_text = "\n".join(sanitize_utf8(r) for r in results if r)
                file_hash_value = file_hash(files[0].name) if files else ""

        history.pop()  # Remove "Extracting..."
        history.append({"role": "assistant", "content": "⏳ Analyzing medical records..."})
        yield history, None

        report_path = os.path.join(report_dir, f"{file_hash_value}_report.txt") if file_hash_value else None

        try:
            response = analyze_medical_records(extracted_text)
            history.pop()  # Remove "Analyzing..."
            history.append({"role": "assistant", "content": response})
            if report_path:
                with open(report_path, "w", encoding="utf-8") as f:
                    f.write(response)
            yield history, report_path if report_path and os.path.exists(report_path) else None
        except Exception as e:
            history.pop()  # Remove "Analyzing..."
            history.append({"role": "assistant", "content": f"❌ Error: {str(e)}"})
            yield history, None

    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>")
        chatbot = gr.Chatbot(label="Analysis", height=600, type="messages")
        file_upload = gr.File(file_types=[".pdf", ".csv", ".xls", ".xlsx"], file_count="multiple")
        msg_input = gr.Textbox(placeholder="Ask about potential oversights...", show_label=False)
        send_btn = gr.Button("Analyze", variant="primary")
        download_output = gr.File(label="Download Report")

        send_btn.click(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
        msg_input.submit(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
    return demo

if __name__ == "__main__":
    print("🚀 Launching app...")
    try:
        demo = create_ui()
        demo.launch(
            server_name="0.0.0.0",
            server_port=7860,
            show_error=True,
            allowed_paths=[report_dir],
            share=False
        )
    except Exception as e:
        print(f"Failed to launch app: {str(e)}")