|
import sys |
|
import os |
|
import pandas as pd |
|
import gradio as gr |
|
import re |
|
import hashlib |
|
import shutil |
|
from datetime import datetime |
|
from collections import defaultdict |
|
from typing import List, Dict, Tuple |
|
|
|
|
|
WORKING_DIR = os.getcwd() |
|
REPORT_DIR = os.path.join(WORKING_DIR, "reports") |
|
os.makedirs(REPORT_DIR, exist_ok=True) |
|
|
|
|
|
MODEL_CACHE_DIR = os.path.join(WORKING_DIR, "model_cache") |
|
os.makedirs(MODEL_CACHE_DIR, exist_ok=True) |
|
os.environ["HF_HOME"] = MODEL_CACHE_DIR |
|
os.environ["TRANSFORMERS_CACHE"] = MODEL_CACHE_DIR |
|
|
|
|
|
sys.path.append(os.path.join(WORKING_DIR, "src")) |
|
from txagent.txagent import TxAgent |
|
|
|
class PatientHistoryAnalyzer: |
|
def __init__(self): |
|
self.max_token_length = 2000 |
|
self.max_text_length = 500 |
|
self.agent = self._initialize_agent() |
|
|
|
def _initialize_agent(self): |
|
"""Initialize the TxAgent with proper configuration""" |
|
tool_path = os.path.join(WORKING_DIR, "data", "new_tool.json") |
|
if not os.path.exists(tool_path): |
|
raise FileNotFoundError(f"Tool file not found at {tool_path}") |
|
|
|
return TxAgent( |
|
model_name="mims-harvard/TxAgent-T1-Llama-3.1-8B", |
|
rag_model_name="mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B", |
|
tool_files_dict={"new_tool": tool_path}, |
|
force_finish=True, |
|
enable_checker=True, |
|
step_rag_num=4, |
|
seed=100, |
|
additional_default_tools=[], |
|
) |
|
|
|
def clean_text(self, text: str) -> str: |
|
"""Clean and normalize text fields""" |
|
if not isinstance(text, str): |
|
text = str(text) |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
return text[:self.max_text_length] |
|
|
|
def process_excel(self, file_path: str) -> Dict[str, List]: |
|
"""Process Excel file into structured patient data""" |
|
try: |
|
df = pd.read_excel(file_path) |
|
df = df.sort_values('Interview Date') |
|
|
|
data = { |
|
'timeline': [], |
|
'medications': defaultdict(list), |
|
'diagnoses': defaultdict(list), |
|
'tests': defaultdict(list), |
|
'doctors': set(), |
|
'all_entries': [] |
|
} |
|
|
|
for _, row in df.iterrows(): |
|
entry = { |
|
'date': self.clean_text(row.get('Interview Date', '')), |
|
'doctor': self.clean_text(row.get('Interviewer', '')), |
|
'form': self.clean_text(row.get('Form Name', '')), |
|
'item': self.clean_text(row.get('Form Item', '')), |
|
'response': self.clean_text(row.get('Item Response', '')), |
|
'notes': self.clean_text(row.get('Description', '')) |
|
} |
|
|
|
data['timeline'].append(entry) |
|
data['doctors'].add(entry['doctor']) |
|
data['all_entries'].append(entry) |
|
|
|
form_lower = entry['form'].lower() |
|
if 'medication' in form_lower or 'drug' in form_lower: |
|
data['medications'][entry['item']].append(entry) |
|
elif 'diagnosis' in form_lower: |
|
data['diagnoses'][entry['item']].append(entry) |
|
elif 'test' in form_lower or 'lab' in form_lower: |
|
data['tests'][entry['item']].append(entry) |
|
|
|
return data |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error processing Excel file: {str(e)}") |
|
|
|
def generate_analysis_prompt(self, patient_data: Dict) -> List[Dict]: |
|
"""Generate analysis prompts that respect token limits""" |
|
prompts = [] |
|
|
|
|
|
current_prompt = self._create_current_status_prompt(patient_data) |
|
prompts.append({ |
|
'type': 'current_status', |
|
'content': current_prompt |
|
}) |
|
|
|
|
|
if len(patient_data['all_entries']) > 10: |
|
history_prompt = self._create_historical_prompt(patient_data) |
|
prompts.append({ |
|
'type': 'historical', |
|
'content': history_prompt |
|
}) |
|
|
|
|
|
if len(patient_data['medications']) > 3: |
|
meds_prompt = self._create_medication_prompt(patient_data) |
|
prompts.append({ |
|
'type': 'medications', |
|
'content': meds_prompt |
|
}) |
|
|
|
return prompts |
|
|
|
def _create_current_status_prompt(self, data: Dict) -> str: |
|
"""Create prompt for current patient status""" |
|
recent_entries = data['timeline'][-10:] |
|
|
|
prompt_lines = [ |
|
"**Comprehensive Patient Status Analysis**", |
|
"Focus on RECENT appointments and CURRENT health status.", |
|
"Analyze for:", |
|
"- Medication consistency", |
|
"- Diagnostic agreement between providers", |
|
"- Recent concerning findings", |
|
"- Immediate follow-up needs", |
|
"", |
|
"**Recent Timeline (last 10 entries):**" |
|
] |
|
|
|
for entry in recent_entries: |
|
prompt_lines.append( |
|
f"- {entry['date']}: {entry['form']} - {entry['item']} = {entry['response']} (by {entry['doctor']})" |
|
) |
|
|
|
prompt_lines.extend([ |
|
"", |
|
"**Current Medications:**", |
|
*[f"- {med}: {entries[-1]['response']} (last updated {entries[-1]['date']})" |
|
for med, entries in data['medications'].items()], |
|
"", |
|
"**Active Diagnoses:**", |
|
*[f"- {diag}: {entries[-1]['response']} (last updated {entries[-1]['date']})" |
|
for diag, entries in data['diagnoses'].items()], |
|
"", |
|
"**Required Output Format:**", |
|
"### Summary of Current Status", |
|
"### Medication Review", |
|
"### Diagnostic Consistency", |
|
"### Urgent Concerns", |
|
"### Recommended Actions" |
|
]) |
|
|
|
return "\n".join(prompt_lines) |
|
|
|
def _create_historical_prompt(self, data: Dict) -> str: |
|
"""Create prompt for historical analysis""" |
|
return "\n".join([ |
|
"**Historical Patient Analysis**", |
|
"Focus on LONG-TERM PATTERNS and HISTORY.", |
|
"", |
|
"**Key Analysis Points:**", |
|
"- Treatment changes over time", |
|
"- Recurring symptoms/issues", |
|
"- Diagnostic evolution", |
|
"- Medication history", |
|
"", |
|
"**Historical Timeline (condensed):**", |
|
*[f"- {entry['date'][:7]}: {entry['form']} - {entry['response']}" |
|
for entry in data['all_entries'][:-10]], |
|
"", |
|
"**Required Output Format:**", |
|
"### Historical Patterns", |
|
"### Treatment Evolution", |
|
"### Chronic Issues", |
|
"### Long-term Recommendations" |
|
]) |
|
|
|
def _create_medication_prompt(self, data: Dict) -> str: |
|
"""Create medication-specific prompt""" |
|
return "\n".join([ |
|
"**Medication-Specific Analysis**", |
|
"Focus on MEDICATION HISTORY and POTENTIAL ISSUES.", |
|
"", |
|
"**Medication History:**", |
|
*[f"- {med}: " + ", ".join( |
|
f"{e['date']}: {e['response']} (by {e['doctor']})" |
|
for e in entries |
|
) for med, entries in data['medications'].items()], |
|
"", |
|
"**Analysis Focus:**", |
|
"- Potential interactions", |
|
"- Dosage changes", |
|
"- Prescriber patterns", |
|
"- Adherence issues", |
|
"", |
|
"**Required Output Format:**", |
|
"### Medication Summary", |
|
"### Potential Issues", |
|
"### Prescriber Patterns", |
|
"### Recommendations" |
|
]) |
|
|
|
def _call_agent(self, prompt: str) -> str: |
|
"""Call TxAgent with proper error handling""" |
|
try: |
|
response = "" |
|
for result in self.agent.run_gradio_chat( |
|
message=prompt, |
|
history=[], |
|
temperature=0.2, |
|
max_new_tokens=1024, |
|
max_token=2048, |
|
call_agent=False, |
|
conversation=[], |
|
): |
|
if isinstance(result, list): |
|
for r in result: |
|
if hasattr(r, 'content') and r.content: |
|
response += r.content + "\n" |
|
elif isinstance(result, str): |
|
response += result + "\n" |
|
|
|
return response.strip() |
|
except Exception as e: |
|
return f"Error in model response: {str(e)}" |
|
|
|
def generate_report(self, analysis_results: List[str]) -> Tuple[str, str]: |
|
"""Combine analysis results into final report""" |
|
report = [ |
|
"# Comprehensive Patient History Analysis", |
|
f"**Generated on**: {datetime.now().strftime('%Y-%m-%d %H:%M')}", |
|
"" |
|
] |
|
|
|
for result in analysis_results: |
|
report.extend(["", "---", "", result]) |
|
|
|
report.extend([ |
|
"", |
|
"## Overall Clinical Summary", |
|
"This report combines analyses of:", |
|
"- Current health status", |
|
"- Historical patterns", |
|
"- Medication history", |
|
"", |
|
"**Key Takeaways:**", |
|
"[Generated summary of most critical findings would appear here]" |
|
]) |
|
|
|
full_report = "\n".join(report) |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
report_filename = f"patient_report_{timestamp}.md" |
|
report_path = os.path.join(REPORT_DIR, report_filename) |
|
|
|
with open(report_path, 'w') as f: |
|
f.write(full_report) |
|
|
|
return full_report, report_path |
|
|
|
def analyze(self, file_path: str) -> Tuple[str, str]: |
|
"""Main analysis workflow""" |
|
try: |
|
patient_data = self.process_excel(file_path) |
|
prompts = self.generate_analysis_prompt(patient_data) |
|
|
|
|
|
analysis_results = [] |
|
for prompt in prompts: |
|
response = self._call_agent(prompt['content']) |
|
analysis_results.append(response) |
|
|
|
return self.generate_report(analysis_results) |
|
|
|
except Exception as e: |
|
return f"Error during analysis: {str(e)}", "" |
|
|
|
def create_interface(): |
|
analyzer = PatientHistoryAnalyzer() |
|
|
|
with gr.Blocks(title="Patient History Analyzer", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# 🏥 Comprehensive Patient History Analysis") |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Analysis"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
file_input = gr.File( |
|
label="Upload Patient Records (Excel)", |
|
file_types=[".xlsx"], |
|
type="filepath" |
|
) |
|
analyze_btn = gr.Button("Analyze Full History", variant="primary") |
|
|
|
with gr.Column(scale=2): |
|
output_display = gr.Markdown( |
|
label="Analysis Results", |
|
elem_id="results" |
|
) |
|
report_download = gr.File( |
|
label="Download Full Report", |
|
interactive=False |
|
) |
|
|
|
with gr.TabItem("Instructions"): |
|
gr.Markdown(""" |
|
## How to Use This Tool |
|
|
|
1. **Upload** your patient's Excel file |
|
2. **Click Analyze** to process the history |
|
3. **Review** the comprehensive analysis |
|
4. **Download** the full report |
|
|
|
### File Requirements |
|
Excel file must contain: |
|
- Booking Number |
|
- Form Name |
|
- Form Item |
|
- Item Response |
|
- Interview Date |
|
- Interviewer |
|
- Description |
|
""") |
|
|
|
analyze_btn.click( |
|
fn=analyzer.analyze, |
|
inputs=file_input, |
|
outputs=[output_display, report_download], |
|
api_name="analyze" |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
try: |
|
demo = create_interface() |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
show_error=True, |
|
allowed_paths=[WORKING_DIR, REPORT_DIR] |
|
) |
|
except Exception as e: |
|
print(f"Error launching application: {str(e)}") |
|
sys.exit(1) |