File size: 9,337 Bytes
1777737 3a20a5b 728def5 3a20a5b ff7a915 28560cd dfe34bb 841c3cb 0e7a2f6 dfe34bb 28560cd dfe34bb 28560cd 3a20a5b 41945fe 3a20a5b 41945fe 3a20a5b ff7a915 3a20a5b ff7a915 28560cd ff7a915 5583dc0 ff7a915 dfe34bb 28560cd dfe34bb 28560cd dfe34bb 28560cd dfe34bb 28560cd dfe34bb 28560cd dfe34bb 3492c23 ff7a915 3ae42d2 3a20a5b 774fd26 3492c23 28560cd dfe34bb 4e4aafc dfe34bb 4a6ed35 28560cd dfe34bb 28560cd 841c3cb 28560cd 88317c7 3a20a5b 88317c7 3a20a5b 28560cd 3ae42d2 3a20a5b 3492c23 841c3cb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
import sys
import os
import pandas as pd
import pdfplumber
import gradio as gr
from tabulate import tabulate
from typing import List, Optional
# ✅ Fix: Add src to Python path with correct parentheses
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
from txagent.txagent import TxAgent
def safe_extract_table_data(table: List[List[str]]) -> List[str]:
extracted_rows = []
if not table or not isinstance(table, list):
return extracted_rows
for row in table:
if not row or not isinstance(row, list):
continue
try:
clean_row = [str(cell) if cell is not None else "" for cell in row]
if any(clean_row):
extracted_rows.append("\t".join(clean_row))
except Exception as e:
print(f"Error processing table row: {e}")
continue
return extracted_rows
def extract_all_text_from_csv_or_excel(file_path: str, progress=None, index=0, total=1) -> str:
try:
if not os.path.exists(file_path):
return f"File not found: {file_path}"
if file_path.endswith(".csv"):
df = pd.read_csv(file_path, encoding="utf-8", errors="replace", low_memory=False)
elif file_path.endswith((".xls", ".xlsx")):
df = pd.read_excel(file_path, engine="openpyxl")
else:
return f"Unsupported spreadsheet format: {file_path}"
if progress:
progress((index + 1) / total, desc=f"Processed table: {os.path.basename(file_path)}")
group_column = None
for col in ["Booking Number", "Form Name"]:
if col in df.columns:
group_column = col
break
if group_column:
try:
groups = df.groupby(group_column)
result = []
for group_name, group_df in groups:
if group_name is None:
continue
result.append(f"\n### Group: {group_name}\n")
result.append(tabulate(group_df, headers="keys", tablefmt="github", showindex=False))
return "\n".join(result) if result else tabulate(df, headers="keys", tablefmt="github", showindex=False)
except Exception as e:
print(f"Error during grouping: {e}")
return tabulate(df, headers="keys", tablefmt="github", showindex=False)
else:
return tabulate(df, headers="keys", tablefmt="github", showindex=False)
except Exception as e:
return f"Error parsing file {os.path.basename(file_path)}: {str(e)}"
def extract_all_text_from_pdf(file_path: str, progress=None, index=0, total=1) -> str:
extracted = []
try:
if not os.path.exists(file_path):
return f"PDF file not found: {file_path}"
with pdfplumber.open(file_path) as pdf:
num_pages = len(pdf.pages) if hasattr(pdf, 'pages') else 0
for i, page in enumerate(pdf.pages if num_pages > 0 else []):
try:
tables = page.extract_tables() if hasattr(page, 'extract_tables') else []
for table in tables if tables else []:
extracted.extend(safe_extract_table_data(table))
if progress and num_pages > 0:
progress((index + (i / num_pages)) / total,
desc=f"Parsing PDF: {os.path.basename(file_path)} ({i+1}/{num_pages})")
except Exception as page_error:
print(f"Error processing page {i+1}: {page_error}")
continue
return "\n".join(extracted) if extracted else f"No extractable content found in {os.path.basename(file_path)}"
except Exception as e:
return f"Error parsing PDF {os.path.basename(file_path)}: {str(e)}"
def create_ui(agent: TxAgent):
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("<h1 style='text-align: center;'>📋 CPS: Clinical Patient Support System</h1>")
chatbot = gr.Chatbot(label="CPS Assistant", height=600, type="messages")
file_upload = gr.File(
label="Upload Medical File",
file_types=[".pdf", ".txt", ".docx", ".jpg", ".png", ".csv", ".xls", ".xlsx"],
file_count="multiple"
)
message_input = gr.Textbox(placeholder="Ask a biomedical question or just upload the files...", show_label=False)
send_button = gr.Button("Send", variant="primary")
conversation_state = gr.State([])
def handle_chat(message: str, history: list, conversation: list, uploaded_files: list, progress=gr.Progress()):
context = (
"You are an expert clinical AI assistant reviewing medical form or interview data. "
"Your job is to analyze this data and reason about any information or red flags that a human doctor might have overlooked. "
"Provide a **detailed and structured response**, including examples, supporting evidence from the form, and clinical rationale for why these items matter. "
"Ensure the output is informative and helpful for improving patient care. "
"Do not hallucinate. Base the response only on the provided form content. "
"End with a section labeled '🧠 Final Analysis' where you summarize key findings the doctor may have missed."
)
try:
extracted_text = ""
if uploaded_files and isinstance(uploaded_files, list):
total_files = len(uploaded_files)
for index, file in enumerate(uploaded_files):
if not hasattr(file, 'name'):
continue
path = file.name
try:
if path.endswith((".csv", ".xls", ".xlsx")):
extracted_text += extract_all_text_from_csv_or_excel(path, progress, index, total_files) + "\n"
elif path.endswith(".pdf"):
extracted_text += extract_all_text_from_pdf(path, progress, index, total_files) + "\n"
else:
extracted_text += f"(Uploaded file: {os.path.basename(path)})\n"
if progress:
progress((index + 1) / total_files, desc=f"Skipping unsupported file: {os.path.basename(path)}")
except Exception as file_error:
print(f"Error processing file {path}: {file_error}")
extracted_text += f"\n[Error processing file: {os.path.basename(path)}]\n"
continue
message = f"{context}\n\n---\n{extracted_text.strip()}\n---\n\nBegin your reasoning."
final_response = None
generator = agent.run_gradio_chat(
message=message,
history=history,
temperature=0.3,
max_new_tokens=1024,
max_token=8192,
call_agent=False,
conversation=conversation,
uploaded_files=uploaded_files,
max_round=30
)
for update in generator:
try:
if isinstance(update, list):
cleaned = [
msg for msg in update
if hasattr(msg, 'role')
and not (
msg.role == "assistant"
and hasattr(msg, 'content')
and msg.content.strip().startswith("🧰")
)
]
if cleaned:
final_response = cleaned
yield cleaned
else:
if isinstance(update, str) and not update.strip().startswith("🧰"):
yield update.encode("utf-8", "replace").decode("utf-8")
except Exception as update_error:
print(f"Error processing update: {update_error}")
continue
except Exception as chat_error:
print(f"Chat handling error: {chat_error}")
yield "An error occurred while processing your request. Please try again."
inputs = [message_input, chatbot, conversation_state, file_upload]
send_button.click(fn=handle_chat, inputs=inputs, outputs=chatbot)
message_input.submit(fn=handle_chat, inputs=inputs, outputs=chatbot)
gr.Examples([
["Upload your medical form and ask what the doctor might've missed."],
["This patient was treated with antibiotics for UTI. What else should we check?"],
["Is there anything abnormal in the attached blood work report?"]
], inputs=message_input)
return demo
|