|  | from pdf2image import convert_from_path, pdfinfo_from_path | 
					
						
						|  | from PIL import Image, ImageFile | 
					
						
						|  | import os | 
					
						
						|  | import re | 
					
						
						|  | import time | 
					
						
						|  | import json | 
					
						
						|  | import gradio as gr | 
					
						
						|  | import numpy as np | 
					
						
						|  | import pymupdf | 
					
						
						|  | from pymupdf import Document, Page, Rect | 
					
						
						|  | import pandas as pd | 
					
						
						|  | import shutil | 
					
						
						|  | import zipfile | 
					
						
						|  | from collections import defaultdict | 
					
						
						|  | from tqdm import tqdm | 
					
						
						|  | from gradio import Progress | 
					
						
						|  | from typing import List, Dict, Any | 
					
						
						|  | from concurrent.futures import ThreadPoolExecutor, as_completed | 
					
						
						|  | from pdf2image import convert_from_path | 
					
						
						|  | from PIL import Image | 
					
						
						|  | from scipy.spatial import cKDTree | 
					
						
						|  | import random | 
					
						
						|  | import string | 
					
						
						|  | import warnings | 
					
						
						|  |  | 
					
						
						|  | from tools.config import OUTPUT_FOLDER, INPUT_FOLDER, IMAGES_DPI, LOAD_TRUNCATED_IMAGES, MAX_IMAGE_PIXELS, CUSTOM_BOX_COLOUR, COMPRESS_REDACTED_PDF, TESSERACT_TEXT_EXTRACT_OPTION, SELECTABLE_TEXT_EXTRACT_OPTION, TEXTRACT_TEXT_EXTRACT_OPTION | 
					
						
						|  | from tools.helper_functions import get_file_name_without_type, read_file | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | IMAGE_NUM_REGEX = re.compile(r'_(\d+)\.png$') | 
					
						
						|  |  | 
					
						
						|  | pd.set_option('future.no_silent_downcasting', True) | 
					
						
						|  |  | 
					
						
						|  | image_dpi = float(IMAGES_DPI) | 
					
						
						|  | if not MAX_IMAGE_PIXELS: Image.MAX_IMAGE_PIXELS = None | 
					
						
						|  | else: Image.MAX_IMAGE_PIXELS = MAX_IMAGE_PIXELS | 
					
						
						|  | ImageFile.LOAD_TRUNCATED_IMAGES = LOAD_TRUNCATED_IMAGES.lower() == "true" | 
					
						
						|  | COMPRESS_REDACTED_PDF = COMPRESS_REDACTED_PDF.lower() == "true" | 
					
						
						|  |  | 
					
						
						|  | def is_pdf_or_image(filename): | 
					
						
						|  | """ | 
					
						
						|  | Check if a file name is a PDF or an image file. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | filename (str): The name of the file. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | bool: True if the file name ends with ".pdf", ".jpg", or ".png", False otherwise. | 
					
						
						|  | """ | 
					
						
						|  | if filename.lower().endswith(".pdf") or filename.lower().endswith(".jpg") or filename.lower().endswith(".jpeg") or filename.lower().endswith(".png"): | 
					
						
						|  | output = True | 
					
						
						|  | else: | 
					
						
						|  | output = False | 
					
						
						|  | return output | 
					
						
						|  |  | 
					
						
						|  | def is_pdf(filename): | 
					
						
						|  | """ | 
					
						
						|  | Check if a file name is a PDF. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | filename (str): The name of the file. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | bool: True if the file name ends with ".pdf", False otherwise. | 
					
						
						|  | """ | 
					
						
						|  | return filename.lower().endswith(".pdf") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def check_image_size_and_reduce(out_path:str, image:Image): | 
					
						
						|  | ''' | 
					
						
						|  | Check if a given image size is above around 4.5mb, and reduce size if necessary. 5mb is the maximum possible to submit to AWS Textract. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  | all_img_details = list() | 
					
						
						|  | page_num = 0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | max_size = 4.5 * 1024 * 1024 | 
					
						
						|  | file_size = os.path.getsize(out_path) | 
					
						
						|  |  | 
					
						
						|  | width = image.width | 
					
						
						|  | height = image.height | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if file_size > max_size: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print(f"Image size before {width}x{height}, original file_size: {file_size}") | 
					
						
						|  |  | 
					
						
						|  | while file_size > max_size: | 
					
						
						|  |  | 
					
						
						|  | new_width = int(width * 0.5) | 
					
						
						|  | new_height = int(height * 0.5) | 
					
						
						|  | image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image.save(out_path, format="PNG", optimize=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | file_size = os.path.getsize(out_path) | 
					
						
						|  | print(f"Resized to {new_width}x{new_height}, new file_size: {file_size}") | 
					
						
						|  | else: | 
					
						
						|  | new_width = width | 
					
						
						|  | new_height = height | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | all_img_details.append((page_num, image, new_width, new_height)) | 
					
						
						|  |  | 
					
						
						|  | return image, new_width, new_height, all_img_details, out_path | 
					
						
						|  |  | 
					
						
						|  | def process_single_page_for_image_conversion(pdf_path:str, page_num:int, image_dpi:float=image_dpi, create_images:bool = True, input_folder: str = INPUT_FOLDER) -> tuple[int, str, float, float]: | 
					
						
						|  |  | 
					
						
						|  | out_path_placeholder = "placeholder_image_" + str(page_num) + ".png" | 
					
						
						|  |  | 
					
						
						|  | if create_images == True: | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | image_output_dir = os.path.join(os.getcwd(), input_folder) | 
					
						
						|  | out_path = os.path.join(image_output_dir, f"{os.path.basename(pdf_path)}_{page_num}.png") | 
					
						
						|  | os.makedirs(os.path.dirname(out_path), exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  | if os.path.exists(out_path): | 
					
						
						|  |  | 
					
						
						|  | image = Image.open(out_path) | 
					
						
						|  | elif pdf_path.lower().endswith(".pdf"): | 
					
						
						|  |  | 
					
						
						|  | image_l = convert_from_path(pdf_path, first_page=page_num+1, last_page=page_num+1, | 
					
						
						|  | dpi=image_dpi, use_cropbox=False, use_pdftocairo=False) | 
					
						
						|  | image = image_l[0] | 
					
						
						|  | image = image.convert("L") | 
					
						
						|  |  | 
					
						
						|  | image.save(out_path, format="PNG") | 
					
						
						|  | elif pdf_path.lower().endswith(".jpg") or pdf_path.lower().endswith(".png") or pdf_path.lower().endswith(".jpeg"): | 
					
						
						|  | image = Image.open(pdf_path) | 
					
						
						|  | image.save(out_path, format="PNG") | 
					
						
						|  | else: | 
					
						
						|  | raise Warning("Could not create image.") | 
					
						
						|  |  | 
					
						
						|  | width, height = image.size | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image, width, height, all_img_details, img_path = check_image_size_and_reduce(out_path, image) | 
					
						
						|  |  | 
					
						
						|  | return page_num, out_path, width, height | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  |  | 
					
						
						|  | print(f"Error processing page {page_num + 1}: {e}") | 
					
						
						|  | return page_num,  out_path_placeholder, pd.NA, pd.NA | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | return page_num,  out_path_placeholder, pd.NA, pd.NA | 
					
						
						|  |  | 
					
						
						|  | def convert_pdf_to_images(pdf_path: str, prepare_for_review:bool=False, page_min: int = 0, page_max:int = 0, create_images:bool=True, image_dpi: float = image_dpi, num_threads: int = 8, input_folder: str = INPUT_FOLDER): | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if prepare_for_review == True: | 
					
						
						|  | page_count = pdfinfo_from_path(pdf_path)['Pages'] | 
					
						
						|  | page_min = 0 | 
					
						
						|  | page_max = page_count | 
					
						
						|  | else: | 
					
						
						|  | page_count = pdfinfo_from_path(pdf_path)['Pages'] | 
					
						
						|  |  | 
					
						
						|  | print(f"Creating images. Number of pages in PDF: {page_count}") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if page_max == 0: page_max = page_count | 
					
						
						|  |  | 
					
						
						|  | results = list() | 
					
						
						|  | with ThreadPoolExecutor(max_workers=num_threads) as executor: | 
					
						
						|  | futures = list() | 
					
						
						|  | for page_num in range(page_min, page_max): | 
					
						
						|  | futures.append(executor.submit(process_single_page_for_image_conversion, pdf_path, page_num, image_dpi, create_images=create_images, input_folder=input_folder)) | 
					
						
						|  |  | 
					
						
						|  | for future in tqdm(as_completed(futures), total=len(futures), unit="pages", desc="Converting pages to image"): | 
					
						
						|  | page_num, img_path, width, height = future.result() | 
					
						
						|  | if img_path: | 
					
						
						|  | results.append((page_num, img_path, width, height)) | 
					
						
						|  | else: | 
					
						
						|  | print(f"Page {page_num + 1} failed to process.") | 
					
						
						|  | results.append((page_num, "placeholder_image_" + str(page_num) + ".png", pd.NA, pd.NA)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | results.sort(key=lambda x: x[0]) | 
					
						
						|  | images = [result[1] for result in results] | 
					
						
						|  | widths = [result[2] for result in results] | 
					
						
						|  | heights = [result[3] for result in results] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return images, widths, heights, results | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_file_for_image_creation(file_path:str, prepare_for_review:bool=False, input_folder:str=INPUT_FOLDER, create_images:bool=True): | 
					
						
						|  |  | 
					
						
						|  | file_extension = os.path.splitext(file_path)[1].lower() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if file_extension in ['.jpg', '.jpeg', '.png']: | 
					
						
						|  | print(f"{file_path} is an image file.") | 
					
						
						|  |  | 
					
						
						|  | img_object = [file_path] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image = Image.open(file_path) | 
					
						
						|  | img_object, image_sizes_width, image_sizes_height, all_img_details, img_path = check_image_size_and_reduce(file_path, image) | 
					
						
						|  |  | 
					
						
						|  | if not isinstance(image_sizes_width, list): | 
					
						
						|  | img_path = [img_path] | 
					
						
						|  | image_sizes_width = [image_sizes_width] | 
					
						
						|  | image_sizes_height = [image_sizes_height] | 
					
						
						|  | all_img_details = [all_img_details] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | elif file_extension == '.pdf': | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | img_path, image_sizes_width, image_sizes_height, all_img_details = convert_pdf_to_images(file_path, prepare_for_review, input_folder=input_folder, create_images=create_images) | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | print(f"{file_path} is not an image or PDF file.") | 
					
						
						|  | img_path = list() | 
					
						
						|  | image_sizes_width = list() | 
					
						
						|  | image_sizes_height = list() | 
					
						
						|  | all_img_details = list() | 
					
						
						|  |  | 
					
						
						|  | return img_path, image_sizes_width, image_sizes_height, all_img_details | 
					
						
						|  |  | 
					
						
						|  | def get_input_file_names(file_input:List[str]): | 
					
						
						|  | ''' | 
					
						
						|  | Get list of input files to report to logs. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  | all_relevant_files = list() | 
					
						
						|  | file_name_with_extension = "" | 
					
						
						|  | full_file_name = "" | 
					
						
						|  | total_pdf_page_count = 0 | 
					
						
						|  |  | 
					
						
						|  | if isinstance(file_input, dict): | 
					
						
						|  | file_input = os.path.abspath(file_input["name"]) | 
					
						
						|  |  | 
					
						
						|  | if isinstance(file_input, str): | 
					
						
						|  | file_input_list = [file_input] | 
					
						
						|  | else: | 
					
						
						|  | file_input_list = file_input | 
					
						
						|  |  | 
					
						
						|  | for file in file_input_list: | 
					
						
						|  | if isinstance(file, str): | 
					
						
						|  | file_path = file | 
					
						
						|  | else: | 
					
						
						|  | file_path = file.name | 
					
						
						|  |  | 
					
						
						|  | file_path_without_ext = get_file_name_without_type(file_path) | 
					
						
						|  |  | 
					
						
						|  | file_extension = os.path.splitext(file_path)[1].lower() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if (file_extension in ['.jpg', '.jpeg', '.png', '.pdf', '.xlsx', '.csv', '.parquet', '.docx']) & ("review_file" not in file_path_without_ext) & ("ocr_output" not in file_path_without_ext) & ("ocr_results_with_words" not in file_path_without_ext): | 
					
						
						|  | all_relevant_files.append(file_path_without_ext) | 
					
						
						|  | file_name_with_extension = file_path_without_ext + file_extension | 
					
						
						|  | full_file_name = file_path | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if (file_extension in ['.pdf']): | 
					
						
						|  |  | 
					
						
						|  | pdf_document = pymupdf.open(file_path) | 
					
						
						|  |  | 
					
						
						|  | page_count = pdf_document.page_count | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | pdf_document.close() | 
					
						
						|  | else: | 
					
						
						|  | page_count = 1 | 
					
						
						|  |  | 
					
						
						|  | total_pdf_page_count += page_count | 
					
						
						|  |  | 
					
						
						|  | all_relevant_files_str = ", ".join(all_relevant_files) | 
					
						
						|  |  | 
					
						
						|  | return all_relevant_files_str, file_name_with_extension, full_file_name, all_relevant_files, total_pdf_page_count | 
					
						
						|  |  | 
					
						
						|  | def convert_color_to_range_0_1(color): | 
					
						
						|  | return tuple(component / 255 for component in color) | 
					
						
						|  |  | 
					
						
						|  | def redact_single_box(pymupdf_page:Page, pymupdf_rect:Rect, img_annotation_box:dict, custom_colours:bool=False): | 
					
						
						|  | ''' | 
					
						
						|  | Commit redaction boxes to a PyMuPDF page. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  | pymupdf_x1 = pymupdf_rect[0] | 
					
						
						|  | pymupdf_y1 = pymupdf_rect[1] | 
					
						
						|  | pymupdf_x2 = pymupdf_rect[2] | 
					
						
						|  | pymupdf_y2 = pymupdf_rect[3] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | redact_bottom_y = pymupdf_y1 + 2 | 
					
						
						|  | redact_top_y = pymupdf_y2 - 2 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if (redact_top_y - redact_bottom_y) < 1: | 
					
						
						|  | middle_y = (pymupdf_y1 + pymupdf_y2) / 2 | 
					
						
						|  | redact_bottom_y = middle_y - 1 | 
					
						
						|  | redact_top_y = middle_y + 1 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | rect_small_pixel_height = Rect(pymupdf_x1, redact_bottom_y, pymupdf_x2, redact_top_y) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | pymupdf_page.add_redact_annot(rect_small_pixel_height) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | shape = pymupdf_page.new_shape() | 
					
						
						|  | shape.draw_rect(pymupdf_rect) | 
					
						
						|  |  | 
					
						
						|  | if custom_colours == True: | 
					
						
						|  | if img_annotation_box["color"][0] > 1: | 
					
						
						|  | out_colour = convert_color_to_range_0_1(img_annotation_box["color"]) | 
					
						
						|  | else: | 
					
						
						|  | out_colour = img_annotation_box["color"] | 
					
						
						|  | else: | 
					
						
						|  | if CUSTOM_BOX_COLOUR == "grey": | 
					
						
						|  | out_colour = (0.5, 0.5, 0.5) | 
					
						
						|  | else: | 
					
						
						|  | out_colour = (0,0,0) | 
					
						
						|  |  | 
					
						
						|  | shape.finish(color=out_colour, fill=out_colour) | 
					
						
						|  |  | 
					
						
						|  | shape.commit() | 
					
						
						|  |  | 
					
						
						|  | def convert_pymupdf_to_image_coords(pymupdf_page:Page, x1:float, y1:float, x2:float, y2:float, image: Image=None, image_dimensions:dict={}): | 
					
						
						|  | ''' | 
					
						
						|  | Converts coordinates from pymupdf format to image coordinates, | 
					
						
						|  | accounting for mediabox dimensions and offset. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  | rect = pymupdf_page.rect | 
					
						
						|  | rect_width = rect.width | 
					
						
						|  | rect_height = rect.height | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | mediabox = pymupdf_page.mediabox | 
					
						
						|  | mediabox_width = mediabox.width | 
					
						
						|  | mediabox_height = mediabox.height | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if image: | 
					
						
						|  | image_page_width, image_page_height = image.size | 
					
						
						|  | elif image_dimensions: | 
					
						
						|  | image_page_width, image_page_height = image_dimensions['image_width'], image_dimensions['image_height'] | 
					
						
						|  | else: | 
					
						
						|  | image_page_width, image_page_height = mediabox_width, mediabox_height | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image_to_mediabox_x_scale = image_page_width / mediabox_width | 
					
						
						|  | image_to_mediabox_y_scale = image_page_height / mediabox_height | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | x1_image = x1 * image_to_mediabox_x_scale | 
					
						
						|  | x2_image = x2 * image_to_mediabox_x_scale | 
					
						
						|  | y1_image = y1 * image_to_mediabox_y_scale | 
					
						
						|  | y2_image = y2 * image_to_mediabox_y_scale | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if mediabox_width != rect_width: | 
					
						
						|  |  | 
					
						
						|  | mediabox_to_rect_x_scale = mediabox_width / rect_width | 
					
						
						|  | mediabox_to_rect_y_scale = mediabox_height / rect_height | 
					
						
						|  |  | 
					
						
						|  | rect_to_mediabox_x_scale = rect_width / mediabox_width | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | mediabox_rect_x_diff = (mediabox_width - rect_width) * (image_to_mediabox_x_scale / 2) | 
					
						
						|  | mediabox_rect_y_diff = (mediabox_height - rect_height) * (image_to_mediabox_y_scale / 2) | 
					
						
						|  |  | 
					
						
						|  | x1_image -= mediabox_rect_x_diff | 
					
						
						|  | x2_image -= mediabox_rect_x_diff | 
					
						
						|  | y1_image += mediabox_rect_y_diff | 
					
						
						|  | y2_image += mediabox_rect_y_diff | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | x1_image *= mediabox_to_rect_x_scale | 
					
						
						|  | x2_image *= mediabox_to_rect_x_scale | 
					
						
						|  | y1_image *= mediabox_to_rect_y_scale | 
					
						
						|  | y2_image *= mediabox_to_rect_y_scale | 
					
						
						|  |  | 
					
						
						|  | return x1_image, y1_image, x2_image, y2_image | 
					
						
						|  |  | 
					
						
						|  | def redact_whole_pymupdf_page(rect_height:float, rect_width:float, page:Page, custom_colours:bool=False, border:float = 5, redact_pdf:bool=True): | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | whole_page_x1, whole_page_y1 = 0 + border, 0 + border | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if border < 0.1: | 
					
						
						|  | whole_page_x2, whole_page_y2 = 1 - border, 1 - border | 
					
						
						|  | else: | 
					
						
						|  | whole_page_x2, whole_page_y2 = rect_width - border, rect_height - border | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | whole_page_rect = Rect(whole_page_x1, whole_page_y1, whole_page_x2, whole_page_y2) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | whole_page_img_annotation_box = {} | 
					
						
						|  | whole_page_img_annotation_box["xmin"] = whole_page_x1 | 
					
						
						|  | whole_page_img_annotation_box["ymin"] = whole_page_y1 | 
					
						
						|  | whole_page_img_annotation_box["xmax"] = whole_page_x2 | 
					
						
						|  | whole_page_img_annotation_box["ymax"] =  whole_page_y2 | 
					
						
						|  | whole_page_img_annotation_box["color"] = (0,0,0) | 
					
						
						|  | whole_page_img_annotation_box["label"] = "Whole page" | 
					
						
						|  |  | 
					
						
						|  | if redact_pdf == True: | 
					
						
						|  | redact_single_box(page, whole_page_rect, whole_page_img_annotation_box, custom_colours) | 
					
						
						|  |  | 
					
						
						|  | return whole_page_img_annotation_box | 
					
						
						|  |  | 
					
						
						|  | def create_page_size_objects(pymupdf_doc:Document, image_sizes_width:List[float], image_sizes_height:List[float], image_file_paths:List[str]): | 
					
						
						|  | page_sizes = list() | 
					
						
						|  | original_cropboxes = list() | 
					
						
						|  |  | 
					
						
						|  | for page_no, page in enumerate(pymupdf_doc): | 
					
						
						|  | reported_page_no = page_no + 1 | 
					
						
						|  |  | 
					
						
						|  | pymupdf_page = pymupdf_doc.load_page(page_no) | 
					
						
						|  | original_cropboxes.append(pymupdf_page.cropbox) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | out_page_image_sizes = { | 
					
						
						|  | "page":reported_page_no, | 
					
						
						|  | "mediabox_width":pymupdf_page.mediabox.width, | 
					
						
						|  | "mediabox_height": pymupdf_page.mediabox.height, | 
					
						
						|  | "cropbox_width":pymupdf_page.cropbox.width, | 
					
						
						|  | "cropbox_height":pymupdf_page.cropbox.height, | 
					
						
						|  | "original_cropbox":original_cropboxes[-1], | 
					
						
						|  | "image_path":image_file_paths[page_no]} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | out_page_image_sizes['cropbox_x_offset'] = pymupdf_page.cropbox.x0 - pymupdf_page.mediabox.x0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | out_page_image_sizes['cropbox_y_offset_from_top'] = pymupdf_page.mediabox.y1 - pymupdf_page.cropbox.y1 | 
					
						
						|  |  | 
					
						
						|  | if image_sizes_width and image_sizes_height: | 
					
						
						|  | out_page_image_sizes["image_width"] = image_sizes_width[page_no] | 
					
						
						|  | out_page_image_sizes["image_height"] = image_sizes_height[page_no] | 
					
						
						|  |  | 
					
						
						|  | page_sizes.append(out_page_image_sizes) | 
					
						
						|  |  | 
					
						
						|  | return page_sizes, original_cropboxes | 
					
						
						|  |  | 
					
						
						|  | def word_level_ocr_output_to_dataframe(ocr_results: dict) -> pd.DataFrame: | 
					
						
						|  | ''' | 
					
						
						|  | Convert a json of ocr results to a dataframe | 
					
						
						|  | ''' | 
					
						
						|  | rows = list() | 
					
						
						|  | ocr_result_page = ocr_results[0] | 
					
						
						|  |  | 
					
						
						|  | for ocr_result in ocr_results: | 
					
						
						|  |  | 
					
						
						|  | page_number = int(ocr_result['page']) | 
					
						
						|  |  | 
					
						
						|  | for line_key, line_data in ocr_result['results'].items(): | 
					
						
						|  |  | 
					
						
						|  | line_number = int(line_data['line']) | 
					
						
						|  | for word in line_data['words']: | 
					
						
						|  | rows.append({ | 
					
						
						|  | 'page': page_number, | 
					
						
						|  | 'line': line_number, | 
					
						
						|  | 'word_text': word['text'], | 
					
						
						|  | 'word_x0': word['bounding_box'][0], | 
					
						
						|  | 'word_y0': word['bounding_box'][1], | 
					
						
						|  | 'word_x1': word['bounding_box'][2], | 
					
						
						|  | 'word_y1': word['bounding_box'][3], | 
					
						
						|  | 'line_text': "", | 
					
						
						|  | 'line_x0': line_data['bounding_box'][0], | 
					
						
						|  | 'line_y0': line_data['bounding_box'][1], | 
					
						
						|  | 'line_x1': line_data['bounding_box'][2], | 
					
						
						|  | 'line_y1': line_data['bounding_box'][3], | 
					
						
						|  | }) | 
					
						
						|  |  | 
					
						
						|  | return pd.DataFrame(rows) | 
					
						
						|  |  | 
					
						
						|  | def prepare_image_or_pdf( | 
					
						
						|  | file_paths: List[str], | 
					
						
						|  | text_extract_method: str, | 
					
						
						|  | all_line_level_ocr_results_df:pd.DataFrame, | 
					
						
						|  | all_page_line_level_ocr_results_with_words_df:pd.DataFrame, | 
					
						
						|  | latest_file_completed: int = 0, | 
					
						
						|  | out_message: List[str] = list(), | 
					
						
						|  | first_loop_state: bool = False, | 
					
						
						|  | number_of_pages:int = 0, | 
					
						
						|  | all_annotations_object:List = list(), | 
					
						
						|  | prepare_for_review:bool = False, | 
					
						
						|  | in_fully_redacted_list:List[int]=list(), | 
					
						
						|  | output_folder:str=OUTPUT_FOLDER, | 
					
						
						|  | input_folder:str=INPUT_FOLDER, | 
					
						
						|  | prepare_images:bool=True, | 
					
						
						|  | page_sizes:list[dict]=list(), | 
					
						
						|  | pymupdf_doc:Document = list(), | 
					
						
						|  | textract_output_found:bool = False, | 
					
						
						|  | relevant_ocr_output_with_words_found:bool = False, | 
					
						
						|  | progress: Progress = Progress(track_tqdm=True) | 
					
						
						|  | ) -> tuple[List[str], List[str]]: | 
					
						
						|  | """ | 
					
						
						|  | Prepare and process image or text PDF files for redaction. | 
					
						
						|  |  | 
					
						
						|  | This function takes a list of file paths, processes each file based on the specified redaction method, | 
					
						
						|  | and returns the output messages and processed file paths. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_paths (List[str]): List of file paths to process. | 
					
						
						|  | text_extract_method (str): The redaction method to use. | 
					
						
						|  | latest_file_completed (optional, int): Index of the last completed file. | 
					
						
						|  | out_message (optional, List[str]): List to store output messages. | 
					
						
						|  | first_loop_state (optional, bool): Flag indicating if this is the first iteration. | 
					
						
						|  | number_of_pages (optional, int): integer indicating the number of pages in the document | 
					
						
						|  | all_annotations_object(optional, List of annotation objects): All annotations for current document | 
					
						
						|  | prepare_for_review(optional, bool): Is this preparation step preparing pdfs and json files to review current redactions? | 
					
						
						|  | in_fully_redacted_list(optional, List of int): A list of pages to fully redact | 
					
						
						|  | output_folder (optional, str): The output folder for file save | 
					
						
						|  | prepare_images (optional, bool): A boolean indicating whether to create images for each PDF page. Defaults to True. | 
					
						
						|  | page_sizes(optional, List[dict]): A list of dicts containing information about page sizes in various formats. | 
					
						
						|  | pymupdf_doc(optional, Document): A pymupdf document object that indicates the existing PDF document object. | 
					
						
						|  | textract_output_found (optional, bool): A boolean indicating whether Textract analysis output has already been found. Defaults to False. | 
					
						
						|  | relevant_ocr_output_with_words_found (optional, bool): A boolean indicating whether local OCR analysis output has already been found. Defaults to False. | 
					
						
						|  | progress (optional, Progress): Progress tracker for the operation | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | tuple[List[str], List[str]]: A tuple containing the output messages and processed file paths. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | tic = time.perf_counter() | 
					
						
						|  | json_from_csv = False | 
					
						
						|  | original_cropboxes = list() | 
					
						
						|  | converted_file_paths = list() | 
					
						
						|  | image_file_paths = list() | 
					
						
						|  |  | 
					
						
						|  | all_img_details = list() | 
					
						
						|  | review_file_csv = pd.DataFrame() | 
					
						
						|  | out_textract_path = "" | 
					
						
						|  | combined_out_message = "" | 
					
						
						|  | final_out_message = "" | 
					
						
						|  |  | 
					
						
						|  | if isinstance(in_fully_redacted_list, pd.DataFrame): | 
					
						
						|  | if not in_fully_redacted_list.empty: | 
					
						
						|  | in_fully_redacted_list = in_fully_redacted_list.iloc[:,0].tolist() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if first_loop_state==True: | 
					
						
						|  | latest_file_completed = 0 | 
					
						
						|  | out_message = list() | 
					
						
						|  | all_annotations_object = list() | 
					
						
						|  | else: | 
					
						
						|  | print("Now redacting file", str(latest_file_completed)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if isinstance(out_message, str): out_message = [out_message] | 
					
						
						|  |  | 
					
						
						|  | if not file_paths: file_paths = list() | 
					
						
						|  |  | 
					
						
						|  | if isinstance(file_paths, dict): file_paths = os.path.abspath(file_paths["name"]) | 
					
						
						|  |  | 
					
						
						|  | if isinstance(file_paths, str): file_path_number = 1 | 
					
						
						|  | else: file_path_number = len(file_paths) | 
					
						
						|  |  | 
					
						
						|  | latest_file_completed = int(latest_file_completed) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if latest_file_completed >= file_path_number: | 
					
						
						|  | print("Last file reached, returning files:", str(latest_file_completed)) | 
					
						
						|  | if isinstance(out_message, list): | 
					
						
						|  | final_out_message = '\n'.join(out_message) | 
					
						
						|  | else: | 
					
						
						|  | final_out_message = out_message | 
					
						
						|  |  | 
					
						
						|  | return final_out_message, converted_file_paths, image_file_paths, number_of_pages, number_of_pages, pymupdf_doc, all_annotations_object, review_file_csv, original_cropboxes, page_sizes, textract_output_found, all_img_details, all_line_level_ocr_results_df, relevant_ocr_output_with_words_found, all_page_line_level_ocr_results_with_words_df | 
					
						
						|  |  | 
					
						
						|  | progress(0.1, desc='Preparing file') | 
					
						
						|  |  | 
					
						
						|  | if isinstance(file_paths, str): | 
					
						
						|  | file_paths_list = [file_paths] | 
					
						
						|  | file_paths_loop = file_paths_list | 
					
						
						|  | else: | 
					
						
						|  | file_paths_list = file_paths | 
					
						
						|  | file_paths_loop = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json')) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for file in file_paths_loop: | 
					
						
						|  | converted_file_path = list() | 
					
						
						|  | image_file_path = list() | 
					
						
						|  |  | 
					
						
						|  | if isinstance(file, str): | 
					
						
						|  | file_path = file | 
					
						
						|  | else: | 
					
						
						|  | file_path = file.name | 
					
						
						|  | file_path_without_ext = get_file_name_without_type(file_path) | 
					
						
						|  | file_name_with_ext = os.path.basename(file_path) | 
					
						
						|  |  | 
					
						
						|  | print("Loading file:", file_name_with_ext) | 
					
						
						|  |  | 
					
						
						|  | if not file_path: | 
					
						
						|  | out_message = "Please select at least one file." | 
					
						
						|  | print(out_message) | 
					
						
						|  | raise Warning(out_message) | 
					
						
						|  |  | 
					
						
						|  | file_extension = os.path.splitext(file_path)[1].lower() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if is_pdf(file_path): | 
					
						
						|  | print(f"File {file_name_with_ext} is a PDF") | 
					
						
						|  | pymupdf_doc = pymupdf.open(file_path) | 
					
						
						|  | pymupdf_pages = pymupdf_doc.page_count | 
					
						
						|  |  | 
					
						
						|  | converted_file_path = file_path | 
					
						
						|  |  | 
					
						
						|  | if prepare_images==True: | 
					
						
						|  | image_file_paths, image_sizes_width, image_sizes_height, all_img_details = process_file_for_image_creation(file_path, prepare_for_review, input_folder, create_images=True) | 
					
						
						|  | else: | 
					
						
						|  | image_file_paths, image_sizes_width, image_sizes_height, all_img_details = process_file_for_image_creation(file_path, prepare_for_review, input_folder, create_images=False) | 
					
						
						|  |  | 
					
						
						|  | page_sizes, original_cropboxes = create_page_size_objects(pymupdf_doc, image_sizes_width, image_sizes_height, image_file_paths) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if (not all_annotations_object) & (prepare_for_review == True): | 
					
						
						|  | all_annotations_object = list() | 
					
						
						|  |  | 
					
						
						|  | for image_path in image_file_paths: | 
					
						
						|  | annotation = {} | 
					
						
						|  | annotation["image"] = image_path | 
					
						
						|  | annotation["boxes"] = list() | 
					
						
						|  |  | 
					
						
						|  | all_annotations_object.append(annotation) | 
					
						
						|  |  | 
					
						
						|  | elif is_pdf_or_image(file_path): | 
					
						
						|  | print(f"File {file_name_with_ext} is an image") | 
					
						
						|  |  | 
					
						
						|  | if file_extension in ['.jpg', '.jpeg', '.png'] and text_extract_method == SELECTABLE_TEXT_EXTRACT_OPTION: | 
					
						
						|  | text_extract_method = TESSERACT_TEXT_EXTRACT_OPTION | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | pymupdf_doc = pymupdf.open() | 
					
						
						|  |  | 
					
						
						|  | img = Image.open(file_path) | 
					
						
						|  | rect = pymupdf.Rect(0, 0, img.width, img.height) | 
					
						
						|  | pymupdf_page = pymupdf_doc.new_page(width=img.width, height=img.height) | 
					
						
						|  | pymupdf_page.insert_image(rect, filename=file_path) | 
					
						
						|  | pymupdf_page = pymupdf_doc.load_page(0) | 
					
						
						|  |  | 
					
						
						|  | file_path_str = str(file_path) | 
					
						
						|  |  | 
					
						
						|  | image_file_paths, image_sizes_width, image_sizes_height, all_img_details = process_file_for_image_creation(file_path_str, prepare_for_review, input_folder, create_images=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | page_sizes, original_cropboxes = create_page_size_objects(pymupdf_doc, image_sizes_width, image_sizes_height, image_file_paths) | 
					
						
						|  |  | 
					
						
						|  | converted_file_path = output_folder + file_name_with_ext | 
					
						
						|  |  | 
					
						
						|  | pymupdf_doc.save(converted_file_path, garbage=4, deflate=True, clean=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | elif file_extension in ['.csv']: | 
					
						
						|  | if '_review_file' in file_path_without_ext: | 
					
						
						|  | review_file_csv = read_file(file_path) | 
					
						
						|  | all_annotations_object = convert_review_df_to_annotation_json(review_file_csv, image_file_paths, page_sizes) | 
					
						
						|  | json_from_csv = True | 
					
						
						|  | elif '_ocr_output' in file_path_without_ext: | 
					
						
						|  | all_line_level_ocr_results_df = read_file(file_path) | 
					
						
						|  |  | 
					
						
						|  | if "line" not in all_line_level_ocr_results_df.columns: | 
					
						
						|  | all_line_level_ocr_results_df["line"] = "" | 
					
						
						|  |  | 
					
						
						|  | json_from_csv = False | 
					
						
						|  | elif '_ocr_results_with_words' in file_path_without_ext: | 
					
						
						|  | all_page_line_level_ocr_results_with_words_df = read_file(file_path) | 
					
						
						|  | json_from_csv = False | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if (file_extension in ['.json']) | (json_from_csv == True): | 
					
						
						|  |  | 
					
						
						|  | if (file_extension in ['.json']) &  (prepare_for_review == True): | 
					
						
						|  | if isinstance(file_path, str): | 
					
						
						|  | with open(file_path, 'r') as json_file: | 
					
						
						|  | all_annotations_object = json.load(json_file) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | all_annotations_object = json.loads(file_path) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | elif (file_extension in ['.json']) and '_textract' in file_path_without_ext: | 
					
						
						|  | print("Saving Textract output") | 
					
						
						|  |  | 
					
						
						|  | output_textract_json_file_name = file_path_without_ext | 
					
						
						|  | if not file_path.endswith("_textract.json"): output_textract_json_file_name = file_path_without_ext + "_textract.json" | 
					
						
						|  | else: output_textract_json_file_name = file_path_without_ext + ".json" | 
					
						
						|  |  | 
					
						
						|  | out_textract_path = os.path.join(output_folder, output_textract_json_file_name) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | shutil.copy2(file_path, out_textract_path) | 
					
						
						|  | textract_output_found = True | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | elif (file_extension in ['.json']) and '_ocr_results_with_words' in file_path_without_ext: | 
					
						
						|  | print("Saving local OCR output with words") | 
					
						
						|  |  | 
					
						
						|  | output_ocr_results_with_words_json_file_name = file_path_without_ext + ".json" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | out_ocr_results_with_words_path = os.path.join(output_folder, output_ocr_results_with_words_json_file_name) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | shutil.copy2(file_path, out_ocr_results_with_words_path) | 
					
						
						|  |  | 
					
						
						|  | if prepare_for_review == True: | 
					
						
						|  | print("Converting local OCR output with words to csv") | 
					
						
						|  | page_sizes_df = pd.DataFrame(page_sizes) | 
					
						
						|  | all_page_line_level_ocr_results_with_words, is_missing, log_files_output_paths = load_and_convert_ocr_results_with_words_json(out_ocr_results_with_words_path, log_files_output_paths, page_sizes_df) | 
					
						
						|  | all_page_line_level_ocr_results_with_words_df = word_level_ocr_output_to_dataframe(all_page_line_level_ocr_results_with_words) | 
					
						
						|  |  | 
					
						
						|  | all_page_line_level_ocr_results_with_words_df = divide_coordinates_by_page_sizes(all_page_line_level_ocr_results_with_words_df, page_sizes_df, xmin="word_x0", xmax="word_x1", ymin="word_y0", ymax="word_y1") | 
					
						
						|  | all_page_line_level_ocr_results_with_words_df = divide_coordinates_by_page_sizes(all_page_line_level_ocr_results_with_words_df, page_sizes_df, xmin="line_x0", xmax="line_x1", ymin="line_y0", ymax="line_y1") | 
					
						
						|  |  | 
					
						
						|  | if text_extract_method == SELECTABLE_TEXT_EXTRACT_OPTION and file_path.endswith("_ocr_results_with_words_local_text.json"): relevant_ocr_output_with_words_found = True | 
					
						
						|  | if text_extract_method == TESSERACT_TEXT_EXTRACT_OPTION and file_path.endswith("_ocr_results_with_words_local_ocr.json"): relevant_ocr_output_with_words_found = True | 
					
						
						|  | if text_extract_method == TEXTRACT_TEXT_EXTRACT_OPTION and file_path.endswith("_ocr_results_with_words_textract.json"): relevant_ocr_output_with_words_found = True | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if all_annotations_object: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image_file_paths_pages = [ | 
					
						
						|  | int(re.search(r'_(\d+)\.png$', os.path.basename(s)).group(1)) | 
					
						
						|  | for s in image_file_paths | 
					
						
						|  | if re.search(r'_(\d+)\.png$', os.path.basename(s)) | 
					
						
						|  | ] | 
					
						
						|  | image_file_paths_pages = [int(i) for i in image_file_paths_pages] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if image_file_paths: | 
					
						
						|  | for i, image_file_path in enumerate(image_file_paths): | 
					
						
						|  |  | 
					
						
						|  | if i < len(all_annotations_object): | 
					
						
						|  | annotation = all_annotations_object[i] | 
					
						
						|  | else: | 
					
						
						|  | annotation = {} | 
					
						
						|  | all_annotations_object.append(annotation) | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | if not annotation: | 
					
						
						|  | annotation = {"image":"", "boxes": []} | 
					
						
						|  | annotation_page_number = int(re.search(r'_(\d+)\.png$', image_file_path).group(1)) | 
					
						
						|  | else: | 
					
						
						|  | annotation_page_number = int(re.search(r'_(\d+)\.png$', annotation["image"]).group(1)) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print("Extracting page number from image failed due to:", e) | 
					
						
						|  | annotation_page_number = 0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if annotation_page_number in image_file_paths_pages: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | correct_image_page = annotation_page_number | 
					
						
						|  | annotation["image"] = image_file_paths[correct_image_page] | 
					
						
						|  | else: | 
					
						
						|  | print("Page", annotation_page_number, "image file not found.") | 
					
						
						|  |  | 
					
						
						|  | all_annotations_object[i] = annotation | 
					
						
						|  |  | 
					
						
						|  | if isinstance(in_fully_redacted_list, list): | 
					
						
						|  | in_fully_redacted_list = pd.DataFrame(data={"fully_redacted_pages_list":in_fully_redacted_list}) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not in_fully_redacted_list.empty: | 
					
						
						|  | print("Redacting whole pages") | 
					
						
						|  |  | 
					
						
						|  | for i, image in enumerate(image_file_paths): | 
					
						
						|  | page = pymupdf_doc.load_page(i) | 
					
						
						|  | rect_height = page.rect.height | 
					
						
						|  | rect_width = page.rect.width | 
					
						
						|  | whole_page_img_annotation_box = redact_whole_pymupdf_page(rect_height, rect_width, image, page, custom_colours = False, border = 5, image_dimensions={"image_width":image_sizes_width[i], "image_height":image_sizes_height[i]}) | 
					
						
						|  |  | 
					
						
						|  | all_annotations_object.append(whole_page_img_annotation_box) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | out_folder = output_folder + file_path_without_ext + ".json" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if file_extension in ['.zip']: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | out_folder = os.path.join(output_folder, file_path_without_ext + "_textract.json") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with zipfile.ZipFile(file_path, 'r') as zip_ref: | 
					
						
						|  | json_files = [f for f in zip_ref.namelist() if f.lower().endswith('.json')] | 
					
						
						|  |  | 
					
						
						|  | if len(json_files) == 1: | 
					
						
						|  | json_filename = json_files[0] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | extracted_path = os.path.join(os.path.dirname(file_path), json_filename) | 
					
						
						|  | zip_ref.extract(json_filename, os.path.dirname(file_path)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | shutil.move(extracted_path, out_folder) | 
					
						
						|  |  | 
					
						
						|  | textract_output_found = True | 
					
						
						|  | else: | 
					
						
						|  | print(f"Skipping {file_path}: Expected 1 JSON file, found {len(json_files)}") | 
					
						
						|  |  | 
					
						
						|  | converted_file_paths.append(converted_file_path) | 
					
						
						|  | image_file_paths.extend(image_file_path) | 
					
						
						|  |  | 
					
						
						|  | toc = time.perf_counter() | 
					
						
						|  | out_time = f"File '{file_name_with_ext}' prepared in {toc - tic:0.1f} seconds." | 
					
						
						|  |  | 
					
						
						|  | print(out_time) | 
					
						
						|  |  | 
					
						
						|  | out_message.append(out_time) | 
					
						
						|  | combined_out_message = '\n'.join(out_message) | 
					
						
						|  |  | 
					
						
						|  | number_of_pages = len(page_sizes) | 
					
						
						|  |  | 
					
						
						|  | print("Finished loading in files") | 
					
						
						|  |  | 
					
						
						|  | return combined_out_message, converted_file_paths, image_file_paths, number_of_pages, number_of_pages, pymupdf_doc, all_annotations_object, review_file_csv, original_cropboxes, page_sizes, textract_output_found, all_img_details, all_line_level_ocr_results_df, relevant_ocr_output_with_words_found, all_page_line_level_ocr_results_with_words_df | 
					
						
						|  |  | 
					
						
						|  | def load_and_convert_ocr_results_with_words_json(ocr_results_with_words_json_file_path:str, log_files_output_paths:str, page_sizes_df:pd.DataFrame): | 
					
						
						|  | """ | 
					
						
						|  | Loads Textract JSON from a file, detects if conversion is needed, and converts if necessary. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | if not os.path.exists(ocr_results_with_words_json_file_path): | 
					
						
						|  | print("No existing OCR results file found.") | 
					
						
						|  | return [], True, log_files_output_paths | 
					
						
						|  |  | 
					
						
						|  | no_ocr_results_with_words_file = False | 
					
						
						|  | print("Found existing OCR results json results file.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if ocr_results_with_words_json_file_path not in log_files_output_paths: | 
					
						
						|  | log_files_output_paths.append(ocr_results_with_words_json_file_path) | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | with open(ocr_results_with_words_json_file_path, 'r', encoding='utf-8') as json_file: | 
					
						
						|  | ocr_results_with_words_data = json.load(json_file) | 
					
						
						|  | except json.JSONDecodeError: | 
					
						
						|  | print("Error: Failed to parse OCR results JSON file. Returning empty data.") | 
					
						
						|  | return [], True, log_files_output_paths | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "page" and "results" in ocr_results_with_words_data[0]: | 
					
						
						|  | print("JSON already in the correct format for app. No changes needed.") | 
					
						
						|  | return ocr_results_with_words_data, False, log_files_output_paths | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | print("Invalid OCR result JSON format: 'page' or 'results' key missing.") | 
					
						
						|  |  | 
					
						
						|  | return [], True, log_files_output_paths | 
					
						
						|  |  | 
					
						
						|  | def convert_text_pdf_to_img_pdf(in_file_path:str, out_text_file_path:List[str], image_dpi:float=image_dpi, output_folder:str=OUTPUT_FOLDER, input_folder:str=INPUT_FOLDER): | 
					
						
						|  | file_path_without_ext = get_file_name_without_type(in_file_path) | 
					
						
						|  |  | 
					
						
						|  | print("In convert_text_pdf_to_img_pdf function, file_path_without_ext:", file_path_without_ext) | 
					
						
						|  |  | 
					
						
						|  | out_file_paths = out_text_file_path | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | pdf_text_image_paths, image_sizes_width, image_sizes_height, all_img_details = process_file_for_image_creation(out_file_paths[0], input_folder=input_folder) | 
					
						
						|  | out_text_image_file_path = output_folder + file_path_without_ext + "_text_redacted_as_img.pdf" | 
					
						
						|  | pdf_text_image_paths[0].save(out_text_image_file_path, "PDF" ,resolution=image_dpi, save_all=True, append_images=pdf_text_image_paths[1:]) | 
					
						
						|  |  | 
					
						
						|  | out_file_paths = [out_text_image_file_path] | 
					
						
						|  |  | 
					
						
						|  | out_message = "PDF " + file_path_without_ext + " converted to image-based file." | 
					
						
						|  | print(out_message) | 
					
						
						|  |  | 
					
						
						|  | return out_message, out_file_paths | 
					
						
						|  |  | 
					
						
						|  | def save_pdf_with_or_without_compression(pymupdf_doc:object, out_redacted_pdf_file_path, COMPRESS_REDACTED_PDF:bool=COMPRESS_REDACTED_PDF): | 
					
						
						|  | ''' | 
					
						
						|  | Save a pymupdf document with basic cleaning or with full compression options. Can be useful for low memory systems to do minimal cleaning to avoid crashing with large PDFs. | 
					
						
						|  | ''' | 
					
						
						|  | if COMPRESS_REDACTED_PDF == True: | 
					
						
						|  | pymupdf_doc.save(out_redacted_pdf_file_path, garbage=4, deflate=True, clean=True) | 
					
						
						|  | else: | 
					
						
						|  | pymupdf_doc.save(out_redacted_pdf_file_path, garbage=1, clean=True) | 
					
						
						|  |  | 
					
						
						|  | def join_values_within_threshold(df1:pd.DataFrame, df2:pd.DataFrame): | 
					
						
						|  |  | 
					
						
						|  | threshold = 5 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df1['key'] = 1 | 
					
						
						|  | df2['key'] = 1 | 
					
						
						|  | merged = pd.merge(df1, df2, on='key').drop(columns=['key']) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | conditions = ( | 
					
						
						|  | (abs(merged['xmin_x'] - merged['xmin_y']) <= threshold) & | 
					
						
						|  | (abs(merged['xmax_x'] - merged['xmax_y']) <= threshold) & | 
					
						
						|  | (abs(merged['ymin_x'] - merged['ymin_y']) <= threshold) & | 
					
						
						|  | (abs(merged['ymax_x'] - merged['ymax_y']) <= threshold) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | filtered = merged[conditions] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | result = filtered.drop_duplicates(subset=['xmin_x', 'xmax_x', 'ymin_x', 'ymax_x']) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | final_df = pd.merge(df1, result, left_on=['xmin', 'xmax', 'ymin', 'ymax'], right_on=['xmin_x', 'xmax_x', 'ymin_x', 'ymax_x'], how='left') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | final_df = final_df.drop(columns=['key']) | 
					
						
						|  |  | 
					
						
						|  | def remove_duplicate_images_with_blank_boxes(data: List[dict]) -> List[dict]: | 
					
						
						|  | ''' | 
					
						
						|  | Remove items from the annotator object where the same page exists twice. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  | image_groups = defaultdict(list) | 
					
						
						|  | for item in data: | 
					
						
						|  | image_groups[item['image']].append(item) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | result = list() | 
					
						
						|  | for image, items in image_groups.items(): | 
					
						
						|  |  | 
					
						
						|  | non_empty_boxes = [item for item in items if item.get('boxes')] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if non_empty_boxes: | 
					
						
						|  |  | 
					
						
						|  | result.append(non_empty_boxes[0]) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | result.append(items[0]) | 
					
						
						|  |  | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  | def divide_coordinates_by_page_sizes( | 
					
						
						|  | review_file_df: pd.DataFrame, | 
					
						
						|  | page_sizes_df: pd.DataFrame, | 
					
						
						|  | xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax" | 
					
						
						|  | ) -> pd.DataFrame: | 
					
						
						|  | """ | 
					
						
						|  | Optimized function to convert absolute image coordinates (>1) to relative coordinates (<=1). | 
					
						
						|  |  | 
					
						
						|  | Identifies rows with absolute coordinates, merges page size information, | 
					
						
						|  | divides coordinates by dimensions, and combines with already-relative rows. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | review_file_df: Input DataFrame with potentially mixed coordinate systems. | 
					
						
						|  | page_sizes_df: DataFrame with page dimensions ('page', 'image_width', | 
					
						
						|  | 'image_height', 'mediabox_width', 'mediabox_height'). | 
					
						
						|  | xmin, xmax, ymin, ymax: Names of the coordinate columns. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | DataFrame with coordinates converted to relative system, sorted. | 
					
						
						|  | """ | 
					
						
						|  | if review_file_df.empty or xmin not in review_file_df.columns: | 
					
						
						|  | return review_file_df | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | coord_cols = [xmin, xmax, ymin, ymax] | 
					
						
						|  | cols_to_convert = coord_cols + ["page"] | 
					
						
						|  | temp_df = review_file_df.copy() | 
					
						
						|  |  | 
					
						
						|  | for col in cols_to_convert: | 
					
						
						|  | if col in temp_df.columns: | 
					
						
						|  | temp_df[col] = pd.to_numeric(temp_df[col], errors="coerce") | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | if col == 'page' or col in coord_cols: | 
					
						
						|  | print(f"Warning: Required column '{col}' not found in review_file_df. Returning original DataFrame.") | 
					
						
						|  | return review_file_df | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | is_absolute_mask = ( | 
					
						
						|  | (temp_df[xmin] > 1) & (temp_df[xmin].notna()) & | 
					
						
						|  | (temp_df[xmax] > 1) & (temp_df[xmax].notna()) & | 
					
						
						|  | (temp_df[ymin] > 1) & (temp_df[ymin].notna()) & | 
					
						
						|  | (temp_df[ymax] > 1) & (temp_df[ymax].notna()) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df_rel = temp_df[~is_absolute_mask] | 
					
						
						|  | df_abs = temp_df[is_absolute_mask].copy() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not df_abs.empty: | 
					
						
						|  |  | 
					
						
						|  | if "image_width" not in df_abs.columns and not page_sizes_df.empty: | 
					
						
						|  | ps_df_copy = page_sizes_df.copy() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | ps_df_copy['page'] = pd.to_numeric(ps_df_copy['page'], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merge_cols = ['page', 'image_width', 'image_height', 'mediabox_width', 'mediabox_height'] | 
					
						
						|  | available_merge_cols = [col for col in merge_cols if col in ps_df_copy.columns] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for col in ['image_width', 'image_height', 'mediabox_width', 'mediabox_height']: | 
					
						
						|  | if col in ps_df_copy.columns: | 
					
						
						|  |  | 
					
						
						|  | if ps_df_copy[col].dtype == 'object': | 
					
						
						|  | ps_df_copy[col] = ps_df_copy[col].replace("<NA>", pd.NA) | 
					
						
						|  |  | 
					
						
						|  | ps_df_copy[col] = pd.to_numeric(ps_df_copy[col], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if 'page' in available_merge_cols: | 
					
						
						|  | df_abs = df_abs.merge( | 
					
						
						|  | ps_df_copy[available_merge_cols], | 
					
						
						|  | on="page", | 
					
						
						|  | how="left" | 
					
						
						|  | ) | 
					
						
						|  | else: | 
					
						
						|  | print("Warning: 'page' column not found in page_sizes_df. Cannot merge dimensions.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "image_width" in df_abs.columns and "mediabox_width" in df_abs.columns: | 
					
						
						|  |  | 
					
						
						|  | if df_abs["image_width"].isna().all(): | 
					
						
						|  |  | 
					
						
						|  | df_abs["image_width"] = df_abs["image_width"].fillna(df_abs["mediabox_width"]) | 
					
						
						|  | df_abs["image_height"] = df_abs["image_height"].fillna(df_abs["mediabox_height"]) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | divisors_numeric = True | 
					
						
						|  | for col in ["image_width", "image_height"]: | 
					
						
						|  | if col in df_abs.columns: | 
					
						
						|  | df_abs[col] = pd.to_numeric(df_abs[col], errors='coerce') | 
					
						
						|  | else: | 
					
						
						|  | print(f"Warning: Dimension column '{col}' missing. Cannot perform division.") | 
					
						
						|  | divisors_numeric = False | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if divisors_numeric and "image_width" in df_abs.columns and "image_height" in df_abs.columns: | 
					
						
						|  |  | 
					
						
						|  | with np.errstate(divide='ignore', invalid='ignore'): | 
					
						
						|  | df_abs[xmin] = round(df_abs[xmin] / df_abs["image_width"],6) | 
					
						
						|  | df_abs[xmax] = round(df_abs[xmax] / df_abs["image_width"],6) | 
					
						
						|  | df_abs[ymin] = round(df_abs[ymin] / df_abs["image_height"],6) | 
					
						
						|  | df_abs[ymax] = round(df_abs[ymax] / df_abs["image_height"],6) | 
					
						
						|  |  | 
					
						
						|  | df_abs.replace([np.inf, -np.inf], np.nan, inplace=True) | 
					
						
						|  | else: | 
					
						
						|  | print("Skipping coordinate division due to missing or non-numeric dimension columns.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | dfs_to_concat = [df for df in [df_rel, df_abs] if not df.empty] | 
					
						
						|  |  | 
					
						
						|  | if dfs_to_concat: | 
					
						
						|  | final_df = pd.concat(dfs_to_concat, ignore_index=True) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | print("Warning: Both relative and absolute splits resulted in empty DataFrames.") | 
					
						
						|  | final_df = pd.DataFrame(columns=review_file_df.columns) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | required_sort_columns = {"page", xmin, ymin} | 
					
						
						|  | if not final_df.empty and required_sort_columns.issubset(final_df.columns): | 
					
						
						|  |  | 
					
						
						|  | final_df['page'] = pd.to_numeric(final_df['page'], errors='coerce') | 
					
						
						|  | final_df[ymin] = pd.to_numeric(final_df[ymin], errors='coerce') | 
					
						
						|  | final_df[xmin] = pd.to_numeric(final_df[xmin], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  | final_df.sort_values(["page", ymin, xmin], inplace=True, na_position='last') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | cols_to_drop = ["image_width", "image_height", "mediabox_width", "mediabox_height"] | 
					
						
						|  | final_df = final_df.drop(columns=cols_to_drop, errors="ignore") | 
					
						
						|  |  | 
					
						
						|  | return final_df | 
					
						
						|  |  | 
					
						
						|  | def multiply_coordinates_by_page_sizes( | 
					
						
						|  | review_file_df: pd.DataFrame, | 
					
						
						|  | page_sizes_df: pd.DataFrame, | 
					
						
						|  | xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax" | 
					
						
						|  | ): | 
					
						
						|  | """ | 
					
						
						|  | Optimized function to convert relative coordinates to absolute based on page sizes. | 
					
						
						|  |  | 
					
						
						|  | Separates relative (<=1) and absolute (>1) coordinates, merges page sizes | 
					
						
						|  | for relative coordinates, calculates absolute pixel values, and recombines. | 
					
						
						|  | """ | 
					
						
						|  | if review_file_df.empty or xmin not in review_file_df.columns: | 
					
						
						|  | return review_file_df | 
					
						
						|  |  | 
					
						
						|  | coord_cols = [xmin, xmax, ymin, ymax] | 
					
						
						|  |  | 
					
						
						|  | for col in coord_cols + ["page"]: | 
					
						
						|  | if col in review_file_df.columns: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | review_file_df[col] = pd.to_numeric(review_file_df[col], errors="coerce") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | is_relative_mask = ( | 
					
						
						|  | (review_file_df[xmin].le(1) & review_file_df[xmin].notna()) & | 
					
						
						|  | (review_file_df[xmax].le(1) & review_file_df[xmax].notna()) & | 
					
						
						|  | (review_file_df[ymin].le(1) & review_file_df[ymin].notna()) & | 
					
						
						|  | (review_file_df[ymax].le(1) & review_file_df[ymax].notna()) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df_abs = review_file_df[~is_relative_mask].copy() | 
					
						
						|  | df_rel = review_file_df[is_relative_mask].copy() | 
					
						
						|  |  | 
					
						
						|  | if df_rel.empty: | 
					
						
						|  |  | 
					
						
						|  | if not df_abs.empty and {"page", xmin, ymin}.issubset(df_abs.columns): | 
					
						
						|  | df_abs.sort_values(["page", xmin, ymin], inplace=True, na_position='last') | 
					
						
						|  | return df_abs | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "image_width" not in df_rel.columns and not page_sizes_df.empty: | 
					
						
						|  |  | 
					
						
						|  | page_sizes_df = page_sizes_df.copy() | 
					
						
						|  | page_sizes_df['page'] = pd.to_numeric(page_sizes_df['page'], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  | page_sizes_df[['image_width', 'image_height']] = page_sizes_df[['image_width','image_height']].replace("<NA>", pd.NA) | 
					
						
						|  | page_sizes_df['image_width'] = pd.to_numeric(page_sizes_df['image_width'], errors='coerce') | 
					
						
						|  | page_sizes_df['image_height'] = pd.to_numeric(page_sizes_df['image_height'], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df_rel = df_rel.merge( | 
					
						
						|  | page_sizes_df[['page', 'image_width', 'image_height']], | 
					
						
						|  | on="page", | 
					
						
						|  | how="left" | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "image_width" in df_rel.columns: | 
					
						
						|  |  | 
					
						
						|  | has_size_mask = df_rel["image_width"].notna() & df_rel["image_height"].notna() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df_rel.loc[has_size_mask, xmin] *= df_rel.loc[has_size_mask, "image_width"] | 
					
						
						|  | df_rel.loc[has_size_mask, xmax] *= df_rel.loc[has_size_mask, "image_width"] | 
					
						
						|  | df_rel.loc[has_size_mask, ymin] *= df_rel.loc[has_size_mask, "image_height"] | 
					
						
						|  | df_rel.loc[has_size_mask, ymax] *= df_rel.loc[has_size_mask, "image_height"] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | dfs_to_concat = [df for df in [df_abs, df_rel] if not df.empty] | 
					
						
						|  |  | 
					
						
						|  | if not dfs_to_concat: | 
					
						
						|  | return pd.DataFrame() | 
					
						
						|  |  | 
					
						
						|  | final_df = pd.concat(dfs_to_concat, ignore_index=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | required_sort_columns = {"page", xmin, ymin} | 
					
						
						|  | if not final_df.empty and required_sort_columns.issubset(final_df.columns): | 
					
						
						|  |  | 
					
						
						|  | final_df.sort_values(["page", xmin, ymin], inplace=True, na_position='last') | 
					
						
						|  |  | 
					
						
						|  | return final_df | 
					
						
						|  |  | 
					
						
						|  | def do_proximity_match_by_page_for_text(df1:pd.DataFrame, df2:pd.DataFrame): | 
					
						
						|  | ''' | 
					
						
						|  | Match text from one dataframe to another based on proximity matching of coordinates page by page. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  | if not 'text' in df2.columns: df2['text'] = '' | 
					
						
						|  | if not 'text' in df1.columns: df1['text'] = '' | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merge_keys = ['xmin', 'ymin', 'xmax', 'ymax', 'label', 'page'] | 
					
						
						|  | df1['key'] = df1[merge_keys].astype(str).agg('_'.join, axis=1) | 
					
						
						|  | df2['key'] = df2[merge_keys].astype(str).agg('_'.join, axis=1) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merged_df = df1.merge(df2[['key', 'text']], on='key', how='left', suffixes=('', '_duplicate')) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merged_df['text'] = np.where( | 
					
						
						|  | merged_df['text'].isna() | (merged_df['text'] == ''), | 
					
						
						|  | merged_df.pop('text_duplicate'), | 
					
						
						|  | merged_df['text'] | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | tolerance = 0.02 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | page_trees = {} | 
					
						
						|  | for page in df2['page'].unique(): | 
					
						
						|  | df2_page = df2[df2['page'] == page] | 
					
						
						|  | coords = df2_page[['xmin', 'ymin', 'xmax', 'ymax']].values | 
					
						
						|  | if np.all(np.isfinite(coords)) and len(coords) > 0: | 
					
						
						|  | page_trees[page] = (cKDTree(coords), df2_page) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for i, row in df1.iterrows(): | 
					
						
						|  | page_number = row['page'] | 
					
						
						|  |  | 
					
						
						|  | if page_number in page_trees: | 
					
						
						|  | tree, df2_page = page_trees[page_number] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | dist, idx = tree.query([row[['xmin', 'ymin', 'xmax', 'ymax']].values], distance_upper_bound=tolerance) | 
					
						
						|  |  | 
					
						
						|  | if dist[0] < tolerance and idx[0] < len(df2_page): | 
					
						
						|  | merged_df.at[i, 'text'] = df2_page.iloc[idx[0]]['text'] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merged_df.drop(columns=['key'], inplace=True) | 
					
						
						|  |  | 
					
						
						|  | return merged_df | 
					
						
						|  |  | 
					
						
						|  | def do_proximity_match_all_pages_for_text(df1:pd.DataFrame, df2:pd.DataFrame, threshold:float=0.03): | 
					
						
						|  | ''' | 
					
						
						|  | Match text from one dataframe to another based on proximity matching of coordinates across all pages. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  | if not 'text' in df2.columns: df2['text'] = '' | 
					
						
						|  | if not 'text' in df1.columns: df1['text'] = '' | 
					
						
						|  |  | 
					
						
						|  | for col in ['xmin', 'ymin', 'xmax', 'ymax']: | 
					
						
						|  | df1[col] = pd.to_numeric(df1[col], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  | for col in ['xmin', 'ymin', 'xmax', 'ymax']: | 
					
						
						|  | df2[col] = pd.to_numeric(df2[col], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merge_keys = ['xmin', 'ymin', 'xmax', 'ymax', 'label', 'page'] | 
					
						
						|  | df1['key'] = df1[merge_keys].astype(str).agg('_'.join, axis=1) | 
					
						
						|  | df2['key'] = df2[merge_keys].astype(str).agg('_'.join, axis=1) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merged_df = df1.merge(df2[['key', 'text']], on='key', how='left', suffixes=('', '_duplicate')) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merged_df['text'] = np.where( | 
					
						
						|  | merged_df['text'].isna() | (merged_df['text'] == ''), | 
					
						
						|  | merged_df.pop('text_duplicate'), | 
					
						
						|  | merged_df['text'] | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | query_coords = np.array(df1[['xmin', 'ymin', 'xmax', 'ymax']].values, dtype=float) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | finite_mask = np.isfinite(query_coords).all(axis=1) | 
					
						
						|  | if not finite_mask.all(): | 
					
						
						|  |  | 
					
						
						|  | query_coords = query_coords[finite_mask] | 
					
						
						|  | else: | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if query_coords.size > 0: | 
					
						
						|  |  | 
					
						
						|  | finite_mask_df2 = np.isfinite(df2[['xmin', 'ymin', 'xmax', 'ymax']].values).all(axis=1) | 
					
						
						|  | df2_finite = df2[finite_mask_df2] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | tree = cKDTree(df2_finite[['xmin', 'ymin', 'xmax', 'ymax']].values) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | tolerance = threshold | 
					
						
						|  | distances, indices = tree.query(query_coords, distance_upper_bound=tolerance) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for i, (dist, idx) in enumerate(zip(distances, indices)): | 
					
						
						|  | if dist < tolerance and idx < len(df2_finite): | 
					
						
						|  | merged_df.at[i, 'text'] = df2_finite.iloc[idx]['text'] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | merged_df.drop(columns=['key'], inplace=True) | 
					
						
						|  |  | 
					
						
						|  | return merged_df | 
					
						
						|  |  | 
					
						
						|  | def _extract_page_number(image_path: Any) -> int: | 
					
						
						|  | """Helper function to safely extract page number.""" | 
					
						
						|  | if not isinstance(image_path, str): | 
					
						
						|  | return 1 | 
					
						
						|  | match = IMAGE_NUM_REGEX.search(image_path) | 
					
						
						|  | if match: | 
					
						
						|  | try: | 
					
						
						|  | return int(match.group(1)) + 1 | 
					
						
						|  | except (ValueError, TypeError): | 
					
						
						|  | return 1 | 
					
						
						|  | return 1 | 
					
						
						|  |  | 
					
						
						|  | def convert_annotation_data_to_dataframe(all_annotations: List[Dict[str, Any]]): | 
					
						
						|  | ''' | 
					
						
						|  | Convert annotation list to DataFrame using Pandas explode and json_normalize. | 
					
						
						|  | ''' | 
					
						
						|  | if not all_annotations: | 
					
						
						|  |  | 
					
						
						|  | print("No annotations found, returning empty dataframe") | 
					
						
						|  | return pd.DataFrame(columns=["image", "page", "xmin", "xmax", "ymin", "ymax", "text", "id"]) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df = pd.DataFrame({ | 
					
						
						|  | "image": [anno.get("image") for anno in all_annotations], | 
					
						
						|  |  | 
					
						
						|  | "boxes": [ | 
					
						
						|  | anno.get("boxes") if isinstance(anno.get("boxes"), list) | 
					
						
						|  | else [anno.get("boxes")] if isinstance(anno.get("boxes"), dict) | 
					
						
						|  | else [] | 
					
						
						|  | for anno in all_annotations | 
					
						
						|  | ] | 
					
						
						|  | }) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df['page'] = df['image'].apply(_extract_page_number) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | placeholder_box = {"xmin": pd.NA, "xmax": pd.NA, "ymin": pd.NA, "ymax": pd.NA, "text": pd.NA, "id": pd.NA} | 
					
						
						|  | df['boxes'] = df['boxes'].apply(lambda x: x if x else [placeholder_box]) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df_exploded = df.explode('boxes', ignore_index=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | mask = df_exploded['boxes'].notna() & df_exploded['boxes'].apply(isinstance, args=(dict,)) | 
					
						
						|  | normalized_boxes = pd.json_normalize(df_exploded.loc[mask, 'boxes']) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | final_df = df_exploded.loc[mask, ['image', 'page']].reset_index(drop=True).join(normalized_boxes) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | essential_box_cols = ["xmin", "xmax", "ymin", "ymax", "text", "id", "label"] | 
					
						
						|  | for col in essential_box_cols: | 
					
						
						|  | if col not in final_df.columns: | 
					
						
						|  | final_df[col] = pd.NA | 
					
						
						|  | final_df[col] = final_df[col].replace({None: pd.NA}) | 
					
						
						|  |  | 
					
						
						|  | base_cols = ["image"] | 
					
						
						|  | extra_box_cols = [col for col in final_df.columns if col not in base_cols and col not in essential_box_cols] | 
					
						
						|  | final_col_order = base_cols + essential_box_cols + sorted(extra_box_cols) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | final_df = final_df.reindex(columns=final_col_order, fill_value=pd.NA) | 
					
						
						|  | final_df = final_df.dropna(subset=["xmin", "xmax", "ymin", "ymax", "text", "id", "label"], how="all") | 
					
						
						|  | final_df.replace({None: pd.NA}) | 
					
						
						|  |  | 
					
						
						|  | return final_df | 
					
						
						|  |  | 
					
						
						|  | def create_annotation_dicts_from_annotation_df( | 
					
						
						|  | all_image_annotations_df: pd.DataFrame, | 
					
						
						|  | page_sizes: List[Dict[str, Any]] | 
					
						
						|  | ) -> List[Dict[str, Any]]: | 
					
						
						|  | ''' | 
					
						
						|  | Convert annotation DataFrame back to list of dicts using dictionary lookup. | 
					
						
						|  | Ensures all images from page_sizes are present without duplicates. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image_dict: Dict[str, Dict[str, Any]] = {} | 
					
						
						|  | for item in page_sizes: | 
					
						
						|  | image_path = item.get("image_path") | 
					
						
						|  | if image_path: | 
					
						
						|  | image_dict[image_path] = {"image": image_path, "boxes": []} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if all_image_annotations_df.empty or 'image' not in all_image_annotations_df.columns: | 
					
						
						|  |  | 
					
						
						|  | return list(image_dict.values()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | box_cols = ['xmin', 'ymin', 'xmax', 'ymax', 'color', 'label', 'text', 'id'] | 
					
						
						|  | available_cols = [col for col in box_cols if col in all_image_annotations_df.columns] | 
					
						
						|  |  | 
					
						
						|  | if 'text' in all_image_annotations_df.columns: | 
					
						
						|  | all_image_annotations_df['text'] = all_image_annotations_df['text'].fillna('') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not available_cols: | 
					
						
						|  | print(f"Warning: None of the expected box columns ({box_cols}) found in DataFrame.") | 
					
						
						|  | return list(image_dict.values()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | coord_cols = ['xmin', 'ymin', 'xmax', 'ymax'] | 
					
						
						|  | valid_box_df = all_image_annotations_df.dropna( | 
					
						
						|  | subset=[col for col in coord_cols if col in available_cols] | 
					
						
						|  | ).copy() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if valid_box_df.empty: | 
					
						
						|  | print("Warning: No valid annotation rows found in DataFrame after dropping NA coordinates.") | 
					
						
						|  | return list(image_dict.values()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | for image_path, group in valid_box_df.groupby('image', observed=True, sort=False): | 
					
						
						|  |  | 
					
						
						|  | if image_path in image_dict: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | boxes = group[available_cols].to_dict(orient='records') | 
					
						
						|  |  | 
					
						
						|  | image_dict[image_path]['boxes'] = boxes | 
					
						
						|  |  | 
					
						
						|  | except KeyError: | 
					
						
						|  |  | 
					
						
						|  | print("Error: Issue grouping DataFrame by 'image'.") | 
					
						
						|  | return list(image_dict.values()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | result = list(image_dict.values()) | 
					
						
						|  |  | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  | def convert_annotation_json_to_review_df( | 
					
						
						|  | all_annotations: List[dict], | 
					
						
						|  | redaction_decision_output: pd.DataFrame = pd.DataFrame(), | 
					
						
						|  | page_sizes: List[dict] = list(), | 
					
						
						|  | do_proximity_match: bool = True | 
					
						
						|  | ) -> pd.DataFrame: | 
					
						
						|  | ''' | 
					
						
						|  | Convert the annotation json data to a dataframe format. | 
					
						
						|  | Add on any text from the initial review_file dataframe by joining based on 'id' if available | 
					
						
						|  | in both sources, otherwise falling back to joining on pages/co-ordinates (if option selected). | 
					
						
						|  |  | 
					
						
						|  | Refactored for improved efficiency, prioritizing ID-based join and conditionally applying | 
					
						
						|  | coordinate division and proximity matching. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | review_file_df = convert_annotation_data_to_dataframe(all_annotations) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | review_file_df.dropna(subset=['xmin', 'ymin', 'xmax', 'ymax'], how='any', inplace=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if review_file_df.empty: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | standard_cols = ["image", "page", "label", "color", "xmin", "ymin", "xmax", "ymax", "text"] | 
					
						
						|  | if 'id' in review_file_df.columns: | 
					
						
						|  | standard_cols.append('id') | 
					
						
						|  | return pd.DataFrame(columns=standard_cols) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if 'id' not in review_file_df.columns: | 
					
						
						|  | review_file_df['id'] = '' | 
					
						
						|  |  | 
					
						
						|  | if not redaction_decision_output.empty and 'id' not in redaction_decision_output.columns: | 
					
						
						|  | redaction_decision_output['id'] = '' | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | page_sizes_df = pd.DataFrame() | 
					
						
						|  | if page_sizes: | 
					
						
						|  | page_sizes_df = pd.DataFrame(page_sizes) | 
					
						
						|  | if not page_sizes_df.empty: | 
					
						
						|  |  | 
					
						
						|  | page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") | 
					
						
						|  | page_sizes_df.dropna(subset=["page"], inplace=True) | 
					
						
						|  | if not page_sizes_df.empty: | 
					
						
						|  | page_sizes_df["page"] = page_sizes_df["page"].astype(int) | 
					
						
						|  | else: | 
					
						
						|  | print("Warning: Page sizes DataFrame became empty after processing, coordinate division will be skipped.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | text_added_successfully = False | 
					
						
						|  |  | 
					
						
						|  | if not redaction_decision_output.empty: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | id_col_exists_in_review = 'id' in review_file_df.columns and not review_file_df['id'].isnull().all() and not (review_file_df['id'] == '').all() | 
					
						
						|  | id_col_exists_in_redaction = 'id' in redaction_decision_output.columns and not redaction_decision_output['id'].isnull().all() and not (redaction_decision_output['id'] == '').all() | 
					
						
						|  |  | 
					
						
						|  | if id_col_exists_in_review and id_col_exists_in_redaction: | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | review_file_df['id'] = review_file_df['id'].astype(str) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | redaction_copy = redaction_decision_output.copy() | 
					
						
						|  | redaction_copy['id'] = redaction_copy['id'].astype(str) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | cols_to_merge = ['id'] | 
					
						
						|  | if 'text' in redaction_copy.columns: | 
					
						
						|  | cols_to_merge.append('text') | 
					
						
						|  | else: | 
					
						
						|  | print("Warning: 'text' column not found in redaction_decision_output. Cannot merge text using 'id'.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | original_text_col_exists = 'text' in review_file_df.columns | 
					
						
						|  | merge_suffix = '_redaction' if original_text_col_exists else '' | 
					
						
						|  |  | 
					
						
						|  | merged_df = pd.merge( | 
					
						
						|  | review_file_df, | 
					
						
						|  | redaction_copy[cols_to_merge], | 
					
						
						|  | on='id', | 
					
						
						|  | how='left', | 
					
						
						|  | suffixes=('', merge_suffix) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if 'text' + merge_suffix in merged_df.columns: | 
					
						
						|  | redaction_text_col = 'text' + merge_suffix | 
					
						
						|  | if original_text_col_exists: | 
					
						
						|  |  | 
					
						
						|  | merged_df['text'] = merged_df[redaction_text_col].combine_first(merged_df['text']) | 
					
						
						|  |  | 
					
						
						|  | merged_df = merged_df.drop(columns=[redaction_text_col]) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | merged_df = merged_df.rename(columns={redaction_text_col: 'text'}) | 
					
						
						|  |  | 
					
						
						|  | text_added_successfully = True | 
					
						
						|  |  | 
					
						
						|  | review_file_df = merged_df | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"Error during 'id'-based merge: {e}. Checking for proximity match fallback.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not text_added_successfully and do_proximity_match: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if 'page' in review_file_df.columns: | 
					
						
						|  | review_file_df['page'] = pd.to_numeric(review_file_df['page'], errors='coerce').fillna(-1).astype(int) | 
					
						
						|  | review_file_df = review_file_df[review_file_df['page'] != -1] | 
					
						
						|  | if not redaction_decision_output.empty and 'page' in redaction_decision_output.columns: | 
					
						
						|  | redaction_decision_output['page'] = pd.to_numeric(redaction_decision_output['page'], errors='coerce').fillna(-1).astype(int) | 
					
						
						|  | redaction_decision_output = redaction_decision_output[redaction_decision_output['page'] != -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not page_sizes_df.empty: | 
					
						
						|  |  | 
					
						
						|  | review_file_df = divide_coordinates_by_page_sizes(review_file_df, page_sizes_df) | 
					
						
						|  | if not redaction_decision_output.empty: | 
					
						
						|  | redaction_decision_output = divide_coordinates_by_page_sizes(redaction_decision_output, page_sizes_df) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not redaction_decision_output.empty: | 
					
						
						|  | try: | 
					
						
						|  | review_file_df = do_proximity_match_all_pages_for_text( | 
					
						
						|  | df1=review_file_df, | 
					
						
						|  | df2=redaction_decision_output | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if 'text' in review_file_df.columns: | 
					
						
						|  | text_added_successfully = True | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"Error during proximity match: {e}. Text data may not be added.") | 
					
						
						|  |  | 
					
						
						|  | elif not text_added_successfully and not do_proximity_match: | 
					
						
						|  | print("Skipping joining text data (ID join not possible/failed, proximity match disabled).") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | required_columns_base = ["image", "page", "label", "color", "xmin", "ymin", "xmax", "ymax"] | 
					
						
						|  | final_columns = required_columns_base[:] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if 'id' in review_file_df.columns: | 
					
						
						|  | final_columns.append('id') | 
					
						
						|  | if 'text' in review_file_df.columns: | 
					
						
						|  | final_columns.append('text') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for col in final_columns: | 
					
						
						|  | if col not in review_file_df.columns: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | review_file_df[col] = '' | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | review_file_df = review_file_df[[col for col in final_columns if col in review_file_df.columns]] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if 'color' in review_file_df.columns: | 
					
						
						|  |  | 
					
						
						|  | if review_file_df['color'].apply(lambda x: isinstance(x, list)).any(): | 
					
						
						|  | review_file_df.loc[:, "color"] = review_file_df.loc[:, "color"].apply(lambda x: tuple(x) if isinstance(x, list) else x) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | sort_columns = ['page', 'ymin', 'xmin', 'label'] | 
					
						
						|  | valid_sort_columns = [col for col in sort_columns if col in review_file_df.columns] | 
					
						
						|  | if valid_sort_columns and not review_file_df.empty: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | review_file_df = review_file_df.sort_values(valid_sort_columns) | 
					
						
						|  | except TypeError as e: | 
					
						
						|  | print(f"Warning: Could not sort DataFrame due to type error in sort columns: {e}") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | base_cols = ["xmin", "xmax", "ymin", "ymax", "text", "id", "label"] | 
					
						
						|  |  | 
					
						
						|  | for col in base_cols: | 
					
						
						|  | if col not in review_file_df.columns: | 
					
						
						|  | review_file_df[col] = pd.NA | 
					
						
						|  |  | 
					
						
						|  | review_file_df = review_file_df.dropna(subset=base_cols, how="all") | 
					
						
						|  |  | 
					
						
						|  | return review_file_df | 
					
						
						|  |  | 
					
						
						|  | def fill_missing_ids_in_list(data_list: list) -> list: | 
					
						
						|  | """ | 
					
						
						|  | Generates unique alphanumeric IDs for dictionaries in a list where the 'id' is | 
					
						
						|  | missing, blank, or not a 12-character string. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | data_list (list): A list of dictionaries, each potentially with an 'id' key. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | list: The input list with missing/invalid IDs filled. | 
					
						
						|  | Note: The function modifies the input list in place. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not isinstance(data_list, list): | 
					
						
						|  | raise TypeError("Input 'data_list' must be a list.") | 
					
						
						|  |  | 
					
						
						|  | if not data_list: | 
					
						
						|  | return data_list | 
					
						
						|  |  | 
					
						
						|  | id_length = 12 | 
					
						
						|  | character_set = string.ascii_letters + string.digits | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | existing_ids = set() | 
					
						
						|  | for item in data_list: | 
					
						
						|  | if not isinstance(item, dict): | 
					
						
						|  | continue | 
					
						
						|  | item_id = item.get('id') | 
					
						
						|  | if isinstance(item_id, str) and len(item_id) == id_length: | 
					
						
						|  | existing_ids.add(item_id) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | generated_ids_set = set() | 
					
						
						|  | num_filled = 0 | 
					
						
						|  |  | 
					
						
						|  | for item in data_list: | 
					
						
						|  | if not isinstance(item, dict): | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | item_id = item.get('id') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | needs_new_id = ( | 
					
						
						|  | item_id is None or | 
					
						
						|  | not isinstance(item_id, str) or | 
					
						
						|  | item_id.strip() == "" or | 
					
						
						|  | len(item_id) != id_length | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if needs_new_id: | 
					
						
						|  |  | 
					
						
						|  | attempts = 0 | 
					
						
						|  | while True: | 
					
						
						|  | candidate_id = ''.join(random.choices(character_set, k=id_length)) | 
					
						
						|  |  | 
					
						
						|  | if candidate_id not in existing_ids and candidate_id not in generated_ids_set: | 
					
						
						|  | generated_ids_set.add(candidate_id) | 
					
						
						|  | item['id'] = candidate_id | 
					
						
						|  | num_filled += 1 | 
					
						
						|  | break | 
					
						
						|  | attempts += 1 | 
					
						
						|  |  | 
					
						
						|  | if attempts > len(data_list) * 100 + 1000: | 
					
						
						|  | raise RuntimeError(f"Failed to generate a unique ID after {attempts} attempts. Check ID length or existing IDs.") | 
					
						
						|  |  | 
					
						
						|  | if num_filled > 0: | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return data_list | 
					
						
						|  |  | 
					
						
						|  | def fill_missing_box_ids(data_input: dict) -> dict: | 
					
						
						|  | """ | 
					
						
						|  | Generates unique alphanumeric IDs for bounding boxes in an input dictionary | 
					
						
						|  | where the 'id' is missing, blank, or not a 12-character string. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | data_input (dict): The input dictionary containing 'image' and 'boxes' keys. | 
					
						
						|  | 'boxes' should be a list of dictionaries, each potentially | 
					
						
						|  | with an 'id' key. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | dict: The input dictionary with missing/invalid box IDs filled. | 
					
						
						|  | Note: The function modifies the input dictionary in place. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not isinstance(data_input, dict): | 
					
						
						|  | raise TypeError("Input 'data_input' must be a dictionary.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | boxes = data_input | 
					
						
						|  | id_length = 12 | 
					
						
						|  | character_set = string.ascii_letters + string.digits | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | existing_ids = set() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | box_id = boxes.get('id') | 
					
						
						|  | if isinstance(box_id, str) and len(box_id) == id_length: | 
					
						
						|  | existing_ids.add(box_id) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | generated_ids_set = set() | 
					
						
						|  | num_filled = 0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | box_id = boxes.get('id') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | needs_new_id = ( | 
					
						
						|  | box_id is None or | 
					
						
						|  | not isinstance(box_id, str) or | 
					
						
						|  | box_id.strip() == "" or | 
					
						
						|  | len(box_id) != id_length | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if needs_new_id: | 
					
						
						|  |  | 
					
						
						|  | attempts = 0 | 
					
						
						|  | while True: | 
					
						
						|  | candidate_id = ''.join(random.choices(character_set, k=id_length)) | 
					
						
						|  |  | 
					
						
						|  | if candidate_id not in existing_ids and candidate_id not in generated_ids_set: | 
					
						
						|  | generated_ids_set.add(candidate_id) | 
					
						
						|  | boxes['id'] = candidate_id | 
					
						
						|  | num_filled += 1 | 
					
						
						|  | break | 
					
						
						|  | attempts += 1 | 
					
						
						|  |  | 
					
						
						|  | if attempts > len(boxes) * 100 + 1000: | 
					
						
						|  | raise RuntimeError(f"Failed to generate a unique ID after {attempts} attempts. Check ID length or existing IDs.") | 
					
						
						|  |  | 
					
						
						|  | if num_filled > 0: | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return data_input | 
					
						
						|  |  | 
					
						
						|  | def fill_missing_box_ids_each_box(data_input: Dict) -> Dict: | 
					
						
						|  | """ | 
					
						
						|  | Generates unique alphanumeric IDs for bounding boxes in a list | 
					
						
						|  | where the 'id' is missing, blank, or not a 12-character string. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | data_input (Dict): The input dictionary containing 'image' and 'boxes' keys. | 
					
						
						|  | 'boxes' should be a list of dictionaries, each potentially | 
					
						
						|  | with an 'id' key. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | Dict: The input dictionary with missing/invalid box IDs filled. | 
					
						
						|  | Note: The function modifies the input dictionary in place. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | if not isinstance(data_input, dict): | 
					
						
						|  | raise TypeError("Input 'data_input' must be a dictionary.") | 
					
						
						|  | if 'boxes' not in data_input or not isinstance(data_input.get('boxes'), list): | 
					
						
						|  |  | 
					
						
						|  | return data_input | 
					
						
						|  |  | 
					
						
						|  | boxes_list = data_input['boxes'] | 
					
						
						|  | id_length = 12 | 
					
						
						|  | character_set = string.ascii_letters + string.digits | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | existing_ids = set() | 
					
						
						|  | for box in boxes_list: | 
					
						
						|  | if isinstance(box, dict): | 
					
						
						|  | box_id = box.get('id') | 
					
						
						|  | if isinstance(box_id, str) and len(box_id) == id_length: | 
					
						
						|  | existing_ids.add(box_id) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | generated_ids_this_run = set() | 
					
						
						|  | num_filled = 0 | 
					
						
						|  |  | 
					
						
						|  | for box in boxes_list: | 
					
						
						|  | if not isinstance(box, dict): | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | box_id = box.get('id') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | needs_new_id = ( | 
					
						
						|  | box_id is None or | 
					
						
						|  | not isinstance(box_id, str) or | 
					
						
						|  | box_id.strip() == "" or | 
					
						
						|  | len(box_id) != id_length | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if needs_new_id: | 
					
						
						|  |  | 
					
						
						|  | while True: | 
					
						
						|  | candidate_id = ''.join(random.choices(character_set, k=id_length)) | 
					
						
						|  |  | 
					
						
						|  | if candidate_id not in existing_ids and candidate_id not in generated_ids_this_run: | 
					
						
						|  | generated_ids_this_run.add(candidate_id) | 
					
						
						|  | box['id'] = candidate_id | 
					
						
						|  | num_filled += 1 | 
					
						
						|  | break | 
					
						
						|  |  | 
					
						
						|  | if num_filled > 0: | 
					
						
						|  | print(f"Successfully filled {num_filled} missing or invalid box IDs.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return data_input | 
					
						
						|  |  | 
					
						
						|  | def fill_missing_ids(df: pd.DataFrame, column_name: str = 'id', length: int = 12) -> pd.DataFrame: | 
					
						
						|  | """ | 
					
						
						|  | Optimized: Generates unique alphanumeric IDs for rows in a DataFrame column | 
					
						
						|  | where the value is missing (NaN, None) or an empty/whitespace string. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | df (pd.DataFrame): The input Pandas DataFrame. | 
					
						
						|  | column_name (str): The name of the column to check and fill (defaults to 'id'). | 
					
						
						|  | This column will be added if it doesn't exist. | 
					
						
						|  | length (int): The desired length of the generated IDs (defaults to 12). | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | pd.DataFrame: The DataFrame with missing/empty IDs filled in the specified column. | 
					
						
						|  | Note: The function modifies the DataFrame directly (in-place). | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not isinstance(df, pd.DataFrame): | 
					
						
						|  | raise TypeError("Input 'df' must be a Pandas DataFrame.") | 
					
						
						|  | if not isinstance(column_name, str) or not column_name: | 
					
						
						|  | raise ValueError("'column_name' must be a non-empty string.") | 
					
						
						|  | if not isinstance(length, int) or length <= 0: | 
					
						
						|  | raise ValueError("'length' must be a positive integer.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | original_dtype = None | 
					
						
						|  | if column_name not in df.columns: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df[column_name] = None | 
					
						
						|  |  | 
					
						
						|  | original_dtype = object | 
					
						
						|  | else: | 
					
						
						|  | original_dtype = df[column_name].dtype | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | is_null = df[column_name].isna() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | is_empty_str = pd.Series(False, index=df.index) | 
					
						
						|  | if not is_null.all(): | 
					
						
						|  | temp_str_col = df.loc[~is_null, column_name].astype(str).str.strip() | 
					
						
						|  | is_empty_str.loc[~is_null] = (temp_str_col == '') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | is_missing_or_empty = is_null | is_empty_str | 
					
						
						|  |  | 
					
						
						|  | rows_to_fill_index = df.index[is_missing_or_empty] | 
					
						
						|  | num_needed = len(rows_to_fill_index) | 
					
						
						|  |  | 
					
						
						|  | if num_needed == 0: | 
					
						
						|  |  | 
					
						
						|  | if pd.api.types.is_object_dtype(original_dtype) or pd.api.types.is_string_dtype(original_dtype): | 
					
						
						|  | pass | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  | return df | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | valid_rows = df.loc[~is_missing_or_empty, column_name] | 
					
						
						|  |  | 
					
						
						|  | valid_rows = valid_rows.dropna() | 
					
						
						|  |  | 
					
						
						|  | if not pd.api.types.is_object_dtype(valid_rows.dtype) and not pd.api.types.is_string_dtype(valid_rows.dtype): | 
					
						
						|  | existing_ids = set(valid_rows.astype(str).str.strip()) | 
					
						
						|  | else: | 
					
						
						|  | existing_ids = set(valid_rows.astype(str).str.strip()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | existing_ids.discard('') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | character_set = string.ascii_letters + string.digits | 
					
						
						|  | generated_ids_set = set() | 
					
						
						|  | new_ids_list = list() | 
					
						
						|  |  | 
					
						
						|  | max_possible_ids = len(character_set) ** length | 
					
						
						|  | if num_needed > max_possible_ids: | 
					
						
						|  | raise ValueError(f"Cannot generate {num_needed} unique IDs with length {length}. Maximum possible is {max_possible_ids}.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | max_attempts_per_id = max(1000, num_needed * 10) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for i in range(num_needed): | 
					
						
						|  | attempts = 0 | 
					
						
						|  | while True: | 
					
						
						|  | candidate_id = ''.join(random.choices(character_set, k=length)) | 
					
						
						|  |  | 
					
						
						|  | if candidate_id not in existing_ids and candidate_id not in generated_ids_set: | 
					
						
						|  | generated_ids_set.add(candidate_id) | 
					
						
						|  | new_ids_list.append(candidate_id) | 
					
						
						|  | break | 
					
						
						|  | attempts += 1 | 
					
						
						|  | if attempts > max_attempts_per_id : | 
					
						
						|  | raise RuntimeError(f"Failed to generate a unique ID after {attempts} attempts. Check length, character set, or density of existing IDs.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not pd.api.types.is_object_dtype(original_dtype) and not pd.api.types.is_string_dtype(original_dtype): | 
					
						
						|  | df['id'] = df['id'].astype(str, errors="ignore") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df.loc[rows_to_fill_index, column_name] = new_ids_list | 
					
						
						|  | print(f"Successfully assigned {len(new_ids_list)} new unique IDs to column '{column_name}'.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return df | 
					
						
						|  |  | 
					
						
						|  | def convert_review_df_to_annotation_json( | 
					
						
						|  | review_file_df: pd.DataFrame, | 
					
						
						|  | image_paths: List[str], | 
					
						
						|  | page_sizes: List[Dict], | 
					
						
						|  | xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax" | 
					
						
						|  | ) -> List[Dict]: | 
					
						
						|  | """ | 
					
						
						|  | Optimized function to convert review DataFrame to Gradio Annotation JSON format. | 
					
						
						|  |  | 
					
						
						|  | Ensures absolute coordinates, handles missing IDs, deduplicates based on key fields, | 
					
						
						|  | selects final columns, and structures data per image/page based on page_sizes. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | review_file_df: Input DataFrame with annotation data. | 
					
						
						|  | image_paths: List of image file paths (Note: currently unused if page_sizes provides paths). | 
					
						
						|  | page_sizes: REQUIRED list of dictionaries, each containing 'page', | 
					
						
						|  | 'image_path', 'image_width', and 'image_height'. Defines | 
					
						
						|  | output structure and dimensions for coordinate conversion. | 
					
						
						|  | xmin, xmax, ymin, ymax: Names of the coordinate columns. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | List of dictionaries suitable for Gradio Annotation output, one dict per image/page. | 
					
						
						|  | """ | 
					
						
						|  | base_cols = ["xmin", "xmax", "ymin", "ymax", "text", "id", "label"] | 
					
						
						|  |  | 
					
						
						|  | for col in base_cols: | 
					
						
						|  | if col not in review_file_df.columns: | 
					
						
						|  | review_file_df[col] = pd.NA | 
					
						
						|  |  | 
					
						
						|  | review_file_df = review_file_df.dropna(subset=["xmin", "xmax", "ymin", "ymax", "text", "id", "label"], how='all') | 
					
						
						|  |  | 
					
						
						|  | if not page_sizes: | 
					
						
						|  | raise ValueError("page_sizes argument is required and cannot be empty.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | page_sizes_df = pd.DataFrame(page_sizes) | 
					
						
						|  | required_ps_cols = {'page', 'image_path', 'image_width', 'image_height'} | 
					
						
						|  | if not required_ps_cols.issubset(page_sizes_df.columns): | 
					
						
						|  | missing = required_ps_cols - set(page_sizes_df.columns) | 
					
						
						|  | raise ValueError(f"page_sizes is missing required keys: {missing}") | 
					
						
						|  |  | 
					
						
						|  | page_sizes_df['page'] = pd.to_numeric(page_sizes_df['page'], errors='coerce') | 
					
						
						|  | page_sizes_df['image_width'] = pd.to_numeric(page_sizes_df['image_width'], errors='coerce') | 
					
						
						|  | page_sizes_df['image_height'] = pd.to_numeric(page_sizes_df['image_height'], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  | page_sizes_df['page'] = page_sizes_df['page'].astype('Int64') | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | raise ValueError(f"Error processing page_sizes: {e}") from e | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if review_file_df.empty: | 
					
						
						|  | print("Input review_file_df is empty. Proceeding to generate JSON structure with empty boxes.") | 
					
						
						|  |  | 
					
						
						|  | for col in [xmin, xmax, ymin, ymax, "page", "label", "color", "id", "text"]: | 
					
						
						|  | if col not in review_file_df.columns: | 
					
						
						|  | review_file_df[col] = pd.NA | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | coord_cols_to_check = [c for c in [xmin, xmax, ymin, ymax] if c in review_file_df.columns] | 
					
						
						|  | needs_multiplication = False | 
					
						
						|  | if coord_cols_to_check: | 
					
						
						|  | temp_df_numeric = review_file_df[coord_cols_to_check].apply(pd.to_numeric, errors='coerce') | 
					
						
						|  | if temp_df_numeric.le(1).any().any(): | 
					
						
						|  | needs_multiplication = True | 
					
						
						|  |  | 
					
						
						|  | if needs_multiplication: | 
					
						
						|  |  | 
					
						
						|  | review_file_df = multiply_coordinates_by_page_sizes( | 
					
						
						|  | review_file_df.copy(), | 
					
						
						|  | page_sizes_df, | 
					
						
						|  | xmin, xmax, ymin, ymax | 
					
						
						|  | ) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | cols_to_convert = [c for c in [xmin, xmax, ymin, ymax, "page"] if c in review_file_df.columns] | 
					
						
						|  | for col in cols_to_convert: | 
					
						
						|  | review_file_df[col] = pd.to_numeric(review_file_df[col], errors='coerce') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if review_file_df.empty: | 
					
						
						|  | print("DataFrame became empty after coordinate processing.") | 
					
						
						|  |  | 
					
						
						|  | for col in [xmin, xmax, ymin, ymax, "page", "label", "color", "id", "text"]: | 
					
						
						|  | if col not in review_file_df.columns: | 
					
						
						|  | review_file_df[col] = pd.NA | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | review_file_df = fill_missing_ids(review_file_df.copy()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | base_dedupe_cols = ["page", xmin, ymin, xmax, ymax, "label", "id"] | 
					
						
						|  |  | 
					
						
						|  | cols_for_dedupe = [col for col in base_dedupe_cols if col in review_file_df.columns] | 
					
						
						|  |  | 
					
						
						|  | if "image" in review_file_df.columns: | 
					
						
						|  | cols_for_dedupe.append("image") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for col in ['label', 'id']: | 
					
						
						|  | if col in cols_for_dedupe and col not in review_file_df.columns: | 
					
						
						|  |  | 
					
						
						|  | print(f"Warning: Column '{col}' needed for dedupe but not found. Adding NA.") | 
					
						
						|  | review_file_df[col] = "" | 
					
						
						|  |  | 
					
						
						|  | if cols_for_dedupe: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | review_file_df = review_file_df.drop_duplicates(subset=cols_for_dedupe) | 
					
						
						|  | else: | 
					
						
						|  | print("Skipping deduplication: No valid columns found to deduplicate by.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | required_final_cols = ["page", "label", "color", xmin, ymin, xmax, ymax, "id", "text"] | 
					
						
						|  |  | 
					
						
						|  | available_final_cols = [col for col in required_final_cols if col in review_file_df.columns] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for col in required_final_cols: | 
					
						
						|  | if col not in review_file_df.columns: | 
					
						
						|  | print(f"Adding missing final column '{col}' with default value.") | 
					
						
						|  | if col in ['label', 'id', 'text']: | 
					
						
						|  | review_file_df[col] = "" | 
					
						
						|  | elif col == 'color': | 
					
						
						|  | review_file_df[col] = None | 
					
						
						|  | else: | 
					
						
						|  | review_file_df[col] = pd.NA | 
					
						
						|  | available_final_cols.append(col) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | review_file_df = review_file_df[available_final_cols] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not review_file_df.empty: | 
					
						
						|  |  | 
					
						
						|  | if 'color' in review_file_df.columns: | 
					
						
						|  | review_file_df['color'] = review_file_df['color'].apply( | 
					
						
						|  | lambda x: tuple(x) if isinstance(x, list) else x | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if 'page' in review_file_df.columns: | 
					
						
						|  | review_file_df['page'] = review_file_df['page'].astype('Int64') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if 'page' in review_file_df.columns: | 
					
						
						|  | grouped_annotations = review_file_df.groupby('page') | 
					
						
						|  | group_keys = set(grouped_annotations.groups.keys()) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | print("Error: 'page' column missing, cannot group annotations.") | 
					
						
						|  | grouped_annotations = None | 
					
						
						|  | group_keys = set() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | json_data = list() | 
					
						
						|  | output_cols_for_boxes = [col for col in ["label", "color", xmin, ymin, xmax, ymax, "id", "text"] if col in review_file_df.columns] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for _, row in page_sizes_df.iterrows(): | 
					
						
						|  | page_num = row['page'] | 
					
						
						|  | pdf_image_path = row['image_path'] | 
					
						
						|  | annotation_boxes = list() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if pd.notna(page_num) and page_num in group_keys and grouped_annotations: | 
					
						
						|  | try: | 
					
						
						|  | page_group_df = grouped_annotations.get_group(page_num) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | annotation_boxes = page_group_df[output_cols_for_boxes].replace({np.nan: None}).to_dict(orient='records') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | except KeyError: | 
					
						
						|  | print(f"Warning: Group key {page_num} not found despite being in group_keys (should not happen).") | 
					
						
						|  | annotation_boxes = list() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | json_data.append({ | 
					
						
						|  | "image": pdf_image_path, | 
					
						
						|  | "boxes": annotation_boxes | 
					
						
						|  | }) | 
					
						
						|  |  | 
					
						
						|  | return json_data | 
					
						
						|  |  |