Spaces:
Running
Running
import streamlit as st | |
import base64 | |
import requests | |
from PIL import Image, ImageDraw | |
from io import BytesIO | |
import fitz # PyMuPDF | |
import time | |
# Configuration - Get API key from Streamlit secrets | |
GEMINI_API_KEY = st.secrets["GEMINI_API_KEY"] | |
GEMINI_MODEL = "gemini-2-flash" | |
DOCUMENT_TYPES = ["Land Records", "Caste Certificates", "Property Registrations"] | |
# Initialize session state | |
def initialize_session_state(): | |
if "chat_history" not in st.session_state: | |
st.session_state["chat_history"] = [] | |
if "processed_doc" not in st.session_state: | |
st.session_state["processed_doc"] = None | |
if "doc_preview" not in st.session_state: | |
st.session_state["doc_preview"] = None | |
if "uploaded_file" not in st.session_state: | |
st.session_state["uploaded_file"] = None | |
# Reset session state | |
def reset_session_state(): | |
for key in ["chat_history", "processed_doc", "doc_preview", "uploaded_file"]: | |
st.session_state.pop(key, None) | |
# Encode uploaded file to base64 | |
def encode_file(uploaded_file): | |
try: | |
file_bytes = uploaded_file.getvalue() | |
if uploaded_file.type == "application/pdf": | |
pdf = fitz.open(stream=BytesIO(file_bytes)) | |
page = pdf[0] | |
pix = page.get_pixmap() | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
elif uploaded_file.type.startswith('image/'): | |
img = Image.open(BytesIO(file_bytes)) | |
elif uploaded_file.type == "text/plain": | |
text = file_bytes.decode('utf-8') | |
img = Image.new('RGB', (800, 600), color=(73, 109, 137)) | |
d = ImageDraw.Draw(img) | |
d.text((10, 10), text, fill=(255, 255, 0)) | |
else: | |
st.error("Unsupported file format") | |
return None | |
img_byte_arr = BytesIO() | |
img.save(img_byte_arr, format='JPEG') | |
return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') | |
except Exception as e: | |
st.error(f"File processing error: {str(e)}") | |
return None | |
# Query Gemini API | |
def query_gemini(prompt, image_b64=None): | |
url = f"https://generativelanguage.googleapis.com/v1/models/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}" | |
parts = [{"text": prompt}] | |
if image_b64: | |
parts.append({"inline_data": {"mime_type": "image/jpeg", "data": image_b64}}) | |
try: | |
response = requests.post( | |
url, | |
json={"contents": [{"parts": parts}]}, | |
headers={"Content-Type": "application/json"}, | |
timeout=30 | |
) | |
if response.status_code != 200: | |
st.error(f"API Request failed with status code: {response.status_code}") | |
return None | |
data = response.json() | |
if 'error' in data: | |
st.error(f"API Error: {data['error'].get('message', 'Unknown error')}") | |
return None | |
if not data.get('candidates'): | |
st.error("No response candidates found in API response") | |
return None | |
candidate = data['candidates'][0] | |
return candidate.get('content', {}).get('parts', [{}])[0].get('text', 'No response text found') | |
except requests.exceptions.RequestException as e: | |
st.error(f"API Request failed: {str(e)}") | |
return None | |
except Exception as e: | |
st.error(f"Unexpected error: {str(e)}") | |
return None | |
# Process the uploaded document | |
def process_document(): | |
if not st.session_state.uploaded_file: | |
st.error("Please upload a document first.") | |
return | |
try: | |
with st.spinner("Analyzing document..."): | |
image_b64 = encode_file(st.session_state.uploaded_file) | |
if not image_b64: | |
return | |
classify_prompt = f"Classify this document into one of these categories: {', '.join(DOCUMENT_TYPES)}. Respond only with the category name." | |
doc_type = query_gemini(classify_prompt, image_b64) | |
extract_prompt = """Extract key details including: | |
- Names | |
- Dates | |
- Identification numbers | |
- Locations | |
Format as a bullet-point list.""" | |
details = query_gemini(extract_prompt, image_b64) | |
verify_prompt = "Analyze this document for signs of tampering. Provide verification status." | |
verification = query_gemini(verify_prompt, image_b64) | |
st.session_state.processed_doc = { | |
"type": doc_type or "Unclassified", | |
"details": details or "No details extracted", | |
"verification": verification or "Verification failed", | |
} | |
st.success("Document processing complete!") | |
time.sleep(1) | |
except Exception as e: | |
st.error(f"Document processing failed: {str(e)}") | |
st.session_state.processed_doc = None | |
# Main app function | |
def main(): | |
st.set_page_config(page_title="DocVerify AI", layout="wide") | |
initialize_session_state() | |
st.sidebar.header("Document Controls") | |
st.sidebar.file_uploader("Upload Document", type=["pdf", "jpg", "jpeg", "png", "txt"], key="uploaded_file", on_change=process_document) | |
if st.sidebar.button("New Document"): | |
reset_session_state() | |
st.rerun() | |
st.title("DocVerify AI - Document Analysis") | |
if st.session_state.processed_doc: | |
st.subheader("Document Summary") | |
st.markdown(f"**Type:** {st.session_state.processed_doc['type']}") | |
st.markdown(f"**Verification Status:** {st.session_state.processed_doc['verification']}") | |
st.text_area("Extracted Details", st.session_state.processed_doc['details'], height=200) | |
if __name__ == "__main__": | |
main() | |