from transformers import MllamaForConditionalGeneration, AutoProcessor, TextIteratorStreamer from PIL import Image import requests import torch from threading import Thread import gradio as gr from gradio import FileData import time import spaces import fitz # PyMuPDF import io import numpy as np # Load model and processor ckpt = "Daemontatox/DocumentCogito" model = MllamaForConditionalGeneration.from_pretrained(ckpt, torch_dtype=torch.bfloat16).to("cuda") processor = AutoProcessor.from_pretrained(ckpt) # Document state to track uploaded files class DocumentState: def __init__(self): self.current_doc_images = [] self.current_doc_text = "" self.doc_type = None def clear(self): self.current_doc_images = [] self.current_doc_text = "" self.doc_type = None doc_state = DocumentState() # Function to convert PDF to images and extract text def process_pdf_file(file_path): """Convert PDF to images and extract text using PyMuPDF.""" doc = fitz.open(file_path) images = [] text = "" # Process each page for page_num in range(doc.page_count): page = doc[page_num] text += f"Page {page_num + 1} content:\n{page.get_text()}\n" pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72)) img_data = pix.tobytes("png") img = Image.open(io.BytesIO(img_data)) images.append(img.convert("RGB")) doc.close() return images, text # Function to process uploaded files (PDF or image) def process_file(file): """Process either PDF or image file and update document state.""" doc_state.clear() if isinstance(file, dict): file_path = file["path"] else: file_path = file if file_path.lower().endswith('.pdf'): doc_state.doc_type = 'pdf' doc_state.current_doc_images, doc_state.current_doc_text = process_pdf_file(file_path) return f"PDF processed. Total pages: {len(doc_state.current_doc_images)}. You can now ask questions about the content." else: doc_state.doc_type = 'image' doc_state.current_doc_images = [Image.open(file_path).convert("RGB")] return "Image loaded successfully. You can now ask questions about the content." # Function to handle streaming responses from the model @spaces.GPU() def bot_streaming(message, history, max_new_tokens=8192): txt = message["text"] messages = [] # Process new file if provided if message.get("files") and len(message["files"]) > 0: process_file(message["files"][0]) # Process history for i, msg in enumerate(history): if isinstance(msg[0], dict): # Multimodal message (text + files) user_content = [{"type": "text", "text": msg[0]["text"]}] if "files" in msg[0] and len(msg[0]["files"]) > 0: user_content.append({"type": "image"}) messages.append({"role": "user", "content": user_content}) messages.append({"role": "assistant", "content": [{"type": "text", "text": msg[1]}]}) elif isinstance(msg[0], str): # Text-only message messages.append({"role": "user", "content": [{"type": "text", "text": msg[0]}]}) messages.append({"role": "assistant", "content": [{"type": "text", "text": msg[1]}]}) # Include document context in the current message if doc_state.current_doc_images: context = f"\nDocument context:\n{doc_state.current_doc_text}" if doc_state.current_doc_text else "" current_msg = f"{txt}{context}" messages.append({"role": "user", "content": [{"type": "text", "text": current_msg}, {"type": "image"}]}) else: messages.append({"role": "user", "content": [{"type": "text", "text": txt}]}) # Apply chat template to messages texts = processor.apply_chat_template(messages, add_generation_prompt=True) # Process inputs based on whether we have images if doc_state.current_doc_images: inputs = processor( text=texts, images=doc_state.current_doc_images[0:1], # Only use first image return_tensors="pt" ).to("cuda") else: inputs = processor(text=texts, return_tensors="pt").to("cuda") streamer = TextIteratorStreamer(processor, skip_special_tokens=True, skip_prompt=True) generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=max_new_tokens) thread = Thread(target=model.generate, kwargs=generation_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text time.sleep(0.01) yield buffer # Function to clear document context def clear_context(): """Clear the current document context.""" doc_state.clear() return "Document context cleared. You can upload a new document." # Create the Gradio interface with gr.Blocks() as demo: gr.Markdown("# Document Analyzer with Chat Support") gr.Markdown("Upload a PDF or image and chat about its contents. For PDFs, all pages will be processed for visual analysis.") chatbot = gr.ChatInterface( fn=bot_streaming, title="Document Chat", examples=[ [{"text": "Which era does this piece belong to? Give details about the era.", "files":["./examples/rococo.jpg"]}, 200], [{"text": "Where do the droughts happen according to this diagram?", "files":["./examples/weather_events.png"]}, 250], [{"text": "What happens when you take out white cat from this chain?", "files":["./examples/ai2d_test.jpg"]}, 250], [{"text": "How long does it take from invoice date to due date? Be short and concise.", "files":["./examples/invoice.png"]}, 250], [{"text": "Where to find this monument? Can you give me other recommendations around the area?", "files":["./examples/wat_arun.jpg"]}, 250], ], textbox=gr.MultimodalTextbox(), additional_inputs=[ gr.Slider( minimum=10, maximum=2048, value=2048, step=10, label="Maximum number of new tokens to generate", ) ], cache_examples=False, stop_btn="Stop Generation", fill_height=True, multimodal=True ) clear_btn = gr.Button("Clear Document Context") clear_btn.click(fn=clear_context) # Update accepted file types chatbot.textbox.file_types = ["image", "pdf"] # Launch the interface demo.launch(debug=True)