CPS-Test-Mobile / app.py
Ali2206's picture
Update app.py
3af8921 verified
raw
history blame
14.7 kB
import sys
import os
import pandas as pd
import json
import gradio as gr
from typing import List, Tuple, Dict, Any
import hashlib
import shutil
import re
from datetime import datetime
import time
import markdown
from collections import defaultdict
# Configuration and setup
persistent_dir = "/data/hf_cache"
os.makedirs(persistent_dir, exist_ok=True)
model_cache_dir = os.path.join(persistent_dir, "txagent_models")
tool_cache_dir = os.path.join(persistent_dir, "tool_cache")
file_cache_dir = os.path.join(persistent_dir, "cache")
report_dir = os.path.join(persistent_dir, "reports")
for directory in [model_cache_dir, tool_cache_dir, file_cache_dir, report_dir]:
os.makedirs(directory, exist_ok=True)
os.environ["HF_HOME"] = model_cache_dir
os.environ["TRANSFORMERS_CACHE"] = model_cache_dir
current_dir = os.path.dirname(os.path.abspath(__file__))
src_path = os.path.abspath(os.path.join(current_dir, "src"))
sys.path.insert(0, src_path)
from txagent.txagent import TxAgent
def file_hash(path: str) -> str:
"""Generate MD5 hash of file contents"""
with open(path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def clean_response(text: str) -> str:
"""Clean and normalize text output"""
try:
text = text.encode('utf-8', 'surrogatepass').decode('utf-8')
except UnicodeError:
text = text.encode('utf-8', 'replace').decode('utf-8')
# Remove unwanted patterns and normalize whitespace
text = re.sub(r"\[.*?\]|\bNone\b", "", text, flags=re.DOTALL)
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[^\n#\-\*\w\s\.,:\(\)]+", "", text)
return text.strip()
def extract_medical_data(df: pd.DataFrame) -> Dict[str, Any]:
"""Extract and organize medical data from DataFrame"""
medical_data = defaultdict(list)
for _, row in df.iterrows():
record = {
'booking': row.get('Booking Number', ''),
'form_name': row.get('Form Name', ''),
'form_item': row.get('Form Item', ''),
'response': row.get('Item Response', ''),
'date': row.get('Interview Date', ''),
'interviewer': row.get('Interviewer', ''),
'description': row.get('Description', '')
}
medical_data[row['Booking Number']].append(record)
return medical_data
def identify_red_flags(records: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Identify potential red flags across all medical records"""
red_flags = {
'symptoms': defaultdict(list),
'medications': defaultdict(list),
'diagnoses': defaultdict(list),
'vitals': defaultdict(list),
'labs': defaultdict(list),
'patients': defaultdict(list)
}
for booking, patient_records in records.items():
for record in patient_records:
form_name = record['form_name'].lower()
item = record['form_item'].lower()
response = record['response'].lower()
# Symptom patterns
if 'pain' in item or 'symptom' in form_name:
if 'severe' in response or 'chronic' in response:
red_flags['symptoms'][item].append((booking, response))
# Medication checks
elif 'medication' in form_name or 'drug' in form_name:
if 'interaction' in response or 'allergy' in response:
red_flags['medications'][item].append((booking, response))
# Diagnosis inconsistencies
elif 'diagnosis' in form_name:
if 'rule out' in response or 'possible' in response:
red_flags['diagnoses'][item].append((booking, response))
# Abnormal vitals
elif 'vital' in form_name:
try:
value = float(re.search(r'\d+\.?\d*', response).group())
if ('blood pressure' in item and value > 140) or \
('heart rate' in item and (value < 50 or value > 100)) or \
('temperature' in item and value > 38):
red_flags['vitals'][item].append((booking, response))
except:
pass
# Abnormal labs
elif 'lab' in form_name or 'test' in form_name:
if 'abnormal' in response or 'high' in response or 'low' in response:
red_flags['labs'][item].append((booking, response))
return red_flags
def generate_combined_prompt(all_records: Dict[str, Any], red_flags: Dict[str, Any]]) -> str:
"""Generate a single comprehensive prompt for all patient data"""
# Create summary of all records
records_summary = []
for booking, records in all_records.items():
records_summary.append(f"\n## Patient {booking}")
for r in records:
records_summary.append(
f"- {r['form_name']}: {r['form_item']} = {r['response']} "
f"({r['date']} by {r['interviewer']})\n {r['description']}"
)
# Format red flags with patient references
red_flags_text = []
for category, items in red_flags.items():
if items:
red_flags_text.append(f"\n### {category.capitalize()} Red Flags")
for item, entries in items.items():
patient_entries = defaultdict(list)
for booking, response in entries:
patient_entries[booking].append(response)
for booking, responses in patient_entries.items():
red_flags_text.append(
f"- {item} (Patient {booking}): {', '.join(responses)}"
)
prompt = f"""
**COMPREHENSIVE PATIENT ANALYSIS**
**Medical Records Summary**:
{"".join(records_summary)}
**Identified Red Flags Across All Patients**:
{"".join(red_flags_text) if red_flags_text else "No obvious red flags detected"}
**Analysis Instructions**:
1. Review ALL patient data holistically
2. Identify patterns that might indicate systemic issues
3. Check for recurring medication problems across patients
4. Note any common missed diagnoses
5. Flag any urgent cases needing immediate attention
6. Provide overall clinical recommendations
**Required Output Format**:
### Summary of Findings
[Overview of most significant findings across all patients]
### Common Missed Diagnoses
- [Conditions frequently overlooked across multiple patients]
- [Specific patients affected: Booking numbers]
### Recurring Medication Issues
- [Common drug interactions or inappropriate prescriptions]
- [Patients affected]
### Systemic Assessment Gaps
- [Patterns of incomplete assessments across patients]
- [Recommended additional tests]
### Critical Cases Needing Follow-up
- [Patients requiring urgent attention]
- [Specific reasons]
### Overall Recommendations
- [General recommendations for clinical practice]
- [Specific actions for different patient groups]
"""
return prompt
def parse_excel_to_combined_prompt(file_path: str) -> str:
"""Parse Excel file into a single comprehensive analysis prompt"""
try:
xl = pd.ExcelFile(file_path)
df = xl.parse(xl.sheet_names[0], header=0).fillna("")
medical_data = extract_medical_data(df)
red_flags = identify_red_flags(medical_data)
prompt = generate_combined_prompt(medical_data, red_flags)
return prompt
except Exception as e:
raise ValueError(f"Error parsing Excel file: {str(e)}")
def init_agent():
"""Initialize the TxAgent with appropriate settings"""
default_tool_path = os.path.abspath("data/new_tool.json")
target_tool_path = os.path.join(tool_cache_dir, "new_tool.json")
if not os.path.exists(target_tool_path):
shutil.copy(default_tool_path, target_tool_path)
agent = TxAgent(
model_name="mims-harvard/TxAgent-T1-Llama-3.1-8B",
rag_model_name="mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B",
tool_files_dict={"new_tool": target_tool_path},
force_finish=True,
enable_checker=True,
step_rag_num=4,
seed=100,
additional_default_tools=[],
)
agent.init_model()
return agent
def create_ui(agent):
"""Create Gradio UI interface"""
with gr.Blocks(theme=gr.themes.Soft(), title="Clinical Oversight Assistant") as demo:
gr.Markdown("# 🏥 Comprehensive Clinical Analysis")
with gr.Tabs():
with gr.TabItem("Analysis"):
with gr.Row():
# Left column - Inputs
with gr.Column(scale=1):
file_upload = gr.File(
label="Upload Excel File",
file_types=[".xlsx"],
file_count="single",
interactive=True
)
msg_input = gr.Textbox(
label="Additional Instructions",
placeholder="Add any specific analysis requests...",
lines=3
)
with gr.Row():
clear_btn = gr.Button("Clear", variant="secondary")
send_btn = gr.Button("Analyze All Patients", variant="primary")
# Right column - Outputs
with gr.Column(scale=2):
chatbot = gr.Chatbot(
label="Comprehensive Analysis Results",
height=600,
bubble_full_width=False,
show_copy_button=True,
render_markdown=True
)
download_output = gr.File(
label="Download Full Report",
interactive=False
)
with gr.TabItem("Instructions"):
gr.Markdown("""
## How to Use This Tool
1. **Upload Excel File**: Select your patient records Excel file
2. **Add Instructions** (Optional): Provide any specific analysis requests
3. **Click Analyze**: The system will process ALL patient records together
4. **Review Results**: Comprehensive analysis appears in the chat window
5. **Download Report**: Get a complete text report of all findings
### Key Features
- **Holistic analysis** of all patient records
- **Pattern detection** across multiple patients
- **Systemic issues** identification
- **Prioritized recommendations** based on severity
""")
def analyze(message: str, chat_history: List[Tuple[str, str]], file) -> Tuple[List[Tuple[str, str]], str]:
"""Main analysis function for all patients"""
if not file:
raise gr.Error("Please upload an Excel file first")
try:
# Initialize chat history
new_history = chat_history + [(message, None)]
new_history.append((None, "⏳ Processing all patient data..."))
yield new_history, None
# Generate combined prompt
prompt = parse_excel_to_combined_prompt(file.name)
# Run analysis
full_output = ""
for result in agent.run_gradio_chat(
message=prompt,
history=[],
temperature=0.2,
max_new_tokens=2048, # Increased for comprehensive analysis
max_token=4096,
call_agent=False,
conversation=[],
):
if isinstance(result, list):
for r in result:
if hasattr(r, 'content') and r.content:
cleaned = clean_response(r.content)
full_output += cleaned + "\n"
elif isinstance(result, str):
cleaned = clean_response(result)
full_output += cleaned + "\n"
if full_output:
new_history[-1] = (None, full_output.strip())
yield new_history, None
# Save report
file_hash_value = file_hash(file.name)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = os.path.join(report_dir, f"comprehensive_{file_hash_value}_{timestamp}_report.md")
with open(report_path, "w", encoding="utf-8") as f:
f.write("# Comprehensive Clinical Analysis Report\n\n")
f.write(f"**Generated on**: {timestamp}\n\n")
f.write(f"**Source file**: {file.name}\n\n")
f.write(full_output)
yield new_history, report_path if os.path.exists(report_path) else None
except Exception as e:
new_history.append((None, f"❌ Error: {str(e)}"))
yield new_history, None
raise gr.Error(f"Analysis failed: {str(e)}")
def clear_chat():
"""Clear chat history and outputs"""
return [], None
# Event handlers
send_btn.click(
analyze,
inputs=[msg_input, chatbot, file_upload],
outputs=[chatbot, download_output],
api_name="analyze"
)
msg_input.submit(
analyze,
inputs=[msg_input, chatbot, file_upload],
outputs=[chatbot, download_output]
)
clear_btn.click(
clear_chat,
inputs=[],
outputs=[chatbot, download_output]
)
return demo
if __name__ == "__main__":
try:
agent = init_agent()
demo = create_ui(agent)
demo.queue(
api_open=False,
max_size=20
).launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
allowed_paths=[report_dir],
share=False
)
except Exception as e:
print(f"Failed to launch application: {str(e)}")
sys.exit(1)