Spaces:
Sleeping
Sleeping
import streamlit as st | |
import base64 | |
import requests | |
from PIL import Image | |
from io import BytesIO | |
import fitz # PyMuPDF | |
import os | |
# Configuration - Replace with your API key | |
GEMINI_API_KEY = st.secrets["GEMINI_API_KEY"] | |
GEMINI_MODEL = "gemini-2.0-flash" | |
DOCUMENT_TYPES = [ | |
"Insurance Policies", "Explanation of Benefits (EOBs)", | |
"Claims (Approved, Denied, or Pending)", "Visit Summaries", | |
"Test Results (Lab Reports, Imaging Reports)", | |
"Prescriptions (E-Prescriptions, Handwritten)", | |
"Discharge Summaries", "Medical Bills", "Payment Statements", | |
"Pharmacy Receipts", "Prior Authorization Requests", | |
"Consent Forms", "Referral Letters", "Others" | |
] | |
def initialize_session_state(): | |
"""Initialize all session state variables""" | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
if "processed_doc" not in st.session_state: | |
st.session_state.processed_doc = None | |
if "doc_preview" not in st.session_state: | |
st.session_state.doc_preview = None | |
def encode_file(uploaded_file): | |
"""Safely encode different file types to base64""" | |
try: | |
file_bytes = uploaded_file.getvalue() | |
if uploaded_file.type == "application/pdf": | |
# Convert PDF to image using PyMuPDF | |
pdf = fitz.open(stream=BytesIO(file_bytes)) | |
page = pdf[0] # Get the first page | |
pix = page.get_pixmap() | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
img_byte_arr = BytesIO() | |
img.save(img_byte_arr, format='JPEG') | |
return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') | |
elif uploaded_file.type in ["image/webp", "image/avif", "image/jpeg", "image/png", "image/gif"]: | |
img = Image.open(BytesIO(file_bytes)) | |
img_byte_arr = BytesIO() | |
img.save(img_byte_arr, format='JPEG') | |
return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') | |
elif uploaded_file.type == "text/plain": | |
# Convert TXT to image | |
text = file_bytes.decode('utf-8') | |
img = Image.new('RGB', (800, 600), color = (73, 109, 137)) | |
d = ImageDraw.Draw(img) | |
d.text((10,10), text, fill=(255,255,0)) | |
img_byte_arr = BytesIO() | |
img.save(img_byte_arr, format='JPEG') | |
return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') | |
return base64.b64encode(file_bytes).decode('utf-8') | |
except Exception as e: | |
st.error(f"File processing error: {str(e)}") | |
return None | |
def query_gemini(prompt, image_b64=None): | |
"""Handle Gemini API communication""" | |
url = f"https://generativelanguage.googleapis.com/v1/models/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}" | |
parts = [{"text": prompt}] | |
if image_b64: | |
parts.append({ | |
"inline_data": { | |
"mime_type": "image/jpeg", | |
"data": image_b64 | |
} | |
}) | |
try: | |
response = requests.post( | |
url, | |
json={"contents": [{"parts": parts}]}, | |
headers={"Content-Type": "application/json"}, | |
timeout=30 | |
) | |
response.raise_for_status() | |
return response.json()["candidates"][0]["content"]["parts"][0]["text"] | |
except Exception as e: | |
st.error(f"API Error: {str(e)}") | |
return None | |
def process_document(): | |
"""Handle document processing pipeline""" | |
uploaded_file = st.session_state.uploaded_file | |
if not uploaded_file: | |
return | |
try: | |
with st.spinner("Analyzing document..."): | |
# Convert to base64 | |
image_b64 = encode_file(uploaded_file) | |
if not image_b64: | |
return | |
# Generate preview | |
if uploaded_file.type == "application/pdf": | |
pdf = fitz.open(stream=BytesIO(uploaded_file.getvalue())) | |
page = pdf[0] # Get the first page | |
pix = page.get_pixmap() | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
st.session_state.doc_preview = img | |
elif uploaded_file.type in ["image/webp", "image/avif", "image/jpeg", "image/png", "image/gif"]: | |
st.session_state.doc_preview = Image.open(uploaded_file) | |
elif uploaded_file.type == "text/plain": | |
text = uploaded_file.getvalue().decode('utf-8') | |
img = Image.new('RGB', (800, 600), color = (73, 109, 137)) | |
d = ImageDraw.Draw(img) | |
d.text((10,10), text, fill=(255,255,0)) | |
st.session_state.doc_preview = img | |
# Classify document | |
classify_prompt = f"Classify this healthcare document into one of: {DOCUMENT_TYPES}. Respond only with the category name." | |
doc_type = query_gemini(classify_prompt, image_b64) or "Others" | |
# Store results | |
st.session_state.processed_doc = { | |
"type": doc_type, | |
"content": image_b64, | |
"summary": query_gemini("Create a detailed structured summary of this healthcare document.", image_b64) | |
} | |
except Exception as e: | |
st.error(f"Processing failed: {str(e)}") | |
st.session_state.processed_doc = None | |
def handle_chat_query(): | |
"""Process user chat input""" | |
user_input = st.session_state.chat_input | |
if not user_input or not st.session_state.processed_doc: | |
return | |
prompt = f""" | |
Document Context: | |
- Type: {st.session_state.processed_doc['type']} | |
- Summary: {st.session_state.processed_doc['summary']} | |
Question: {user_input} | |
Answer concisely and factually. If unsure, state "Information not found". | |
""" | |
with st.spinner("Generating response..."): | |
response = query_gemini(prompt, st.session_state.processed_doc['content']) | |
st.session_state.chat_history.append(("user", user_input)) | |
st.session_state.chat_history.append(("assistant", response or "Could not generate response")) | |
# UI Layout | |
def main(): | |
st.set_page_config(page_title="Healthcare Document Assistant", layout="wide") | |
initialize_session_state() | |
# Sidebar Section | |
with st.sidebar: | |
st.header("Document Management") | |
# Preview above upload button | |
if st.session_state.doc_preview: | |
st.subheader("Preview") | |
st.image(st.session_state.doc_preview, use_container_width=True) | |
# Upload button | |
st.file_uploader( | |
"Upload Document", | |
type=["pdf", "webp", "avif", "jpg", "jpeg", "png", "gif", "txt"], | |
key="uploaded_file", | |
on_change=process_document | |
) | |
# Document type below upload button | |
if st.session_state.processed_doc: | |
st.divider() | |
st.subheader("Document Type") | |
st.markdown(f"**{st.session_state.processed_doc['type']}**") | |
# Main Content | |
st.title("Healthcare Document Assistant") | |
if st.session_state.processed_doc: | |
# Document Summary | |
st.subheader("Document Summary") | |
st.markdown(st.session_state.processed_doc['summary']) | |
# Chat Interface | |
st.divider() | |
st.subheader("Document Q&A") | |
# Chat history | |
for role, message in st.session_state.chat_history: | |
with st.chat_message(role.capitalize()): | |
st.markdown(message) | |
# Chat input | |
st.chat_input( | |
"Ask about the document...", | |
key="chat_input", | |
on_submit=handle_chat_query | |
) | |
else: | |
st.info("Please upload a document to begin analysis") | |
if __name__ == "__main__": | |
main() |