import sys import os import pandas as pd import pdfplumber import gradio as gr from typing import List # ✅ Fix: Add src to Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) from txagent.txagent import TxAgent def extract_all_text_from_csv_or_excel(file_path: str, progress=None, index=0, total=1) -> str: try: if not os.path.exists(file_path): return f"File not found: {file_path}" if progress: progress((index + 1) / total, desc=f"Reading spreadsheet: {os.path.basename(file_path)}") if file_path.endswith(".csv"): df = pd.read_csv(file_path, encoding="utf-8", errors="replace", low_memory=False) elif file_path.endswith((".xls", ".xlsx")): df = pd.read_excel(file_path, engine="openpyxl") else: return f"Unsupported spreadsheet format: {file_path}" lines = [] for _, row in df.iterrows(): line = " | ".join(str(cell) for cell in row if pd.notna(cell)) if line: lines.append(line) return f"\ud83d\udcc4 {os.path.basename(file_path)}\n\n" + "\n".join(lines) except Exception as e: return f"[Error reading {os.path.basename(file_path)}]: {str(e)}" def extract_all_text_from_pdf(file_path: str, progress=None, index=0, total=1) -> str: try: if not os.path.exists(file_path): return f"PDF not found: {file_path}" extracted = [] with pdfplumber.open(file_path) as pdf: num_pages = len(pdf.pages) for i, page in enumerate(pdf.pages): try: text = page.extract_text() or "" extracted.append(text.strip()) if progress: progress((index + (i / num_pages)) / total, desc=f"Reading PDF: {os.path.basename(file_path)} ({i+1}/{num_pages})") except Exception as e: extracted.append(f"[Error reading page {i+1}]: {str(e)}") return f"\ud83d\udcc4 {os.path.basename(file_path)}\n\n" + "\n\n".join(extracted) except Exception as e: return f"[Error reading PDF {os.path.basename(file_path)}]: {str(e)}" def create_ui(agent: TxAgent): with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("

\ud83d\udccb CPS: Clinical Patient Support System

") chatbot = gr.Chatbot(label="CPS Assistant", height=600, type="messages") file_upload = gr.File( label="Upload Medical File", file_types=[".pdf", ".txt", ".docx", ".jpg", ".png", ".csv", ".xls", ".xlsx"], file_count="multiple" ) message_input = gr.Textbox(placeholder="Ask a biomedical question or just upload the files...", show_label=False) send_button = gr.Button("Send", variant="primary") conversation_state = gr.State([]) def handle_chat(message: str, history: list, conversation: list, uploaded_files: list, progress=gr.Progress()): context = ( "You are an expert clinical AI assistant reviewing medical form or interview data. " "Your job is to analyze this data and reason about any information or red flags that a human doctor might have overlooked. " "Provide a **detailed and structured response**, including examples, supporting evidence from the form, and clinical rationale for why these items matter. " "Ensure the output is informative and helpful for improving patient care. " "Do not hallucinate. Base the response only on the provided form content. " "End with a section labeled '🧠 Final Analysis' where you summarize key findings the doctor may have missed." ) try: extracted_text = "" if uploaded_files and isinstance(uploaded_files, list): total_files = len(uploaded_files) for index, file in enumerate(uploaded_files): if not hasattr(file, 'name'): continue path = file.name try: if path.endswith((".csv", ".xls", ".xlsx")): extracted_text += extract_all_text_from_csv_or_excel(path, progress, index, total_files) + "\n" elif path.endswith(".pdf"): extracted_text += extract_all_text_from_pdf(path, progress, index, total_files) + "\n" else: extracted_text += f"(Uploaded file: {os.path.basename(path)})\n" except Exception as file_error: extracted_text += f"[Error processing file: {os.path.basename(path)}] — {str(file_error)}\n" continue message = ( f"{context}\n\n--- Uploaded File Content ---\n\n{extracted_text.strip()}\n\n--- End of File ---\n\nNow begin your reasoning:" ) generator = agent.run_gradio_chat( message=message, history=history, temperature=0.3, max_new_tokens=1024, max_token=8192, call_agent=False, conversation=conversation, uploaded_files=uploaded_files, max_round=30 ) for update in generator: try: if isinstance(update, list): cleaned = [ msg for msg in update if hasattr(msg, 'role') and not ( msg.role == "assistant" and hasattr(msg, 'content') and msg.content.strip().startswith("\ud83e\udde0") ) ] if cleaned: yield cleaned elif isinstance(update, str) and not update.strip().startswith("\ud83e\udde0"): yield update.encode("utf-8", "replace").decode("utf-8") except Exception as update_error: print(f"Error processing update: {update_error}") continue except Exception as chat_error: print(f"Chat handling error: {chat_error}") yield "An error occurred while processing your request. Please try again." inputs = [message_input, chatbot, conversation_state, file_upload] send_button.click(fn=handle_chat, inputs=inputs, outputs=chatbot) message_input.submit(fn=handle_chat, inputs=inputs, outputs=chatbot) gr.Examples([ ["Upload your medical form and ask what the doctor might've missed."], ["This patient was treated with antibiotics for UTI. What else should we check?"], ["Is there anything abnormal in the attached blood work report?"] ], inputs=message_input) return demo