import sys
import os
import pandas as pd
import pdfplumber
import json
import gradio as gr
from typing import List
from transformers import LayoutLMv3Processor, LayoutLMv3ForTokenClassification
from PIL import Image
import torch
# ✅ Fix: Add src to Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
from txagent.txagent import TxAgent
def sanitize_utf8(text: str) -> str:
return text.encode("utf-8", "ignore").decode("utf-8")
def clean_final_response(text: str) -> str:
cleaned = text.replace("[TOOL_CALLS]", "").strip()
responses = cleaned.split("[Final Analysis]")
if len(responses) <= 1:
return f"
"
panels = []
for i, section in enumerate(responses[1:], 1):
final = section.strip()
panels.append(
f""
f"
🧠 Final Analysis #{i}
"
f"
{final.replace(chr(10), '
')}
"
f"
"
)
return "".join(panels)
def use_layoutlmv3_on_image(image_path):
processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-large")
model = LayoutLMv3ForTokenClassification.from_pretrained("microsoft/layoutlmv3-large")
image = Image.open(image_path).convert("RGB")
encoding = processor(images=image, return_tensors="pt")
with torch.no_grad():
outputs = model(**encoding)
logits = outputs.logits
predicted_class = logits.argmax(-1)
tokens = processor.tokenizer.convert_ids_to_tokens(encoding['input_ids'][0])
text = " ".join([tokens[i] for i in range(len(tokens)) if predicted_class[0][i] != -100])
return json.dumps({"filename": os.path.basename(image_path), "content": text})
def convert_file_to_json(file_path: str, file_type: str) -> str:
try:
if file_type == "csv":
df = pd.read_csv(file_path, encoding_errors="replace", header=None, dtype=str, skip_blank_lines=False, on_bad_lines="skip")
elif file_type in ["xls", "xlsx"]:
try:
df = pd.read_excel(file_path, engine="openpyxl", header=None, dtype=str)
except:
df = pd.read_excel(file_path, engine="xlrd", header=None, dtype=str)
elif file_type == "pdf":
return use_layoutlmv3_on_image(file_path)
else:
return json.dumps({"error": f"Unsupported file type: {file_type}"})
if df is None or df.empty:
return json.dumps({"warning": f"No data extracted from: {file_path}"})
df = df.fillna("")
content = df.astype(str).values.tolist()
return json.dumps({"filename": os.path.basename(file_path), "rows": content})
except Exception as e:
return json.dumps({"error": f"Error reading {os.path.basename(file_path)}: {str(e)}"})
def chunk_text(text: str, max_tokens: int = 8192) -> List[str]:
chunks = []
words = text.split()
chunk = []
token_count = 0
for word in words:
token_count += len(word) // 4 + 1
if token_count > max_tokens:
chunks.append(" ".join(chunk))
chunk = [word]
token_count = len(word) // 4 + 1
else:
chunk.append(word)
if chunk:
chunks.append(" ".join(chunk))
return chunks
def create_ui(agent: TxAgent):
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("\U0001F4CB CPS: Clinical Patient Support System
")
chatbot = gr.Chatbot(label="CPS Assistant", height=600, type="tuples")
file_upload = gr.File(
label="Upload Medical File",
file_types=[".pdf", ".txt", ".docx", ".jpg", ".png", ".csv", ".xls", ".xlsx"],
file_count="multiple"
)
message_input = gr.Textbox(placeholder="Ask a biomedical question or just upload the files...", show_label=False)
send_button = gr.Button("Send", variant="primary")
conversation_state = gr.State([])
def handle_chat(message: str, history: list, conversation: list, uploaded_files: list, progress=gr.Progress()):
context = (
"You are an expert clinical AI assistant reviewing medical form or interview data. "
"Your job is to analyze this data and reason about any information or red flags that a human doctor might have overlooked. "
"Provide a **detailed and structured response**, including examples, supporting evidence from the form, and clinical rationale for why these items matter. "
"Ensure the output is informative and helpful for improving patient care. "
"Do not hallucinate. Base the response only on the provided form content. "
"End with a section labeled '[Final Analysis]' where you summarize key findings the doctor may have missed."
)
try:
history.append((message, "⏳ Processing your request..."))
yield history
extracted_text = ""
if uploaded_files and isinstance(uploaded_files, list):
total_files = len(uploaded_files)
for index, file in enumerate(uploaded_files):
if not hasattr(file, 'name'):
continue
path = file.name
extension = path.split(".")[-1].lower()
json_text = convert_file_to_json(path, extension)
extracted_text += sanitize_utf8(json_text) + "\n"
chunks = chunk_text(extracted_text.strip())
full_response = ""
for i, chunk in enumerate(chunks):
chunked_prompt = (
f"{context}\n\n--- Uploaded File Content (Chunk {i+1}/{len(chunks)}) ---\n\n{chunk}\n\n"
f"--- End of Chunk ---\n\nNow begin your analysis:"
)
generator = agent.run_gradio_chat(
message=chunked_prompt,
history=[],
temperature=0.3,
max_new_tokens=1024,
max_token=8192,
call_agent=False,
conversation=conversation,
uploaded_files=uploaded_files,
max_round=30
)
chunk_response = ""
for update in generator:
if isinstance(update, str):
chunk_response += update
elif isinstance(update, list):
for msg in update:
if hasattr(msg, 'content'):
chunk_response += msg.content
full_response += chunk_response + "\n\n"
full_response = clean_final_response(full_response.strip())
history[-1] = (message, full_response)
yield history
except Exception as chat_error:
print(f"Chat handling error: {chat_error}")
error_msg = "An error occurred while processing your request. Please try again."
if len(history) > 0 and history[-1][1].startswith("⏳"):
history[-1] = (history[-1][0], error_msg)
else:
history.append((message, error_msg))
yield history
inputs = [message_input, chatbot, conversation_state, file_upload]
send_button.click(fn=handle_chat, inputs=inputs, outputs=chatbot)
message_input.submit(fn=handle_chat, inputs=inputs, outputs=chatbot)
gr.Examples([
["Upload your medical form and ask what the doctor might've missed."],
["This patient was treated with antibiotics for UTI. What else should we check?"],
["Is there anything abnormal in the attached blood work report?"]
], inputs=message_input)
return demo