#!/usr/bin/env python3 """ Dots.OCR Gradio Demo Application A Gradio-based web interface for demonstrating the Dots.OCR model using Hugging Face transformers. This application provides OCR and layout analysis capabilities for documents and images. """ import os import json import traceback import math from io import BytesIO from typing import Optional, Dict, Any, Tuple, List import requests # Set LOCAL_RANK for transformers if "LOCAL_RANK" not in os.environ: os.environ["LOCAL_RANK"] = "0" import torch import gradio as gr from PIL import Image, ImageDraw, ImageFont from transformers import AutoModelForCausalLM, AutoProcessor from qwen_vl_utils import process_vision_info import fitz # PyMuPDF # Constants MIN_PIXELS = 3136 MAX_PIXELS = 11289600 IMAGE_FACTOR = 28 # Prompts dict_promptmode_to_prompt = { "prompt_layout_all_en": """Please output the layout information from the PDF image, including each layout element's bbox, its category, and the corresponding text content within the bbox. 1. Bbox format: [x1, y1, x2, y2] 2. Layout Categories: The possible categories are ['Caption', 'Footnote', 'Formula', 'List-item', 'Page-footer', 'Page-header', 'Picture', 'Section-header', 'Table', 'Text', 'Title']. 3. Text Extraction & Formatting Rules: - Picture: For the 'Picture' category, the text field should be omitted. - Formula: Format its text as LaTeX. - Table: Format its text as HTML. - All Others (Text, Title, etc.): Format their text as Markdown. 4. Constraints: - The output text must be the original text from the image, with no translation. - All layout elements must be sorted according to human reading order. 5. Final Output: The entire output must be a single JSON object. """, "prompt_layout_only_en": """Please output the layout information from this PDF image, including each layout's bbox and its category. The bbox should be in the format [x1, y1, x2, y2]. The layout categories for the PDF document include ['Caption', 'Footnote', 'Formula', 'List-item', 'Page-footer', 'Page-header', 'Picture', 'Section-header', 'Table', 'Text', 'Title']. Do not output the corresponding text. The layout result should be in JSON format.""", "prompt_ocr": """Extract the text content from this image.""", "prompt_grounding_ocr": """Extract text from the given bounding box on the image (format: [x1, y1, x2, y2]).\nBounding Box:\n""", } # Utility functions def round_by_factor(number: int, factor: int) -> int: """Returns the closest integer to 'number' that is divisible by 'factor'.""" return round(number / factor) * factor def smart_resize( height: int, width: int, factor: int = 28, min_pixels: int = 3136, max_pixels: int = 11289600, ): """Rescales the image so that the following conditions are met: 1. Both dimensions (height and width) are divisible by 'factor'. 2. The total number of pixels is within the range ['min_pixels', 'max_pixels']. 3. The aspect ratio of the image is maintained as closely as possible. """ if max(height, width) / min(height, width) > 200: raise ValueError( f"absolute aspect ratio must be smaller than 200, got {max(height, width) / min(height, width)}" ) h_bar = max(factor, round_by_factor(height, factor)) w_bar = max(factor, round_by_factor(width, factor)) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = round_by_factor(height / beta, factor) w_bar = round_by_factor(width / beta, factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = round_by_factor(height * beta, factor) w_bar = round_by_factor(width * beta, factor) return h_bar, w_bar def fetch_image(image_input, min_pixels: int = None, max_pixels: int = None): """Fetch and process an image""" if isinstance(image_input, str): if image_input.startswith(("http://", "https://")): response = requests.get(image_input) image = Image.open(BytesIO(response.content)).convert('RGB') else: image = Image.open(image_input).convert('RGB') elif isinstance(image_input, Image.Image): image = image_input.convert('RGB') else: raise ValueError(f"Invalid image input type: {type(image_input)}") if min_pixels is not None or max_pixels is not None: min_pixels = min_pixels or MIN_PIXELS max_pixels = max_pixels or MAX_PIXELS height, width = smart_resize( image.height, image.width, factor=IMAGE_FACTOR, min_pixels=min_pixels, max_pixels=max_pixels ) image = image.resize((width, height), Image.LANCZOS) return image def load_images_from_pdf(pdf_path: str) -> List[Image.Image]: """Load images from PDF file""" images = [] try: pdf_document = fitz.open(pdf_path) for page_num in range(len(pdf_document)): page = pdf_document.load_page(page_num) # Convert page to image mat = fitz.Matrix(2.0, 2.0) # Increase resolution pix = page.get_pixmap(matrix=mat) img_data = pix.tobytes("ppm") image = Image.open(BytesIO(img_data)).convert('RGB') images.append(image) pdf_document.close() except Exception as e: print(f"Error loading PDF: {e}") return [] return images def draw_layout_on_image(image: Image.Image, layout_data: List[Dict]) -> Image.Image: """Draw layout bounding boxes on image""" img_copy = image.copy() draw = ImageDraw.Draw(img_copy) # Colors for different categories colors = { 'Caption': '#FF6B6B', 'Footnote': '#4ECDC4', 'Formula': '#45B7D1', 'List-item': '#96CEB4', 'Page-footer': '#FFEAA7', 'Page-header': '#DDA0DD', 'Picture': '#FFD93D', 'Section-header': '#6C5CE7', 'Table': '#FD79A8', 'Text': '#74B9FF', 'Title': '#E17055' } try: # Load a font try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 12) except Exception: font = ImageFont.load_default() for item in layout_data: if 'bbox' in item and 'category' in item: bbox = item['bbox'] category = item['category'] color = colors.get(category, '#000000') # Draw rectangle draw.rectangle(bbox, outline=color, width=2) # Draw label label = category label_bbox = draw.textbbox((0, 0), label, font=font) label_width = label_bbox[2] - label_bbox[0] label_height = label_bbox[3] - label_bbox[1] # Position label above the box label_x = bbox[0] label_y = max(0, bbox[1] - label_height - 2) # Draw background for label draw.rectangle( [label_x, label_y, label_x + label_width + 4, label_y + label_height + 2], fill=color ) # Draw text draw.text((label_x + 2, label_y + 1), label, fill='white', font=font) except Exception as e: print(f"Error drawing layout: {e}") return img_copy def layoutjson2md(image: Image.Image, layout_data: List[Dict], text_key: str = 'text', no_page_hf: bool = False) -> str: """Convert layout JSON to markdown format""" markdown_lines = [] if not no_page_hf: markdown_lines.append("# Document Content\n") try: # Sort items by reading order (top to bottom, left to right) sorted_items = sorted(layout_data, key=lambda x: (x.get('bbox', [0, 0, 0, 0])[1], x.get('bbox', [0, 0, 0, 0])[0])) for item in sorted_items: category = item.get('category', '') text = item.get(text_key, '') if not text: continue if category == 'Title': markdown_lines.append(f"# {text}\n") elif category == 'Section-header': markdown_lines.append(f"## {text}\n") elif category == 'Text': markdown_lines.append(f"{text}\n") elif category == 'List-item': markdown_lines.append(f"- {text}\n") elif category == 'Table': # If text is already HTML, keep it as is if text.strip().startswith('<'): markdown_lines.append(f"{text}\n") else: markdown_lines.append(f"**Table:** {text}\n") elif category == 'Formula': # If text is LaTeX, format it properly if text.strip().startswith('$') or '\\' in text: markdown_lines.append(f"$$\n{text}\n$$\n") else: markdown_lines.append(f"**Formula:** {text}\n") elif category == 'Caption': markdown_lines.append(f"*{text}*\n") elif category == 'Footnote': markdown_lines.append(f"^{text}^\n") elif category in ['Page-header', 'Page-footer']: # Skip headers and footers in main content continue else: markdown_lines.append(f"{text}\n") markdown_lines.append("") # Add spacing except Exception as e: print(f"Error converting to markdown: {e}") return str(layout_data) return "\n".join(markdown_lines) # Initialize model and processor at script level model_id = "rednote-hilab/dots.ocr" model = AutoModelForCausalLM.from_pretrained( model_id, attn_implementation="flash_attention_2", torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True ) processor = AutoProcessor.from_pretrained( model_id, trust_remote_code=True ) # Global state variables device = "cuda" if torch.cuda.is_available() else "cpu" # PDF handling state pdf_cache = { "images": [], "current_page": 0, "total_pages": 0, "file_type": None, "is_parsed": False, "results": [] } # Processing state processing_results = { 'original_image': None, 'processed_image': None, 'layout_result': None, 'markdown_content': None, 'raw_output': None, } def inference(image: Image.Image, prompt: str, max_new_tokens: int = 24000) -> str: """Run inference on an image with the given prompt""" try: if model is None or processor is None: raise RuntimeError("Model not loaded. Please check model initialization.") # Prepare messages in the expected format messages = [ { "role": "user", "content": [ { "type": "image", "image": image }, {"type": "text", "text": prompt} ] } ] # Apply chat template text = processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) # Process vision information image_inputs, video_inputs = process_vision_info(messages) # Prepare inputs inputs = processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) # Move to device inputs = inputs.to(device) # Generate output with torch.no_grad(): generated_ids = model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, temperature=0.1 ) # Decode output generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) ] output_text = processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False ) return output_text[0] if output_text else "" except Exception as e: print(f"Error during inference: {e}") traceback.print_exc() return f"Error during inference: {str(e)}" def process_image( image: Image.Image, prompt_mode: str, min_pixels: Optional[int] = None, max_pixels: Optional[int] = None ) -> Dict[str, Any]: """Process a single image with the specified prompt mode""" try: # Resize image if needed if min_pixels is not None or max_pixels is not None: image = fetch_image(image, min_pixels=min_pixels, max_pixels=max_pixels) # Get prompt prompt = dict_promptmode_to_prompt[prompt_mode] # Run inference raw_output = inference(image, prompt) # Process results based on prompt mode result = { 'original_image': image, 'raw_output': raw_output, 'prompt_mode': prompt_mode, 'processed_image': image, 'layout_result': None, 'markdown_content': None } # For layout analysis prompts, try to parse JSON and create visualizations if prompt_mode in ['prompt_layout_all_en', 'prompt_layout_only_en']: try: # Try to parse JSON output layout_data = json.loads(raw_output) result['layout_result'] = layout_data # Create visualization with bounding boxes try: processed_image = draw_layout_on_image(image, layout_data) result['processed_image'] = processed_image except Exception as e: print(f"Error drawing layout: {e}") result['processed_image'] = image # Generate markdown if text is available if prompt_mode == 'prompt_layout_all_en': try: markdown_content = layoutjson2md(image, layout_data, text_key='text') result['markdown_content'] = markdown_content except Exception as e: print(f"Error generating markdown: {e}") result['markdown_content'] = raw_output except json.JSONDecodeError: print("Failed to parse JSON output, using raw output") result['markdown_content'] = raw_output else: # For OCR prompts, use raw output as markdown result['markdown_content'] = raw_output return result except Exception as e: print(f"Error processing image: {e}") traceback.print_exc() return { 'original_image': image, 'raw_output': f"Error processing image: {str(e)}", 'prompt_mode': prompt_mode, 'processed_image': image, 'layout_result': None, 'markdown_content': f"Error processing image: {str(e)}" } def load_file_for_preview(file_path: str) -> Tuple[Optional[Image.Image], str]: """Load file for preview (supports PDF and images)""" global pdf_cache if not file_path or not os.path.exists(file_path): return None, "No file selected" file_ext = os.path.splitext(file_path)[1].lower() try: if file_ext == '.pdf': # Load PDF pages images = load_images_from_pdf(file_path) if not images: return None, "Failed to load PDF" pdf_cache.update({ "images": images, "current_page": 0, "total_pages": len(images), "file_type": "pdf", "is_parsed": False, "results": [] }) return images[0], f"Page 1 / {len(images)}" elif file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']: # Load single image image = Image.open(file_path).convert('RGB') pdf_cache.update({ "images": [image], "current_page": 0, "total_pages": 1, "file_type": "image", "is_parsed": False, "results": [] }) return image, "Page 1 / 1" else: return None, f"Unsupported file format: {file_ext}" except Exception as e: print(f"Error loading file: {e}") return None, f"Error loading file: {str(e)}" def turn_page(direction: str) -> Tuple[Optional[Image.Image], str, str]: """Navigate through PDF pages""" global pdf_cache if not pdf_cache["images"]: return None, "No file loaded", "No results yet" if direction == "prev": pdf_cache["current_page"] = max(0, pdf_cache["current_page"] - 1) elif direction == "next": pdf_cache["current_page"] = min( pdf_cache["total_pages"] - 1, pdf_cache["current_page"] + 1 ) index = pdf_cache["current_page"] current_image = pdf_cache["images"][index] page_info = f"Page {index + 1} / {pdf_cache['total_pages']}" # Get results for current page if available current_result = "" if (pdf_cache["is_parsed"] and index < len(pdf_cache["results"]) and pdf_cache["results"][index]): result = pdf_cache["results"][index] if result.get('markdown_content'): current_result = result['markdown_content'] else: current_result = result.get('raw_output', 'No content available') else: current_result = "Page not processed yet" return current_image, page_info, current_result def create_gradio_interface(): """Create the Gradio interface""" # Custom CSS css = """ .main-container { max-width: 1400px; margin: 0 auto; } .header-text { text-align: center; color: #2c3e50; margin-bottom: 20px; } .process-button { background: linear-gradient(45deg, #667eea 0%, #764ba2 100%) !important; border: none !important; color: white !important; font-weight: bold !important; } .process-button:hover { transform: translateY(-2px) !important; box-shadow: 0 4px 8px rgba(0,0,0,0.2) !important; } .info-box { background: #f8f9fa; border: 1px solid #dee2e6; border-radius: 8px; padding: 15px; margin: 10px 0; } .page-info { text-align: center; padding: 8px 16px; background: #e9ecef; border-radius: 20px; font-weight: bold; margin: 10px 0; } .model-status { padding: 10px; border-radius: 8px; margin: 10px 0; text-align: center; font-weight: bold; } .status-loading { background: #fff3cd; color: #856404; border: 1px solid #ffeaa7; } .status-ready { background: #d1edff; color: #0c5460; border: 1px solid #b8daff; } .status-error { background: #f8d7da; color: #721c24; border: 1px solid #f5c6cb; } """ with gr.Blocks(theme=gr.themes.Soft(), css=css, title="Dots.OCR Demo") as demo: # Header gr.HTML("""
Advanced OCR and Document Layout Analysis powered by Hugging Face Transformers