Improved efficiency of review page navigation, especially for large documents. Updated user guide
93b4c8a
| import os | |
| import re | |
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| from xml.etree.ElementTree import Element, SubElement, tostring, parse | |
| from xml.dom import minidom | |
| import uuid | |
| from typing import List, Tuple | |
| from gradio_image_annotation import image_annotator | |
| from gradio_image_annotation.image_annotator import AnnotatedImageData | |
| from pymupdf import Document, Rect | |
| import pymupdf | |
| from PIL import ImageDraw, Image | |
| from tools.config import OUTPUT_FOLDER, CUSTOM_BOX_COLOUR, MAX_IMAGE_PIXELS, INPUT_FOLDER | |
| from tools.file_conversion import is_pdf, convert_annotation_json_to_review_df, convert_review_df_to_annotation_json, process_single_page_for_image_conversion, multiply_coordinates_by_page_sizes, convert_annotation_data_to_dataframe, create_annotation_dicts_from_annotation_df, remove_duplicate_images_with_blank_boxes, fill_missing_ids, divide_coordinates_by_page_sizes | |
| from tools.helper_functions import get_file_name_without_type, detect_file_type | |
| from tools.file_redaction import redact_page_with_pymupdf | |
| if not MAX_IMAGE_PIXELS: Image.MAX_IMAGE_PIXELS = None | |
| def decrease_page(number:int): | |
| ''' | |
| Decrease page number for review redactions page. | |
| ''' | |
| if number > 1: | |
| return number - 1, number - 1 | |
| else: | |
| return 1, 1 | |
| def increase_page(number:int, page_image_annotator_object:AnnotatedImageData): | |
| ''' | |
| Increase page number for review redactions page. | |
| ''' | |
| if not page_image_annotator_object: | |
| return 1, 1 | |
| max_pages = len(page_image_annotator_object) | |
| if number < max_pages: | |
| return number + 1, number + 1 | |
| else: | |
| return max_pages, max_pages | |
| def update_zoom(current_zoom_level:int, annotate_current_page:int, decrease:bool=True): | |
| if decrease == False: | |
| if current_zoom_level >= 70: | |
| current_zoom_level -= 10 | |
| else: | |
| if current_zoom_level < 110: | |
| current_zoom_level += 10 | |
| return current_zoom_level, annotate_current_page | |
| def update_dropdown_list_based_on_dataframe(df:pd.DataFrame, column:str) -> List["str"]: | |
| ''' | |
| Gather unique elements from a string pandas Series, then append 'ALL' to the start and return the list. | |
| ''' | |
| if isinstance(df, pd.DataFrame): | |
| # Check if the Series is empty or all NaN | |
| if column not in df.columns or df[column].empty or df[column].isna().all(): | |
| return ["ALL"] | |
| elif column != "page": | |
| entities = df[column].astype(str).unique().tolist() | |
| entities_for_drop = sorted(entities) | |
| entities_for_drop.insert(0, "ALL") | |
| else: | |
| # Ensure the column can be converted to int - assumes it is the page column | |
| try: | |
| entities = df[column].astype(int).unique() | |
| entities_for_drop = sorted(entities) | |
| entities_for_drop = [str(e) for e in entities_for_drop] # Convert back to string | |
| entities_for_drop.insert(0, "ALL") | |
| except ValueError: | |
| return ["ALL"] # Handle case where conversion fails | |
| return entities_for_drop # Ensure to return the list | |
| else: | |
| return ["ALL"] | |
| def get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object:AnnotatedImageData, | |
| recogniser_dataframe_base:pd.DataFrame, | |
| recogniser_dropdown_value:str, | |
| text_dropdown_value:str, | |
| page_dropdown_value:str, | |
| review_df:pd.DataFrame=[], | |
| page_sizes:List[str]=[]): | |
| ''' | |
| Create a filtered recogniser dataframe and associated dropdowns based on current information in the image annotator and review data frame. | |
| ''' | |
| recogniser_entities_list = ["Redaction"] | |
| recogniser_dataframe_out = recogniser_dataframe_base | |
| recogniser_dataframe_out_gr = gr.Dataframe() | |
| review_dataframe = review_df | |
| try: | |
| #print("converting annotation json in get_filtered_recogniser...") | |
| review_dataframe = convert_annotation_json_to_review_df(page_image_annotator_object, review_df, page_sizes) | |
| recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "label") | |
| recogniser_entities_drop = gr.Dropdown(value=recogniser_dropdown_value, choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) | |
| # This is the choice list for entities when creating a new redaction box | |
| recogniser_entities_list = [entity for entity in recogniser_entities_for_drop.copy() if entity != 'Redaction' and entity != 'ALL'] # Remove any existing 'Redaction' | |
| recogniser_entities_list.insert(0, 'Redaction') # Add 'Redaction' to the start of the list | |
| text_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "text") | |
| text_entities_drop = gr.Dropdown(value=text_dropdown_value, choices=text_entities_for_drop, allow_custom_value=True, interactive=True) | |
| page_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "page") | |
| page_entities_drop = gr.Dropdown(value=page_dropdown_value, choices=page_entities_for_drop, allow_custom_value=True, interactive=True) | |
| recogniser_dataframe_out_gr = gr.Dataframe(review_dataframe[["page", "label", "text", "id"]], show_search="filter", col_count=(4, "fixed"), type="pandas", headers=["page", "label", "text", "id"], show_fullscreen_button=True, wrap=True, max_height=400, static_columns=[0,1,2,3]) | |
| recogniser_dataframe_out = review_dataframe[["page", "label", "text", "id"]] | |
| except Exception as e: | |
| print("Could not extract recogniser information:", e) | |
| recogniser_dataframe_out = recogniser_dataframe_base[["page", "label", "text", "id"]] | |
| label_choices = review_dataframe["label"].astype(str).unique().tolist() | |
| text_choices = review_dataframe["text"].astype(str).unique().tolist() | |
| page_choices = review_dataframe["page"].astype(str).unique().tolist() | |
| recogniser_entities_drop = gr.Dropdown(value=recogniser_dropdown_value, choices=label_choices, allow_custom_value=True, interactive=True) | |
| recogniser_entities_list = ["Redaction"] | |
| text_entities_drop = gr.Dropdown(value=text_dropdown_value, choices=text_choices, allow_custom_value=True, interactive=True) | |
| page_entities_drop = gr.Dropdown(value=page_dropdown_value, choices=page_choices, allow_custom_value=True, interactive=True) | |
| return recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list, text_entities_drop, page_entities_drop | |
| def update_recogniser_dataframes(page_image_annotator_object:AnnotatedImageData, recogniser_dataframe_base:pd.DataFrame, recogniser_entities_dropdown_value:str="ALL", text_dropdown_value:str="ALL", page_dropdown_value:str="ALL", review_df:pd.DataFrame=[], page_sizes:list[str]=[]): | |
| ''' | |
| Update recogniser dataframe information that appears alongside the pdf pages on the review screen. | |
| ''' | |
| recogniser_entities_list = ["Redaction"] | |
| recogniser_dataframe_out = pd.DataFrame() | |
| recogniser_dataframe_out_gr = gr.Dataframe() | |
| # If base recogniser dataframe is empy, need to create it. | |
| if recogniser_dataframe_base.empty: | |
| recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list, text_entities_drop, page_entities_drop = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) | |
| elif recogniser_dataframe_base.iloc[0,0] == "": | |
| recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_dropdown_value, recogniser_entities_list, text_entities_drop, page_entities_drop = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) | |
| else: | |
| recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_dropdown, recogniser_entities_list, text_dropdown, page_dropdown = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) | |
| review_dataframe, text_entities_drop, page_entities_drop = update_entities_df_recogniser_entities(recogniser_entities_dropdown_value, recogniser_dataframe_out, page_dropdown_value, text_dropdown_value) | |
| recogniser_dataframe_out_gr = gr.Dataframe(review_dataframe[["page", "label", "text", "id"]], show_search="filter", col_count=(4, "fixed"), type="pandas", headers=["page", "label", "text", "id"], show_fullscreen_button=True, wrap=True, max_height=400, static_columns=[0,1,2,3]) | |
| recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(recogniser_dataframe_out, "label") | |
| recogniser_entities_drop = gr.Dropdown(value=recogniser_entities_dropdown_value, choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) | |
| recogniser_entities_list_base = recogniser_dataframe_out["label"].astype(str).unique().tolist() | |
| # Recogniser entities list is the list of choices that appear when you make a new redaction box | |
| recogniser_entities_list = [entity for entity in recogniser_entities_list_base if entity != 'Redaction'] | |
| recogniser_entities_list.insert(0, 'Redaction') | |
| return recogniser_entities_list, recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, text_entities_drop, page_entities_drop | |
| def undo_last_removal(backup_review_state:pd.DataFrame, backup_image_annotations_state:list[dict], backup_recogniser_entity_dataframe_base:pd.DataFrame): | |
| return backup_review_state, backup_image_annotations_state, backup_recogniser_entity_dataframe_base | |
| def update_annotator_page_from_review_df( | |
| review_df: pd.DataFrame, | |
| image_file_paths:List[str], # Note: This input doesn't seem used in the original logic flow after the first line was removed | |
| page_sizes:List[dict], | |
| current_image_annotations_state:List[str], # This should ideally be List[dict] based on its usage | |
| current_page_annotator:object, # Should be dict or a custom annotation object for one page | |
| selected_recogniser_entity_df_row:pd.DataFrame, | |
| input_folder:str, | |
| doc_full_file_name_textbox:str | |
| ) -> Tuple[object, List[dict], int, List[dict], pd.DataFrame, int]: # Correcting return types based on usage | |
| ''' | |
| Update the visible annotation object and related objects with the latest review file information, | |
| optimizing by processing only the current page's data. | |
| ''' | |
| # Assume current_image_annotations_state is List[dict] and current_page_annotator is dict | |
| out_image_annotations_state: List[dict] = list(current_image_annotations_state) # Make a copy to avoid modifying input in place | |
| out_current_page_annotator: dict = current_page_annotator | |
| # Get the target page number from the selected row | |
| # Safely access the page number, handling potential errors or empty DataFrame | |
| gradio_annotator_current_page_number: int = 0 | |
| annotate_previous_page: int = 0 # Renaming for clarity if needed, matches original output | |
| if not selected_recogniser_entity_df_row.empty and 'page' in selected_recogniser_entity_df_row.columns: | |
| try: | |
| # Use .iloc[0] and .item() for robust scalar extraction | |
| gradio_annotator_current_page_number = int(selected_recogniser_entity_df_row['page'].iloc[0]) | |
| annotate_previous_page = gradio_annotator_current_page_number # Store original page number | |
| except (IndexError, ValueError, TypeError): | |
| print("Warning: Could not extract valid page number from selected_recogniser_entity_df_row. Defaulting to page 0 (or 1).") | |
| gradio_annotator_current_page_number = 1 # Or 0 depending on 1-based vs 0-based indexing elsewhere | |
| # Ensure page number is valid and 1-based for external display/logic | |
| if gradio_annotator_current_page_number <= 0: | |
| gradio_annotator_current_page_number = 1 | |
| page_max_reported = len(out_image_annotations_state) | |
| if gradio_annotator_current_page_number > page_max_reported: | |
| gradio_annotator_current_page_number = page_max_reported # Cap at max pages | |
| page_num_reported_zero_indexed = gradio_annotator_current_page_number - 1 | |
| # Process page sizes DataFrame early, as it's needed for image path handling and potentially coordinate multiplication | |
| page_sizes_df = pd.DataFrame(page_sizes) | |
| if not page_sizes_df.empty: | |
| # Safely convert page column to numeric and then int | |
| page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") | |
| page_sizes_df.dropna(subset=["page"], inplace=True) | |
| if not page_sizes_df.empty: | |
| page_sizes_df["page"] = page_sizes_df["page"].astype(int) | |
| else: | |
| print("Warning: Page sizes DataFrame became empty after processing.") | |
| # --- OPTIMIZATION: Process only the current page's data from review_df --- | |
| if not review_df.empty: | |
| # Filter review_df for the current page | |
| # Ensure 'page' column in review_df is comparable to page_num_reported | |
| if 'page' in review_df.columns: | |
| review_df['page'] = pd.to_numeric(review_df['page'], errors='coerce').fillna(-1).astype(int) | |
| current_image_path = out_image_annotations_state[page_num_reported_zero_indexed]['image'] | |
| replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image(doc_full_file_name_textbox, current_image_path, page_sizes_df, gradio_annotator_current_page_number, input_folder) | |
| # page_sizes_df has been changed - save back to page_sizes_object | |
| page_sizes = page_sizes_df.to_dict(orient='records') | |
| review_df.loc[review_df["page"]==gradio_annotator_current_page_number, 'image'] = replaced_image_path | |
| images_list = list(page_sizes_df["image_path"]) | |
| images_list[page_num_reported_zero_indexed] = replaced_image_path | |
| out_image_annotations_state[page_num_reported_zero_indexed]['image'] = replaced_image_path | |
| current_page_review_df = review_df[review_df['page'] == gradio_annotator_current_page_number].copy() | |
| current_page_review_df = multiply_coordinates_by_page_sizes(current_page_review_df, page_sizes_df) | |
| else: | |
| print(f"Warning: 'page' column not found in review_df. Cannot filter for page {gradio_annotator_current_page_number}. Skipping update from review_df.") | |
| current_page_review_df = pd.DataFrame() # Empty dataframe if filter fails | |
| if not current_page_review_df.empty: | |
| # Convert the current page's review data to annotation list format for *this page* | |
| current_page_annotations_list = [] | |
| # Define expected annotation dict keys, including 'image', 'page', coords, 'label', 'text', 'color' etc. | |
| # Assuming review_df has compatible columns | |
| expected_annotation_keys = ['label', 'color', 'xmin', 'ymin', 'xmax', 'ymax', 'text', 'id'] # Add/remove as needed | |
| # Ensure necessary columns exist in current_page_review_df before converting rows | |
| for key in expected_annotation_keys: | |
| if key not in current_page_review_df.columns: | |
| # Add missing column with default value | |
| # Use np.nan for numeric, '' for string/object | |
| default_value = np.nan if key in ['xmin', 'ymin', 'xmax', 'ymax'] else '' | |
| current_page_review_df[key] = default_value | |
| # Convert filtered DataFrame rows to list of dicts | |
| # Using .to_dict(orient='records') is efficient for this | |
| current_page_annotations_list_raw = current_page_review_df[expected_annotation_keys].to_dict(orient='records') | |
| current_page_annotations_list = current_page_annotations_list_raw | |
| # Update the annotations state for the current page | |
| # Each entry in out_image_annotations_state seems to be a dict containing keys like 'image', 'page', 'annotations' (List[dict]) | |
| # Need to update the 'annotations' list for the specific page. | |
| # Find the entry for the current page in the state | |
| page_state_entry_found = False | |
| for i, page_state_entry in enumerate(out_image_annotations_state): | |
| # Assuming page_state_entry has a 'page' key (1-based) | |
| match = re.search(r"(\d+)\.png$", page_state_entry['image']) | |
| if match: page_no = int(match.group(1)) | |
| else: page_no = -1 | |
| if 'image' in page_state_entry and page_no == page_num_reported_zero_indexed: | |
| # Replace the annotations list for this page with the new list from review_df | |
| out_image_annotations_state[i]['boxes'] = current_page_annotations_list | |
| # Update the image path as well, based on review_df if available, or keep existing | |
| # Assuming review_df has an 'image' column for this page | |
| if 'image' in current_page_review_df.columns and not current_page_review_df.empty: | |
| # Use the image path from the first row of the filtered review_df | |
| out_image_annotations_state[i]['image'] = current_page_review_df['image'].iloc[0] | |
| page_state_entry_found = True | |
| break | |
| if not page_state_entry_found: | |
| # This scenario might happen if the current_image_annotations_state didn't initially contain | |
| # an entry for this page number. Depending on the application logic, you might need to | |
| # add a new entry here, but based on the original code's structure, it seems | |
| # out_image_annotations_state is pre-populated for all pages. | |
| print(f"Warning: Entry for page {gradio_annotator_current_page_number} not found in current_image_annotations_state. Cannot update page annotations.") | |
| # --- Image Path and Page Size Handling (already seems focused on current page, keep similar logic) --- | |
| # Get the image path for the current page from the updated state | |
| # Ensure the entry exists before accessing | |
| current_image_path = None | |
| if len(out_image_annotations_state) > page_num_reported_zero_indexed and 'image' in out_image_annotations_state[page_num_reported_zero_indexed]: | |
| current_image_path = out_image_annotations_state[page_num_reported_zero_indexed]['image'] | |
| else: | |
| print(f"Warning: Could not get image path from state for page index {page_num_reported_zero_indexed}.") | |
| # Replace placeholder image with real image path if needed | |
| if current_image_path and not page_sizes_df.empty: | |
| try: | |
| replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image( | |
| doc_full_file_name_textbox, current_image_path, page_sizes_df, | |
| gradio_annotator_current_page_number, input_folder # Use 1-based page number | |
| ) | |
| # Update state and review_df with the potentially replaced image path | |
| if len(out_image_annotations_state) > page_num_reported_zero_indexed: | |
| out_image_annotations_state[page_num_reported_zero_indexed]['image'] = replaced_image_path | |
| if 'page' in review_df.columns and 'image' in review_df.columns: | |
| review_df.loc[review_df["page"]==gradio_annotator_current_page_number, 'image'] = replaced_image_path | |
| except Exception as e: | |
| print(f"Error during image path replacement for page {gradio_annotator_current_page_number}: {e}") | |
| # Save back page_sizes_df to page_sizes list format | |
| if not page_sizes_df.empty: | |
| page_sizes = page_sizes_df.to_dict(orient='records') | |
| else: | |
| page_sizes = [] # Ensure page_sizes is a list if df is empty | |
| # --- Re-evaluate Coordinate Multiplication and Duplicate Removal --- | |
| # The original code multiplied coordinates for the *entire* document and removed duplicates | |
| # across the *entire* document *after* converting the full review_df to state. | |
| # With the optimized approach, we updated only one page's annotations in the state. | |
| # Let's assume remove_duplicate_images_with_blank_boxes expects the raw list of dicts state format: | |
| try: | |
| out_image_annotations_state = remove_duplicate_images_with_blank_boxes(out_image_annotations_state) | |
| except Exception as e: | |
| print(f"Error during duplicate removal: {e}. Proceeding without duplicate removal.") | |
| # Select the current page's annotation object from the (potentially updated) state | |
| if len(out_image_annotations_state) > page_num_reported_zero_indexed: | |
| out_current_page_annotator = out_image_annotations_state[page_num_reported_zero_indexed] | |
| else: | |
| print(f"Warning: Cannot select current page annotator object for index {page_num_reported_zero_indexed}.") | |
| out_current_page_annotator = {} # Or None, depending on expected output type | |
| # The original code returns gradio_annotator_current_page_number as the 3rd value, | |
| # which was potentially updated by bounding checks. Keep this. | |
| final_page_number_returned = gradio_annotator_current_page_number | |
| return (out_current_page_annotator, | |
| out_image_annotations_state, | |
| final_page_number_returned, | |
| page_sizes, | |
| review_df, # review_df might have its 'page' column type changed, keep it as is or revert if necessary | |
| annotate_previous_page) # The original page number from selected_recogniser_entity_df_row | |
| def exclude_selected_items_from_redaction(review_df: pd.DataFrame, | |
| selected_rows_df: pd.DataFrame, | |
| image_file_paths:List[str], | |
| page_sizes:List[dict], | |
| image_annotations_state:dict, | |
| recogniser_entity_dataframe_base:pd.DataFrame): | |
| ''' | |
| Remove selected items from the review dataframe from the annotation object and review dataframe. | |
| ''' | |
| backup_review_state = review_df | |
| backup_image_annotations_state = image_annotations_state | |
| backup_recogniser_entity_dataframe_base = recogniser_entity_dataframe_base | |
| if not selected_rows_df.empty and not review_df.empty: | |
| use_id = ( | |
| "id" in selected_rows_df.columns | |
| and "id" in review_df.columns | |
| and not selected_rows_df["id"].isnull().all() | |
| and not review_df["id"].isnull().all() | |
| ) | |
| selected_merge_cols = ["id"] if use_id else ["label", "page", "text"] | |
| # Subset and drop duplicates from selected_rows_df | |
| selected_subset = selected_rows_df[selected_merge_cols].drop_duplicates(subset=selected_merge_cols) | |
| # Perform anti-join using merge with indicator | |
| merged_df = review_df.merge(selected_subset, on=selected_merge_cols, how='left', indicator=True) | |
| out_review_df = merged_df[merged_df['_merge'] == 'left_only'].drop(columns=['_merge']) | |
| out_image_annotations_state = convert_review_df_to_annotation_json(out_review_df, image_file_paths, page_sizes) | |
| out_recogniser_entity_dataframe_base = out_review_df[["page", "label", "text", "id"]] | |
| # Either there is nothing left in the selection dataframe, or the review dataframe | |
| else: | |
| out_review_df = review_df | |
| out_recogniser_entity_dataframe_base = recogniser_entity_dataframe_base | |
| out_image_annotations_state = image_annotations_state | |
| return out_review_df, out_image_annotations_state, out_recogniser_entity_dataframe_base, backup_review_state, backup_image_annotations_state, backup_recogniser_entity_dataframe_base | |
| def replace_annotator_object_img_np_array_with_page_sizes_image_path( | |
| all_image_annotations:List[dict], | |
| page_image_annotator_object:AnnotatedImageData, | |
| page_sizes:List[dict], | |
| page:int): | |
| ''' | |
| Check if the image value in an AnnotatedImageData dict is a placeholder or np.array. If either of these, replace the value with the file path of the image that is hopefully already loaded into the app related to this page. | |
| ''' | |
| page_zero_index = page - 1 | |
| if isinstance(all_image_annotations[page_zero_index]["image"], np.ndarray) or "placeholder_image" in all_image_annotations[page_zero_index]["image"] or isinstance(page_image_annotator_object['image'], np.ndarray): | |
| page_sizes_df = pd.DataFrame(page_sizes) | |
| page_sizes_df[["page"]] = page_sizes_df[["page"]].apply(pd.to_numeric, errors="coerce") | |
| # Check for matching pages | |
| matching_paths = page_sizes_df.loc[page_sizes_df['page'] == page, "image_path"].unique() | |
| if matching_paths.size > 0: | |
| image_path = matching_paths[0] | |
| page_image_annotator_object['image'] = image_path | |
| all_image_annotations[page_zero_index]["image"] = image_path | |
| else: | |
| print(f"No image path found for page {page}.") | |
| return page_image_annotator_object, all_image_annotations | |
| def replace_placeholder_image_with_real_image(doc_full_file_name_textbox:str, current_image_path:str, page_sizes_df:pd.DataFrame, page_num_reported:int, input_folder:str): | |
| ''' If image path is still not valid, load in a new image an overwrite it. Then replace all items in the image annotation object for all pages based on the updated information.''' | |
| page_num_reported_zero_indexed = page_num_reported - 1 | |
| if not os.path.exists(current_image_path): | |
| page_num, replaced_image_path, width, height = process_single_page_for_image_conversion(doc_full_file_name_textbox, page_num_reported_zero_indexed, input_folder=input_folder) | |
| # Overwrite page_sizes values | |
| page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"] = width | |
| page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"] = height | |
| page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_path"] = replaced_image_path | |
| else: | |
| if not page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"].isnull().all(): | |
| width = page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"].max() | |
| height = page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"].max() | |
| else: | |
| image = Image.open(current_image_path) | |
| width = image.width | |
| height = image.height | |
| page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"] = width | |
| page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"] = height | |
| page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_path"] = current_image_path | |
| replaced_image_path = current_image_path | |
| return replaced_image_path, page_sizes_df | |
| def update_annotator_object_and_filter_df( | |
| all_image_annotations:List[AnnotatedImageData], | |
| gradio_annotator_current_page_number:int, | |
| recogniser_entities_dropdown_value:str="ALL", | |
| page_dropdown_value:str="ALL", | |
| text_dropdown_value:str="ALL", | |
| recogniser_dataframe_base:gr.Dataframe=None, # Simplified default | |
| zoom:int=100, | |
| review_df:pd.DataFrame=None, # Use None for default empty DataFrame | |
| page_sizes:List[dict]=[], | |
| doc_full_file_name_textbox:str='', | |
| input_folder:str=INPUT_FOLDER | |
| ) -> Tuple[image_annotator, gr.Number, gr.Number, int, str, gr.Dataframe, pd.DataFrame, List[str], List[str], List[dict], List[AnnotatedImageData]]: | |
| ''' | |
| Update a gradio_image_annotation object with new annotation data for the current page | |
| and update filter dataframes, optimizing by processing only the current page's data for display. | |
| ''' | |
| zoom_str = str(zoom) + '%' | |
| # Handle default empty review_df and recogniser_dataframe_base | |
| if review_df is None or not isinstance(review_df, pd.DataFrame): | |
| review_df = pd.DataFrame(columns=["image", "page", "label", "color", "xmin", "ymin", "xmax", "ymax", "text", "id"]) | |
| if recogniser_dataframe_base is None: # Create a simple default if None | |
| recogniser_dataframe_base = gr.Dataframe(pd.DataFrame(data={"page":[], "label":[], "text":[], "id":[]})) | |
| # Handle empty all_image_annotations state early | |
| if not all_image_annotations: | |
| print("No all_image_annotation object found") | |
| # Return blank/default outputs | |
| blank_annotator = gr.ImageAnnotator( | |
| value = None, boxes_alpha=0.1, box_thickness=1, label_list=[], label_colors=[], | |
| show_label=False, height=zoom_str, width=zoom_str, box_min_size=1, | |
| box_selected_thickness=2, handle_size=4, sources=None, | |
| show_clear_button=False, show_share_button=False, show_remove_button=False, | |
| handles_cursor=True, interactive=True, use_default_label=True | |
| ) | |
| blank_df_out_gr = gr.Dataframe(pd.DataFrame(columns=["page", "label", "text", "id"])) | |
| blank_df_modified = pd.DataFrame(columns=["page", "label", "text", "id"]) | |
| return (blank_annotator, gr.Number(value=1), gr.Number(value=1), 1, | |
| recogniser_entities_dropdown_value, blank_df_out_gr, blank_df_modified, | |
| [], [], [], []) # Return empty lists/defaults for other outputs | |
| # Validate and bound the current page number (1-based logic) | |
| page_num_reported = max(1, gradio_annotator_current_page_number) # Minimum page is 1 | |
| page_max_reported = len(all_image_annotations) | |
| if page_num_reported > page_max_reported: | |
| page_num_reported = page_max_reported | |
| page_num_reported_zero_indexed = page_num_reported - 1 | |
| annotate_previous_page = page_num_reported # Store the determined page number | |
| # --- Process page sizes DataFrame --- | |
| page_sizes_df = pd.DataFrame(page_sizes) | |
| if not page_sizes_df.empty: | |
| page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") | |
| page_sizes_df.dropna(subset=["page"], inplace=True) | |
| if not page_sizes_df.empty: | |
| page_sizes_df["page"] = page_sizes_df["page"].astype(int) | |
| else: | |
| print("Warning: Page sizes DataFrame became empty after processing.") | |
| # --- Handle Image Path Replacement for the Current Page --- | |
| # This modifies the specific page entry within all_image_annotations list | |
| # Assuming replace_annotator_object_img_np_array_with_page_sizes_image_path | |
| # correctly updates the image path within the list element. | |
| if len(all_image_annotations) > page_num_reported_zero_indexed: | |
| # Make a shallow copy of the list and deep copy the specific page dict before modification | |
| # to avoid modifying the input list unexpectedly if it's used elsewhere. | |
| # However, the original code modified the list in place, so we'll stick to that | |
| # pattern but acknowledge it. | |
| page_object_to_update = all_image_annotations[page_num_reported_zero_indexed] | |
| # Use the helper function to replace the image path within the page object | |
| # Note: This helper returns the potentially modified page_object and the full state. | |
| # The full state return seems redundant if only page_object_to_update is modified. | |
| # Let's call it and assume it correctly updates the item in the list. | |
| updated_page_object, all_image_annotations_after_img_replace = replace_annotator_object_img_np_array_with_page_sizes_image_path( | |
| all_image_annotations, page_object_to_update, page_sizes, page_num_reported) | |
| # The original code immediately re-assigns all_image_annotations. | |
| # We'll rely on the function modifying the list element in place or returning the updated list. | |
| # Assuming it returns the updated list for robustness: | |
| all_image_annotations = all_image_annotations_after_img_replace | |
| # Now handle the actual image file path replacement using replace_placeholder_image_with_real_image | |
| current_image_path = updated_page_object.get('image') # Get potentially updated image path | |
| if current_image_path and not page_sizes_df.empty: | |
| try: | |
| replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image( | |
| doc_full_file_name_textbox, current_image_path, page_sizes_df, | |
| page_num_reported, input_folder=input_folder # Use 1-based page num | |
| ) | |
| # Update the image path in the state and review_df for the current page | |
| # Find the correct entry in all_image_annotations list again by index | |
| if len(all_image_annotations) > page_num_reported_zero_indexed: | |
| all_image_annotations[page_num_reported_zero_indexed]['image'] = replaced_image_path | |
| # Update review_df's image path for this page | |
| if 'page' in review_df.columns and 'image' in review_df.columns: | |
| # Ensure review_df page column is numeric for filtering | |
| review_df['page'] = pd.to_numeric(review_df['page'], errors='coerce').fillna(-1).astype(int) | |
| review_df.loc[review_df["page"]==page_num_reported, 'image'] = replaced_image_path | |
| except Exception as e: | |
| print(f"Error during image path replacement for page {page_num_reported}: {e}") | |
| else: | |
| print(f"Warning: Page index {page_num_reported_zero_indexed} out of bounds for all_image_annotations list.") | |
| # Save back page_sizes_df to page_sizes list format | |
| if not page_sizes_df.empty: | |
| page_sizes = page_sizes_df.to_dict(orient='records') | |
| else: | |
| page_sizes = [] # Ensure page_sizes is a list if df is empty | |
| # --- OPTIMIZATION: Prepare data *only* for the current page for display --- | |
| current_page_image_annotator_object = None | |
| if len(all_image_annotations) > page_num_reported_zero_indexed: | |
| page_data_for_display = all_image_annotations[page_num_reported_zero_indexed] | |
| # Convert current page annotations list to DataFrame for coordinate multiplication IF needed | |
| # Assuming coordinate multiplication IS needed for display if state stores relative coords | |
| current_page_annotations_df = convert_annotation_data_to_dataframe([page_data_for_display]) | |
| if not current_page_annotations_df.empty and not page_sizes_df.empty: | |
| # Multiply coordinates *only* for this page's DataFrame | |
| try: | |
| # Need the specific page's size for multiplication | |
| page_size_row = page_sizes_df[page_sizes_df['page'] == page_num_reported] | |
| if not page_size_row.empty: | |
| current_page_annotations_df = multiply_coordinates_by_page_sizes( | |
| current_page_annotations_df, page_size_row, # Pass only the row for the current page | |
| xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax" | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Error during coordinate multiplication for page {page_num_reported}: {e}. Using original coordinates.") | |
| # If error, proceed with original coordinates or handle as needed | |
| if "color" not in current_page_annotations_df.columns: | |
| current_page_annotations_df['color'] = '(0, 0, 0)' | |
| # Convert the processed DataFrame back to the list of dicts format for the annotator | |
| processed_current_page_annotations_list = current_page_annotations_df[["xmin", "xmax", "ymin", "ymax", "label", "color", "text", "id"]].to_dict(orient='records') | |
| # Construct the final object expected by the Gradio ImageAnnotator value parameter | |
| current_page_image_annotator_object: AnnotatedImageData = { | |
| 'image': page_data_for_display.get('image'), # Use the (potentially updated) image path | |
| 'boxes': processed_current_page_annotations_list | |
| } | |
| # --- Update Dropdowns and Review DataFrame --- | |
| # This external function still operates on potentially large DataFrames. | |
| # It receives all_image_annotations and a copy of review_df. | |
| try: | |
| recogniser_entities_list, recogniser_dataframe_out_gr, recogniser_dataframe_modified, recogniser_entities_dropdown_value, text_entities_drop, page_entities_drop = update_recogniser_dataframes( | |
| all_image_annotations, # Pass the updated full state | |
| recogniser_dataframe_base, | |
| recogniser_entities_dropdown_value, | |
| text_dropdown_value, | |
| page_dropdown_value, | |
| review_df.copy(), # Keep the copy as per original function call | |
| page_sizes # Pass updated page sizes | |
| ) | |
| # Generate default black colors for labels if needed by image_annotator | |
| recogniser_colour_list = [(0, 0, 0) for _ in range(len(recogniser_entities_list))] | |
| except Exception as e: | |
| print(f"Error calling update_recogniser_dataframes: {e}. Returning empty/default filter data.") | |
| recogniser_entities_list = [] | |
| recogniser_colour_list = [] | |
| recogniser_dataframe_out_gr = gr.Dataframe(pd.DataFrame(columns=["page", "label", "text", "id"])) | |
| recogniser_dataframe_modified = pd.DataFrame(columns=["page", "label", "text", "id"]) | |
| text_entities_drop = [] | |
| page_entities_drop = [] | |
| # --- Final Output Components --- | |
| page_number_reported_gradio_comp = gr.Number(label = "Current page", value=page_num_reported, precision=0) | |
| ### Present image_annotator outputs | |
| # Handle the case where current_page_image_annotator_object couldn't be prepared | |
| if current_page_image_annotator_object is None: | |
| # This should ideally be covered by the initial empty check for all_image_annotations, | |
| # but as a safeguard: | |
| print("Warning: Could not prepare annotator object for the current page.") | |
| out_image_annotator = image_annotator(value=None, interactive=False) # Present blank/non-interactive | |
| else: | |
| out_image_annotator = image_annotator( | |
| value = current_page_image_annotator_object, | |
| boxes_alpha=0.1, | |
| box_thickness=1, | |
| label_list=recogniser_entities_list, # Use labels from update_recogniser_dataframes | |
| label_colors=recogniser_colour_list, | |
| show_label=False, | |
| height=zoom_str, | |
| width=zoom_str, | |
| box_min_size=1, | |
| box_selected_thickness=2, | |
| handle_size=4, | |
| sources=None,#["upload"], | |
| show_clear_button=False, | |
| show_share_button=False, | |
| show_remove_button=False, | |
| handles_cursor=True, | |
| interactive=True # Keep interactive if data is present | |
| ) | |
| # The original code returned page_number_reported_gradio twice; | |
| # returning the Gradio component and the plain integer value. | |
| # Let's match the output signature. | |
| return (out_image_annotator, | |
| page_number_reported_gradio_comp, | |
| page_number_reported_gradio_comp, # Redundant, but matches original return signature | |
| page_num_reported, # Plain integer value | |
| recogniser_entities_dropdown_value, | |
| recogniser_dataframe_out_gr, | |
| recogniser_dataframe_modified, | |
| text_entities_drop, # List of text entities for dropdown | |
| page_entities_drop, # List of page numbers for dropdown | |
| page_sizes, # Updated page_sizes list | |
| all_image_annotations) # Return the updated full state | |
| def update_all_page_annotation_object_based_on_previous_page( | |
| page_image_annotator_object:AnnotatedImageData, | |
| current_page:int, | |
| previous_page:int, | |
| all_image_annotations:List[AnnotatedImageData], | |
| page_sizes:List[dict]=[], | |
| clear_all:bool=False | |
| ): | |
| ''' | |
| Overwrite image annotations on the page we are moving from with modifications. | |
| ''' | |
| previous_page_zero_index = previous_page -1 | |
| if not current_page: current_page = 1 | |
| # This replaces the numpy array image object with the image file path | |
| page_image_annotator_object, all_image_annotations = replace_annotator_object_img_np_array_with_page_sizes_image_path(all_image_annotations, page_image_annotator_object, page_sizes, previous_page) | |
| if clear_all == False: all_image_annotations[previous_page_zero_index] = page_image_annotator_object | |
| else: all_image_annotations[previous_page_zero_index]["boxes"] = [] | |
| return all_image_annotations, current_page, current_page | |
| def apply_redactions_to_review_df_and_files(page_image_annotator_object:AnnotatedImageData, | |
| file_paths:List[str], | |
| doc:Document, | |
| all_image_annotations:List[AnnotatedImageData], | |
| current_page:int, | |
| review_file_state:pd.DataFrame, | |
| output_folder:str = OUTPUT_FOLDER, | |
| save_pdf:bool=True, | |
| page_sizes:List[dict]=[], | |
| progress=gr.Progress(track_tqdm=True)): | |
| ''' | |
| Apply modified redactions to a pymupdf and export review files | |
| ''' | |
| output_files = [] | |
| output_log_files = [] | |
| pdf_doc = [] | |
| review_df = review_file_state | |
| page_image_annotator_object = all_image_annotations[current_page - 1] | |
| # This replaces the numpy array image object with the image file path | |
| page_image_annotator_object, all_image_annotations = replace_annotator_object_img_np_array_with_page_sizes_image_path(all_image_annotations, page_image_annotator_object, page_sizes, current_page) | |
| page_image_annotator_object['image'] = all_image_annotations[current_page - 1]["image"] | |
| if not page_image_annotator_object: | |
| print("No image annotations object found for page") | |
| return doc, all_image_annotations, output_files, output_log_files, review_df | |
| if isinstance(file_paths, str): | |
| file_paths = [file_paths] | |
| for file_path in file_paths: | |
| file_name_without_ext = get_file_name_without_type(file_path) | |
| file_name_with_ext = os.path.basename(file_path) | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| if save_pdf == True: | |
| # If working with image docs | |
| if (is_pdf(file_path) == False) & (file_extension not in '.csv'): | |
| image = Image.open(file_paths[-1]) | |
| draw = ImageDraw.Draw(image) | |
| for img_annotation_box in page_image_annotator_object['boxes']: | |
| coords = [img_annotation_box["xmin"], | |
| img_annotation_box["ymin"], | |
| img_annotation_box["xmax"], | |
| img_annotation_box["ymax"]] | |
| fill = img_annotation_box["color"] | |
| # Ensure fill is a valid RGB tuple | |
| if isinstance(fill, tuple) and len(fill) == 3: | |
| # Check if all elements are integers in the range 0-255 | |
| if all(isinstance(c, int) and 0 <= c <= 255 for c in fill): | |
| pass | |
| else: | |
| print(f"Invalid color values: {fill}. Defaulting to black.") | |
| fill = (0, 0, 0) # Default to black if invalid | |
| else: | |
| print(f"Invalid fill format: {fill}. Defaulting to black.") | |
| fill = (0, 0, 0) # Default to black if not a valid tuple | |
| # Ensure the image is in RGB mode | |
| if image.mode not in ("RGB", "RGBA"): | |
| image = image.convert("RGB") | |
| draw = ImageDraw.Draw(image) | |
| draw.rectangle(coords, fill=fill) | |
| output_image_path = output_folder + file_name_without_ext + "_redacted.png" | |
| image.save(output_folder + file_name_without_ext + "_redacted.png") | |
| output_files.append(output_image_path) | |
| doc = [image] | |
| elif file_extension in '.csv': | |
| pdf_doc = [] | |
| # If working with pdfs | |
| elif is_pdf(file_path) == True: | |
| pdf_doc = pymupdf.open(file_path) | |
| orig_pdf_file_path = file_path | |
| output_files.append(orig_pdf_file_path) | |
| number_of_pages = pdf_doc.page_count | |
| original_cropboxes = [] | |
| page_sizes_df = pd.DataFrame(page_sizes) | |
| page_sizes_df[["page"]] = page_sizes_df[["page"]].apply(pd.to_numeric, errors="coerce") | |
| for i in progress.tqdm(range(0, number_of_pages), desc="Saving redacted pages to file", unit = "pages"): | |
| image_loc = all_image_annotations[i]['image'] | |
| # Load in image object | |
| if isinstance(image_loc, np.ndarray): | |
| image = Image.fromarray(image_loc.astype('uint8')) | |
| elif isinstance(image_loc, Image.Image): | |
| image = image_loc | |
| elif isinstance(image_loc, str): | |
| if not os.path.exists(image_loc): | |
| image=page_sizes_df.loc[page_sizes_df['page']==i, "image_path"] | |
| try: | |
| image = Image.open(image_loc) | |
| except Exception as e: | |
| image = None | |
| pymupdf_page = pdf_doc.load_page(i) #doc.load_page(current_page -1) | |
| original_cropboxes.append(pymupdf_page.cropbox) | |
| pymupdf_page.set_cropbox(pymupdf_page.mediabox) | |
| pymupdf_page = redact_page_with_pymupdf(page=pymupdf_page, page_annotations=all_image_annotations[i], image=image, original_cropbox=original_cropboxes[-1], page_sizes_df= page_sizes_df) # image=image, | |
| else: | |
| print("File type not recognised.") | |
| progress(0.9, "Saving output files") | |
| #try: | |
| if pdf_doc: | |
| out_pdf_file_path = output_folder + file_name_without_ext + "_redacted.pdf" | |
| pdf_doc.save(out_pdf_file_path, garbage=4, deflate=True, clean=True) | |
| output_files.append(out_pdf_file_path) | |
| else: | |
| print("PDF input not found. Outputs not saved to PDF.") | |
| # If save_pdf is not true, then add the original pdf to the output files | |
| else: | |
| if is_pdf(file_path) == True: | |
| orig_pdf_file_path = file_path | |
| output_files.append(orig_pdf_file_path) | |
| try: | |
| #print("Saving review file.") | |
| review_df = convert_annotation_json_to_review_df(all_image_annotations, review_file_state.copy(), page_sizes=page_sizes) | |
| page_sizes_df = pd.DataFrame(page_sizes) | |
| page_sizes_df .loc[:, "page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") | |
| review_df = divide_coordinates_by_page_sizes(review_df, page_sizes_df) | |
| review_df = review_df[["image", "page", "label","color", "xmin", "ymin", "xmax", "ymax", "text", "id"]] | |
| out_review_file_file_path = output_folder + file_name_with_ext + '_review_file.csv' | |
| review_df.to_csv(out_review_file_file_path, index=None) | |
| output_files.append(out_review_file_file_path) | |
| except Exception as e: | |
| print("In apply redactions function, could not save annotations to csv file:", e) | |
| return doc, all_image_annotations, output_files, output_log_files, review_df | |
| def get_boxes_json(annotations:AnnotatedImageData): | |
| return annotations["boxes"] | |
| def update_all_entity_df_dropdowns(df:pd.DataFrame, label_dropdown_value:str, page_dropdown_value:str, text_dropdown_value:str): | |
| ''' | |
| Update all dropdowns based on rows that exist in a dataframe | |
| ''' | |
| if isinstance(label_dropdown_value, str): | |
| label_dropdown_value = [label_dropdown_value] | |
| if isinstance(page_dropdown_value, str): | |
| page_dropdown_value = [page_dropdown_value] | |
| if isinstance(text_dropdown_value, str): | |
| text_dropdown_value = [text_dropdown_value] | |
| filtered_df = df.copy() | |
| # Apply filtering based on dropdown selections | |
| # if not "ALL" in page_dropdown_value: | |
| # filtered_df = filtered_df[filtered_df["page"].astype(str).isin(page_dropdown_value)] | |
| # if not "ALL" in text_dropdown_value: | |
| # filtered_df = filtered_df[filtered_df["text"].astype(str).isin(text_dropdown_value)] | |
| # if not "ALL" in label_dropdown_value: | |
| # filtered_df = filtered_df[filtered_df["label"].astype(str).isin(label_dropdown_value)] | |
| recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") | |
| recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) | |
| text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") | |
| text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) | |
| page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") | |
| page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) | |
| return recogniser_entities_drop, text_entities_drop, page_entities_drop | |
| def update_entities_df_recogniser_entities(choice:str, df:pd.DataFrame, page_dropdown_value:str, text_dropdown_value:str): | |
| ''' | |
| Update the rows in a dataframe depending on the user choice from a dropdown | |
| ''' | |
| if isinstance(choice, str): | |
| choice = [choice] | |
| if isinstance(page_dropdown_value, str): | |
| page_dropdown_value = [page_dropdown_value] | |
| if isinstance(text_dropdown_value, str): | |
| text_dropdown_value = [text_dropdown_value] | |
| filtered_df = df.copy() | |
| # Apply filtering based on dropdown selections | |
| if not "ALL" in page_dropdown_value: | |
| filtered_df = filtered_df[filtered_df["page"].astype(str).isin(page_dropdown_value)] | |
| if not "ALL" in text_dropdown_value: | |
| filtered_df = filtered_df[filtered_df["text"].astype(str).isin(text_dropdown_value)] | |
| if not "ALL" in choice: | |
| filtered_df = filtered_df[filtered_df["label"].astype(str).isin(choice)] | |
| recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") | |
| recogniser_entities_drop = gr.Dropdown(value=choice[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) | |
| text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") | |
| text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) | |
| page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") | |
| page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) | |
| return filtered_df, text_entities_drop, page_entities_drop | |
| def update_entities_df_page(choice:str, df:pd.DataFrame, label_dropdown_value:str, text_dropdown_value:str): | |
| ''' | |
| Update the rows in a dataframe depending on the user choice from a dropdown | |
| ''' | |
| if isinstance(choice, str): | |
| choice = [choice] | |
| if isinstance(label_dropdown_value, str): | |
| label_dropdown_value = [label_dropdown_value] | |
| if isinstance(text_dropdown_value, str): | |
| text_dropdown_value = [text_dropdown_value] | |
| filtered_df = df.copy() | |
| # Apply filtering based on dropdown selections | |
| if not "ALL" in text_dropdown_value: | |
| filtered_df = filtered_df[filtered_df["text"].astype(str).isin(text_dropdown_value)] | |
| if not "ALL" in label_dropdown_value: | |
| filtered_df = filtered_df[filtered_df["label"].astype(str).isin(label_dropdown_value)] | |
| if not "ALL" in choice: | |
| filtered_df = filtered_df[filtered_df["page"].astype(str).isin(choice)] | |
| recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") | |
| recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) | |
| text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") | |
| text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) | |
| page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") | |
| page_entities_drop = gr.Dropdown(value=choice[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) | |
| return filtered_df, recogniser_entities_drop, text_entities_drop | |
| def update_entities_df_text(choice:str, df:pd.DataFrame, label_dropdown_value:str, page_dropdown_value:str): | |
| ''' | |
| Update the rows in a dataframe depending on the user choice from a dropdown | |
| ''' | |
| if isinstance(choice, str): | |
| choice = [choice] | |
| if isinstance(label_dropdown_value, str): | |
| label_dropdown_value = [label_dropdown_value] | |
| if isinstance(page_dropdown_value, str): | |
| page_dropdown_value = [page_dropdown_value] | |
| filtered_df = df.copy() | |
| # Apply filtering based on dropdown selections | |
| if not "ALL" in page_dropdown_value: | |
| filtered_df = filtered_df[filtered_df["page"].astype(str).isin(page_dropdown_value)] | |
| if not "ALL" in label_dropdown_value: | |
| filtered_df = filtered_df[filtered_df["label"].astype(str).isin(label_dropdown_value)] | |
| if not "ALL" in choice: | |
| filtered_df = filtered_df[filtered_df["text"].astype(str).isin(choice)] | |
| recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") | |
| recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) | |
| text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") | |
| text_entities_drop = gr.Dropdown(value=choice[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) | |
| page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") | |
| page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) | |
| return filtered_df, recogniser_entities_drop, page_entities_drop | |
| def reset_dropdowns(df:pd.DataFrame): | |
| ''' | |
| Return Gradio dropdown objects with value 'ALL'. | |
| ''' | |
| recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "label") | |
| recogniser_entities_drop = gr.Dropdown(value="ALL", choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) | |
| text_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "text") | |
| text_entities_drop = gr.Dropdown(value="ALL", choices=text_entities_for_drop, allow_custom_value=True, interactive=True) | |
| page_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "page") | |
| page_entities_drop = gr.Dropdown(value="ALL", choices=page_entities_for_drop, allow_custom_value=True, interactive=True) | |
| return recogniser_entities_drop, text_entities_drop, page_entities_drop | |
| def df_select_callback(df: pd.DataFrame, evt: gr.SelectData): | |
| row_value_page = evt.row_value[0] # This is the page number value | |
| row_value_label = evt.row_value[1] # This is the label number value | |
| row_value_text = evt.row_value[2] # This is the text number value | |
| row_value_id = evt.row_value[3] # This is the text number value | |
| row_value_df = pd.DataFrame(data={"page":[row_value_page], "label":[row_value_label], "text":[row_value_text], "id":[row_value_id]}) | |
| return row_value_df | |
| def df_select_callback_textract_api(df: pd.DataFrame, evt: gr.SelectData): | |
| row_value_job_id = evt.row_value[0] # This is the page number value | |
| # row_value_label = evt.row_value[1] # This is the label number value | |
| row_value_job_type = evt.row_value[2] # This is the text number value | |
| row_value_df = pd.DataFrame(data={"job_id":[row_value_job_id], "label":[row_value_job_type]}) | |
| return row_value_job_id, row_value_job_type, row_value_df | |
| def df_select_callback_cost(df: pd.DataFrame, evt: gr.SelectData): | |
| row_value_code = evt.row_value[0] # This is the value for cost code | |
| #row_value_label = evt.row_value[1] # This is the label number value | |
| #row_value_df = pd.DataFrame(data={"page":[row_value_code], "label":[row_value_label]}) | |
| return row_value_code | |
| def df_select_callback_ocr(df: pd.DataFrame, evt: gr.SelectData): | |
| row_value_page = evt.row_value[0] # This is the page_number value | |
| row_value_text = evt.row_value[1] # This is the text contents | |
| row_value_df = pd.DataFrame(data={"page":[row_value_page], "text":[row_value_text]}) | |
| return row_value_page, row_value_df | |
| def update_selected_review_df_row_colour( | |
| redaction_row_selection: pd.DataFrame, | |
| review_df: pd.DataFrame, | |
| previous_id: str = "", | |
| previous_colour: str = '(0, 0, 0)', | |
| colour: str = '(1, 0, 255)' | |
| ) -> tuple[pd.DataFrame, str, str]: | |
| ''' | |
| Update the colour of a single redaction box based on the values in a selection row | |
| (Optimized Version) | |
| ''' | |
| # Ensure 'color' column exists, default to previous_colour if previous_id is provided | |
| if "color" not in review_df.columns: | |
| review_df["color"] = previous_colour if previous_id else '(0, 0, 0)' | |
| # Ensure 'id' column exists | |
| if "id" not in review_df.columns: | |
| # Assuming fill_missing_ids is a defined function that returns a DataFrame | |
| # It's more efficient if this is handled outside if possible, | |
| # or optimized internally. | |
| print("Warning: 'id' column not found. Calling fill_missing_ids.") | |
| review_df = fill_missing_ids(review_df) # Keep this if necessary, but note it can be slow | |
| # --- Optimization 1 & 2: Reset existing highlight colours using vectorized assignment --- | |
| # Reset the color of the previously highlighted row | |
| if previous_id and previous_id in review_df["id"].values: | |
| review_df.loc[review_df["id"] == previous_id, "color"] = previous_colour | |
| # Reset the color of any row that currently has the highlight colour (handle cases where previous_id might not have been tracked correctly) | |
| # Convert to string for comparison only if the dtype might be mixed or not purely string | |
| # If 'color' is consistently string, the .astype(str) might be avoidable. | |
| # Assuming color is consistently string format like '(R, G, B)' | |
| review_df.loc[review_df["color"] == colour, "color"] = '(0, 0, 0)' | |
| if not redaction_row_selection.empty and not review_df.empty: | |
| use_id = ( | |
| "id" in redaction_row_selection.columns | |
| and "id" in review_df.columns | |
| and not redaction_row_selection["id"].isnull().all() | |
| and not review_df["id"].isnull().all() | |
| ) | |
| selected_merge_cols = ["id"] if use_id else ["label", "page", "text"] | |
| # --- Optimization 3: Use inner merge directly --- | |
| # Merge to find rows in review_df that match redaction_row_selection | |
| merged_reviews = review_df.merge( | |
| redaction_row_selection[selected_merge_cols], | |
| on=selected_merge_cols, | |
| how="inner" # Use inner join as we only care about matches | |
| ) | |
| if not merged_reviews.empty: | |
| # Assuming we only expect one match for highlighting a single row | |
| # If multiple matches are possible and you want to highlight all, | |
| # the logic for previous_id and previous_colour needs adjustment. | |
| new_previous_colour = str(merged_reviews["color"].iloc[0]) | |
| new_previous_id = merged_reviews["id"].iloc[0] | |
| # --- Optimization 1 & 2: Update color of the matched row using vectorized assignment --- | |
| if use_id: | |
| # Faster update if using unique 'id' as merge key | |
| review_df.loc[review_df["id"].isin(merged_reviews["id"]), "color"] = colour | |
| else: | |
| # More general case using multiple columns - might be slower | |
| # Create a temporary key for comparison | |
| def create_merge_key(df, cols): | |
| return df[cols].astype(str).agg('_'.join, axis=1) | |
| review_df_key = create_merge_key(review_df, selected_merge_cols) | |
| merged_reviews_key = create_merge_key(merged_reviews, selected_merge_cols) | |
| review_df.loc[review_df_key.isin(merged_reviews_key), "color"] = colour | |
| previous_colour = new_previous_colour | |
| previous_id = new_previous_id | |
| else: | |
| # No rows matched the selection | |
| print("No reviews found matching selection criteria") | |
| # The reset logic at the beginning already handles setting color to (0, 0, 0) | |
| # if it was the highlight colour and didn't match. | |
| # No specific action needed here for color reset beyond what's done initially. | |
| previous_colour = '(0, 0, 0)' # Reset previous_colour as no row was highlighted | |
| previous_id = '' # Reset previous_id | |
| else: | |
| # If selection is empty, reset any existing highlights | |
| review_df.loc[review_df["color"] == colour, "color"] = '(0, 0, 0)' | |
| previous_colour = '(0, 0, 0)' | |
| previous_id = '' | |
| # Ensure column order is maintained if necessary, though pandas generally preserves order | |
| # Creating a new DataFrame here might involve copying data, consider if this is strictly needed. | |
| if set(["image", "page", "label", "color", "xmin","ymin", "xmax", "ymax", "text", "id"]).issubset(review_df.columns): | |
| review_df = review_df[["image", "page", "label", "color", "xmin","ymin", "xmax", "ymax", "text", "id"]] | |
| else: | |
| print("Warning: Not all expected columns are present in review_df for reordering.") | |
| return review_df, previous_id, previous_colour | |
| def update_boxes_color(images: list, redaction_row_selection: pd.DataFrame, colour: tuple = (0, 255, 0)): | |
| """ | |
| Update the color of bounding boxes in the images list based on redaction_row_selection. | |
| Parameters: | |
| - images (list): List of dictionaries containing image paths and box metadata. | |
| - redaction_row_selection (pd.DataFrame): DataFrame with 'page', 'label', and optionally 'text' columns. | |
| - colour (tuple): RGB tuple for the new color. | |
| Returns: | |
| - Updated list with modified colors. | |
| """ | |
| # Convert DataFrame to a set for fast lookup | |
| selection_set = set(zip(redaction_row_selection["page"], redaction_row_selection["label"])) | |
| for page_idx, image_obj in enumerate(images): | |
| if "boxes" in image_obj: | |
| for box in image_obj["boxes"]: | |
| if (page_idx, box["label"]) in selection_set: | |
| box["color"] = colour # Update color | |
| return images | |
| def update_other_annotator_number_from_current(page_number_first_counter:int): | |
| return page_number_first_counter | |
| def convert_image_coords_to_adobe(pdf_page_width:float, pdf_page_height:float, image_width:float, image_height:float, x1:float, y1:float, x2:float, y2:float): | |
| ''' | |
| Converts coordinates from image space to Adobe PDF space. | |
| Parameters: | |
| - pdf_page_width: Width of the PDF page | |
| - pdf_page_height: Height of the PDF page | |
| - image_width: Width of the source image | |
| - image_height: Height of the source image | |
| - x1, y1, x2, y2: Coordinates in image space | |
| - page_sizes: List of dicts containing sizes of page as pymupdf page or PIL image | |
| Returns: | |
| - Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space | |
| ''' | |
| # Calculate scaling factors | |
| scale_width = pdf_page_width / image_width | |
| scale_height = pdf_page_height / image_height | |
| # Convert coordinates | |
| pdf_x1 = x1 * scale_width | |
| pdf_x2 = x2 * scale_width | |
| # Convert Y coordinates (flip vertical axis) | |
| # Adobe coordinates start from bottom-left | |
| pdf_y1 = pdf_page_height - (y1 * scale_height) | |
| pdf_y2 = pdf_page_height - (y2 * scale_height) | |
| # Make sure y1 is always less than y2 for Adobe's coordinate system | |
| if pdf_y1 > pdf_y2: | |
| pdf_y1, pdf_y2 = pdf_y2, pdf_y1 | |
| return pdf_x1, pdf_y1, pdf_x2, pdf_y2 | |
| def convert_pymupdf_coords_to_adobe(x1: float, y1: float, x2: float, y2: float, pdf_page_height: float): | |
| """ | |
| Converts coordinates from PyMuPDF (fitz) space to Adobe PDF space. | |
| Parameters: | |
| - x1, y1, x2, y2: Coordinates in PyMuPDF space | |
| - pdf_page_height: Total height of the PDF page | |
| Returns: | |
| - Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space | |
| """ | |
| # PyMuPDF uses (0,0) at the bottom-left, while Adobe uses (0,0) at the top-left | |
| adobe_y1 = pdf_page_height - y2 # Convert top coordinate | |
| adobe_y2 = pdf_page_height - y1 # Convert bottom coordinate | |
| return x1, adobe_y1, x2, adobe_y2 | |
| def create_xfdf(review_file_df:pd.DataFrame, pdf_path:str, pymupdf_doc:object, image_paths:List[str], document_cropboxes:List=[], page_sizes:List[dict]=[]): | |
| ''' | |
| Create an xfdf file from a review csv file and a pdf | |
| ''' | |
| pages_are_images = True | |
| # Create root element | |
| xfdf = Element('xfdf', xmlns="http://ns.adobe.com/xfdf/", xml_space="preserve") | |
| # Add header | |
| header = SubElement(xfdf, 'header') | |
| header.set('pdf-filepath', pdf_path) | |
| # Add annots | |
| annots = SubElement(xfdf, 'annots') | |
| # Check if page size object exists, and if current coordinates are in relative format or image coordinates format. | |
| if page_sizes: | |
| page_sizes_df = pd.DataFrame(page_sizes) | |
| # If there are no image coordinates, then convert coordinates to pymupdf coordinates prior to export | |
| pages_are_images = False | |
| if "mediabox_width" not in review_file_df.columns: | |
| review_file_df = review_file_df.merge(page_sizes_df, how="left", on = "page") | |
| # If all coordinates are less or equal to one, this is a relative page scaling - change back to image coordinates | |
| if review_file_df["xmin"].max() <= 1 and review_file_df["xmax"].max() <= 1 and review_file_df["ymin"].max() <= 1 and review_file_df["ymax"].max() <= 1: | |
| review_file_df["xmin"] = review_file_df["xmin"] * review_file_df["mediabox_width"] | |
| review_file_df["xmax"] = review_file_df["xmax"] * review_file_df["mediabox_width"] | |
| review_file_df["ymin"] = review_file_df["ymin"] * review_file_df["mediabox_height"] | |
| review_file_df["ymax"] = review_file_df["ymax"] * review_file_df["mediabox_height"] | |
| # If all nulls, then can do image coordinate conversion | |
| if len(page_sizes_df.loc[page_sizes_df["mediabox_width"].isnull(),"mediabox_width"]) == len(page_sizes_df["mediabox_width"]): | |
| pages_are_images = True | |
| review_file_df = multiply_coordinates_by_page_sizes(review_file_df, page_sizes_df, xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax") | |
| # if "image_width" not in review_file_df.columns: | |
| # review_file_df = review_file_df.merge(page_sizes_df, how="left", on = "page") | |
| # # If all coordinates are less or equal to one, this is a relative page scaling - change back to image coordinates | |
| # if review_file_df["xmin"].max() <= 1 and review_file_df["xmax"].max() <= 1 and review_file_df["ymin"].max() <= 1 and review_file_df["ymax"].max() <= 1: | |
| # review_file_df["xmin"] = review_file_df["xmin"] * review_file_df["image_width"] | |
| # review_file_df["xmax"] = review_file_df["xmax"] * review_file_df["image_width"] | |
| # review_file_df["ymin"] = review_file_df["ymin"] * review_file_df["image_height"] | |
| # review_file_df["ymax"] = review_file_df["ymax"] * review_file_df["image_height"] | |
| # Go through each row of the review_file_df, create an entry in the output Adobe xfdf file. | |
| for _, row in review_file_df.iterrows(): | |
| page_num_reported = row["page"] | |
| page_python_format = int(row["page"])-1 | |
| pymupdf_page = pymupdf_doc.load_page(page_python_format) | |
| # Load cropbox sizes. Set cropbox to the original cropbox sizes from when the document was loaded into the app. | |
| if document_cropboxes: | |
| # Extract numbers safely using regex | |
| match = re.findall(r"[-+]?\d*\.\d+|\d+", document_cropboxes[page_python_format]) | |
| if match and len(match) == 4: | |
| rect_values = list(map(float, match)) # Convert extracted strings to floats | |
| pymupdf_page.set_cropbox(Rect(*rect_values)) | |
| else: | |
| raise ValueError(f"Invalid cropbox format: {document_cropboxes[page_python_format]}") | |
| else: | |
| print("Document cropboxes not found.") | |
| pdf_page_height = pymupdf_page.mediabox.height | |
| pdf_page_width = pymupdf_page.mediabox.width | |
| # Create redaction annotation | |
| redact_annot = SubElement(annots, 'redact') | |
| # Generate unique ID | |
| annot_id = str(uuid.uuid4()) | |
| redact_annot.set('name', annot_id) | |
| # Set page number (subtract 1 as PDF pages are 0-based) | |
| redact_annot.set('page', str(int(row['page']) - 1)) | |
| # # Convert coordinates | |
| # if pages_are_images == True: | |
| # x1, y1, x2, y2 = convert_image_coords_to_adobe( | |
| # pdf_page_width, | |
| # pdf_page_height, | |
| # image_page_width, | |
| # image_page_height, | |
| # row['xmin'], | |
| # row['ymin'], | |
| # row['xmax'], | |
| # row['ymax'] | |
| # ) | |
| # else: | |
| x1, y1, x2, y2 = convert_pymupdf_coords_to_adobe(row['xmin'], | |
| row['ymin'], | |
| row['xmax'], | |
| row['ymax'], pdf_page_height) | |
| if CUSTOM_BOX_COLOUR == "grey": | |
| colour_str = "0.5,0.5,0.5" | |
| else: | |
| colour_str = row['color'].strip('()').replace(' ', '') | |
| # Set coordinates | |
| redact_annot.set('rect', f"{x1:.2f},{y1:.2f},{x2:.2f},{y2:.2f}") | |
| # Set redaction properties | |
| redact_annot.set('title', row['label']) # The type of redaction (e.g., "PERSON") | |
| redact_annot.set('contents', row['text']) # The redacted text | |
| redact_annot.set('subject', row['label']) # The redacted text | |
| redact_annot.set('mimetype', "Form") | |
| # Set appearance properties | |
| redact_annot.set('border-color', colour_str) # Black border | |
| redact_annot.set('repeat', 'false') | |
| redact_annot.set('interior-color', colour_str) | |
| #redact_annot.set('fill-color', colour_str) | |
| #redact_annot.set('outline-color', colour_str) | |
| #redact_annot.set('overlay-color', colour_str) | |
| #redact_annot.set('overlay-text', row['label']) | |
| redact_annot.set('opacity', "0.5") | |
| # Add appearance dictionary | |
| # appearanceDict = SubElement(redact_annot, 'appearancedict') | |
| # # Normal appearance | |
| # normal = SubElement(appearanceDict, 'normal') | |
| # #normal.set('appearance', 'redact') | |
| # # Color settings for the mark (before applying redaction) | |
| # markAppearance = SubElement(redact_annot, 'markappearance') | |
| # markAppearance.set('stroke-color', colour_str) # Red outline | |
| # markAppearance.set('fill-color', colour_str) # Light red fill | |
| # markAppearance.set('opacity', '0.5') # 50% opacity | |
| # # Final redaction appearance (after applying) | |
| # redactAppearance = SubElement(redact_annot, 'redactAppearance') | |
| # redactAppearance.set('fillColor', colour_str) # Black fill | |
| # redactAppearance.set('fontName', 'Helvetica') | |
| # redactAppearance.set('fontSize', '12') | |
| # redactAppearance.set('textAlignment', 'left') | |
| # redactAppearance.set('textColor', colour_str) # White text | |
| # Convert to pretty XML string | |
| xml_str = minidom.parseString(tostring(xfdf)).toprettyxml(indent=" ") | |
| return xml_str | |
| def convert_df_to_xfdf(input_files:List[str], pdf_doc:Document, image_paths:List[str], output_folder:str = OUTPUT_FOLDER, document_cropboxes:List=[], page_sizes:List[dict]=[]): | |
| ''' | |
| Load in files to convert a review file into an Adobe comment file format | |
| ''' | |
| output_paths = [] | |
| pdf_name = "" | |
| file_path_name = "" | |
| if isinstance(input_files, str): | |
| file_paths_list = [input_files] | |
| else: | |
| file_paths_list = input_files | |
| # Sort the file paths so that the pdfs come first | |
| file_paths_list = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json')) | |
| for file in file_paths_list: | |
| if isinstance(file, str): | |
| file_path = file | |
| else: | |
| file_path = file.name | |
| file_path_name = get_file_name_without_type(file_path) | |
| file_path_end = detect_file_type(file_path) | |
| if file_path_end == "pdf": | |
| pdf_name = os.path.basename(file_path) | |
| if file_path_end == "csv": | |
| # If no pdf name, just get the name of the file path | |
| if not pdf_name: | |
| pdf_name = file_path_name | |
| # Read CSV file | |
| review_file_df = pd.read_csv(file_path) | |
| review_file_df.fillna('', inplace=True) # Replace NaN in review file with an empty string | |
| xfdf_content = create_xfdf(review_file_df, pdf_name, pdf_doc, image_paths, document_cropboxes, page_sizes) | |
| output_path = output_folder + file_path_name + "_adobe.xfdf" | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(xfdf_content) | |
| output_paths.append(output_path) | |
| return output_paths | |
| ### Convert xfdf coordinates back to image for app | |
| def convert_adobe_coords_to_image(pdf_page_width:float, pdf_page_height:float, image_width:float, image_height:float, x1:float, y1:float, x2:float, y2:float): | |
| ''' | |
| Converts coordinates from Adobe PDF space to image space. | |
| Parameters: | |
| - pdf_page_width: Width of the PDF page | |
| - pdf_page_height: Height of the PDF page | |
| - image_width: Width of the source image | |
| - image_height: Height of the source image | |
| - x1, y1, x2, y2: Coordinates in Adobe PDF space | |
| Returns: | |
| - Tuple of converted coordinates (x1, y1, x2, y2) in image space | |
| ''' | |
| # Calculate scaling factors | |
| scale_width = image_width / pdf_page_width | |
| scale_height = image_height / pdf_page_height | |
| # Convert coordinates | |
| image_x1 = x1 * scale_width | |
| image_x2 = x2 * scale_width | |
| # Convert Y coordinates (flip vertical axis) | |
| # Adobe coordinates start from bottom-left | |
| image_y1 = (pdf_page_height - y1) * scale_height | |
| image_y2 = (pdf_page_height - y2) * scale_height | |
| # Make sure y1 is always less than y2 for image's coordinate system | |
| if image_y1 > image_y2: | |
| image_y1, image_y2 = image_y2, image_y1 | |
| return image_x1, image_y1, image_x2, image_y2 | |
| def parse_xfdf(xfdf_path:str): | |
| ''' | |
| Parse the XFDF file and extract redaction annotations. | |
| Parameters: | |
| - xfdf_path: Path to the XFDF file | |
| Returns: | |
| - List of dictionaries containing redaction information | |
| ''' | |
| tree = parse(xfdf_path) | |
| root = tree.getroot() | |
| # Define the namespace | |
| namespace = {'xfdf': 'http://ns.adobe.com/xfdf/'} | |
| redactions = [] | |
| # Find all redact elements using the namespace | |
| for redact in root.findall('.//xfdf:redact', namespaces=namespace): | |
| redaction_info = { | |
| 'image': '', # Image will be filled in later | |
| 'page': int(redact.get('page')) + 1, # Convert to 1-based index | |
| 'xmin': float(redact.get('rect').split(',')[0]), | |
| 'ymin': float(redact.get('rect').split(',')[1]), | |
| 'xmax': float(redact.get('rect').split(',')[2]), | |
| 'ymax': float(redact.get('rect').split(',')[3]), | |
| 'label': redact.get('title'), | |
| 'text': redact.get('contents'), | |
| 'color': redact.get('border-color', '(0, 0, 0)') # Default to black if not specified | |
| } | |
| redactions.append(redaction_info) | |
| return redactions | |
| def convert_xfdf_to_dataframe(file_paths_list:List[str], pymupdf_doc, image_paths:List[str], output_folder:str=OUTPUT_FOLDER): | |
| ''' | |
| Convert redaction annotations from XFDF and associated images into a DataFrame. | |
| Parameters: | |
| - xfdf_path: Path to the XFDF file | |
| - pdf_doc: PyMuPDF document object | |
| - image_paths: List of PIL Image objects corresponding to PDF pages | |
| Returns: | |
| - DataFrame containing redaction information | |
| ''' | |
| output_paths = [] | |
| xfdf_paths = [] | |
| df = pd.DataFrame() | |
| # Sort the file paths so that the pdfs come first | |
| file_paths_list = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json')) | |
| for file in file_paths_list: | |
| if isinstance(file, str): | |
| file_path = file | |
| else: | |
| file_path = file.name | |
| file_path_name = get_file_name_without_type(file_path) | |
| file_path_end = detect_file_type(file_path) | |
| if file_path_end == "pdf": | |
| pdf_name = os.path.basename(file_path) | |
| # Add pdf to outputs | |
| output_paths.append(file_path) | |
| if file_path_end == "xfdf": | |
| if not pdf_name: | |
| message = "Original PDF needed to convert from .xfdf format" | |
| print(message) | |
| raise ValueError(message) | |
| xfdf_path = file | |
| file_path_name = get_file_name_without_type(xfdf_path) | |
| # Parse the XFDF file | |
| redactions = parse_xfdf(xfdf_path) | |
| # Create a DataFrame from the redaction information | |
| df = pd.DataFrame(redactions) | |
| df.fillna('', inplace=True) # Replace NaN with an empty string | |
| for _, row in df.iterrows(): | |
| page_python_format = int(row["page"])-1 | |
| pymupdf_page = pymupdf_doc.load_page(page_python_format) | |
| pdf_page_height = pymupdf_page.rect.height | |
| pdf_page_width = pymupdf_page.rect.width | |
| image_path = image_paths[page_python_format] | |
| if isinstance(image_path, str): | |
| image = Image.open(image_path) | |
| image_page_width, image_page_height = image.size | |
| # Convert to image coordinates | |
| image_x1, image_y1, image_x2, image_y2 = convert_adobe_coords_to_image(pdf_page_width, pdf_page_height, image_page_width, image_page_height, row['xmin'], row['ymin'], row['xmax'], row['ymax']) | |
| df.loc[_, ['xmin', 'ymin', 'xmax', 'ymax']] = [image_x1, image_y1, image_x2, image_y2] | |
| # Optionally, you can add the image path or other relevant information | |
| df.loc[_, 'image'] = image_path | |
| out_file_path = output_folder + file_path_name + "_review_file.csv" | |
| df.to_csv(out_file_path, index=None) | |
| output_paths.append(out_file_path) | |
| return output_paths |