|
import os |
|
import pandas as pd |
|
import pdfplumber |
|
import re |
|
import gradio as gr |
|
from typing import List, Dict |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
import hashlib |
|
|
|
|
|
persistent_dir = "/data/hf_cache" |
|
os.makedirs(persistent_dir, exist_ok=True) |
|
file_cache_dir = os.path.join(persistent_dir, "cache") |
|
report_dir = os.path.join(persistent_dir, "reports") |
|
for directory in [file_cache_dir, report_dir]: |
|
os.makedirs(directory, exist_ok=True) |
|
|
|
|
|
MEDICAL_KEYWORDS = { |
|
'diagnosis', 'assessment', 'plan', 'results', 'medications', |
|
'allergies', 'summary', 'impression', 'findings', 'recommendations' |
|
} |
|
|
|
def sanitize_utf8(text: str) -> str: |
|
"""Sanitize text to handle UTF-8 encoding issues.""" |
|
return text.encode("utf-8", "ignore").decode("utf-8") |
|
|
|
def file_hash(path: str) -> str: |
|
"""Generate MD5 hash of a file.""" |
|
with open(path, "rb") as f: |
|
return hashlib.md5(f.read()).hexdigest() |
|
|
|
def extract_priority_pages(file_path: str) -> str: |
|
"""Extract text from PDF pages, prioritizing those with medical keywords.""" |
|
try: |
|
text_chunks = [] |
|
with pdfplumber.open(file_path) as pdf: |
|
for i, page in enumerate(pdf.pages): |
|
page_text = page.extract_text() or "" |
|
if i < 3 or any(re.search(rf'\b{kw}\b', page_text.lower()) for kw in MEDICAL_KEYWORDS): |
|
text_chunks.append(f"=== Page {i+1} ===\n{page_text.strip()}") |
|
return "\n\n".join(text_chunks) |
|
except Exception as e: |
|
return f"PDF processing error: {str(e)}" |
|
|
|
def convert_file_to_text(file_path: str, file_type: str) -> str: |
|
"""Convert supported file types to text, caching results.""" |
|
try: |
|
h = file_hash(file_path) |
|
cache_path = os.path.join(file_cache_dir, f"{h}.txt") |
|
if os.path.exists(cache_path): |
|
with open(cache_path, "r", encoding="utf-8") as f: |
|
return f.read() |
|
|
|
if file_type == "pdf": |
|
text = extract_priority_pages(file_path) |
|
elif file_type == "csv": |
|
df = pd.read_csv(file_path, encoding_errors="replace", header=None, dtype=str, |
|
skip_blank_lines=False, on_bad_lines="skip") |
|
text = "\n".join(df.fillna("").astype(str).agg(" ".join, axis=1)) |
|
elif file_type in ["xls", "xlsx"]: |
|
try: |
|
df = pd.read_excel(file_path, engine="openpyxl", header=None, dtype=str) |
|
except Exception: |
|
df = pd.read_excel(file_path, engine="xlrd", header=None, dtype=str) |
|
text = "\n".join(df.fillna("").astype(str).agg(" ".join, axis=1)) |
|
else: |
|
text = f"Unsupported file type: {file_type}" |
|
|
|
with open(cache_path, "w", encoding="utf-8") as f: |
|
f.write(text) |
|
return text |
|
except Exception as e: |
|
return f"Error processing {os.path.basename(file_path)}: {str(e)}" |
|
|
|
def parse_analysis_response(raw_response: str) -> Dict[str, List[str]]: |
|
"""Parse raw analysis response into structured sections.""" |
|
sections = { |
|
"Missed Diagnoses": [], |
|
"Medication Conflicts": [], |
|
"Incomplete Assessments": [], |
|
"Urgent Follow-up": [] |
|
} |
|
current_section = None |
|
lines = raw_response.split("\n") |
|
|
|
for line in lines: |
|
line = line.strip() |
|
if not line: |
|
continue |
|
if line.startswith("Missed Diagnoses"): |
|
current_section = "Missed Diagnoses" |
|
elif line.startswith("Medication Conflicts"): |
|
current_section = "Medication Conflicts" |
|
elif line.startswith("Incomplete Assessments"): |
|
current_section = "Incomplete Assessments" |
|
elif line.startswith("Urgent Follow-up"): |
|
current_section = "Urgent Follow-up" |
|
elif current_section and line.startswith("-"): |
|
sections[current_section].append(line) |
|
|
|
return sections |
|
|
|
def analyze_medical_records(extracted_text: str) -> str: |
|
"""Analyze medical records for clinical oversights and return structured response.""" |
|
|
|
|
|
raw_response = """ |
|
Missed Diagnoses: |
|
- Undiagnosed hypertension despite elevated BP readings. |
|
- Family history of diabetes not evaluated for prediabetes risk. |
|
|
|
Medication Conflicts: |
|
- Concurrent use of SSRIs and NSAIDs detected, increasing risk of gastrointestinal bleeding. |
|
- Beta-blocker prescribed without assessing asthma history, risking bronchospasm. |
|
|
|
Incomplete Assessments: |
|
- No cardiac stress test despite reported chest pain. |
|
- Social history lacks documentation of substance use or living conditions. |
|
|
|
Urgent Follow-up: |
|
- Abnormal ECG results require immediate cardiology referral. |
|
- Elevated liver enzymes not addressed, needing hepatology consultation. |
|
""" |
|
|
|
|
|
parsed = parse_analysis_response(raw_response) |
|
|
|
|
|
response = ["### Clinical Oversight Analysis\n"] |
|
has_findings = False |
|
for section, items in parsed.items(): |
|
response.append(f"#### {section}") |
|
if items: |
|
response.extend(items) |
|
has_findings = True |
|
else: |
|
response.append("- None identified.") |
|
response.append("") |
|
|
|
response.append("### Summary") |
|
if has_findings: |
|
summary = "The analysis identified potential oversights in diagnosis, medication management, assessments, and follow-up needs. Immediate action is recommended to address critical findings and ensure comprehensive patient care." |
|
else: |
|
summary = "No significant clinical oversights were identified in the provided records. Continue monitoring and ensure complete documentation." |
|
response.append(summary) |
|
|
|
return "\n".join(response) |
|
|
|
def create_ui(): |
|
"""Create Gradio UI for clinical oversight analysis.""" |
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>") |
|
chatbot = gr.Chatbot(label="Analysis", height=600, type="messages") |
|
file_upload = gr.File(file_types=[".pdf", ".csv", ".xls", ".xlsx"], file_count="multiple") |
|
msg_input = gr.Textbox(placeholder="Ask about potential oversights...", show_label=False) |
|
send_btn = gr.Button("Analyze", variant="primary") |
|
download_output = gr.File(label="Download Full Report") |
|
|
|
def analyze(message: str, history: List[dict], files: List): |
|
"""Handle analysis of medical records and update UI.""" |
|
history.append({"role": "user", "content": message}) |
|
history.append({"role": "assistant", "content": "⏳ Analyzing records for potential oversights..."}) |
|
yield history, None |
|
|
|
extracted_text = "" |
|
file_hash_value = "" |
|
if files: |
|
with ThreadPoolExecutor(max_workers=6) as executor: |
|
futures = [executor.submit(convert_file_to_text, f.name, f.name.split(".")[-1].lower()) for f in files] |
|
extracted_text = "\n".join(sanitize_utf8(f.result()) for f in as_completed(futures)) |
|
file_hash_value = file_hash(files[0].name) if files else "" |
|
|
|
|
|
history.pop() |
|
try: |
|
response = analyze_medical_records(extracted_text) |
|
history.append({"role": "assistant", "content": response}) |
|
|
|
|
|
report_path = os.path.join(report_dir, f"{file_hash_value}_report.txt") if file_hash_value else None |
|
if report_path: |
|
with open(report_path, "w", encoding="utf-8") as f: |
|
f.write(response) |
|
yield history, report_path if report_path and os.path.exists(report_path) else None |
|
except Exception as e: |
|
history.append({"role": "assistant", "content": f"❌ Error occurred: {str(e)}"}) |
|
yield history, None |
|
|
|
send_btn.click(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output]) |
|
msg_input.submit(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output]) |
|
return demo |
|
|
|
if __name__ == "__main__": |
|
print("🚀 Launching app...") |
|
try: |
|
demo = create_ui() |
|
demo.queue(api_open=False).launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
show_error=True, |
|
allowed_paths=[report_dir], |
|
share=False |
|
) |
|
except Exception as e: |
|
print(f"Failed to launch app: {str(e)}") |