import sys import os import pandas as pd import pdfplumber import gradio as gr from tabulate import tabulate from typing import List, Optional # ✅ Fix: Add src to Python path with correct parentheses sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) from txagent.txagent import TxAgent def safe_extract_table_data(table: List[List[str]]) -> List[str]: extracted_rows = [] if not table or not isinstance(table, list): return extracted_rows for row in table: if not row or not isinstance(row, list): continue try: clean_row = [str(cell) if cell is not None else "" for cell in row] if any(clean_row): extracted_rows.append("\t".join(clean_row)) except Exception as e: print(f"Error processing table row: {e}") continue return extracted_rows def extract_all_text_from_csv_or_excel(file_path: str, progress=None, index=0, total=1) -> str: try: if not os.path.exists(file_path): return f"File not found: {file_path}" if file_path.endswith(".csv"): df = pd.read_csv(file_path, encoding="utf-8", errors="replace", low_memory=False) elif file_path.endswith((".xls", ".xlsx")): df = pd.read_excel(file_path, engine="openpyxl") else: return f"Unsupported spreadsheet format: {file_path}" if progress: progress((index + 1) / total, desc=f"Processed table: {os.path.basename(file_path)}") group_column = None for col in ["Booking Number", "Form Name"]: if col in df.columns: group_column = col break if group_column: try: groups = df.groupby(group_column) result = [] for group_name, group_df in groups: if group_name is None: continue result.append(f"\n### Group: {group_name}\n") result.append(tabulate(group_df, headers="keys", tablefmt="github", showindex=False)) return "\n".join(result) if result else tabulate(df, headers="keys", tablefmt="github", showindex=False) except Exception as e: print(f"Error during grouping: {e}") return tabulate(df, headers="keys", tablefmt="github", showindex=False) else: return tabulate(df, headers="keys", tablefmt="github", showindex=False) except Exception as e: return f"Error parsing file {os.path.basename(file_path)}: {str(e)}" def extract_all_text_from_pdf(file_path: str, progress=None, index=0, total=1) -> str: extracted = [] try: if not os.path.exists(file_path): return f"PDF file not found: {file_path}" with pdfplumber.open(file_path) as pdf: num_pages = len(pdf.pages) if hasattr(pdf, 'pages') else 0 for i, page in enumerate(pdf.pages if num_pages > 0 else []): try: tables = page.extract_tables() if hasattr(page, 'extract_tables') else [] for table in tables if tables else []: extracted.extend(safe_extract_table_data(table)) if progress and num_pages > 0: progress((index + (i / num_pages)) / total, desc=f"Parsing PDF: {os.path.basename(file_path)} ({i+1}/{num_pages})") except Exception as page_error: print(f"Error processing page {i+1}: {page_error}") continue return "\n".join(extracted) if extracted else f"No extractable content found in {os.path.basename(file_path)}" except Exception as e: return f"Error parsing PDF {os.path.basename(file_path)}: {str(e)}" def create_ui(agent: TxAgent): with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("

📋 CPS: Clinical Patient Support System

") chatbot = gr.Chatbot(label="CPS Assistant", height=600, type="messages") file_upload = gr.File( label="Upload Medical File", file_types=[".pdf", ".txt", ".docx", ".jpg", ".png", ".csv", ".xls", ".xlsx"], file_count="multiple" ) message_input = gr.Textbox(placeholder="Ask a biomedical question or just upload the files...", show_label=False) send_button = gr.Button("Send", variant="primary") conversation_state = gr.State([]) def handle_chat(message: str, history: list, conversation: list, uploaded_files: list, progress=gr.Progress()): context = ( "You are an expert clinical AI assistant reviewing medical form or interview data. " "Your job is to analyze this data and reason about any information or red flags that a human doctor might have overlooked. " "Provide a **detailed and structured response**, including examples, supporting evidence from the form, and clinical rationale for why these items matter. " "Ensure the output is informative and helpful for improving patient care. " "Do not hallucinate. Base the response only on the provided form content. " "End with a section labeled '🧠 Final Analysis' where you summarize key findings the doctor may have missed." ) try: extracted_text = "" if uploaded_files and isinstance(uploaded_files, list): total_files = len(uploaded_files) for index, file in enumerate(uploaded_files): if not hasattr(file, 'name'): continue path = file.name try: if path.endswith((".csv", ".xls", ".xlsx")): extracted_text += extract_all_text_from_csv_or_excel(path, progress, index, total_files) + "\n" elif path.endswith(".pdf"): extracted_text += extract_all_text_from_pdf(path, progress, index, total_files) + "\n" else: extracted_text += f"(Uploaded file: {os.path.basename(path)})\n" if progress: progress((index + 1) / total_files, desc=f"Skipping unsupported file: {os.path.basename(path)}") except Exception as file_error: print(f"Error processing file {path}: {file_error}") extracted_text += f"\n[Error processing file: {os.path.basename(path)}]\n" continue message = f"{context}\n\n---\n{extracted_text.strip()}\n---\n\nBegin your reasoning." final_response = None generator = agent.run_gradio_chat( message=message, history=history, temperature=0.3, max_new_tokens=1024, max_token=8192, call_agent=False, conversation=conversation, uploaded_files=uploaded_files, max_round=30 ) for update in generator: try: if isinstance(update, list): cleaned = [ msg for msg in update if hasattr(msg, 'role') and not ( msg.role == "assistant" and hasattr(msg, 'content') and msg.content.strip().startswith("🧰") ) ] if cleaned: final_response = cleaned yield cleaned else: if isinstance(update, str) and not update.strip().startswith("🧰"): yield update.encode("utf-8", "replace").decode("utf-8") except Exception as update_error: print(f"Error processing update: {update_error}") continue except Exception as chat_error: print(f"Chat handling error: {chat_error}") yield "An error occurred while processing your request. Please try again." inputs = [message_input, chatbot, conversation_state, file_upload] send_button.click(fn=handle_chat, inputs=inputs, outputs=chatbot) message_input.submit(fn=handle_chat, inputs=inputs, outputs=chatbot) gr.Examples([ ["Upload your medical form and ask what the doctor might've missed."], ["This patient was treated with antibiotics for UTI. What else should we check?"], ["Is there anything abnormal in the attached blood work report?"] ], inputs=message_input) return demo