CPS-Test-Mobile / app.py
Ali2206's picture
Update app.py
fe67870 verified
raw
history blame
12.5 kB
import sys
import os
import pandas as pd
import pdfplumber
import json
import gradio as gr
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
import hashlib
import shutil
import re
import psutil
import subprocess
import traceback
import torch
import copy
import time
# Configure environment variables and logging
os.environ["VLLM_LOGGING_LEVEL"] = "DEBUG"
if not torch.cuda.is_available():
print("No GPU detected. Forcing CPU mode by setting CUDA_VISIBLE_DEVICES to an empty string.")
os.environ["CUDA_VISIBLE_DEVICES"] = ""
persistent_dir = "/data/hf_cache"
os.makedirs(persistent_dir, exist_ok=True)
model_cache_dir = os.path.join(persistent_dir, "txagent_models")
tool_cache_dir = os.path.join(persistent_dir, "tool_cache")
file_cache_dir = os.path.join(persistent_dir, "cache")
report_dir = os.path.join(persistent_dir, "reports")
vllm_cache_dir = os.path.join(persistent_dir, "vllm_cache")
for directory in [model_cache_dir, tool_cache_dir, file_cache_dir, report_dir, vllm_cache_dir]:
os.makedirs(directory, exist_ok=True)
os.environ["HF_HOME"] = model_cache_dir
os.environ["TRANSFORMERS_CACHE"] = model_cache_dir
os.environ["VLLM_CACHE_DIR"] = vllm_cache_dir
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
current_dir = os.path.dirname(os.path.abspath(__file__))
src_path = os.path.abspath(os.path.join(current_dir, "src"))
sys.path.insert(0, src_path)
from txagent.txagent import TxAgent
MEDICAL_KEYWORDS = {'diagnosis', 'assessment', 'plan', 'results', 'medications',
'allergies', 'summary', 'impression', 'findings', 'recommendations'}
def sanitize_utf8(text: str) -> str:
return text.encode("utf-8", "ignore").decode("utf-8")
def file_hash(path: str) -> str:
with open(path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def extract_priority_pages(file_path: str, max_pages: int = 20) -> str:
try:
text_chunks = []
with pdfplumber.open(file_path) as pdf:
# Always extract the first 3 pages
for i, page in enumerate(pdf.pages[:3]):
text = page.extract_text() or ""
text_chunks.append(f"=== Page {i+1} ===\n{text.strip()}")
# For pages 4 to max_pages, add only if medical keywords are found
for i, page in enumerate(pdf.pages[3:max_pages], start=4):
page_text = page.extract_text() or ""
if any(re.search(rf'\b{kw}\b', page_text.lower()) for kw in MEDICAL_KEYWORDS):
text_chunks.append(f"=== Page {i} ===\n{page_text.strip()}")
return "\n\n".join(text_chunks)
except Exception as e:
print("PDF processing error:", str(e))
traceback.print_exc()
return str(e)
def convert_file_to_json(file_path: str, file_type: str) -> str:
try:
h = file_hash(file_path)
cache_path = os.path.join(file_cache_dir, f"{h}.json")
if os.path.exists(cache_path):
with open(cache_path, "r", encoding="utf-8") as f:
return f.read()
if file_type == "pdf":
text = extract_priority_pages(file_path)
result = json.dumps({"filename": os.path.basename(file_path), "content": text, "status": "initial"})
elif file_type == "csv":
df = pd.read_csv(file_path, encoding_errors="replace", header=None, dtype=str,
skip_blank_lines=False, on_bad_lines="skip")
content = df.fillna("").astype(str).values.tolist()
result = json.dumps({"filename": os.path.basename(file_path), "rows": content})
elif file_type in ["xls", "xlsx"]:
try:
df = pd.read_excel(file_path, engine="openpyxl", header=None, dtype=str)
except Exception:
df = pd.read_excel(file_path, engine="xlrd", header=None, dtype=str)
content = df.fillna("").astype(str).values.tolist()
result = json.dumps({"filename": os.path.basename(file_path), "rows": content})
else:
result = json.dumps({"error": f"Unsupported file type: {file_type}"})
with open(cache_path, "w", encoding="utf-8") as f:
f.write(result)
return result
except Exception as e:
print("Error processing", file_path, str(e))
traceback.print_exc()
return json.dumps({"error": str(e)})
def log_system_usage(tag=""):
try:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory()
print(f"[{tag}] CPU: {cpu}% | RAM: {mem.used // (1024**2)}MB / {mem.total // (1024**2)}MB")
result = subprocess.run(
["nvidia-smi", "--query-gpu=memory.used,memory.total,utilization.gpu", "--format=csv,nounits,noheader"],
capture_output=True, text=True
)
if result.returncode == 0:
used, total, util = result.stdout.strip().split(", ")
print(f"[{tag}] GPU: {used}MB / {total}MB | Utilization: {util}%")
except Exception as e:
print(f"[{tag}] GPU/CPU monitor failed: {e}")
traceback.print_exc()
def init_agent():
try:
print("πŸ” Initializing model...")
log_system_usage("Before Load")
default_tool_path = os.path.abspath("data/new_tool.json")
target_tool_path = os.path.join(tool_cache_dir, "new_tool.json")
if not os.path.exists(target_tool_path):
shutil.copy(default_tool_path, target_tool_path)
agent = TxAgent(
model_name="mims-harvard/TxAgent-T1-Llama-3.1-8B",
rag_model_name="mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B",
tool_files_dict={"new_tool": target_tool_path},
force_finish=True,
enable_checker=True,
step_rag_num=8,
seed=100,
additional_default_tools=[],
)
agent.init_model()
log_system_usage("After Load")
print("βœ… Agent Ready")
return agent
except Exception as e:
print("❌ Error initializing agent:", str(e))
traceback.print_exc()
raise e
def create_ui(agent):
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>")
# Persistent conversation state to maintain history
conversation_state = gr.State([])
chatbot = gr.Chatbot(label="Analysis", height=600, type="messages")
file_upload = gr.File(file_types=[".pdf", ".csv", ".xls", ".xlsx"], file_count="multiple")
msg_input = gr.Textbox(placeholder="Ask about potential oversights...", show_label=False)
send_btn = gr.Button("Analyze", variant="primary")
download_output = gr.File(label="Download Full Report")
def analyze(message: str, state: list, files: list):
if state is None:
state = []
history = state
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": "⏳ Analyzing records for potential oversights..."})
# Yield the initial update
yield copy.deepcopy(history), None, copy.deepcopy(history)
extracted = ""
file_hash_value = ""
if files:
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(convert_file_to_json, f.name, f.name.split(".")[-1].lower()) for f in files]
results = []
for future in as_completed(futures):
try:
res = future.result()
results.append(sanitize_utf8(res))
except Exception as e:
print("❌ Error in file processing:", str(e))
traceback.print_exc()
extracted = "\n".join(results)
file_hash_value = file_hash(files[0].name)
prompt = f"""Review these medical records and identify EXACTLY what might have been missed:
1. List potential missed diagnoses
2. Flag any medication conflicts
3. Note incomplete assessments
4. Highlight abnormal results needing follow-up
Medical Records:
{extracted[:8000]}
### Potential Oversights:
"""
print("πŸ”Ž Generated prompt:")
print(prompt)
full_response = ""
response_chunks = []
tool_calls_rendered = []
try:
for chunk in agent.run_gradio_chat(
message=prompt,
history=[],
temperature=0.2,
max_new_tokens=2048,
max_token=4096,
call_agent=False,
conversation=[]
):
if chunk is None:
continue
chunk_content = chunk if isinstance(chunk, str) else getattr(chunk, 'content', '')
if not chunk_content:
continue
response_chunks.append(chunk_content)
full_response = "".join(response_chunks)
# Collect and render any tool calls
matches = re.findall(r"\[TOOL_CALLS\]\[(.*?)\]", chunk_content, re.DOTALL)
for m in matches:
tool_calls_rendered.append(f"\nπŸ“¦ Tool Call: [{m.strip()}]")
display_response = re.sub(r"\[TOOL_CALLS\].*?\n*", "", full_response, flags=re.DOTALL)
display_response = display_response.replace('[TxAgent]', '').strip()
display_response += "\n\n" + "\n".join(tool_calls_rendered)
if history and history[-1]["role"] == "assistant":
history[-1]["content"] = display_response
else:
history.append({"role": "assistant", "content": display_response})
# Yield updated conversation state
yield copy.deepcopy(history), None, copy.deepcopy(history)
full_response = re.sub(r"\[TOOL_CALLS\].*?\n*", "", full_response, flags=re.DOTALL).strip()
full_response = full_response.replace('[TxAgent]', '').strip()
report_path = None
if file_hash_value:
report_path = os.path.join(report_dir, f"{file_hash_value}_report.txt")
with open(report_path, "w", encoding="utf-8") as f:
f.write(full_response)
if history and history[-1]["role"] == "assistant":
history[-1]["content"] = full_response
else:
history.append({"role": "assistant", "content": full_response})
yield copy.deepcopy(history), report_path if report_path and os.path.exists(report_path) else None, copy.deepcopy(history)
except Exception as e:
history.append({"role": "assistant", "content": f"❌ An error occurred in analyze: {str(e)}"})
traceback.print_exc()
yield copy.deepcopy(history), None, copy.deepcopy(history)
send_btn.click(analyze, inputs=[msg_input, conversation_state, file_upload],
outputs=[chatbot, download_output, conversation_state])
msg_input.submit(analyze, inputs=[msg_input, conversation_state, file_upload],
outputs=[chatbot, download_output, conversation_state])
return demo
# Global variable to hold the WSGI/ASGI app for container environments.
app = None
if __name__ == "__main__":
try:
print("πŸš€ Launching app...")
agent = init_agent()
demo = create_ui(agent)
# Launch the app with queueing; capture the returned app instance.
launched_app, local_url, share_url = demo.queue(api_open=False).launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
allowed_paths=[report_dir],
share=False,
ssr=False # Disable SSR to improve UI updates
)
# Assign the underlying web app to the global variable for container access.
app = launched_app.app
except Exception as e:
print("❌ Fatal error during launch:", str(e))
traceback.print_exc()