import sys import os import pandas as pd import pdfplumber import gradio as gr import re from typing import List, Dict, Optional # ✅ Fix: Add src to Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) from txagent.txagent import TxAgent def sanitize_utf8(text: str) -> str: """Clean text of problematic Unicode characters""" return text.encode('utf-8', 'ignore').decode('utf-8') def clean_final_response(response: str) -> str: """Remove tool calls and other artifacts from final response""" # Split on TOOL_CALLS if present if '[TOOL_CALLS]' in response: response = response.split('[TOOL_CALLS]')[0] # Remove any remaining special tokens response = re.sub(r'\[[A-Z_]+\]', '', response) return response.strip() def chunk_text(text: str, max_tokens: int = 8000) -> List[str]: """Split text into chunks based on token count estimate""" words = text.split() chunks = [] current_chunk = [] current_tokens = 0 for word in words: # Estimate tokens (roughly 1 token per 4 characters) word_tokens = len(word) // 4 + 1 if current_tokens + word_tokens > max_tokens and current_chunk: chunks.append(' '.join(current_chunk)) current_chunk = [word] current_tokens = word_tokens else: current_chunk.append(word) current_tokens += word_tokens if current_chunk: chunks.append(' '.join(current_chunk)) return chunks def extract_all_text_from_csv_or_excel(file_path: str, progress=None, index=0, total=1) -> str: """Extract text from spreadsheet files with error handling""" try: if not os.path.exists(file_path): return f"File not found: {file_path}" if progress: progress((index + 1) / total, desc=f"Reading spreadsheet: {os.path.basename(file_path)}") if file_path.endswith(".csv"): df = pd.read_csv(file_path, encoding="utf-8", errors="replace", low_memory=False) elif file_path.endswith((".xls", ".xlsx")): df = pd.read_excel(file_path, engine="openpyxl") else: return f"Unsupported spreadsheet format: {file_path}" lines = [] for _, row in df.iterrows(): line = " | ".join(str(cell) for cell in row if pd.notna(cell)) if line: lines.append(line) return f"📄 {os.path.basename(file_path)}\n\n" + "\n".join(lines) except Exception as e: return f"[Error reading {os.path.basename(file_path)}]: {str(e)}" def extract_all_text_from_pdf(file_path: str, progress=None, index=0, total=1) -> str: """Extract text from PDF files with error handling""" try: if not os.path.exists(file_path): return f"PDF not found: {file_path}" extracted = [] with pdfplumber.open(file_path) as pdf: num_pages = len(pdf.pages) for i, page in enumerate(pdf.pages): try: text = page.extract_text() or "" extracted.append(text.strip()) if progress: progress((index + (i / num_pages)) / total, desc=f"Reading PDF: {os.path.basename(file_path)} ({i+1}/{num_pages})") except Exception as e: extracted.append(f"[Error reading page {i+1}]: {str(e)}") return f"📄 {os.path.basename(file_path)}\n\n" + "\n\n".join(extracted) except Exception as e: return f"[Error reading PDF {os.path.basename(file_path)}]: {str(e)}" def create_ui(agent: TxAgent): with gr.Blocks(theme=gr.themes.Soft(), title="Clinical Patient Support System") as demo: gr.Markdown("

📋 CPS: Clinical Patient Support System

") # Fix: Changed type to 'messages' to match Gradio requirements chatbot = gr.Chatbot(label="CPS Assistant", height=600, type="messages") file_upload = gr.File( label="Upload Medical File", file_types=[".pdf", ".txt", ".docx", ".jpg", ".png", ".csv", ".xls", ".xlsx"], file_count="multiple" ) message_input = gr.Textbox( placeholder="Ask a biomedical question or just upload the files...", show_label=False ) send_button = gr.Button("Send", variant="primary") conversation_state = gr.State([]) def handle_chat(message: str, history: list, conversation: list, uploaded_files: list, progress=gr.Progress()): context = ( "You are an expert clinical AI assistant reviewing medical form or interview data. " "Your job is to analyze this data and reason about any information or red flags that a human doctor might have overlooked. " "Provide a **detailed and structured response**, including examples, supporting evidence from the form, and clinical rationale for why these items matter. " "Ensure the output is informative and helpful for improving patient care. " "Do not hallucinate. Base the response only on the provided form content. " "End with a section labeled '🧠 Final Analysis' where you summarize key findings the doctor may have missed." ) try: # Show processing message immediately history.append((message, "⏳ Processing your request...")) yield history extracted_text = "" if uploaded_files and isinstance(uploaded_files, list): total_files = len(uploaded_files) for index, file in enumerate(uploaded_files): if not hasattr(file, 'name'): continue path = file.name try: if path.endswith((".csv", ".xls", ".xlsx")): extracted_text += extract_all_text_from_csv_or_excel(path, progress, index, total_files) + "\n" elif path.endswith(".pdf"): extracted_text += extract_all_text_from_pdf(path, progress, index, total_files) + "\n" else: extracted_text += f"(Uploaded file: {os.path.basename(path)})\n" except Exception as file_error: extracted_text += f"[Error processing {os.path.basename(path)}]: {str(file_error)}\n" sanitized = sanitize_utf8(extracted_text.strip()) chunks = chunk_text(sanitized) full_response = "" for i, chunk in enumerate(chunks): chunked_prompt = ( f"{context}\n\n--- Uploaded File Content (Chunk {i+1}/{len(chunks)}) ---\n\n{chunk}\n\n" f"--- End of Chunk ---\n\nNow begin your analysis:" ) generator = agent.run_gradio_chat( message=chunked_prompt, history=[], temperature=0.3, max_new_tokens=1024, max_token=8192, call_agent=False, conversation=conversation, uploaded_files=uploaded_files, max_round=30 ) # Collect all updates from the generator chunk_response = "" for update in generator: if isinstance(update, str): chunk_response += update elif isinstance(update, list): # Handle list of messages for msg in update: if hasattr(msg, 'content'): chunk_response += msg.content full_response += chunk_response + "\n\n" # Clean up the final response full_response = clean_final_response(full_response.strip()) # Remove the processing message and add the final response history[-1] = (message, full_response) yield history except Exception as chat_error: print(f"Chat handling error: {chat_error}") error_msg = "An error occurred while processing your request. Please try again." if len(history) > 0 and history[-1][1].startswith("⏳"): history[-1] = (history[-1][0], error_msg) else: history.append((message, error_msg)) yield history inputs = [message_input, chatbot, conversation_state, file_upload] send_button.click(fn=handle_chat, inputs=inputs, outputs=chatbot) message_input.submit(fn=handle_chat, inputs=inputs, outputs=chatbot) gr.Examples([ ["Upload your medical form and ask what the doctor might've missed."], ["This patient was treated with antibiotics for UTI. What else should we check?"], ["Is there anything abnormal in the attached blood work report?"] ], inputs=message_input) return demo