CPS-Test-Mobile / app.py
Ali2206's picture
Update app.py
13ad0d3 verified
raw
history blame
10.1 kB
import sys
import os
import pandas as pd
import json
import gradio as gr
from typing import List, Tuple, Dict, Any
import hashlib
import shutil
import re
from datetime import datetime
import time
from collections import defaultdict
# Configuration and setup
persistent_dir = "/data/hf_cache"
os.makedirs(persistent_dir, exist_ok=True)
model_cache_dir = os.path.join(persistent_dir, "txagent_models")
tool_cache_dir = os.path.join(persistent_dir, "tool_cache")
file_cache_dir = os.path.join(persistent_dir, "cache")
report_dir = os.path.join(persistent_dir, "reports")
for directory in [model_cache_dir, tool_cache_dir, file_cache_dir, report_dir]:
os.makedirs(directory, exist_ok=True)
os.environ["HF_HOME"] = model_cache_dir
os.environ["TRANSFORMERS_CACHE"] = model_cache_dir
current_dir = os.path.dirname(os.path.abspath(__file__))
src_path = os.path.abspath(os.path.join(current_dir, "src"))
sys.path.insert(0, src_path)
from txagent.txagent import TxAgent
# Constants
MAX_TOKENS = 32768
CHUNK_SIZE = 10000
MAX_NEW_TOKENS = 2048
MAX_BOOKINGS_PER_CHUNK = 5
def file_hash(path: str) -> str:
with open(path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def clean_response(text: str) -> str:
try:
text = text.encode('utf-8', 'surrogatepass').decode('utf-8')
except UnicodeError:
text = text.encode('utf-8', 'replace').decode('utf-8')
text = re.sub(r"\[.*?\]|\bNone\b", "", text, flags=re.DOTALL)
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[^\n#\-\*\w\s\.,:\(\)]+", "", text)
return text.strip()
def estimate_tokens(text: str) -> int:
return len(text) // 3.5
def process_patient_data(df: pd.DataFrame) -> Dict[str, Any]:
data = {
'bookings': defaultdict(list),
'medications': defaultdict(list),
'diagnoses': defaultdict(list),
'tests': defaultdict(list),
'procedures': defaultdict(list),
'doctors': set(),
'timeline': []
}
df = df.sort_values('Interview Date')
for booking, group in df.groupby('Booking Number'):
for _, row in group.iterrows():
entry = {
'booking': booking,
'date': str(row['Interview Date']),
'doctor': str(row['Interviewer']),
'form': str(row['Form Name']),
'item': str(row['Form Item']),
'response': str(row['Item Response']),
'notes': str(row['Description'])
}
data['bookings'][booking].append(entry)
data['timeline'].append(entry)
data['doctors'].add(entry['doctor'])
form_lower = entry['form'].lower()
if 'medication' in form_lower or 'drug' in form_lower:
data['medications'][entry['item']].append(entry)
elif 'diagnosis' in form_lower or 'condition' in form_lower:
data['diagnoses'][entry['item']].append(entry)
elif 'test' in form_lower or 'lab' in form_lower or 'result' in form_lower:
data['tests'][entry['item']].append(entry)
elif 'procedure' in form_lower or 'surgery' in form_lower:
data['procedures'][entry['item']].append(entry)
return data
def generate_analysis_prompt(patient_data: Dict[str, Any], bookings: List[str]) -> str:
prompt_lines = [
"**Comprehensive Patient Analysis**",
f"Analyzing {len(bookings)} bookings",
"",
"**Key Analysis Points:**",
"- Chronological progression of symptoms",
"- Medication changes and interactions",
"- Diagnostic consistency across providers",
"- Missed diagnostic opportunities",
"- Gaps in follow-up",
"",
"**Patient Timeline:**"
]
for entry in patient_data['timeline']:
if entry['booking'] in bookings:
prompt_lines.append(
f"- {entry['date']}: {entry['form']} - {entry['item']} = {entry['response']} (by {entry['doctor']})"
)
prompt_lines.extend([
"",
"**Medication History:**",
*[f"- {med}: " + " → ".join(
f"{e['date']}: {e['response']}"
for e in entries if e['booking'] in bookings
) for med, entries in patient_data['medications'].items()],
"",
"**Required Analysis Format:**",
"### Diagnostic Patterns",
"### Medication Analysis",
"### Provider Consistency",
"### Missed Opportunities",
"### Recommendations"
])
return "\n".join(prompt_lines)
def chunk_bookings(patient_data: Dict[str, Any]) -> List[List[str]]:
all_bookings = list(patient_data['bookings'].keys())
booking_sizes = []
for booking in all_bookings:
entries = patient_data['bookings'][booking]
size = sum(estimate_tokens(str(e)) for e in entries)
booking_sizes.append((booking, size))
booking_sizes.sort(key=lambda x: x[1], reverse=True)
chunks = [[] for _ in range(3)]
chunk_sizes = [0, 0, 0]
for booking, size in booking_sizes:
min_chunk = chunk_sizes.index(min(chunk_sizes))
chunks[min_chunk].append(booking)
chunk_sizes[min_chunk] += size
return chunks
def init_agent():
default_tool_path = os.path.abspath("data/new_tool.json")
target_tool_path = os.path.join(tool_cache_dir, "new_tool.json")
if not os.path.exists(target_tool_path):
shutil.copy(default_tool_path, target_tool_path)
agent = TxAgent(
model_name="mims-harvard/TxAgent-T1-Llama-3.1-8B",
rag_model_name="mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B",
tool_files_dict={"new_tool": target_tool_path},
force_finish=True,
enable_checker=True,
step_rag_num=4,
seed=100,
additional_default_tools=[]
)
agent.init_model()
return agent
def analyze_with_agent(agent, prompt: str) -> str:
try:
response = ""
for result in agent.run_gradio_chat(
message=prompt,
history=[],
temperature=0.2,
max_new_tokens=MAX_NEW_TOKENS,
max_token=MAX_TOKENS,
call_agent=False,
conversation=[],
):
if isinstance(result, list):
for r in result:
if hasattr(r, 'content') and r.content:
response += clean_response(r.content) + "\n"
elif isinstance(result, str):
response += clean_response(result) + "\n"
elif hasattr(result, 'content'):
response += clean_response(result.content) + "\n"
return response.strip()
except Exception as e:
return f"Error in analysis: {str(e)}"
def create_ui(agent):
with gr.Blocks(theme=gr.themes.Soft(), title="Patient History Analyzer") as demo:
gr.Markdown("# 🏥 Patient History Analyzer")
with gr.Tabs():
with gr.TabItem("Analysis"):
with gr.Row():
with gr.Column(scale=1):
file_upload = gr.File(
label="Upload Excel File",
file_types=[".xlsx"],
file_count="single"
)
analyze_btn = gr.Button("Analyze", variant="primary")
status = gr.Markdown("Ready")
with gr.Column(scale=2):
output = gr.Markdown()
report = gr.File(label="Download Report")
with gr.TabItem("Instructions"):
gr.Markdown("""
## How to Use
1. Upload patient history Excel
2. Click Analyze
3. View/download report
**Required Columns:**
- Booking Number
- Interview Date
- Interviewer
- Form Name
- Form Item
- Item Response
- Description
""")
def analyze(file):
if not file:
raise gr.Error("Please upload a file")
try:
df = pd.read_excel(file.name)
patient_data = process_patient_data(df)
chunks = chunk_bookings(patient_data)
full_report = []
for i, bookings in enumerate(chunks, 1):
prompt = generate_analysis_prompt(patient_data, bookings)
response = analyze_with_agent(agent, prompt)
full_report.append(f"## Chunk {i}\n{response}\n")
yield "\n".join(full_report), None
# Final summary
if len(chunks) > 1:
summary_prompt = "Create final summary combining all chunks"
summary = analyze_with_agent(agent, summary_prompt)
full_report.append(f"## Final Summary\n{summary}\n")
report_path = os.path.join(report_dir, f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md")
with open(report_path, 'w') as f:
f.write("\n".join(full_report))
yield "\n".join(full_report), report_path
except Exception as e:
raise gr.Error(f"Error: {str(e)}")
analyze_btn.click(
analyze,
inputs=file_upload,
outputs=[output, report]
)
return demo
if __name__ == "__main__":
try:
agent = init_agent()
demo = create_ui(agent)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
except Exception as e:
print(f"Error: {str(e)}")
sys.exit(1)