CPS-Test-Mobile / app.py
Ali2206's picture
Update app.py
4d00da5 verified
raw
history blame
7.96 kB
import os
import pandas as pd
import pdfplumber
import re
import gradio as gr
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor
import hashlib
# Persistent directories
persistent_dir = "/data/hf_cache"
os.makedirs(persistent_dir, exist_ok=True)
file_cache_dir = os.path.join(persistent_dir, "cache")
report_dir = os.path.join(persistent_dir, "reports")
for directory in [file_cache_dir, report_dir]:
os.makedirs(directory, exist_ok=True)
def sanitize_utf8(text: str) -> str:
"""Sanitize text to handle UTF-8 encoding issues."""
return text.encode("utf-8", "ignore").decode("utf-8")
def file_hash(path: str) -> str:
"""Generate MD5 hash of a file."""
with open(path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def extract_all_pages(file_path: str) -> str:
"""Extract text from all pages of a PDF."""
try:
text_chunks = []
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text() or ""
text_chunks.append(page_text.strip())
return "\n".join(text_chunks)
except Exception:
return ""
def convert_file_to_text(file_path: str, file_type: str) -> str:
"""Convert supported file types to text, caching results."""
try:
h = file_hash(file_path)
cache_path = os.path.join(file_cache_dir, f"{h}.txt")
if os.path.exists(cache_path):
with open(cache_path, "r", encoding="utf-8") as f:
return f.read()
if file_type == "pdf":
text = extract_all_pages(file_path)
elif file_type == "csv":
df = pd.read_csv(file_path, encoding_errors="replace", header=None, dtype=str,
skip_blank_lines=True, on_bad_lines="skip")
text = " ".join(df.fillna("").astype(str).agg(" ".join, axis=1))
elif file_type in ["xls", "xlsx"]:
df = pd.read_excel(file_path, engine="openpyxl", header=None, dtype=str)
text = " ".join(df.fillna("").astype(str).agg(" ".join, axis=1))
else:
text = ""
if text:
with open(cache_path, "w", encoding="utf-8") as f:
f.write(text)
return text
except Exception:
return ""
def parse_analysis_response(raw_response: str) -> Dict[str, List[str]]:
"""Parse raw analysis response into structured sections using regex."""
sections = {
"Missed Diagnoses": [],
"Medication Conflicts": [],
"Incomplete Assessments": [],
"Urgent Follow-up": []
}
current_section = None
section_pattern = re.compile(r"^(Missed Diagnoses|Medication Conflicts|Incomplete Assessments|Urgent Follow-up):$", re.MULTILINE)
item_pattern = re.compile(r"^- .+$", re.MULTILINE)
for line in raw_response.splitlines():
line = line.strip()
if not line:
continue
if section_pattern.match(line):
current_section = line[:-1]
elif current_section and item_pattern.match(line):
sections[current_section].append(line)
return sections
def analyze_medical_records(extracted_text: str) -> str:
"""Analyze medical records and return structured response."""
# Split text into chunks to handle large inputs
chunk_size = 10000
chunks = [extracted_text[i:i + chunk_size] for i in range(0, len(extracted_text), chunk_size)]
# Placeholder for analysis (replace with model or rule-based logic)
raw_response_template = """
Missed Diagnoses:
- Undiagnosed hypertension despite elevated BP readings.
- Family history of diabetes not evaluated for prediabetes risk.
Medication Conflicts:
- SSRIs and NSAIDs detected, increasing GI bleeding risk.
Incomplete Assessments:
- No cardiac stress test despite chest pain.
Urgent Follow-up:
- Abnormal ECG requires cardiology referral.
"""
# Aggregate findings across chunks
all_sections = {
"Missed Diagnoses": set(),
"Medication Conflicts": set(),
"Incomplete Assessments": set(),
"Urgent Follow-up": set()
}
for chunk_idx, chunk in enumerate(chunks, 1):
# Simulate analysis per chunk (replace with real logic)
raw_response = raw_response_template
parsed = parse_analysis_response(raw_response)
for section, items in parsed.items():
all_sections[section].update(items)
# Format final response
response = ["### Clinical Oversight Analysis\n"]
has_findings = False
for section, items in all_sections.items():
response.append(f"#### {section}")
if items:
response.extend(sorted(items))
has_findings = True
else:
response.append("- None identified.")
response.append("")
response.append("### Summary")
summary = ("The analysis identified potential oversights in diagnosis, medication management, "
"assessments, and follow-up needs. Immediate action is recommended.") if has_findings else \
"No significant oversights identified. Continue monitoring."
response.append(summary)
return "\n".join(response)
def create_ui():
"""Create Gradio UI for clinical oversight analysis."""
def analyze(message: str, history: List[dict], files: List):
"""Handle analysis and return results."""
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": "⏳ Analyzing..."})
yield history, None
extracted_text = ""
file_hash_value = ""
if files:
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(convert_file_to_text, f.name, f.name.split(".")[-1].lower()) for f in files]
results = [f.result() for f in futures]
extracted_text = "\n".join(sanitize_utf8(r) for r in results if r)
file_hash_value = file_hash(files[0].name) if files else ""
history.pop() # Remove "Analyzing..."
report_path = os.path.join(report_dir, f"{file_hash_value}_report.txt") if file_hash_value else None
try:
response = analyze_medical_records(extracted_text)
history.append({"role": "assistant", "content": response})
if report_path:
with open(report_path, "w", encoding="utf-8") as f:
f.write(response)
yield history, report_path if report_path and os.path.exists(report_path) else None
except Exception as e:
history.append({"role": "assistant", "content": f"❌ Error: {str(e)}"})
yield history, None
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>")
chatbot = gr.Chatbot(label="Analysis", height=600, type="messages")
file_upload = gr.File(file_types=[".pdf", ".csv", ".xls", ".xlsx"], file_count="multiple")
msg_input = gr.Textbox(placeholder="Ask about potential oversights...", show_label=False)
send_btn = gr.Button("Analyze", variant="primary")
download_output = gr.File(label="Download Report")
send_btn.click(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
msg_input.submit(analyze, inputs=[msg_input, gr.State([]), file_upload], outputs=[chatbot, download_output])
return demo
if __name__ == "__main__":
print("🚀 Launching app...")
try:
demo = create_ui()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
allowed_paths=[report_dir],
share=False
)
except Exception as e:
print(f"Failed to launch app: {str(e)}")