import sys import os import pandas as pd import json import gradio as gr from typing import List, Tuple import hashlib import shutil import re from datetime import datetime import time # Configuration and setup persistent_dir = "/data/hf_cache" os.makedirs(persistent_dir, exist_ok=True) model_cache_dir = os.path.join(persistent_dir, "txagent_models") tool_cache_dir = os.path.join(persistent_dir, "tool_cache") file_cache_dir = os.path.join(persistent_dir, "cache") report_dir = os.path.join(persistent_dir, "reports") for directory in [model_cache_dir, tool_cache_dir, file_cache_dir, report_dir]: os.makedirs(directory, exist_ok=True) os.environ["HF_HOME"] = model_cache_dir os.environ["TRANSFORMERS_CACHE"] = model_cache_dir current_dir = os.path.dirname(os.path.abspath(__file__)) src_path = os.path.abspath(os.path.join(current_dir, "src")) sys.path.insert(0, src_path) from txagent.txagent import TxAgent def file_hash(path: str) -> str: with open(path, "rb") as f: return hashlib.md5(f.read()).hexdigest() def clean_response(text: str) -> str: try: text = text.encode('utf-8', 'surrogatepass').decode('utf-8') except UnicodeError: text = text.encode('utf-8', 'replace').decode('utf-8') text = re.sub(r"\[.*?\]|\bNone\b", "", text, flags=re.DOTALL) text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r"[^\n#\-\*\w\s\.,:\(\)]+", "", text) return text.strip() def parse_excel_to_prompts(file_path: str) -> List[str]: try: xl = pd.ExcelFile(file_path) df = xl.parse(xl.sheet_names[0], header=0).fillna("") groups = df.groupby("Booking Number") prompts = [] for booking, group in groups: records = [] for _, row in group.iterrows(): record = f"- {row['Form Name']}: {row['Form Item']} = {row['Item Response']} ({row['Interview Date']} by {row['Interviewer']})\n{row['Description']}" records.append(clean_response(record)) record_text = "\n".join(records) prompt = f""" Patient Booking Number: {booking} Instructions: Analyze the following patient case for missed diagnoses, medication conflicts, incomplete assessments, and any urgent follow-up needed. Summarize under the markdown headings. Data: {record_text} ### Missed Diagnoses - ... ### Medication Conflicts - ... ### Incomplete Assessments - ... ### Urgent Follow-up - ... """ prompts.append(prompt) return prompts except Exception as e: raise ValueError(f"Error parsing Excel file: {str(e)}") def init_agent(): default_tool_path = os.path.abspath("data/new_tool.json") target_tool_path = os.path.join(tool_cache_dir, "new_tool.json") if not os.path.exists(target_tool_path): shutil.copy(default_tool_path, target_tool_path) agent = TxAgent( model_name="mims-harvard/TxAgent-T1-Llama-3.1-8B", rag_model_name="mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B", tool_files_dict={"new_tool": target_tool_path}, force_finish=True, enable_checker=True, step_rag_num=4, seed=100, additional_default_tools=[], ) agent.init_model() return agent def create_ui(agent): with gr.Blocks(theme=gr.themes.Soft(), title="Clinical Oversight Assistant") as demo: gr.Markdown("# 🏥 Clinical Oversight Assistant (Excel Optimized)") with gr.Tabs(): with gr.TabItem("Analysis"): with gr.Row(): # Left column - Inputs with gr.Column(scale=1): file_upload = gr.File( label="Upload Excel File", file_types=[".xlsx"], file_count="single", interactive=True ) msg_input = gr.Textbox( label="Additional Instructions", placeholder="Add any specific analysis requests...", lines=3 ) with gr.Row(): clear_btn = gr.Button("Clear", variant="secondary") send_btn = gr.Button("Analyze", variant="primary") # Right column - Outputs with gr.Column(scale=2): chatbot = gr.Chatbot( label="Analysis Results", height=600, bubble_full_width=False, show_copy_button=True ) download_output = gr.File( label="Download Full Report", interactive=False ) with gr.TabItem("Instructions"): gr.Markdown(""" ## How to Use This Tool 1. **Upload Excel File**: Select your patient records Excel file 2. **Add Instructions** (Optional): Provide any specific analysis requests 3. **Click Analyze**: The system will process each patient record 4. **Review Results**: Analysis appears in the chat window 5. **Download Report**: Get a full text report of all findings ### Excel File Requirements Your Excel file must contain these columns: - Booking Number - Form Name - Form Item - Item Response - Interview Date - Interviewer - Description ### Analysis Includes - Missed diagnoses - Medication conflicts - Incomplete assessments - Urgent follow-up needs """) def format_message(role: str, content: str) -> Tuple[str, str]: """Format messages for the chatbot in (user, bot) format""" if role == "user": return (content, None) else: return (None, content) def analyze(message: str, chat_history: List[Tuple[str, str]], file) -> Tuple[List[Tuple[str, str]], str]: if not file: raise gr.Error("Please upload an Excel file first") try: # Initialize chat history with user message new_history = chat_history + [format_message("user", message)] new_history.append(format_message("assistant", "⏳ Processing Excel data...")) yield new_history, None prompts = parse_excel_to_prompts(file.name) full_output = "" for idx, prompt in enumerate(prompts, 1): chunk_output = "" try: for result in agent.run_gradio_chat( message=prompt, history=[], temperature=0.2, max_new_tokens=1024, max_token=4096, call_agent=False, conversation=[], ): if isinstance(result, list): for r in result: if hasattr(r, 'content') and r.content: cleaned = clean_response(r.content) chunk_output += cleaned + "\n" elif isinstance(result, str): cleaned = clean_response(result) chunk_output += cleaned + "\n" if chunk_output: output = f"--- Booking {idx} ---\n{chunk_output.strip()}\n" new_history[-1] = format_message("assistant", output) yield new_history, None except Exception as e: error_msg = f"⚠️ Error processing booking {idx}: {str(e)}" new_history.append(format_message("assistant", error_msg)) yield new_history, None continue if chunk_output: output = f"--- Booking {idx} ---\n{chunk_output.strip()}\n" new_history.append(format_message("assistant", output)) full_output += output + "\n" yield new_history, None # Save report file_hash_value = file_hash(file.name) report_path = os.path.join(report_dir, f"{file_hash_value}_report.txt") with open(report_path, "w", encoding="utf-8") as f: f.write(full_output) yield new_history, report_path if os.path.exists(report_path) else None except Exception as e: new_history.append(format_message("assistant", f"❌ Error: {str(e)}")) yield new_history, None raise gr.Error(f"Analysis failed: {str(e)}") def clear_chat(): return [], None # Event handlers send_btn.click( analyze, inputs=[msg_input, chatbot, file_upload], outputs=[chatbot, download_output], api_name="analyze" ) msg_input.submit( analyze, inputs=[msg_input, chatbot, file_upload], outputs=[chatbot, download_output] ) clear_btn.click( clear_chat, inputs=[], outputs=[chatbot, download_output] ) return demo if __name__ == "__main__": try: agent = init_agent() demo = create_ui(agent) demo.queue( api_open=False, max_size=20 ).launch( server_name="0.0.0.0", server_port=7860, show_error=True, allowed_paths=[report_dir], share=False ) except Exception as e: print(f"Failed to launch application: {str(e)}") sys.exit(1)