import sys import os import pandas as pd import json import gradio as gr from typing import List, Tuple, Dict, Any import hashlib import shutil import re from datetime import datetime import time from collections import defaultdict # Configuration and setup persistent_dir = "/data/hf_cache" os.makedirs(persistent_dir, exist_ok=True) model_cache_dir = os.path.join(persistent_dir, "txagent_models") tool_cache_dir = os.path.join(persistent_dir, "tool_cache") file_cache_dir = os.path.join(persistent_dir, "cache") report_dir = os.path.join(persistent_dir, "reports") for directory in [model_cache_dir, tool_cache_dir, file_cache_dir, report_dir]: os.makedirs(directory, exist_ok=True) os.environ["HF_HOME"] = model_cache_dir os.environ["TRANSFORMERS_CACHE"] = model_cache_dir current_dir = os.path.dirname(os.path.abspath(__file__)) src_path = os.path.abspath(os.path.join(current_dir, "src")) sys.path.insert(0, src_path) from txagent.txagent import TxAgent # Constants MAX_TOKENS = 32768 CHUNK_SIZE = 10000 MAX_NEW_TOKENS = 2048 MAX_BOOKINGS_PER_CHUNK = 5 def file_hash(path: str) -> str: with open(path, "rb") as f: return hashlib.md5(f.read()).hexdigest() def clean_response(text: str) -> str: try: text = text.encode('utf-8', 'surrogatepass').decode('utf-8') except UnicodeError: text = text.encode('utf-8', 'replace').decode('utf-8') text = re.sub(r"\[.*?\]|\bNone\b", "", text, flags=re.DOTALL) text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r"[^\n#\-\*\w\s\.,:\(\)]+", "", text) return text.strip() def estimate_tokens(text: str) -> int: return len(text) // 3.5 def process_patient_data(df: pd.DataFrame) -> Dict[str, Any]: data = { 'bookings': defaultdict(list), 'medications': defaultdict(list), 'diagnoses': defaultdict(list), 'tests': defaultdict(list), 'procedures': defaultdict(list), 'doctors': set(), 'timeline': [] } df = df.sort_values('Interview Date') for booking, group in df.groupby('Booking Number'): for _, row in group.iterrows(): entry = { 'booking': booking, 'date': str(row['Interview Date']), 'doctor': str(row['Interviewer']), 'form': str(row['Form Name']), 'item': str(row['Form Item']), 'response': str(row['Item Response']), 'notes': str(row['Description']) } data['bookings'][booking].append(entry) data['timeline'].append(entry) data['doctors'].add(entry['doctor']) form_lower = entry['form'].lower() if 'medication' in form_lower or 'drug' in form_lower: data['medications'][entry['item']].append(entry) elif 'diagnosis' in form_lower or 'condition' in form_lower: data['diagnoses'][entry['item']].append(entry) elif 'test' in form_lower or 'lab' in form_lower or 'result' in form_lower: data['tests'][entry['item']].append(entry) elif 'procedure' in form_lower or 'surgery' in form_lower: data['procedures'][entry['item']].append(entry) return data def generate_analysis_prompt(patient_data: Dict[str, Any], bookings: List[str]) -> str: prompt_lines = [ "**Comprehensive Patient Analysis**", f"Analyzing {len(bookings)} bookings", "", "**Key Analysis Points:**", "- Chronological progression of symptoms", "- Medication changes and interactions", "- Diagnostic consistency across providers", "- Missed diagnostic opportunities", "- Gaps in follow-up", "", "**Patient Timeline:**" ] for entry in patient_data['timeline']: if entry['booking'] in bookings: prompt_lines.append( f"- {entry['date']}: {entry['form']} - {entry['item']} = {entry['response']} (by {entry['doctor']})" ) prompt_lines.extend([ "", "**Medication History:**", *[f"- {med}: " + " → ".join( f"{e['date']}: {e['response']}" for e in entries if e['booking'] in bookings ) for med, entries in patient_data['medications'].items()], "", "**Required Analysis Format:**", "### Diagnostic Patterns", "### Medication Analysis", "### Provider Consistency", "### Missed Opportunities", "### Recommendations" ]) return "\n".join(prompt_lines) def chunk_bookings(patient_data: Dict[str, Any]) -> List[List[str]]: all_bookings = list(patient_data['bookings'].keys()) booking_sizes = [] for booking in all_bookings: entries = patient_data['bookings'][booking] size = sum(estimate_tokens(str(e)) for e in entries) booking_sizes.append((booking, size)) booking_sizes.sort(key=lambda x: x[1], reverse=True) chunks = [[] for _ in range(3)] chunk_sizes = [0, 0, 0] for booking, size in booking_sizes: min_chunk = chunk_sizes.index(min(chunk_sizes)) chunks[min_chunk].append(booking) chunk_sizes[min_chunk] += size return chunks def init_agent(): default_tool_path = os.path.abspath("data/new_tool.json") target_tool_path = os.path.join(tool_cache_dir, "new_tool.json") if not os.path.exists(target_tool_path): shutil.copy(default_tool_path, target_tool_path) agent = TxAgent( model_name="mims-harvard/TxAgent-T1-Llama-3.1-8B", rag_model_name="mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B", tool_files_dict={"new_tool": target_tool_path}, force_finish=True, enable_checker=True, step_rag_num=4, seed=100, additional_default_tools=[] ) agent.init_model() return agent def analyze_with_agent(agent, prompt: str) -> str: try: response = "" for result in agent.run_gradio_chat( message=prompt, history=[], temperature=0.2, max_new_tokens=MAX_NEW_TOKENS, max_token=MAX_TOKENS, call_agent=False, conversation=[], ): if isinstance(result, list): for r in result: if hasattr(r, 'content') and r.content: response += clean_response(r.content) + "\n" elif isinstance(result, str): response += clean_response(result) + "\n" elif hasattr(result, 'content'): response += clean_response(result.content) + "\n" return response.strip() except Exception as e: return f"Error in analysis: {str(e)}" def create_ui(agent): with gr.Blocks(theme=gr.themes.Soft(), title="Patient History Analyzer") as demo: gr.Markdown("# 🏥 Patient History Analyzer") with gr.Tabs(): with gr.TabItem("Analysis"): with gr.Row(): with gr.Column(scale=1): file_upload = gr.File( label="Upload Excel File", file_types=[".xlsx"], file_count="single" ) analyze_btn = gr.Button("Analyze", variant="primary") status = gr.Markdown("Ready") with gr.Column(scale=2): output = gr.Markdown() report = gr.File(label="Download Report") with gr.TabItem("Instructions"): gr.Markdown(""" ## How to Use 1. Upload patient history Excel 2. Click Analyze 3. View/download report **Required Columns:** - Booking Number - Interview Date - Interviewer - Form Name - Form Item - Item Response - Description """) def analyze(file): if not file: raise gr.Error("Please upload a file") try: df = pd.read_excel(file.name) patient_data = process_patient_data(df) chunks = chunk_bookings(patient_data) full_report = [] for i, bookings in enumerate(chunks, 1): prompt = generate_analysis_prompt(patient_data, bookings) response = analyze_with_agent(agent, prompt) full_report.append(f"## Chunk {i}\n{response}\n") yield "\n".join(full_report), None # Final summary if len(chunks) > 1: summary_prompt = "Create final summary combining all chunks" summary = analyze_with_agent(agent, summary_prompt) full_report.append(f"## Final Summary\n{summary}\n") report_path = os.path.join(report_dir, f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md") with open(report_path, 'w') as f: f.write("\n".join(full_report)) yield "\n".join(full_report), report_path except Exception as e: raise gr.Error(f"Error: {str(e)}") analyze_btn.click( analyze, inputs=file_upload, outputs=[output, report] ) return demo if __name__ == "__main__": try: agent = init_agent() demo = create_ui(agent) demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True ) except Exception as e: print(f"Error: {str(e)}") sys.exit(1)