File size: 7,868 Bytes
b9022c3
 
 
 
 
9087ae6
1dc1376
b9022c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9087ae6
 
 
 
 
b9022c3
9087ae6
b9022c3
 
1dc1376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b9022c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9087ae6
 
 
 
 
1dc1376
b9022c3
1dc1376
 
 
 
 
 
b9022c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1dc1376
b9022c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import streamlit as st
import base64
import requests
from PIL import Image
from io import BytesIO
import fitz  # PyMuPDF
import os

# Configuration - Replace with your API key
GEMINI_API_KEY = st.secrets["GEMINI_API_KEY"]
GEMINI_MODEL = "gemini-2.0-flash"

DOCUMENT_TYPES = [
    "Insurance Policies", "Explanation of Benefits (EOBs)",
    "Claims (Approved, Denied, or Pending)", "Visit Summaries",
    "Test Results (Lab Reports, Imaging Reports)", 
    "Prescriptions (E-Prescriptions, Handwritten)",
    "Discharge Summaries", "Medical Bills", "Payment Statements",
    "Pharmacy Receipts", "Prior Authorization Requests",
    "Consent Forms", "Referral Letters", "Others"
]

def initialize_session_state():
    """Initialize all session state variables"""
    if "chat_history" not in st.session_state:
        st.session_state.chat_history = []
    if "processed_doc" not in st.session_state:
        st.session_state.processed_doc = None
    if "doc_preview" not in st.session_state:
        st.session_state.doc_preview = None

def encode_file(uploaded_file):
    """Safely encode different file types to base64"""
    try:
        file_bytes = uploaded_file.getvalue()
        
        if uploaded_file.type == "application/pdf":
            # Convert PDF to image using PyMuPDF
            pdf = fitz.open(stream=BytesIO(file_bytes))
            page = pdf[0]  # Get the first page
            pix = page.get_pixmap()
            img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
            img_byte_arr = BytesIO()
            img.save(img_byte_arr, format='JPEG')
            return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8')
        
        elif uploaded_file.type in ["image/webp", "image/avif", "image/jpeg", "image/png", "image/gif"]:
            img = Image.open(BytesIO(file_bytes))
            img_byte_arr = BytesIO()
            img.save(img_byte_arr, format='JPEG')
            return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8')
        
        elif uploaded_file.type == "text/plain":
            # Convert TXT to image
            text = file_bytes.decode('utf-8')
            img = Image.new('RGB', (800, 600), color = (73, 109, 137))
            d = ImageDraw.Draw(img)
            d.text((10,10), text, fill=(255,255,0))
            img_byte_arr = BytesIO()
            img.save(img_byte_arr, format='JPEG')
            return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8')
        
        return base64.b64encode(file_bytes).decode('utf-8')
    except Exception as e:
        st.error(f"File processing error: {str(e)}")
        return None

def query_gemini(prompt, image_b64=None):
    """Handle Gemini API communication"""
    url = f"https://generativelanguage.googleapis.com/v1/models/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}"
    
    parts = [{"text": prompt}]
    if image_b64:
        parts.append({
            "inline_data": {
                "mime_type": "image/jpeg",
                "data": image_b64
            }
        })
    
    try:
        response = requests.post(
            url,
            json={"contents": [{"parts": parts}]},
            headers={"Content-Type": "application/json"},
            timeout=30
        )
        response.raise_for_status()
        return response.json()["candidates"][0]["content"]["parts"][0]["text"]
    except Exception as e:
        st.error(f"API Error: {str(e)}")
        return None

def process_document():
    """Handle document processing pipeline"""
    uploaded_file = st.session_state.uploaded_file
    if not uploaded_file:
        return
    
    try:
        with st.spinner("Analyzing document..."):
            # Convert to base64
            image_b64 = encode_file(uploaded_file)
            if not image_b64:
                return

            # Generate preview
            if uploaded_file.type == "application/pdf":
                pdf = fitz.open(stream=BytesIO(uploaded_file.getvalue()))
                page = pdf[0]  # Get the first page
                pix = page.get_pixmap()
                img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
                st.session_state.doc_preview = img
            elif uploaded_file.type in ["image/webp", "image/avif", "image/jpeg", "image/png", "image/gif"]:
                st.session_state.doc_preview = Image.open(uploaded_file)
            elif uploaded_file.type == "text/plain":
                text = uploaded_file.getvalue().decode('utf-8')
                img = Image.new('RGB', (800, 600), color = (73, 109, 137))
                d = ImageDraw.Draw(img)
                d.text((10,10), text, fill=(255,255,0))
                st.session_state.doc_preview = img

            # Classify document
            classify_prompt = f"Classify this healthcare document into one of: {DOCUMENT_TYPES}. Respond only with the category name."
            doc_type = query_gemini(classify_prompt, image_b64) or "Others"
            
            # Store results
            st.session_state.processed_doc = {
                "type": doc_type,
                "content": image_b64,
                "summary": query_gemini("Create a detailed structured summary of this healthcare document.", image_b64)
            }
            
    except Exception as e:
        st.error(f"Processing failed: {str(e)}")
        st.session_state.processed_doc = None

def handle_chat_query():
    """Process user chat input"""
    user_input = st.session_state.chat_input
    if not user_input or not st.session_state.processed_doc:
        return
    
    prompt = f"""
    Document Context:
    - Type: {st.session_state.processed_doc['type']}
    - Summary: {st.session_state.processed_doc['summary']}
    
    Question: {user_input}
    
    Answer concisely and factually. If unsure, state "Information not found".
    """
    
    with st.spinner("Generating response..."):
        response = query_gemini(prompt, st.session_state.processed_doc['content']) 
    
    st.session_state.chat_history.append(("user", user_input))
    st.session_state.chat_history.append(("assistant", response or "Could not generate response"))

# UI Layout
def main():
    st.set_page_config(page_title="Healthcare Document Assistant", layout="wide")
    initialize_session_state()

    # Sidebar Section
    with st.sidebar:
        st.header("Document Management")
        
        # Preview above upload button
        if st.session_state.doc_preview:
            st.subheader("Preview")
            st.image(st.session_state.doc_preview, use_container_width=True)
        
        # Upload button
        st.file_uploader(
            "Upload Document",
            type=["pdf", "webp", "avif", "jpg", "jpeg", "png", "gif", "txt"],
            key="uploaded_file",
            on_change=process_document
        )
        
        # Document type below upload button
        if st.session_state.processed_doc:
            st.divider()
            st.subheader("Document Type")
            st.markdown(f"**{st.session_state.processed_doc['type']}**")

    # Main Content
    st.title("Healthcare Document Assistant")
    
    if st.session_state.processed_doc:
        # Document Summary
        st.subheader("Document Summary")
        st.markdown(st.session_state.processed_doc['summary'])
        
        # Chat Interface
        st.divider()
        st.subheader("Document Q&A")
        
        # Chat history
        for role, message in st.session_state.chat_history:
            with st.chat_message(role.capitalize()):
                st.markdown(message)
        
        # Chat input
        st.chat_input(
            "Ask about the document...",
            key="chat_input",
            on_submit=handle_chat_query
        )
    else:
        st.info("Please upload a document to begin analysis")

if __name__ == "__main__":
    main()