tonic-discharge-guard / callbackmanager.py
sanggusti's picture
Attempt to fix, fingers crossed
6498b44
raw
history blame
28.6 kB
import gradio as gr
from meldrx import MeldRxAPI
import json
import os
import tempfile
from datetime import datetime
import traceback
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Import PDF utilities
from pdfutils import PDFGenerator, generate_discharge_summary
class CallbackManager:
def __init__(self, redirect_uri: str, client_secret: str = None):
client_id = os.getenv("APPID")
if not client_id:
raise ValueError("APPID environment variable not set.")
workspace_id = os.getenv("WORKSPACE_URL")
if not workspace_id:
raise ValueError("WORKSPACE_URL environment variable not set.")
self.api = MeldRxAPI(client_id, client_secret, workspace_id, redirect_uri)
self.auth_code = None
self.access_token = None
def get_auth_url(self) -> str:
return self.api.get_authorization_url()
def set_auth_code(self, code: str) -> str:
self.auth_code = code
if self.api.authenticate_with_code(code):
self.access_token = self.api.access_token
return f"Authentication successful! Access Token: {self.access_token[:10]}... (truncated)"
return "Authentication failed. Please check the code."
def get_patient_data(self) -> str:
"""Fetch patient data from MeldRx"""
try:
if not self.access_token:
logger.warning("Not authenticated when getting patient data")
return "Not authenticated. Please provide a valid authorization code first."
# For demo purposes, if there's no actual API connected, return mock data
# Remove this in production and use the real API call
if not hasattr(self.api, 'get_patients') or self.api.get_patients is None:
logger.info("Using mock patient data (no API connection)")
# Return mock FHIR bundle with patient data
mock_data = {
"resourceType": "Bundle",
"type": "searchset",
"total": 2,
"link": [],
"entry": [
{
"resource": {
"resourceType": "Patient",
"id": "patient1",
"name": [
{
"use": "official",
"family": "Smith",
"given": ["John"]
}
],
"gender": "male",
"birthDate": "1970-01-01",
"address": [
{
"city": "Boston",
"state": "MA",
"postalCode": "02108"
}
]
}
},
{
"resource": {
"resourceType": "Patient",
"id": "patient2",
"name": [
{
"use": "official",
"family": "Johnson",
"given": ["Jane"]
}
],
"gender": "female",
"birthDate": "1985-05-15",
"address": [
{
"city": "Cambridge",
"state": "MA",
"postalCode": "02139"
}
]
}
}
]
}
return json.dumps(mock_data, indent=2)
# Real implementation with API call
logger.info("Calling Meldrx API to get patients")
patients = self.api.get_patients()
if patients is not None:
return json.dumps(patients, indent=2) if patients else "No patient data returned."
return "Failed to retrieve patient data."
except Exception as e:
error_msg = f"Error in get_patient_data: {str(e)}"
logger.error(error_msg)
return f"Error retrieving patient data: {str(e)}"
def get_patient_documents(self, patient_id: str = None):
"""Fetch patient documents from MeldRx"""
if not self.access_token:
return "Not authenticated. Please provide a valid authorization code first."
try:
# This would call the actual MeldRx API to get documents for a specific patient
# For demonstration, we'll return mock document data
return [
{
"doc_id": "doc123",
"type": "clinical_note",
"date": "2023-01-16",
"author": "Dr. Sample Doctor",
"content": "Patient presented with symptoms of respiratory distress...",
},
{
"doc_id": "doc124",
"type": "lab_result",
"date": "2023-01-17",
"author": "Lab System",
"content": "CBC results: WBC 7.5, RBC 4.2, Hgb 14.1...",
}
]
except Exception as e:
return f"Error retrieving patient documents: {str(e)}"
def display_form(
first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
doctor_city, doctor_state, doctor_zip,
admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
diagnosis, procedures, medications, preparer_name, preparer_job_title
):
form = f"""
**Patient Discharge Form**
- Name: {first_name} {middle_initial} {last_name}
- Date of Birth: {dob}, Age: {age}, Sex: {sex}
- Address: {address}, {city}, {state}, {zip_code}
- Doctor: {doctor_first_name} {doctor_middle_initial} {doctor_last_name}
- Hospital/Clinic: {hospital_name}
- Doctor Address: {doctor_address}, {doctor_city}, {doctor_state}, {doctor_zip}
- Admission Date: {admission_date}, Source: {referral_source}, Method: {admission_method}
- Discharge Date: {discharge_date}, Reason: {discharge_reason}
- Date of Death: {date_of_death}
- Diagnosis: {diagnosis}
- Procedures: {procedures}
- Medications: {medications}
- Prepared By: {preparer_name}, {preparer_job_title}
"""
return form
def process_fhir_bundle(fhir_bundle):
"""Extract and structure patient data from FHIR bundle"""
patients = []
if not isinstance(fhir_bundle, dict):
# Try to parse the data if it's a string
try:
fhir_bundle = json.loads(fhir_bundle)
except:
return []
for entry in fhir_bundle.get("entry", []):
resource = entry.get("resource", {})
if resource.get("resourceType") == "Patient":
patients.append(resource)
return patients
def create_patient_stripe_data(patient):
"""Extract data from patient to create a stripe (without components or event handlers)"""
# Safely extract name components
official_name = next(
(n for n in patient.get("name", []) if n.get("use") == "official"),
next((n for n in patient.get("name", [])), {}) # Fallback to first name if no official
)
given_names = " ".join(official_name.get("given", ["Unknown"]))
family_name = official_name.get("family", "Unknown")
# Extract demographic information
gender = patient.get("gender", "unknown").capitalize()
birth_date = patient.get("birthDate", "Unknown")
patient_id = patient.get("id", "Unknown")
# Extract address information
address = patient.get("address", [{}])[0] if patient.get("address") else {}
city = address.get("city", "Unknown")
state = address.get("state", "")
postal_code = address.get("postalCode", "")
# Return structured data that can be used to display the patient info
return {
"name": f"{given_names} {family_name}",
"gender": gender,
"birth_date": birth_date,
"patient_id": patient_id,
"city": city,
"state": state,
"postal_code": postal_code
}
def update_patient_display(patient_data_json):
"""
Update the patient display with parsed FHIR data.
Returns processed data without trying to update UI components directly.
"""
try:
# Handle the case where patient_data_json is an error message
if isinstance(patient_data_json, str) and (patient_data_json.startswith("Not authenticated") or patient_data_json.startswith("Failed")):
logger.warning(f"Error in patient data: {patient_data_json}")
return {
"has_pagination": False,
"patients": [],
"metadata": f"Error: {patient_data_json}",
"raw_data": None
}
# Parse JSON if it's a string
if isinstance(patient_data_json, str):
try:
fhir_bundle = json.loads(patient_data_json)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)} - Data: {patient_data_json[:100]}...")
return {
"has_pagination": False,
"patients": [],
"metadata": f"Invalid JSON data: {str(e)}",
"raw_data": None
}
else:
fhir_bundle = patient_data_json
# Safety check
if not isinstance(fhir_bundle, dict):
logger.error(f"Unexpected data type: {type(fhir_bundle)}")
return {
"has_pagination": False,
"patients": [],
"metadata": f"Expected dictionary but got {type(fhir_bundle)}",
"raw_data": None
}
patients = process_fhir_bundle(fhir_bundle)
if not patients:
logger.warning("No patients found in data")
return {
"has_pagination": False,
"patients": [],
"metadata": "No patients found in the data",
"raw_data": None
}
# Check pagination
has_prev = any(link.get("relation") == "previous" for link in fhir_bundle.get("link", []))
has_next = any(link.get("relation") == "next" for link in fhir_bundle.get("link", []))
# Create patient data for display
patient_data_list = []
for patient in patients:
try:
patient_data = create_patient_stripe_data(patient)
patient_data_list.append(patient_data)
except Exception as e:
logger.error(f"Error processing patient data: {str(e)}")
# Continue with other patients if one fails
logger.info(f"Processed {len(patient_data_list)} patients successfully")
# Generate metadata string
metadata_text = f"""
**Bundle Metadata**
Type: {fhir_bundle.get('type', 'Unknown')}
Total Patients: {fhir_bundle.get('total', len(patients))}
Last Updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return {
"has_pagination": has_prev or has_next,
"patients": patient_data_list,
"metadata": metadata_text,
"raw_data": {"patients": patients, "bundle": fhir_bundle}
}
except Exception as e:
error_msg = f"Error loading patient data: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return {
"has_pagination": False,
"patients": [],
"metadata": error_msg,
"raw_data": None
}
def fetch_and_display_patients():
try:
logger.info("Fetching patient data...")
# Get data from API
data = CALLBACK_MANAGER.get_patient_data()
logger.info(f"Received patient data: {data[:100]}..." if isinstance(data, str) else "Received non-string data")
# Handle case when the data is just an error message
if data.startswith("Not authenticated") or data.startswith("Failed"):
logger.warning(f"Authentication issue: {data}")
processed_data = {
"has_pagination": False,
"patients": [],
"metadata": f"Error: {data}",
"raw_data": None
}
else:
# Process the results
try:
processed_data = update_patient_display(data)
logger.info("Successfully processed patient display data")
except Exception as e:
logger.error(f"Error in update_patient_display: {str(e)}\n{traceback.format_exc()}")
processed_data = {
"has_pagination": False,
"patients": [],
"metadata": f"Error processing patient data: {str(e)}",
"raw_data": None
}
# Return the raw JSON and processed data
return [
data, # For the raw JSON output
processed_data["has_pagination"], # Update pagination visibility
format_patient_html(processed_data["patients"]), # HTML for patients
processed_data["metadata"], # Metadata text
processed_data["raw_data"] # State data
]
except Exception as e:
error_msg = f"Unexpected error fetching patient data: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return [
f"Error: {str(e)}",
False,
"",
error_msg,
None
]
def format_patient_html(patients):
"""Format patient list as HTML for display in markdown component"""
if not patients:
return "No patients found."
html = ""
for i, patient in enumerate(patients):
html += f"""
<div style="padding: 10px; margin: 10px 0; border: 1px solid #ccc; border-radius: 5px; background-color: #f9f9f9;">
<h3>{patient['name']}</h3>
<p>{patient['gender']} | Born: {patient['birth_date']}</p>
<p>{patient['city']}, {patient['state']} {patient['postal_code']}</p>
<p>ID: {patient['patient_id']}</p>
<hr/>
<div style="display: flex; justify-content: space-between;">
<span>Status: Pending Review</span>
<button onclick="alert('Documents for patient {patient['patient_id']}')">View Documents</button>
</div>
</div>
"""
return html
def generate_pdf_from_form(
first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
doctor_city, doctor_state, doctor_zip,
admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
diagnosis, procedures, medications, preparer_name, preparer_job_title
):
"""Generate a PDF discharge form using the provided data"""
# Create PDF generator
pdf_gen = PDFGenerator()
# Format data for PDF generation
patient_info = {
"first_name": first_name,
"last_name": last_name,
"dob": dob,
"age": age,
"sex": sex,
"mobile": "", # Not collected in the form
"address": address,
"city": city,
"state": state,
"zip": zip_code
}
discharge_info = {
"date_of_admission": admission_date,
"date_of_discharge": discharge_date,
"source_of_admission": referral_source,
"mode_of_admission": admission_method,
"discharge_against_advice": "Yes" if discharge_reason == "Discharge Against Advice" else "No"
}
diagnosis_info = {
"diagnosis": diagnosis,
"operation_procedure": procedures,
"treatment": "", # Not collected in the form
"follow_up": "" # Not collected in the form
}
medication_info = {
"medications": [medications] if medications else [],
"instructions": "" # Not collected in the form
}
prepared_by = {
"name": preparer_name,
"title": preparer_job_title,
"signature": "" # Not collected in the form
}
# Generate PDF
pdf_buffer = pdf_gen.generate_discharge_form(
patient_info,
discharge_info,
diagnosis_info,
medication_info,
prepared_by
)
# Create temporary file to save the PDF
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdf')
temp_file.write(pdf_buffer.read())
temp_file_path = temp_file.name
temp_file.close()
return temp_file_path
def generate_pdf_from_meldrx(patient_data):
"""Generate a PDF using patient data from MeldRx"""
if isinstance(patient_data, str):
# If it's a string (error message or JSON string), try to parse it
try:
patient_data = json.loads(patient_data)
except:
return None, "Invalid patient data format"
if not patient_data:
return None, "No patient data available"
try:
# For demonstration, we'll use the first patient in the list if it's a list
if isinstance(patient_data, list) and len(patient_data):
patient = patient_data[0]
else:
patient = patient_data
# Extract patient info
patient_info = {
"name": f"{patient.get('name', {}).get('given', [''])[0]} {patient.get('name', {}).get('family', '')}",
"dob": patient.get('birthDate', 'Unknown'),
"patient_id": patient.get('id', 'Unknown'),
"admission_date": datetime.now().strftime("%Y-%m-%d"), # Mock data
"physician": "Dr. Provider" # Mock data
}
# Mock LLM-generated content
llm_content = {
"diagnosis": "Diagnosis information would be generated by LLM",
"treatment": "Treatment summary would be generated by LLM",
"medications": "Medication list would be generated by LLM",
"follow_up": "Follow-up instructions would be generated by LLM",
"special_instructions": "Special instructions would be generated by LLM"
}
# Create discharge summary
output_dir = tempfile.mkdtemp()
pdf_path = generate_discharge_summary(patient_info, llm_content, output_dir)
return pdf_path, "PDF generated successfully"
except Exception as e:
return None, f"Error generating PDF: {str(e)}"
CALLBACK_MANAGER = CallbackManager(
redirect_uri="https://multitransformer-discharge-guard.hf.space/callback",
client_secret=None
)
with gr.Blocks() as demo:
gr.Markdown("# Patient Discharge Form with MeldRx Integration")
with gr.Tab("Authenticate with MeldRx"):
gr.Markdown("## SMART on FHIR Authentication")
auth_url_output = gr.Textbox(label="Authorization URL", value=CALLBACK_MANAGER.get_auth_url(), interactive=False)
gr.Markdown("Copy the URL above, open it in a browser, log in, and paste the 'code' from the redirect URL below.")
auth_code_input = gr.Textbox(label="Authorization Code")
auth_submit = gr.Button("Submit Code")
auth_result = gr.Textbox(label="Authentication Result")
patient_data_button = gr.Button("Fetch Patient Data")
patient_data_output = gr.Textbox(label="Patient Data")
# Add button to generate PDF from MeldRx data
meldrx_pdf_button = gr.Button("Generate PDF from MeldRx Data")
meldrx_pdf_status = gr.Textbox(label="PDF Generation Status")
meldrx_pdf_download = gr.File(label="Download Generated PDF")
auth_submit.click(fn=CALLBACK_MANAGER.set_auth_code, inputs=auth_code_input, outputs=auth_result)
with gr.Tab("Show Patients Log"):
gr.Markdown("## Patient Dashboard")
# Store patient data in a state variable
patient_data_state = gr.State()
# Pagination controls
with gr.Row() as pagination_row:
prev_btn = gr.Button("Previous Page", visible=False)
next_btn = gr.Button("Next Page", visible=False)
# Replace patient stripes with a single markdown component
patient_display = gr.HTML("No patient data loaded.")
# Metadata display
metadata_md = gr.Markdown()
# Refresh button
refresh_btn = gr.Button("Refresh Data")
with gr.Tab("Discharge Form"):
gr.Markdown("## Patient Details")
with gr.Row():
first_name = gr.Textbox(label="First Name")
last_name = gr.Textbox(label="Last Name")
middle_initial = gr.Textbox(label="Middle Initial")
with gr.Row():
dob = gr.Textbox(label="Date of Birth")
age = gr.Textbox(label="Age")
sex = gr.Textbox(label="Sex")
address = gr.Textbox(label="Address")
with gr.Row():
city = gr.Textbox(label="City")
state = gr.Textbox(label="State")
zip_code = gr.Textbox(label="Zip Code")
gr.Markdown("## Primary Healthcare Professional Details")
with gr.Row():
doctor_first_name = gr.Textbox(label="Doctor's First Name")
doctor_last_name = gr.Textbox(label="Doctor's Last Name")
doctor_middle_initial = gr.Textbox(label="Middle Initial")
hospital_name = gr.Textbox(label="Hospital/Clinic Name")
doctor_address = gr.Textbox(label="Address")
with gr.Row():
doctor_city = gr.Textbox(label="City")
doctor_state = gr.Textbox(label="State")
doctor_zip = gr.Textbox(label="Zip Code")
gr.Markdown("## Admission and Discharge Details")
with gr.Row():
admission_date = gr.Textbox(label="Date of Admission")
referral_source = gr.Textbox(label="Source of Referral")
admission_method = gr.Textbox(label="Method of Admission")
with gr.Row():
discharge_date = gr.Textbox(label="Date of Discharge")
discharge_reason = gr.Radio(["Treated", "Transferred", "Discharge Against Advice", "Patient Died"], label="Discharge Reason")
date_of_death = gr.Textbox(label="Date of Death (if applicable)")
gr.Markdown("## Diagnosis & Procedures")
diagnosis = gr.Textbox(label="Diagnosis")
procedures = gr.Textbox(label="Operation & Procedures")
gr.Markdown("## Medication Details")
medications = gr.Textbox(label="Medication on Discharge")
gr.Markdown("## Prepared By")
with gr.Row():
preparer_name = gr.Textbox(label="Name")
preparer_job_title = gr.Textbox(label="Job Title")
# Add buttons for both display form and generate PDF
with gr.Row():
submit_display = gr.Button("Display Form")
submit_pdf = gr.Button("Generate PDF")
# Output areas
form_output = gr.Markdown()
pdf_output = gr.File(label="Download PDF")
# Connect the display form button
submit_display.click(
display_form,
inputs=[
first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
doctor_city, doctor_state, doctor_zip,
admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
diagnosis, procedures, medications, preparer_name, preparer_job_title
],
outputs=form_output
)
# Connect the generate PDF button
submit_pdf.click(
generate_pdf_from_form,
inputs=[
first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
doctor_city, doctor_state, doctor_zip,
admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
diagnosis, procedures, medications, preparer_name, preparer_job_title
],
outputs=pdf_output
)
# Define a function to handle fetching and displaying patients with better error handling
def fetch_and_display_patients():
try:
logger.info("Fetching patient data...")
# Get data from API
data = CALLBACK_MANAGER.get_patient_data()
logger.info(f"Received patient data: {data[:100]}..." if isinstance(data, str) else "Received non-string data")
# Handle case when the data is just an error message
if data.startswith("Not authenticated") or data.startswith("Failed"):
logger.warning(f"Authentication issue: {data}")
return [
data, # For the raw JSON output
{"visible": False}, # For pagination_row
[], # For patient_stripes
{"value": f"Error: {data}"}, # For metadata_md
None # For patient_data_state
]
# Process the results for UI display
try:
pagination_update, stripes_update, metadata_update, state_update = update_patient_display(data)
logger.info("Successfully processed patient display data")
return [
data, # For the raw JSON output
pagination_update,
stripes_update,
metadata_update,
state_update
]
except Exception as e:
logger.error(f"Error in update_patient_display: {str(e)}\n{traceback.format_exc()}")
# Return at least the raw data, even if display processing fails
return [
data,
{"visible": False},
[],
{"value": f"Error processing patient data: {str(e)}"},
None
]
except Exception as e:
error_msg = f"Unexpected error fetching patient data: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return [
f"Error: {str(e)}",
{"visible": False},
[],
{"value": error_msg},
None
]
# Connect the patient data button to both display the raw JSON and update the patient display
patient_data_button.click(
fn=fetch_and_display_patients,
inputs=None,
outputs=[patient_data_output, pagination_row, patient_stripes, metadata_md, patient_data_state]
)
# Add functionality for PDF generation from MeldRx data
meldrx_pdf_button.click(
fn=generate_pdf_from_meldrx,
inputs=patient_data_output,
outputs=[meldrx_pdf_download, meldrx_pdf_status]
)
# Connect refresh button in patient log tab
refresh_btn.click(
fn=fetch_and_display_patients,
inputs=None,
outputs=[patient_data_output, pagination_row, patient_stripes, metadata_md, patient_data_state]
)
demo.launch()