CPS-Test-Mobile / app.py
Ali2206's picture
Update app.py
176dbe1 verified
raw
history blame
8.93 kB
import os
import pandas as pd
import pdfplumber
import re
import gradio as gr
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
import hashlib
# Persistent directories
persistent_dir = "/data/hf_cache"
os.makedirs(persistent_dir, exist_ok=True)
file_cache_dir = os.path.join(persistent_dir, "cache")
report_dir = os.path.join(persistent_dir, "reports")
for directory in [file_cache_dir, report_dir]:
os.makedirs(directory, exist_ok=True)
# Medical keywords for PDF extraction
MEDICAL_KEYWORDS = {
'diagnosis', 'assessment', 'plan', 'results', 'medications',
'allergies', 'summary', 'impression', 'findings', 'recommendations'
}
def sanitize_utf8(text: str) -> str:
"""Sanitize text to handle UTF-8 encoding issues."""
return text.encode("utf-8", "ignore").decode("utf-8")
def file_hash(path: str) -> str:
"""Generate MD5 hash of a file."""
with open(path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def extract_priority_pages(file_path: str) -> str:
"""Extract text from PDF pages, prioritizing those with medical keywords."""
try:
text_chunks = []
with pdfplumber.open(file_path) as pdf:
for i, page in enumerate(pdf.pages):
page_text = page.extract_text() or ""
if i < 3 or any(re.search(rf'\b{kw}\b', page_text.lower()) for kw in MEDICAL_KEYWORDS):
text_chunks.append(f"=== Page {i+1} ===\n{page_text.strip()}")
return "\n\n".join(text_chunks)
except Exception as e:
return f"PDF processing error: {str(e)}"
def convert_file_to_text(file_path: str, file_type: str) -> str:
"""Convert supported file types to text, caching results."""
try:
h = file_hash(file_path)
cache_path = os.path.join(file_cache_dir, f"{h}.txt")
if os.path.exists(cache_path):
with open(cache_path, "r", encoding="utf-8") as f:
return f.read()
if file_type == "pdf":
text = extract_priority_pages(file_path)
elif file_type == "csv":
df = pd.read_csv(file_path, encoding_errors="replace", header=None, dtype=str,
skip_blank_lines=False, on_bad_lines="skip")
text = "\n".join(df.fillna("").astype(str).agg(" ".join, axis=1))
elif file_type in ["xls", "xlsx"]:
try:
df = pd.read_excel(file_path, engine="openpyxl", header=None, dtype=str)
except Exception:
df = pd.read_excel(file_path, engine="xlrd", header=None, dtype=str)
text = "\n".join(df.fillna("").astype(str).agg(" ".join, axis=1))
else:
text = f"Unsupported file type: {file_type}"
with open(cache_path, "w", encoding="utf-8") as f:
f.write(text)
return text
except Exception as e:
return f"Error processing {os.path.basename(file_path)}: {str(e)}"
def parse_analysis_response(raw_response: str) -> Dict[str, List[str]]:
"""Parse raw analysis response into structured sections."""
sections = {
"Missed Diagnoses": [],
"Medication Conflicts": [],
"Incomplete Assessments": [],
"Urgent Follow-up": []
}
current_section = None
lines = raw_response.split("\n")
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith("Missed Diagnoses"):
current_section = "Missed Diagnoses"
elif line.startswith("Medication Conflicts"):
current_section = "Medication Conflicts"
elif line.startswith("Incomplete Assessments"):
current_section = "Incomplete Assessments"
elif line.startswith("Urgent Follow-up"):
current_section = "Urgent Follow-up"
elif current_section and line.startswith("-"):
sections[current_section].append(line)
return sections
def analyze_medical_records(extracted_text: str) -> str:
"""Analyze medical records for clinical oversights and return structured response."""
# Placeholder for dynamic analysis (replace with actual model or rule-based logic)
# Example response to demonstrate flexibility with varying content
raw_response = """
Missed Diagnoses:
- Undiagnosed hypertension despite elevated BP readings.
- Family history of diabetes not evaluated for prediabetes risk.
Medication Conflicts:
- Concurrent use of SSRIs and NSAIDs detected, increasing risk of gastrointestinal bleeding.
- Beta-blocker prescribed without assessing asthma history, risking bronchospasm.
Incomplete Assessments:
- No cardiac stress test despite reported chest pain.
- Social history lacks documentation of substance use or living conditions.
Urgent Follow-up:
- Abnormal ECG results require immediate cardiology referral.
- Elevated liver enzymes not addressed, needing hepatology consultation.
"""
# Parse the raw response into sections
parsed = parse_analysis_response(raw_response)
# Format the response
response = ["### Clinical Oversight Analysis\n"]
has_findings = False
for section, items in parsed.items():
response.append(f"#### {section}")
if items:
response.extend(items)
has_findings = True
else:
response.append("- None identified.")
response.append("") # Add newline for readability
response.append("### Summary")
if has_findings:
summary = "The analysis identified potential oversights in diagnosis, medication management, assessments, and follow-up needs. Immediate action is recommended to address critical findings and ensure comprehensive patient care."
else:
summary = "No significant clinical oversights were identified in the provided records. Continue monitoring and ensure complete documentation."
response.append(summary)
return "\n".join(response)
def create_ui():
"""Create Gradio UI for clinical oversight analysis."""
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>")
chatbot = gr.Chatbot(label="Analysis", height=600, type="messages")
file_upload = gr.File(file_types=[".pdf", ".csv", ".xls", ".xlsx"], file_count="multiple")
msg_input = gr.Textbox(placeholder="Ask about potential oversights...", show_label=False)
send_btn = gr.Button("Analyze", variant="primary")
download_output = gr.File(label="Download Full Report")
def analyze(message: str, history: List[dict], files: List):
"""Handle analysis of medical records and update UI."""
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": "⏳ Analyzing records for potential oversights..."})
yield history, None
extracted_text = ""
file_hash_value = ""
if files:
with ThreadPoolExecutor(max_workers=6) as executor:
futures = [executor.submit(convert_file_to_text, f.name, f.name.split(".")[-1].lower()) for f in files]
extracted_text = "\n".join(sanitize_utf8(f.result()) for f in as_completed(futures))
file_hash_value = file_hash(files[0].name) if files else ""
# Analyze extracted text
history.pop() # Remove "Analyzing..." message
try:
response = analyze_medical_records(extracted_text)
history.append({"role": "assistant", "content": response})
# Generate report file
report_path = os.path.join(report_dir, f"{file_hash_value}_report.txt") if file_hash_value else None
if report_path:
with open(report_path, "w", encoding="utf-8") as f:
f.write(response)
yield history, report_path if report_path and os.path.exists(report_path) else None
except Exception as e:
history.append({"role": "assistant", "content": f"❌ Error occurred: {str(e)}"})
yield history, None
send_btn.click(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
msg_input.submit(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
return demo
if __name__ == "__main__":
print("🚀 Launching app...")
try:
demo = create_ui()
demo.queue(api_open=False).launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
allowed_paths=[report_dir],
share=False
)
except Exception as e:
print(f"Failed to launch app: {str(e)}")