|
import time
|
|
import re
|
|
import json
|
|
import io
|
|
import os
|
|
from PIL import Image, ImageChops, ImageFile
|
|
ImageFile.LOAD_TRUNCATED_IMAGES = True
|
|
|
|
from typing import List, Dict, Tuple
|
|
import pandas as pd
|
|
|
|
|
|
from pdfminer.high_level import extract_pages
|
|
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine, LTTextLineHorizontal, LTAnno
|
|
from pikepdf import Pdf, Dictionary, Name
|
|
import pymupdf
|
|
from pymupdf import Rect
|
|
from fitz import Document, Page
|
|
|
|
import gradio as gr
|
|
from gradio import Progress
|
|
from collections import defaultdict
|
|
|
|
from tools.custom_image_analyser_engine import CustomImageAnalyzerEngine, OCRResult, combine_ocr_results, CustomImageRecognizerResult
|
|
from tools.file_conversion import process_file
|
|
from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold
|
|
from tools.helper_functions import get_file_path_end, output_folder
|
|
from tools.file_conversion import process_file, is_pdf, convert_text_pdf_to_img_pdf, is_pdf_or_image
|
|
from tools.data_anonymise import generate_decision_process_output
|
|
from tools.aws_textract import analyse_page_with_textract, convert_pike_pdf_page_to_bytes, json_to_ocrresult
|
|
|
|
def sum_numbers_before_seconds(string:str):
|
|
"""Extracts numbers that precede the word 'seconds' from a string and adds them up.
|
|
|
|
Args:
|
|
string: The input string.
|
|
|
|
Returns:
|
|
The sum of all numbers before 'seconds' in the string.
|
|
"""
|
|
|
|
|
|
numbers = re.findall(r'(\d+\.\d+)?\s*seconds', string)
|
|
|
|
|
|
numbers = [float(num.split()[0]) for num in numbers]
|
|
|
|
|
|
sum_of_numbers = round(sum(numbers),1)
|
|
|
|
return sum_of_numbers
|
|
|
|
def choose_and_run_redactor(file_paths:List[str], prepared_pdf_file_paths:List[str], prepared_pdf_image_paths:List[str], language:str, chosen_redact_entities:List[str], in_redact_method:str, in_allow_list:List[List[str]]=None, latest_file_completed:int=0, out_message:list=[], out_file_paths:list=[], log_files_output_paths:list=[], first_loop_state:bool=False, page_min:int=0, page_max:int=999, estimated_time_taken_state:float=0.0, handwrite_signature_checkbox:List[str]=["Redact all identified handwriting", "Redact all identified signatures"], all_request_metadata_str:str = "", all_image_annotations:dict={}, pdf_text=[], progress=gr.Progress(track_tqdm=True)):
|
|
'''
|
|
Based on the type of redaction selected, pass the document file content onto the relevant function and return a redacted document plus processing logs.
|
|
'''
|
|
|
|
tic = time.perf_counter()
|
|
all_request_metadata = all_request_metadata_str.split('\n') if all_request_metadata_str else []
|
|
|
|
|
|
if first_loop_state==True:
|
|
latest_file_completed = 0
|
|
|
|
out_file_paths = []
|
|
pdf_text = []
|
|
|
|
|
|
if isinstance(out_message, str):
|
|
out_message = [out_message]
|
|
|
|
if not out_file_paths:
|
|
out_file_paths = []
|
|
|
|
latest_file_completed = int(latest_file_completed)
|
|
|
|
|
|
|
|
|
|
if latest_file_completed >= len(file_paths):
|
|
|
|
|
|
latest_file_completed = 99
|
|
final_out_message = '\n'.join(out_message)
|
|
|
|
|
|
estimate_total_processing_time = sum_numbers_before_seconds(final_out_message)
|
|
print("Estimated total processing time:", str(estimate_total_processing_time))
|
|
|
|
|
|
|
|
return final_out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimate_total_processing_time, all_request_metadata_str, pdf_text, all_image_annotations
|
|
|
|
file_paths_loop = [file_paths[int(latest_file_completed)]]
|
|
|
|
if not in_allow_list.empty:
|
|
in_allow_list_flat = in_allow_list[0].tolist()
|
|
print("In allow list:", in_allow_list_flat)
|
|
else:
|
|
in_allow_list_flat = []
|
|
|
|
progress(0.5, desc="Redacting file")
|
|
|
|
for file in file_paths_loop:
|
|
|
|
file_path = file.name
|
|
|
|
if file_path:
|
|
file_path_without_ext = get_file_path_end(file_path)
|
|
is_a_pdf = is_pdf(file_path) == True
|
|
if is_a_pdf == False:
|
|
|
|
print("File is not a pdf, assuming that image analysis needs to be used.")
|
|
in_redact_method = "Quick image analysis - typed text"
|
|
else:
|
|
out_message = "No file selected"
|
|
print(out_message)
|
|
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str, pdf_text, all_image_annotations
|
|
|
|
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
|
|
|
if is_pdf_or_image(file_path) == False:
|
|
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
|
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str, pdf_text, all_image_annotations
|
|
|
|
print("Redacting file " + file_path_without_ext + " as an image-based file")
|
|
|
|
pdf_text, redaction_logs, logging_file_paths, new_request_metadata, all_image_annotations = redact_image_pdf(file_path, prepared_pdf_image_paths, language, chosen_redact_entities, in_allow_list_flat, is_a_pdf, page_min, page_max, in_redact_method, handwrite_signature_checkbox)
|
|
|
|
|
|
if is_pdf(file_path) == False:
|
|
out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
|
|
pdf_text[0].save(out_image_file_path, "PDF" ,resolution=100.0, save_all=True, append_images=pdf_text[1:])
|
|
|
|
else:
|
|
out_image_file_path = output_folder + file_path_without_ext + "_redacted.pdf"
|
|
pdf_text.save(out_image_file_path)
|
|
|
|
out_file_paths.append(out_image_file_path)
|
|
if logging_file_paths:
|
|
log_files_output_paths.extend(logging_file_paths)
|
|
|
|
out_message.append("File '" + file_path_without_ext + "' successfully redacted")
|
|
|
|
|
|
logs_output_file_name = out_image_file_path + "_decision_process_output.csv"
|
|
redaction_logs.to_csv(logs_output_file_name)
|
|
log_files_output_paths.append(logs_output_file_name)
|
|
|
|
|
|
if new_request_metadata:
|
|
print("Request metadata:", new_request_metadata)
|
|
all_request_metadata.append(new_request_metadata)
|
|
|
|
|
|
if latest_file_completed != len(file_paths):
|
|
print("Completed file number:", str(latest_file_completed))
|
|
latest_file_completed += 1
|
|
|
|
elif in_redact_method == "Simple text analysis - PDFs with selectable text":
|
|
|
|
print("file_path for selectable text analysis:", file_path)
|
|
|
|
if is_pdf(file_path) == False:
|
|
out_message = "Please upload a PDF file for text analysis. If you have an image, select 'Image analysis'."
|
|
return out_message, None, None
|
|
|
|
|
|
print('Redacting file as text-based PDF')
|
|
pdf_text, decision_process_logs, page_text_outputs, all_image_annotations = redact_text_pdf(file_path, prepared_pdf_image_paths, language, chosen_redact_entities, in_allow_list_flat, page_min, page_max, "Simple text analysis - PDFs with selectable text")
|
|
|
|
out_text_file_path = output_folder + file_path_without_ext + "_text_redacted.pdf"
|
|
pdf_text.save(out_text_file_path)
|
|
out_file_paths.append(out_text_file_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
decision_logs_output_file_name = out_text_file_path + "_decision_process_output.csv"
|
|
decision_process_logs.to_csv(decision_logs_output_file_name)
|
|
log_files_output_paths.append(decision_logs_output_file_name)
|
|
|
|
all_text_output_file_name = out_text_file_path + "_all_text_output.csv"
|
|
page_text_outputs.to_csv(all_text_output_file_name)
|
|
log_files_output_paths.append(all_text_output_file_name)
|
|
|
|
out_message_new = "File '" + file_path_without_ext + "' successfully redacted"
|
|
out_message.append(out_message_new)
|
|
|
|
if latest_file_completed != len(file_paths):
|
|
print("Completed file number:", str(latest_file_completed), "more files to do")
|
|
latest_file_completed += 1
|
|
|
|
else:
|
|
out_message = "No redaction method selected"
|
|
print(out_message)
|
|
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str, pdf_text, all_image_annotations
|
|
|
|
toc = time.perf_counter()
|
|
out_time = f"in {toc - tic:0.1f} seconds."
|
|
print(out_time)
|
|
|
|
out_message_out = '\n'.join(out_message)
|
|
out_message_out = out_message_out + " " + out_time
|
|
|
|
|
|
if all_request_metadata:
|
|
all_request_metadata_str = '\n'.join(all_request_metadata)
|
|
|
|
all_request_metadata_file_path = output_folder + file_path_without_ext + "_textract_request_metadata.txt"
|
|
|
|
with open(all_request_metadata_file_path, "w") as f:
|
|
f.write(all_request_metadata_str)
|
|
|
|
|
|
if all_request_metadata_file_path not in log_files_output_paths:
|
|
log_files_output_paths.append(all_request_metadata_file_path)
|
|
|
|
|
|
return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str, pdf_text, all_image_annotations
|
|
|
|
def convert_pikepdf_coords_to_pymudf(pymupdf_page, annot):
|
|
'''
|
|
Convert annotations from pikepdf to pymupdf format
|
|
'''
|
|
|
|
mediabox_height = pymupdf_page.mediabox[3] - pymupdf_page.mediabox[1]
|
|
mediabox_width = pymupdf_page.mediabox[2] - pymupdf_page.mediabox[0]
|
|
rect_height = pymupdf_page.rect.height
|
|
rect_width = pymupdf_page.rect.width
|
|
|
|
|
|
|
|
|
|
|
|
|
|
page_x_adjust = (rect_width - mediabox_width) / 2
|
|
page_y_adjust = (rect_height - mediabox_height) / 2
|
|
|
|
|
|
|
|
rect_field = annot["/Rect"]
|
|
|
|
|
|
rect_coordinates = [float(coord) for coord in rect_field]
|
|
|
|
|
|
x1, y1, x2, y2 = rect_coordinates
|
|
x1 = x1 + page_x_adjust
|
|
new_y1 = (rect_height - y2) - page_y_adjust
|
|
x2 = x2 + page_x_adjust
|
|
new_y2 = (rect_height - y1) - page_y_adjust
|
|
|
|
return x1, new_y1, x2, new_y2
|
|
|
|
def convert_pikepdf_to_image_coords(pymupdf_page, annot, image:Image):
|
|
'''
|
|
Convert annotations from pikepdf coordinates to image coordinates.
|
|
'''
|
|
|
|
|
|
rect_height = pymupdf_page.rect.height
|
|
rect_width = pymupdf_page.rect.width
|
|
|
|
|
|
image_page_width, image_page_height = image.size
|
|
|
|
|
|
scale_width = image_page_width / rect_width
|
|
scale_height = image_page_height / rect_height
|
|
|
|
|
|
rect_field = annot["/Rect"]
|
|
|
|
|
|
rect_coordinates = [float(coord) for coord in rect_field]
|
|
|
|
|
|
x1, y1, x2, y2 = rect_coordinates
|
|
x1_image = x1 * scale_width
|
|
new_y1_image = image_page_height - (y2 * scale_height)
|
|
x2_image = x2 * scale_width
|
|
new_y2_image = image_page_height - (y1 * scale_height)
|
|
|
|
return x1_image, new_y1_image, x2_image, new_y2_image
|
|
|
|
def convert_image_coords_to_pymupdf(pymupdf_page, annot:CustomImageRecognizerResult, image:Image):
|
|
'''
|
|
Converts an image with redaction coordinates from a CustomImageRecognizerResult to pymupdf coordinates.
|
|
'''
|
|
|
|
rect_height = pymupdf_page.rect.height
|
|
rect_width = pymupdf_page.rect.width
|
|
|
|
image_page_width, image_page_height = image.size
|
|
|
|
|
|
scale_width = rect_width / image_page_width
|
|
scale_height = rect_height / image_page_height
|
|
|
|
|
|
x1 = (annot.left * scale_width)
|
|
new_y1 = (annot.top * scale_height)
|
|
x2 = ((annot.left + annot.width) * scale_width)
|
|
new_y2 = ((annot.top + annot.height) * scale_height)
|
|
|
|
return x1, new_y1, x2, new_y2
|
|
|
|
def convert_gradio_annotation_coords_to_pymupdf(pymupdf_page:Page, annot:dict, image:Image):
|
|
'''
|
|
Converts an image with redaction coordinates from a gradio annotation component to pymupdf coordinates.
|
|
'''
|
|
|
|
rect_height = pymupdf_page.rect.height
|
|
rect_width = pymupdf_page.rect.width
|
|
|
|
image_page_width, image_page_height = image.size
|
|
|
|
|
|
scale_width = rect_width / image_page_width
|
|
scale_height = rect_height / image_page_height
|
|
|
|
|
|
x1 = (annot["xmin"] * scale_width)
|
|
new_y1 = (annot["ymin"] * scale_height)
|
|
x2 = ((annot["xmax"]) * scale_width)
|
|
new_y2 = ((annot["ymax"]) * scale_height)
|
|
|
|
return x1, new_y1, x2, new_y2
|
|
|
|
def move_page_info(file_path: str) -> str:
|
|
|
|
base, extension = file_path.rsplit('.pdf', 1)
|
|
|
|
|
|
page_info = base.split('page ')[1].split(' of')[0]
|
|
new_base = base.replace(f'page {page_info} of ', '')
|
|
|
|
|
|
new_file_path = f"{new_base}_page_{page_info}.png"
|
|
|
|
return new_file_path
|
|
|
|
def redact_page_with_pymupdf(page:Page, annotations_on_page, image = None):
|
|
|
|
mediabox_height = page.mediabox[3] - page.mediabox[1]
|
|
mediabox_width = page.mediabox[2] - page.mediabox[0]
|
|
rect_height = page.rect.height
|
|
rect_width = page.rect.width
|
|
|
|
|
|
|
|
|
|
out_annotation_boxes = {}
|
|
all_image_annotation_boxes = []
|
|
image_path = ""
|
|
|
|
if isinstance(image, Image.Image):
|
|
image_path = move_page_info(str(page))
|
|
image.save(image_path)
|
|
elif isinstance(image, str):
|
|
image_path = image
|
|
image = Image.open(image_path)
|
|
|
|
|
|
|
|
|
|
if isinstance (annotations_on_page, dict):
|
|
annotations_on_page = annotations_on_page["boxes"]
|
|
|
|
|
|
for annot in annotations_on_page:
|
|
|
|
|
|
|
|
if (isinstance(annot, CustomImageRecognizerResult)) | isinstance(annot, dict):
|
|
|
|
img_annotation_box = {}
|
|
|
|
|
|
if isinstance(annot, dict):
|
|
img_annotation_box = annot
|
|
try:
|
|
img_annotation_box["label"] = annot.entity_type
|
|
except:
|
|
img_annotation_box["label"] = "Redaction"
|
|
|
|
x1, pymupdf_y1, x2, pymupdf_y2 = convert_gradio_annotation_coords_to_pymupdf(page, annot, image)
|
|
|
|
|
|
else:
|
|
x1, pymupdf_y1, x2, pymupdf_y2 = convert_image_coords_to_pymupdf(page, annot, image)
|
|
|
|
img_annotation_box["xmin"] = annot.left
|
|
img_annotation_box["ymin"] = annot.top
|
|
img_annotation_box["xmax"] = annot.left + annot.width
|
|
img_annotation_box["ymax"] = annot.top + annot.height
|
|
img_annotation_box["color"] = (0,0,0)
|
|
try:
|
|
img_annotation_box["label"] = annot.entity_type
|
|
except:
|
|
img_annotation_box["label"] = "Redaction"
|
|
|
|
rect = Rect(x1, pymupdf_y1, x2, pymupdf_y2)
|
|
|
|
|
|
else:
|
|
x1, pymupdf_y1, x2, pymupdf_y2 = convert_pikepdf_coords_to_pymudf(page, annot)
|
|
|
|
rect = Rect(x1, pymupdf_y1, x2, pymupdf_y2)
|
|
|
|
img_annotation_box = {}
|
|
|
|
if image:
|
|
image_x1, image_y1, image_x2, image_y2 = convert_pikepdf_to_image_coords(page, annot, image)
|
|
|
|
|
|
img_annotation_box["xmin"] = image_x1
|
|
img_annotation_box["ymin"] = image_y1
|
|
img_annotation_box["xmax"] = image_x2
|
|
img_annotation_box["ymax"] = image_y2
|
|
img_annotation_box["color"] = (0,0,0)
|
|
|
|
if isinstance(annot, Dictionary):
|
|
|
|
img_annotation_box["label"] = str(annot["/T"])
|
|
|
|
else:
|
|
img_annotation_box["label"] = "REDACTION"
|
|
|
|
|
|
|
|
|
|
all_image_annotation_boxes.append(img_annotation_box)
|
|
|
|
|
|
middle_y = (pymupdf_y1 + pymupdf_y2) / 2
|
|
rect_single_pixel_height = Rect(x1, middle_y - 2, x2, middle_y + 2)
|
|
|
|
|
|
page.add_redact_annot(rect_single_pixel_height)
|
|
|
|
|
|
shape = page.new_shape()
|
|
shape.draw_rect(rect)
|
|
shape.finish(color=(0, 0, 0), fill=(0, 0, 0))
|
|
shape.commit()
|
|
|
|
out_annotation_boxes = {
|
|
"image": image_path,
|
|
"boxes": all_image_annotation_boxes
|
|
}
|
|
|
|
page.apply_redactions(images=0, graphics=0)
|
|
page.clean_contents()
|
|
|
|
|
|
|
|
|
|
return page, out_annotation_boxes
|
|
|
|
def bounding_boxes_overlap(box1, box2):
|
|
"""Check if two bounding boxes overlap."""
|
|
return (box1[0] < box2[2] and box2[0] < box1[2] and
|
|
box1[1] < box2[3] and box2[1] < box1[3])
|
|
|
|
def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_results=[], handwriting_recogniser_results=[], handwrite_signature_checkbox: List[str]=["Redact all identified handwriting", "Redact all identified signatures"], horizontal_threshold:int=50, vertical_threshold:int=12):
|
|
merged_bboxes = []
|
|
grouped_bboxes = defaultdict(list)
|
|
|
|
|
|
if signature_recogniser_results or handwriting_recogniser_results:
|
|
if "Redact all identified handwriting" in handwrite_signature_checkbox:
|
|
print("Handwriting boxes exist at merge:", handwriting_recogniser_results)
|
|
bboxes.extend(handwriting_recogniser_results)
|
|
|
|
if "Redact all identified signatures" in handwrite_signature_checkbox:
|
|
print("Signature boxes exist at merge:", signature_recogniser_results)
|
|
bboxes.extend(signature_recogniser_results)
|
|
|
|
|
|
reconstructed_bboxes = []
|
|
for bbox in bboxes:
|
|
print("bbox:", bbox)
|
|
bbox_box = (bbox.left, bbox.top, bbox.left + bbox.width, bbox.top + bbox.height)
|
|
for line_text, line_info in combined_results.items():
|
|
line_box = line_info['bounding_box']
|
|
if bounding_boxes_overlap(bbox_box, line_box):
|
|
if bbox.text in line_text:
|
|
start_char = line_text.index(bbox.text)
|
|
end_char = start_char + len(bbox.text)
|
|
|
|
relevant_words = []
|
|
current_char = 0
|
|
for word in line_info['words']:
|
|
word_end = current_char + len(word['text'])
|
|
if current_char <= start_char < word_end or current_char < end_char <= word_end or (start_char <= current_char and word_end <= end_char):
|
|
relevant_words.append(word)
|
|
if word_end >= end_char:
|
|
break
|
|
current_char = word_end
|
|
if not word['text'].endswith(' '):
|
|
current_char += 1
|
|
|
|
if relevant_words:
|
|
|
|
left = min(word['bounding_box'][0] for word in relevant_words)
|
|
top = min(word['bounding_box'][1] for word in relevant_words)
|
|
right = max(word['bounding_box'][2] for word in relevant_words)
|
|
bottom = max(word['bounding_box'][3] for word in relevant_words)
|
|
|
|
|
|
combined_text = " ".join(word['text'] for word in relevant_words)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
reconstructed_bbox = CustomImageRecognizerResult(
|
|
bbox.entity_type,
|
|
bbox.start,
|
|
bbox.end,
|
|
bbox.score,
|
|
left,
|
|
top,
|
|
right - left,
|
|
bottom - top,
|
|
combined_text
|
|
)
|
|
reconstructed_bboxes.append(reconstructed_bbox)
|
|
break
|
|
else:
|
|
|
|
reconstructed_bboxes.append(bbox)
|
|
|
|
|
|
for box in reconstructed_bboxes:
|
|
grouped_bboxes[round(box.top / vertical_threshold)].append(box)
|
|
|
|
|
|
for _, group in grouped_bboxes.items():
|
|
group.sort(key=lambda box: box.left)
|
|
|
|
merged_box = group[0]
|
|
for next_box in group[1:]:
|
|
if next_box.left - (merged_box.left + merged_box.width) <= horizontal_threshold:
|
|
|
|
if merged_box.text == next_box.text:
|
|
new_text = merged_box.text
|
|
else:
|
|
new_text = merged_box.text + " " + next_box.text
|
|
|
|
if merged_box.text == next_box.text:
|
|
new_text = merged_box.text
|
|
new_entity_type = merged_box.entity_type
|
|
else:
|
|
new_text = merged_box.text + " " + next_box.text
|
|
new_entity_type = merged_box.entity_type + " - " + next_box.entity_type
|
|
|
|
new_left = min(merged_box.left, next_box.left)
|
|
new_top = min(merged_box.top, next_box.top)
|
|
new_width = max(merged_box.left + merged_box.width, next_box.left + next_box.width) - new_left
|
|
new_height = max(merged_box.top + merged_box.height, next_box.top + next_box.height) - new_top
|
|
merged_box = CustomImageRecognizerResult(
|
|
new_entity_type, merged_box.start, merged_box.end, merged_box.score, new_left, new_top, new_width, new_height, new_text
|
|
)
|
|
else:
|
|
merged_bboxes.append(merged_box)
|
|
merged_box = next_box
|
|
|
|
merged_bboxes.append(merged_box)
|
|
|
|
return merged_bboxes
|
|
|
|
def redact_image_pdf(file_path:str, prepared_pdf_file_paths:List[str], language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, is_a_pdf:bool=True, page_min:int=0, page_max:int=999, analysis_type:str="Quick image analysis - typed text", handwrite_signature_checkbox:List[str]=["Redact all identified handwriting", "Redact all identified signatures"], request_metadata:str="", progress=Progress(track_tqdm=True)):
|
|
'''
|
|
Take an path for an image of a document, then run this image through the Presidio ImageAnalyzer and PIL to get a redacted page back. Adapted from Presidio ImageRedactorEngine.
|
|
'''
|
|
|
|
logging_file_paths = []
|
|
file_name = get_file_path_end(file_path)
|
|
fill = (0, 0, 0)
|
|
decision_process_output_str = ""
|
|
images = []
|
|
all_image_annotations = []
|
|
|
|
image_analyser = CustomImageAnalyzerEngine(nlp_analyser)
|
|
|
|
|
|
pymupdf_doc = pymupdf.open(file_path)
|
|
|
|
if not prepared_pdf_file_paths:
|
|
out_message = "PDF does not exist as images. Converting pages to image"
|
|
print(out_message)
|
|
|
|
prepared_pdf_file_paths = process_file(file_path)
|
|
|
|
if not isinstance(prepared_pdf_file_paths, list):
|
|
print("Converting prepared_pdf_file_paths to list")
|
|
prepared_pdf_file_paths = [prepared_pdf_file_paths]
|
|
|
|
|
|
number_of_pages = len(prepared_pdf_file_paths)
|
|
|
|
print("Number of pages:", str(number_of_pages))
|
|
|
|
out_message = "Redacting pages"
|
|
print(out_message)
|
|
|
|
|
|
|
|
if page_max > number_of_pages or page_max == 0:
|
|
page_max = number_of_pages
|
|
|
|
if page_min <= 0:
|
|
page_min = 0
|
|
else:
|
|
page_min = page_min - 1
|
|
|
|
print("Page range:", str(page_min + 1), "to", str(page_max))
|
|
|
|
|
|
|
|
all_ocr_results = []
|
|
all_decision_process = []
|
|
all_line_level_ocr_results_df = pd.DataFrame()
|
|
all_decision_process_table = pd.DataFrame()
|
|
|
|
if analysis_type == "Quick image analysis - typed text": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".csv"
|
|
elif analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.csv"
|
|
|
|
for i in range(0, number_of_pages):
|
|
handwriting_or_signature_boxes = []
|
|
signature_recogniser_results = []
|
|
handwriting_recogniser_results = []
|
|
|
|
|
|
|
|
try:
|
|
image = prepared_pdf_file_paths[i]
|
|
print("image:", image)
|
|
except Exception as e:
|
|
print("Could not redact page:", reported_page_number, "due to:")
|
|
print(e)
|
|
continue
|
|
|
|
image_annotations = {"image": image, "boxes": []}
|
|
|
|
|
|
print("prepared_pdf_file_paths:", prepared_pdf_file_paths)
|
|
|
|
if i >= page_min and i < page_max:
|
|
|
|
reported_page_number = str(i + 1)
|
|
|
|
print("Redacting page", reported_page_number)
|
|
|
|
pymupdf_page = pymupdf_doc.load_page(i)
|
|
|
|
|
|
page_width, page_height = image.size
|
|
|
|
|
|
if language == 'en':
|
|
ocr_lang = 'eng'
|
|
else: ocr_lang = language
|
|
|
|
|
|
if analysis_type == "Quick image analysis - typed text":
|
|
|
|
word_level_ocr_results = image_analyser.perform_ocr(image)
|
|
|
|
|
|
line_level_ocr_results, line_level_ocr_results_with_children = combine_ocr_results(word_level_ocr_results)
|
|
|
|
|
|
|
|
|
|
ocr_results_with_children_str = str(line_level_ocr_results_with_children)
|
|
logs_output_file_name = output_folder + "ocr_with_children.txt"
|
|
with open(logs_output_file_name, "w") as f:
|
|
f.write(ocr_results_with_children_str)
|
|
|
|
|
|
if analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
|
|
|
|
|
image_buffer = io.BytesIO()
|
|
image.save(image_buffer, format='PNG')
|
|
pdf_page_as_bytes = image_buffer.getvalue()
|
|
|
|
json_file_path = output_folder + file_name + "_page_" + reported_page_number + "_textract.json"
|
|
|
|
if not os.path.exists(json_file_path):
|
|
text_blocks, new_request_metadata = analyse_page_with_textract(pdf_page_as_bytes, json_file_path)
|
|
logging_file_paths.append(json_file_path)
|
|
request_metadata = request_metadata + "\n" + new_request_metadata
|
|
else:
|
|
|
|
print("Found existing Textract json results file for this page.")
|
|
with open(json_file_path, 'r') as json_file:
|
|
text_blocks = json.load(json_file)
|
|
text_blocks = text_blocks['Blocks']
|
|
|
|
line_level_ocr_results, handwriting_or_signature_boxes, signature_recogniser_results, handwriting_recogniser_results, line_level_ocr_results_with_children = json_to_ocrresult(text_blocks, page_width, page_height)
|
|
|
|
|
|
if chosen_redact_entities:
|
|
|
|
redaction_bboxes = image_analyser.analyze_text(
|
|
line_level_ocr_results,
|
|
line_level_ocr_results_with_children,
|
|
language=language,
|
|
entities=chosen_redact_entities,
|
|
allow_list=allow_list,
|
|
score_threshold=score_threshold,
|
|
)
|
|
else:
|
|
redaction_bboxes = []
|
|
|
|
if analysis_type == "Quick image analysis - typed text": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".txt"
|
|
elif analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.txt"
|
|
|
|
|
|
bboxes_str = str(redaction_bboxes)
|
|
with open(interim_results_file_path, "w") as f:
|
|
f.write(bboxes_str)
|
|
|
|
|
|
merged_redaction_bboxes = merge_img_bboxes(redaction_bboxes, line_level_ocr_results_with_children, signature_recogniser_results, handwriting_recogniser_results, handwrite_signature_checkbox)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if is_pdf(file_path) == False:
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
all_image_annotations_boxes = []
|
|
|
|
for box in merged_redaction_bboxes:
|
|
print("box:", box)
|
|
|
|
x0 = box.left
|
|
y0 = box.top
|
|
x1 = x0 + box.width
|
|
y1 = y0 + box.height
|
|
|
|
try:
|
|
label = box.entity_type
|
|
except:
|
|
label = "Redaction"
|
|
|
|
|
|
all_image_annotations_boxes.append({
|
|
"xmin": x0,
|
|
"ymin": y0,
|
|
"xmax": x1,
|
|
"ymax": y1,
|
|
"label": label,
|
|
"color": (0, 0, 0)
|
|
})
|
|
|
|
draw.rectangle([x0, y0, x1, y1], fill=fill)
|
|
|
|
image_annotations = {"image": file_path, "boxes": all_image_annotations_boxes}
|
|
|
|
|
|
else:
|
|
pymupdf_page, image_annotations = redact_page_with_pymupdf(pymupdf_page, merged_redaction_bboxes, image)
|
|
|
|
|
|
decision_process_table = pd.DataFrame([{
|
|
'page': reported_page_number,
|
|
'entity_type': result.entity_type,
|
|
'start': result.start,
|
|
'end': result.end,
|
|
'score': result.score,
|
|
'left': result.left,
|
|
'top': result.top,
|
|
'width': result.width,
|
|
'height': result.height,
|
|
'text': result.text
|
|
} for result in merged_redaction_bboxes])
|
|
|
|
all_decision_process_table = pd.concat([all_decision_process_table, decision_process_table])
|
|
|
|
|
|
line_level_ocr_results_df = pd.DataFrame([{
|
|
'page': reported_page_number,
|
|
'text': result.text,
|
|
'left': result.left,
|
|
'top': result.top,
|
|
'width': result.width,
|
|
'height': result.height
|
|
} for result in line_level_ocr_results])
|
|
|
|
all_line_level_ocr_results_df = pd.concat([all_line_level_ocr_results_df, line_level_ocr_results_df])
|
|
|
|
if is_pdf(file_path) == False:
|
|
images.append(image)
|
|
pymupdf_doc = images
|
|
|
|
all_image_annotations.append(image_annotations)
|
|
|
|
all_line_level_ocr_results_df.to_csv(ocr_results_file_path)
|
|
logging_file_paths.append(ocr_results_file_path)
|
|
|
|
return pymupdf_doc, all_decision_process_table, logging_file_paths, request_metadata, all_image_annotations
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_text_container_characters(text_container:LTTextContainer):
|
|
|
|
if isinstance(text_container, LTTextContainer):
|
|
characters = [char
|
|
for line in text_container
|
|
if isinstance(line, LTTextLine) or isinstance(line, LTTextLineHorizontal)
|
|
for char in line]
|
|
|
|
return characters
|
|
return []
|
|
|
|
def analyse_text_container(text_container:OCRResult, language:str, chosen_redact_entities:List[str], score_threshold:float, allow_list:List[str]):
|
|
'''
|
|
Take text and bounding boxes in OCRResult format and analyze it for PII using spacy and the Microsoft Presidio package.
|
|
'''
|
|
|
|
analyser_results = []
|
|
|
|
text_to_analyze = text_container.text
|
|
|
|
|
|
if chosen_redact_entities:
|
|
analyser_results = nlp_analyser.analyze(text=text_to_analyze,
|
|
language=language,
|
|
entities=chosen_redact_entities,
|
|
score_threshold=score_threshold,
|
|
return_decision_process=True,
|
|
allow_list=allow_list)
|
|
|
|
print(analyser_results)
|
|
|
|
return analyser_results
|
|
|
|
def create_text_bounding_boxes_from_characters(char_objects:List[LTChar]) -> Tuple[List[OCRResult], List[LTChar]]:
|
|
'''
|
|
Create an OCRResult object based on a list of pdfminer LTChar objects.
|
|
'''
|
|
|
|
line_level_results_out = []
|
|
line_level_characters_out = []
|
|
|
|
character_objects_out = []
|
|
|
|
|
|
full_text = ""
|
|
overall_bbox = [float('inf'), float('inf'), float('-inf'), float('-inf')]
|
|
word_bboxes = []
|
|
|
|
|
|
current_word = ""
|
|
current_word_bbox = [float('inf'), float('inf'), float('-inf'), float('-inf')]
|
|
|
|
for char in char_objects:
|
|
character_objects_out.append(char)
|
|
|
|
if isinstance(char, LTAnno):
|
|
|
|
full_text += char.get_text()
|
|
if current_word:
|
|
word_bboxes.append((current_word, current_word_bbox))
|
|
current_word = ""
|
|
current_word_bbox = [float('inf'), float('inf'), float('-inf'), float('-inf')]
|
|
|
|
|
|
if '\n' in char.get_text():
|
|
|
|
|
|
if current_word:
|
|
word_bboxes.append((current_word, current_word_bbox))
|
|
|
|
line_level_results_out.append(OCRResult(full_text, round(overall_bbox[0], 2), round(overall_bbox[1], 2), round(overall_bbox[2] - overall_bbox[0], 2), round(overall_bbox[3] - overall_bbox[1], 2)))
|
|
line_level_characters_out.append(character_objects_out)
|
|
|
|
character_objects_out = []
|
|
full_text = ""
|
|
overall_bbox = [float('inf'), float('inf'), float('-inf'), float('-inf')]
|
|
current_word = ""
|
|
current_word_bbox = [float('inf'), float('inf'), float('-inf'), float('-inf')]
|
|
|
|
continue
|
|
|
|
|
|
full_text += char.get_text()
|
|
|
|
|
|
x0, y0, x1, y1 = char.bbox
|
|
overall_bbox[0] = min(overall_bbox[0], x0)
|
|
overall_bbox[1] = min(overall_bbox[1], y0)
|
|
overall_bbox[2] = max(overall_bbox[2], x1)
|
|
overall_bbox[3] = max(overall_bbox[3], y1)
|
|
|
|
|
|
current_word += char.get_text()
|
|
|
|
|
|
current_word_bbox[0] = min(current_word_bbox[0], x0)
|
|
current_word_bbox[1] = min(current_word_bbox[1], y0)
|
|
current_word_bbox[2] = max(current_word_bbox[2], x1)
|
|
current_word_bbox[3] = max(current_word_bbox[3], y1)
|
|
|
|
|
|
|
|
if current_word:
|
|
word_bboxes.append((current_word, current_word_bbox))
|
|
|
|
if full_text:
|
|
line_level_results_out.append(OCRResult(full_text, round(overall_bbox[0],2), round(overall_bbox[1], 2), round(overall_bbox[2]-overall_bbox[0],2), round(overall_bbox[3]-overall_bbox[1],2)))
|
|
|
|
|
|
return line_level_results_out, line_level_characters_out
|
|
|
|
def merge_text_bounding_boxes(analyser_results:CustomImageRecognizerResult, characters:List[LTChar], combine_pixel_dist:int, vertical_padding:int=0):
|
|
'''
|
|
Merge identified bounding boxes containing PII that are very close to one another
|
|
'''
|
|
analysed_bounding_boxes = []
|
|
if len(analyser_results) > 0 and len(characters) > 0:
|
|
|
|
bounding_boxes = []
|
|
text_out = []
|
|
for result in analyser_results:
|
|
char_boxes = [char.bbox for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
|
char_text = [char._text for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
|
if char_boxes:
|
|
|
|
left = min(box[0] for box in char_boxes)
|
|
bottom = min(box[1] for box in char_boxes)
|
|
right = max(box[2] for box in char_boxes)
|
|
top = max(box[3] for box in char_boxes) + vertical_padding
|
|
bounding_boxes.append((bottom, left, result, [left, bottom, right, top], char_text))
|
|
|
|
char_text = "".join(char_text)
|
|
|
|
|
|
bounding_boxes.sort()
|
|
|
|
merged_bounding_boxes = []
|
|
current_box = None
|
|
current_y = None
|
|
current_result = None
|
|
current_text = []
|
|
|
|
for y, x, result, char_box, text in bounding_boxes:
|
|
|
|
|
|
|
|
if current_y is None or current_box is None:
|
|
current_box = char_box
|
|
current_y = char_box[1]
|
|
current_result = result
|
|
current_text = list(text)
|
|
|
|
else:
|
|
vertical_diff_bboxes = abs(char_box[1] - current_y)
|
|
horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
|
|
|
|
|
|
|
|
|
|
if (
|
|
vertical_diff_bboxes <= 5 and horizontal_diff_bboxes <= combine_pixel_dist
|
|
):
|
|
|
|
current_box[2] = char_box[2]
|
|
current_box[3] = max(current_box[3], char_box[3])
|
|
current_result.end = max(current_result.end, result.end)
|
|
try:
|
|
current_result.type = current_result.type + " - " + result.type
|
|
except:
|
|
print("Unable to append new result type.")
|
|
|
|
if current_text:
|
|
current_text.append(" ")
|
|
current_text.extend(text)
|
|
|
|
|
|
else:
|
|
merged_bounding_boxes.append(
|
|
{"text":"".join(current_text),"boundingBox": current_box, "result": current_result})
|
|
|
|
|
|
|
|
|
|
current_box = char_box
|
|
current_y = char_box[1]
|
|
current_result = result
|
|
current_text = list(text)
|
|
|
|
|
|
|
|
if current_box:
|
|
merged_bounding_boxes.append({"text":"".join(current_text), "boundingBox": current_box, "result": current_result})
|
|
|
|
|
|
if not merged_bounding_boxes:
|
|
analysed_bounding_boxes.extend(
|
|
{"text":text, "boundingBox": char.bbox, "result": result}
|
|
for result in analyser_results
|
|
for char in characters[result.start:result.end]
|
|
if isinstance(char, LTChar)
|
|
)
|
|
else:
|
|
analysed_bounding_boxes.extend(merged_bounding_boxes)
|
|
|
|
|
|
|
|
return analysed_bounding_boxes
|
|
|
|
def create_text_redaction_process_results(analyser_results, analysed_bounding_boxes, page_num):
|
|
decision_process_table = pd.DataFrame()
|
|
|
|
if len(analyser_results) > 0:
|
|
|
|
analysed_bounding_boxes_df_new = pd.DataFrame(analysed_bounding_boxes)
|
|
analysed_bounding_boxes_df_text = analysed_bounding_boxes_df_new['result'].astype(str).str.split(",",expand=True).replace(".*: ", "", regex=True)
|
|
analysed_bounding_boxes_df_text.columns = ["type", "start", "end", "score"]
|
|
analysed_bounding_boxes_df_new = pd.concat([analysed_bounding_boxes_df_new, analysed_bounding_boxes_df_text], axis = 1)
|
|
analysed_bounding_boxes_df_new['page'] = page_num + 1
|
|
decision_process_table = pd.concat([decision_process_table, analysed_bounding_boxes_df_new], axis = 0).drop('result', axis=1)
|
|
|
|
|
|
|
|
return decision_process_table
|
|
|
|
def create_annotations_for_bounding_boxes(analysed_bounding_boxes):
|
|
annotations_on_page = []
|
|
for analysed_bounding_box in analysed_bounding_boxes:
|
|
bounding_box = analysed_bounding_box["boundingBox"]
|
|
annotation = Dictionary(
|
|
Type=Name.Annot,
|
|
Subtype=Name.Square,
|
|
QuadPoints=[bounding_box[0], bounding_box[3], bounding_box[2], bounding_box[3],
|
|
bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[1]],
|
|
Rect=[bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[3]],
|
|
C=[0, 0, 0],
|
|
IC=[0, 0, 0],
|
|
CA=1,
|
|
T=analysed_bounding_box["result"].entity_type,
|
|
BS=Dictionary(
|
|
W=0,
|
|
S=Name.S
|
|
)
|
|
)
|
|
annotations_on_page.append(annotation)
|
|
return annotations_on_page
|
|
|
|
def redact_text_pdf(filename:str, prepared_pdf_image_path:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, page_min:int=0, page_max:int=999, analysis_type:str = "Simple text analysis - PDFs with selectable text", progress=Progress(track_tqdm=True)):
|
|
'''
|
|
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
|
'''
|
|
annotations_all_pages = []
|
|
all_image_annotations = []
|
|
page_text_outputs_all_pages = pd.DataFrame()
|
|
decision_process_table_all_pages = pd.DataFrame()
|
|
|
|
combine_pixel_dist = 20
|
|
|
|
|
|
pikepdf_pdf = Pdf.open(filename)
|
|
number_of_pages = len(pikepdf_pdf.pages)
|
|
|
|
|
|
pymupdf_doc = pymupdf.open(filename)
|
|
|
|
page_num = 0
|
|
|
|
|
|
if page_max > number_of_pages or page_max == 0:
|
|
page_max = number_of_pages
|
|
|
|
|
|
|
|
if page_min <= 0: page_min = 0
|
|
else: page_min = page_min - 1
|
|
|
|
print("Page range is",str(page_min + 1), "to", str(page_max))
|
|
|
|
for page_no in range(0, number_of_pages):
|
|
|
|
|
|
image = prepared_pdf_image_path[page_no]
|
|
|
|
image_annotations = {"image": image, "boxes": []}
|
|
|
|
pymupdf_page = pymupdf_doc.load_page(page_no)
|
|
|
|
print("Page number is:", str(page_no + 1))
|
|
|
|
if page_min <= page_no < page_max:
|
|
|
|
for page_layout in extract_pages(filename, page_numbers = [page_no], maxpages=1):
|
|
|
|
page_analyser_results = []
|
|
page_analysed_bounding_boxes = []
|
|
|
|
characters = []
|
|
annotations_on_page = []
|
|
decision_process_table_on_page = pd.DataFrame()
|
|
page_text_outputs = pd.DataFrame()
|
|
|
|
if analysis_type == "Simple text analysis - PDFs with selectable text":
|
|
for text_container in page_layout:
|
|
|
|
text_container_analyser_results = []
|
|
text_container_analysed_bounding_boxes = []
|
|
|
|
characters = get_text_container_characters(text_container)
|
|
|
|
|
|
line_level_text_results_list, line_characters = create_text_bounding_boxes_from_characters(characters)
|
|
|
|
|
|
|
|
|
|
if line_level_text_results_list:
|
|
|
|
line_level_text_results_df = pd.DataFrame([{
|
|
'page': page_no + 1,
|
|
'text': result.text,
|
|
'left': result.left,
|
|
'top': result.top,
|
|
'width': result.width,
|
|
'height': result.height
|
|
} for result in line_level_text_results_list])
|
|
|
|
page_text_outputs = pd.concat([page_text_outputs, line_level_text_results_df])
|
|
|
|
|
|
for i, text_line in enumerate(line_level_text_results_list):
|
|
text_line_analyzer_result = []
|
|
text_line_bounding_boxes = []
|
|
|
|
|
|
|
|
text_line_analyzer_result = analyse_text_container(text_line, language, chosen_redact_entities, score_threshold, allow_list)
|
|
|
|
|
|
if text_line_analyzer_result:
|
|
|
|
|
|
|
|
|
|
|
|
text_line_bounding_boxes = merge_text_bounding_boxes(text_line_analyzer_result, line_characters[i], combine_pixel_dist, vertical_padding = 0)
|
|
|
|
text_container_analyser_results.extend(text_line_analyzer_result)
|
|
text_container_analysed_bounding_boxes.extend(text_line_bounding_boxes)
|
|
|
|
|
|
|
|
|
|
page_analyser_results.extend(text_container_analyser_results)
|
|
page_analysed_bounding_boxes.extend(text_container_analysed_bounding_boxes)
|
|
|
|
|
|
annotations_on_page = create_annotations_for_bounding_boxes(page_analysed_bounding_boxes)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pymupdf_page, image_annotations = redact_page_with_pymupdf(pymupdf_page, annotations_on_page, image)
|
|
|
|
annotations_all_pages.extend([annotations_on_page])
|
|
|
|
print("For page number:", page_no, "there are", len(annotations_all_pages[page_num]), "annotations")
|
|
|
|
|
|
|
|
decision_process_table_on_page = create_text_redaction_process_results(page_analyser_results, page_analysed_bounding_boxes, page_num)
|
|
|
|
if not decision_process_table_on_page.empty:
|
|
decision_process_table_all_pages = pd.concat([decision_process_table_all_pages, decision_process_table_on_page])
|
|
|
|
if not page_text_outputs.empty:
|
|
page_text_outputs = page_text_outputs.sort_values(["top", "left"], ascending=[False, False]).reset_index(drop=True)
|
|
|
|
page_text_outputs_all_pages = pd.concat([page_text_outputs_all_pages, page_text_outputs])
|
|
|
|
all_image_annotations.append(image_annotations)
|
|
|
|
return pymupdf_doc, decision_process_table_all_pages, page_text_outputs_all_pages, all_image_annotations
|
|
|