CPS-Test-Mobile / app.py
Ali2206's picture
Update app.py
0a3f912 verified
raw
history blame
22.2 kB
import sys
import os
import pandas as pd
import pdfplumber
import json
import gradio as gr
from typing import List, Dict, Generator, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import hashlib
import shutil
import re
import psutil
import subprocess
import logging
import torch
import gc
from diskcache import Cache
from transformers import AutoTokenizer
from pathlib import Path
# ==================== CONFIGURATION ====================
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Directory Setup
BASE_DIR = Path("/data/hf_cache")
DIRECTORIES = {
"models": BASE_DIR / "txagent_models",
"tools": BASE_DIR / "tool_cache",
"cache": BASE_DIR / "cache",
"reports": BASE_DIR / "reports",
"vllm": BASE_DIR / "vllm_cache"
}
for dir_path in DIRECTORIES.values():
dir_path.mkdir(parents=True, exist_ok=True)
# Environment Configuration
os.environ.update({
"HF_HOME": str(DIRECTORIES["models"]),
"TRANSFORMERS_CACHE": str(DIRECTORIES["models"]),
"VLLM_CACHE_DIR": str(DIRECTORIES["vllm"]),
"TOKENIZERS_PARALLELISM": "false",
"CUDA_LAUNCH_BLOCKING": "1"
})
current_dir = os.path.dirname(os.path.abspath(__file__))
src_path = os.path.abspath(os.path.join(current_dir, "src"))
sys.path.insert(0, src_path)
from txagent.txagent import TxAgent
# ==================== CORE COMPONENTS ====================
class FileProcessor:
"""Handles all file processing operations"""
@staticmethod
def extract_pdf_content(file_path: str) -> str:
"""Extract text from PDF with parallel processing"""
try:
with pdfplumber.open(file_path) as pdf:
total_pages = len(pdf.pages)
if not total_pages:
return ""
def process_batch(start: int, end: int) -> List[tuple]:
results = []
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages[start:end]:
page_num = start + pdf.pages.index(page)
text = page.extract_text() or ""
results.append((page_num, f"=== Page {page_num + 1} ===\n{text.strip()}"))
return results
batch_size = min(10, total_pages)
batches = [(i, min(i + batch_size, total_pages)) for i in range(0, total_pages, batch_size)]
text_chunks = [""] * total_pages
with ThreadPoolExecutor(max_workers=min(6, os.cpu_count() or 4)) as executor:
futures = [executor.submit(process_batch, start, end) for start, end in batches]
for future in as_completed(futures):
for page_num, text in future.result():
text_chunks[page_num] = text
return "\n\n".join(filter(None, text_chunks))
except Exception as e:
logger.error(f"PDF extraction failed: {e}")
return f"PDF processing error: {str(e)}"
@staticmethod
def process_tabular_data(file_path: str, file_type: str) -> List[Dict]:
"""Process Excel or CSV files"""
try:
if file_type == "csv":
chunks = pd.read_csv(
file_path,
header=None,
dtype=str,
encoding_errors='replace',
on_bad_lines='skip',
chunksize=10000
)
df = pd.concat(chunks) if chunks else pd.DataFrame()
else: # Excel
try:
df = pd.read_excel(file_path, engine='openpyxl', header=None, dtype=str)
except:
df = pd.read_excel(file_path, engine='xlrd', header=None, dtype=str)
return [{
"filename": os.path.basename(file_path),
"rows": df.where(pd.notnull(df), "").astype(str).values.tolist(),
"type": file_type
}]
except Exception as e:
logger.error(f"{file_type.upper()} processing failed: {e}")
return [{"error": f"{file_type.upper()} processing error: {str(e)}"}]
@classmethod
def handle_upload(cls, file_path: str, file_type: str) -> List[Dict]:
"""Route file processing based on type"""
processor_map = {
"pdf": cls.extract_pdf_content,
"xls": lambda x: cls.process_tabular_data(x, "excel"),
"xlsx": lambda x: cls.process_tabular_data(x, "excel"),
"csv": lambda x: cls.process_tabular_data(x, "csv")
}
if file_type not in processor_map:
return [{"error": f"Unsupported file type: {file_type}"}]
try:
result = processor_map[file_type](file_path)
if file_type == "pdf":
return [{
"filename": os.path.basename(file_path),
"content": result,
"type": "pdf"
}]
return result
except Exception as e:
logger.error(f"File processing failed: {e}")
return [{"error": f"File processing error: {str(e)}"}]
class TextAnalyzer:
"""Handles text processing and analysis"""
def __init__(self):
self.tokenizer = AutoTokenizer.from_pretrained("mims-harvard/TxAgent-T1-Llama-3.1-8B")
self.cache = Cache(DIRECTORIES["cache"], size_limit=10*1024**3)
def chunk_content(self, text: str, max_tokens: int = 1800) -> List[str]:
"""Split text into token-limited chunks"""
tokens = self.tokenizer.encode(text)
return [
self.tokenizer.decode(tokens[i:i+max_tokens])
for i in range(0, len(tokens), max_tokens)
]
def clean_output(self, text: str) -> str:
"""Clean and format model response"""
text = text.encode("utf-8", "ignore").decode("utf-8")
text = re.sub(
r"\[.*?\]|\bNone\b|To analyze the patient record excerpt.*?medications\."
r"|Since the previous attempts.*?\.|I need to.*?medications\."
r"|Retrieving tools.*?\.", "", text, flags=re.DOTALL
)
diagnoses = []
in_section = False
for line in text.splitlines():
line = line.strip()
if not line:
continue
if re.match(r"###\s*Missed Diagnoses", line):
in_section = True
continue
if re.match(r"###\s*(Medication Conflicts|Incomplete Assessments|Urgent Follow-up)", line):
in_section = False
continue
if in_section and re.match(r"-\s*.+", line):
diagnosis = re.sub(r"^\-\s*", "", line).strip()
if diagnosis and not re.match(r"No issues identified", diagnosis, re.IGNORECASE):
diagnoses.append(diagnosis)
return " ".join(diagnoses) if diagnoses else ""
def generate_summary(self, analysis: str) -> str:
"""Create concise clinical summary"""
findings = []
for chunk in analysis.split("--- Analysis for Chunk"):
chunk = chunk.strip()
if not chunk or "No oversights identified" in chunk:
continue
in_section = False
for line in chunk.splitlines():
line = line.strip()
if not line:
continue
if re.match(r"###\s*Missed Diagnoses", line):
in_section = True
continue
if re.match(r"###\s*(Medication Conflicts|Incomplete Assessments|Urgent Follow-up)", line):
in_section = False
continue
if in_section and re.match(r"-\s*.+", line):
finding = re.sub(r"^\-\s*", "", line).strip()
if finding and not re.match(r"No issues identified", finding, re.IGNORECASE):
findings.append(finding)
unique_findings = list(dict.fromkeys(findings))
if not unique_findings:
return "No clinical concerns identified in the provided records."
if len(unique_findings) > 1:
summary = "Potential concerns include: " + ", ".join(unique_findings[:-1])
summary += f", and {unique_findings[-1]}"
else:
summary = "Potential concern identified: " + unique_findings[0]
return summary + ". Recommend urgent clinical review."
class ClinicalAgent:
"""Main application controller"""
def __init__(self):
self.agent = self._init_agent()
self.file_processor = FileProcessor()
self.text_analyzer = TextAnalyzer()
def _init_agent(self) -> Any:
"""Initialize the AI agent"""
logger.info("Initializing clinical agent...")
self._log_system_status("pre-init")
tool_path = DIRECTORIES["tools"] / "new_tool.json"
if not tool_path.exists():
default_tools = Path("data/new_tool.json")
if default_tools.exists():
shutil.copy(default_tools, tool_path)
agent = TxAgent(
model_name="mims-harvard/TxAgent-T1-Llama-3.1-8B",
rag_model_name="mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B",
tool_files_dict={"new_tool": str(tool_path)},
force_finish=True,
enable_checker=False,
step_rag_num=4,
seed=100,
additional_default_tools=[],
)
agent.init_model()
self._log_system_status("post-init")
logger.info("Clinical agent ready")
return agent
def _log_system_status(self, phase: str) -> None:
"""Log system resource utilization"""
try:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory()
logger.info(f"[{phase}] CPU: {cpu:.1f}% | RAM: {mem.used//(1024**2)}MB/{mem.total//(1024**2)}MB")
gpu_info = subprocess.run(
["nvidia-smi", "--query-gpu=memory.used,memory.total,utilization.gpu",
"--format=csv,nounits,noheader"],
capture_output=True, text=True
)
if gpu_info.returncode == 0:
used, total, util = gpu_info.stdout.strip().split(", ")
logger.info(f"[{phase}] GPU: {used}MB/{total}MB | Util: {util}%")
except Exception as e:
logger.error(f"Resource monitoring failed: {e}")
def process_stream(self, prompt: str, history: List[Dict]) -> Generator[Dict, None, None]:
"""Stream the agent's responses"""
full_response = ""
for chunk in self.agent.run_gradio_chat(prompt, [], 0.2, 512, 2048, False, []):
if not chunk:
continue
if isinstance(chunk, list):
for msg in chunk:
if hasattr(msg, 'content') and msg.content:
cleaned = self.text_analyzer.clean_output(msg.content)
if cleaned:
full_response += cleaned + " "
yield {"role": "assistant", "content": full_response}
elif isinstance(chunk, str) and chunk.strip():
cleaned = self.text_analyzer.clean_output(chunk)
if cleaned:
full_response += cleaned + " "
yield {"role": "assistant", "content": full_response}
def analyze_records(self, message: str, history: List[Dict], files: List) -> Generator[Dict[str, Any], None, None]:
"""Main analysis workflow"""
outputs = {
"chatbot": history.copy(),
"download_output": None,
"final_summary": "",
"progress": {"value": "Initializing...", "visible": True}
}
yield outputs
try:
# Add user message
history.append({"role": "user", "content": message})
outputs["chatbot"] = history
yield outputs
# Process files
extracted = []
file_hash = ""
if files:
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for f in files:
file_type = Path(f.name).suffix[1:].lower()
futures.append(executor.submit(
self.file_processor.handle_upload,
f.name,
file_type
))
for i, future in enumerate(as_completed(futures), 1):
try:
extracted.extend(future.result())
outputs["progress"] = self._format_progress(i, len(files), "Processing files")
yield outputs
except Exception as e:
logger.error(f"File processing failed: {e}")
extracted.append({"error": str(e)})
if files and os.path.exists(files[0].name):
file_hash = hashlib.md5(open(files[0].name, "rb").read()).hexdigest()
history.append({"role": "assistant", "content": "✅ Files processed successfully"})
outputs.update({
"chatbot": history,
"progress": self._format_progress(len(files), len(files), "Files processed")
})
yield outputs
# Analyze content
text_content = "\n".join(json.dumps(item) for item in extracted)
chunks = self.text_analyzer.chunk_content(text_content)
full_analysis = ""
for idx, chunk in enumerate(chunks, 1):
prompt = f"""
Analyze this clinical documentation for potential missed diagnoses. Provide:
1. Specific clinical findings with references (e.g., "Elevated BP (160/95) on page 3")
2. Their clinical significance
3. Urgency of review
Use concise, continuous prose without bullet points. If no concerns, state "No missed diagnoses identified."
Document Excerpt (Part {idx}/{len(chunks)}):
{chunk[:1750]}
"""
history.append({"role": "assistant", "content": ""})
outputs.update({
"chatbot": history,
"progress": self._format_progress(idx, len(chunks), "Analyzing")
})
yield outputs
# Stream analysis
chunk_response = ""
for update in self.process_stream(prompt, history):
history[-1] = update
chunk_response = update["content"]
outputs.update({
"chatbot": history,
"progress": self._format_progress(idx, len(chunks), "Analyzing")
})
yield outputs
full_analysis += f"--- Analysis Part {idx} ---\n{chunk_response}\n"
torch.cuda.empty_cache()
gc.collect()
# Final outputs
summary = self.text_analyzer.generate_summary(full_analysis)
report_path = DIRECTORIES["reports"] / f"{file_hash}_report.txt" if file_hash else None
if report_path:
with open(report_path, "w", encoding="utf-8") as f:
f.write(full_analysis + "\n\nSUMMARY:\n" + summary)
outputs.update({
"download_output": str(report_path) if report_path and report_path.exists() else None,
"final_summary": summary,
"progress": {"visible": False}
})
yield outputs
except Exception as e:
logger.error(f"Analysis failed: {e}")
history.append({"role": "assistant", "content": f"❌ Analysis error: {str(e)}"})
outputs.update({
"chatbot": history,
"final_summary": f"Error: {str(e)}",
"progress": {"visible": False}
})
yield outputs
def _format_progress(self, current: int, total: int, stage: str = "") -> Dict[str, Any]:
"""Format progress update for UI"""
status = f"{stage} - {current}/{total}" if stage else f"{current}/{total}"
return {"value": status, "visible": True, "label": f"Progress: {status}"}
def create_interface(self) -> gr.Blocks:
"""Build the Gradio interface"""
with gr.Blocks(
theme=gr.themes.Soft(
primary_hue="indigo",
secondary_hue="blue",
neutral_hue="slate"
),
title="Clinical Oversight Assistant",
css="""
.summary-panel {
border-left: 4px solid #4f46e5;
padding: 16px;
background: #f8fafc;
border-radius: 8px;
margin-bottom: 16px;
}
.upload-area {
border: 2px dashed #cbd5e1;
border-radius: 8px;
padding: 24px;
margin: 12px 0;
}
.chat-container {
border-radius: 8px;
border: 1px solid #e2e8f0;
}
"""
) as app:
# Header
gr.Markdown("""
<div style='text-align: center; margin-bottom: 24px;'>
<h1 style='color: #4f46e5; margin-bottom: 8px;'>🩺 Clinical Oversight Assistant</h1>
<p style='color: #64748b;'>
AI-powered analysis for identifying potential missed diagnoses in patient records
</p>
</div>
""")
with gr.Row(equal_height=False):
# Main Chat Panel
with gr.Column(scale=3):
gr.Markdown("**Clinical Analysis Conversation**")
chatbot = gr.Chatbot(
label="",
height=650,
show_copy_button=True,
avatar_images=(
"assets/user.png",
"assets/assistant.png"
) if Path("assets/user.png").exists() else None,
bubble_full_width=False,
type="messages",
elem_classes=["chat-container"]
)
# Results Panel
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("**Clinical Summary**")
final_summary = gr.Markdown(
"Analysis results will appear here...",
elem_classes=["summary-panel"]
)
with gr.Group():
gr.Markdown("**Report Export**")
download_output = gr.File(
label="Download Full Analysis",
visible=False,
interactive=False
)
# Input Section
with gr.Row():
file_upload = gr.File(
file_types=[".pdf", ".csv", ".xls", ".xlsx"],
file_count="multiple",
label="Upload Patient Records",
elem_classes=["upload-area"]
)
with gr.Row():
user_input = gr.Textbox(
placeholder="Enter your clinical query or analysis request...",
show_label=False,
container=False,
scale=7,
autofocus=True
)
submit_btn = gr.Button(
"Analyze",
variant="primary",
scale=1,
min_width=120
)
# Hidden progress tracker
progress_tracker = gr.Textbox(
label="Analysis Progress",
visible=False,
interactive=False
)
# Event handlers
submit_btn.click(
self.analyze_records,
inputs=[user_input, chatbot, file_upload],
outputs=[chatbot, download_output, final_summary, progress_tracker],
show_progress="hidden"
)
user_input.submit(
self.analyze_records,
inputs=[user_input, chatbot, file_upload],
outputs=[chatbot, download_output, final_summary, progress_tracker],
show_progress="hidden"
)
app.load(
lambda: [[], None, "", "", None, {"visible": False}],
outputs=[chatbot, download_output, final_summary, user_input, file_upload, progress_tracker],
queue=False
)
return app
# ==================== APPLICATION ENTRY POINT ====================
if __name__ == "__main__":
try:
logger.info("Launching Clinical Oversight Assistant...")
clinical_app = ClinicalAgent()
interface = clinical_app.create_interface()
interface.queue(
api_open=False,
max_size=20
).launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
allowed_paths=[str(DIRECTORIES["reports"])],
share=False
)
except Exception as e:
logger.error(f"Application failed to start: {e}")
raise
finally:
if torch.distributed.is_initialized():
torch.distributed.destroy_process_group()