Spaces:
Sleeping
Sleeping
Commit
·
ebf9010
1
Parent(s):
15026f7
Added 'Review redactions' tab to the app. You can now visually inspect suggested redactions and modify/add with a point and click interface.
Browse files- app.py +53 -9
- redaction_review.py +88 -0
- requirements.txt +1 -0
- tools/aws_functions.py +0 -1
- tools/file_conversion.py +41 -19
- tools/file_redaction.py +467 -293
- tools/redaction_review.py +211 -0
app.py
CHANGED
@@ -4,10 +4,13 @@ import socket
|
|
4 |
# By default TLDExtract will try to pull files from the internet. I have instead downloaded this file locally to avoid the requirement for an internet connection.
|
5 |
os.environ['TLDEXTRACT_CACHE'] = 'tld/.tld_set_snapshot'
|
6 |
|
|
|
|
|
7 |
from tools.helper_functions import ensure_output_folder_exists, add_folder_to_path, put_columns_in_df, get_connection_params, output_folder, get_or_create_env_var, reveal_feedback_buttons, wipe_logs, custom_regex_load
|
8 |
from tools.aws_functions import upload_file_to_s3
|
9 |
from tools.file_redaction import choose_and_run_redactor
|
10 |
from tools.file_conversion import prepare_image_or_pdf, get_input_file_names
|
|
|
11 |
from tools.data_anonymise import anonymise_data_files
|
12 |
from tools.auth import authenticate_user
|
13 |
#from tools.aws_functions import load_data_from_aws
|
@@ -53,6 +56,10 @@ with app:
|
|
53 |
session_hash_state = gr.State()
|
54 |
s3_output_folder_state = gr.State()
|
55 |
|
|
|
|
|
|
|
|
|
56 |
# Logging state
|
57 |
feedback_logs_state = gr.State(feedback_logs_folder + 'log.csv')
|
58 |
feedback_s3_logs_loc_state = gr.State(feedback_logs_folder)
|
@@ -65,9 +72,12 @@ with app:
|
|
65 |
session_hash_textbox = gr.Textbox(value="", visible=False) # Invisible text box to hold the session hash/username, Textract request metadata, data file names just for logging purposes.
|
66 |
textract_metadata_textbox = gr.Textbox(value="", visible=False)
|
67 |
doc_file_name_textbox = gr.Textbox(value="", visible=False)
|
|
|
68 |
data_file_name_textbox = gr.Textbox(value="", visible=False)
|
69 |
s3_logs_output_textbox = gr.Textbox(label="Feedback submission logs", visible=False)
|
70 |
estimated_time_taken_number = gr.Number(value=0.0, precision=1, visible=False) # This keeps track of the time taken to redact files for logging purposes.
|
|
|
|
|
71 |
|
72 |
###
|
73 |
# UI DESIGN
|
@@ -106,7 +116,29 @@ with app:
|
|
106 |
pdf_further_details_text = gr.Textbox(label="Please give more detailed feedback about the results:", visible=False)
|
107 |
pdf_submit_feedback_btn = gr.Button(value="Submit feedback", visible=False)
|
108 |
|
109 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
110 |
# TEXT / TABULAR DATA TAB
|
111 |
with gr.Tab(label="Open text or Excel/csv files"):
|
112 |
gr.Markdown(
|
@@ -170,17 +202,29 @@ with app:
|
|
170 |
###
|
171 |
# PDF/IMAGE REDACTION
|
172 |
###
|
173 |
-
in_doc_files.upload(fn=get_input_file_names, inputs=[in_doc_files], outputs=[doc_file_name_textbox])
|
174 |
|
175 |
-
document_redact_btn.click(fn = prepare_image_or_pdf, inputs=[in_doc_files, in_redaction_method, in_allow_list, text_documents_done, output_summary, first_loop_state], outputs=[output_summary, prepared_pdf_state], api_name="prepare_doc").\
|
176 |
-
then(fn = choose_and_run_redactor, inputs=[in_doc_files, prepared_pdf_state, in_redact_language, in_redact_entities, in_redaction_method, in_allow_list_state, text_documents_done, output_summary, output_file_list_state, log_files_output_list_state, first_loop_state, page_min, page_max, estimated_time_taken_number, handwrite_signature_checkbox, textract_metadata_textbox],
|
177 |
-
outputs=[output_summary, output_file, output_file_list_state, text_documents_done, log_files_output, log_files_output_list_state, estimated_time_taken_number, textract_metadata_textbox], api_name="redact_doc")
|
|
|
178 |
|
179 |
# If the output file count text box changes, keep going with redacting each document until done
|
180 |
-
text_documents_done.change(fn = prepare_image_or_pdf, inputs=[in_doc_files, in_redaction_method, in_allow_list, text_documents_done, output_summary, second_loop_state], outputs=[output_summary, prepared_pdf_state]).\
|
181 |
-
then(fn = choose_and_run_redactor, inputs=[in_doc_files, prepared_pdf_state, in_redact_language, in_redact_entities, in_redaction_method, in_allow_list_state, text_documents_done, output_summary, output_file_list_state, log_files_output_list_state, second_loop_state, page_min, page_max, estimated_time_taken_number, handwrite_signature_checkbox, textract_metadata_textbox],
|
182 |
-
outputs=[output_summary, output_file, output_file_list_state, text_documents_done, log_files_output, log_files_output_list_state, estimated_time_taken_number, textract_metadata_textbox]).\
|
183 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
184 |
|
185 |
###
|
186 |
# TABULAR DATA REDACTION
|
|
|
4 |
# By default TLDExtract will try to pull files from the internet. I have instead downloaded this file locally to avoid the requirement for an internet connection.
|
5 |
os.environ['TLDEXTRACT_CACHE'] = 'tld/.tld_set_snapshot'
|
6 |
|
7 |
+
from gradio_image_annotation import image_annotator
|
8 |
+
|
9 |
from tools.helper_functions import ensure_output_folder_exists, add_folder_to_path, put_columns_in_df, get_connection_params, output_folder, get_or_create_env_var, reveal_feedback_buttons, wipe_logs, custom_regex_load
|
10 |
from tools.aws_functions import upload_file_to_s3
|
11 |
from tools.file_redaction import choose_and_run_redactor
|
12 |
from tools.file_conversion import prepare_image_or_pdf, get_input_file_names
|
13 |
+
from tools.redaction_review import apply_redactions, crop, get_boxes_json, modify_existing_page_redactions, decrease_page, increase_page, update_annotator
|
14 |
from tools.data_anonymise import anonymise_data_files
|
15 |
from tools.auth import authenticate_user
|
16 |
#from tools.aws_functions import load_data_from_aws
|
|
|
56 |
session_hash_state = gr.State()
|
57 |
s3_output_folder_state = gr.State()
|
58 |
|
59 |
+
pdf_doc_state = gr.State([])
|
60 |
+
images_pdf_state = gr.State([]) # List of pdf pages converted to PIL images
|
61 |
+
all_image_annotations_state = gr.State([])
|
62 |
+
|
63 |
# Logging state
|
64 |
feedback_logs_state = gr.State(feedback_logs_folder + 'log.csv')
|
65 |
feedback_s3_logs_loc_state = gr.State(feedback_logs_folder)
|
|
|
72 |
session_hash_textbox = gr.Textbox(value="", visible=False) # Invisible text box to hold the session hash/username, Textract request metadata, data file names just for logging purposes.
|
73 |
textract_metadata_textbox = gr.Textbox(value="", visible=False)
|
74 |
doc_file_name_textbox = gr.Textbox(value="", visible=False)
|
75 |
+
doc_file_name_with_extension_textbox = gr.Textbox(value="", visible=False)
|
76 |
data_file_name_textbox = gr.Textbox(value="", visible=False)
|
77 |
s3_logs_output_textbox = gr.Textbox(label="Feedback submission logs", visible=False)
|
78 |
estimated_time_taken_number = gr.Number(value=0.0, precision=1, visible=False) # This keeps track of the time taken to redact files for logging purposes.
|
79 |
+
annotate_previous_page = gr.Number(value=1, label="Previous page", precision=0, visible=False) # Keeps track of the last page that the annotator was on
|
80 |
+
|
81 |
|
82 |
###
|
83 |
# UI DESIGN
|
|
|
116 |
pdf_further_details_text = gr.Textbox(label="Please give more detailed feedback about the results:", visible=False)
|
117 |
pdf_submit_feedback_btn = gr.Button(value="Submit feedback", visible=False)
|
118 |
|
119 |
+
# Object annotation
|
120 |
+
with gr.Tab("Review redactions", id="tab_object_annotation"):
|
121 |
+
|
122 |
+
with gr.Row():
|
123 |
+
annotation_last_page_button = gr.Button("Previous page")
|
124 |
+
annotate_current_page = gr.Number(value=1, label="Current page", precision=0)
|
125 |
+
|
126 |
+
annotation_next_page_button = gr.Button("Next page")
|
127 |
+
|
128 |
+
annotation_button_apply = gr.Button("Apply revised redactions", variant="primary")
|
129 |
+
|
130 |
+
annotator = image_annotator(
|
131 |
+
label="Modify redaction boxes",
|
132 |
+
label_list=["Redaction"],
|
133 |
+
label_colors=[(0, 0, 0)],
|
134 |
+
sources=None,#["upload"],
|
135 |
+
show_clear_button=False,
|
136 |
+
show_remove_button=False,
|
137 |
+
interactive=False
|
138 |
+
)
|
139 |
+
|
140 |
+
output_review_files = gr.File(label="Review output files")
|
141 |
+
|
142 |
# TEXT / TABULAR DATA TAB
|
143 |
with gr.Tab(label="Open text or Excel/csv files"):
|
144 |
gr.Markdown(
|
|
|
202 |
###
|
203 |
# PDF/IMAGE REDACTION
|
204 |
###
|
205 |
+
in_doc_files.upload(fn=get_input_file_names, inputs=[in_doc_files], outputs=[doc_file_name_textbox, doc_file_name_with_extension_textbox])
|
206 |
|
207 |
+
document_redact_btn.click(fn = prepare_image_or_pdf, inputs=[in_doc_files, in_redaction_method, in_allow_list, text_documents_done, output_summary, first_loop_state], outputs=[output_summary, prepared_pdf_state, images_pdf_state], api_name="prepare_doc").\
|
208 |
+
then(fn = choose_and_run_redactor, inputs=[in_doc_files, prepared_pdf_state, images_pdf_state, in_redact_language, in_redact_entities, in_redaction_method, in_allow_list_state, text_documents_done, output_summary, output_file_list_state, log_files_output_list_state, first_loop_state, page_min, page_max, estimated_time_taken_number, handwrite_signature_checkbox, textract_metadata_textbox, all_image_annotations_state, pdf_doc_state],
|
209 |
+
outputs=[output_summary, output_file, output_file_list_state, text_documents_done, log_files_output, log_files_output_list_state, estimated_time_taken_number, textract_metadata_textbox, pdf_doc_state, all_image_annotations_state], api_name="redact_doc").\
|
210 |
+
then(fn=update_annotator, inputs=[all_image_annotations_state, page_min], outputs=[annotator, annotate_current_page])
|
211 |
|
212 |
# If the output file count text box changes, keep going with redacting each document until done
|
213 |
+
text_documents_done.change(fn = prepare_image_or_pdf, inputs=[in_doc_files, in_redaction_method, in_allow_list, text_documents_done, output_summary, second_loop_state], outputs=[output_summary, prepared_pdf_state, images_pdf_state]).\
|
214 |
+
then(fn = choose_and_run_redactor, inputs=[in_doc_files, prepared_pdf_state, images_pdf_state, in_redact_language, in_redact_entities, in_redaction_method, in_allow_list_state, text_documents_done, output_summary, output_file_list_state, log_files_output_list_state, second_loop_state, page_min, page_max, estimated_time_taken_number, handwrite_signature_checkbox, textract_metadata_textbox, all_image_annotations_state, pdf_doc_state],
|
215 |
+
outputs=[output_summary, output_file, output_file_list_state, text_documents_done, log_files_output, log_files_output_list_state, estimated_time_taken_number, textract_metadata_textbox, pdf_doc_state, all_image_annotations_state]).\
|
216 |
+
then(fn=update_annotator, inputs=[all_image_annotations_state, page_min], outputs=[annotator, annotate_current_page]).\
|
217 |
+
then(fn = reveal_feedback_buttons, outputs=[pdf_feedback_radio, pdf_further_details_text, pdf_submit_feedback_btn, pdf_feedback_title])
|
218 |
+
|
219 |
+
annotate_current_page.change(
|
220 |
+
modify_existing_page_redactions, inputs = [annotator, annotate_current_page, annotate_previous_page, all_image_annotations_state], outputs = [all_image_annotations_state, annotate_previous_page]).\
|
221 |
+
then(update_annotator, inputs=[all_image_annotations_state, annotate_current_page], outputs = [annotator, annotate_current_page])
|
222 |
+
|
223 |
+
annotation_last_page_button.click(fn=decrease_page, inputs=[annotate_current_page], outputs=[annotate_current_page])
|
224 |
+
annotation_next_page_button.click(fn=increase_page, inputs=[annotate_current_page, all_image_annotations_state], outputs=[annotate_current_page])
|
225 |
+
|
226 |
+
#annotation_button_get.click(get_boxes_json, annotator, json_boxes)
|
227 |
+
annotation_button_apply.click(apply_redactions, inputs=[annotator, in_doc_files, pdf_doc_state, all_image_annotations_state, annotate_current_page], outputs=[pdf_doc_state, all_image_annotations_state, output_review_files], scroll_to_output=True)
|
228 |
|
229 |
###
|
230 |
# TABULAR DATA REDACTION
|
redaction_review.py
ADDED
@@ -0,0 +1,88 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import gradio as gr
|
2 |
+
from gradio_image_annotation import image_annotator
|
3 |
+
from gradio_image_annotation.image_annotator import AnnotatedImageData
|
4 |
+
|
5 |
+
from tools.file_conversion import is_pdf, convert_pdf_to_images
|
6 |
+
from tools.helper_functions import get_file_path_end, output_folder
|
7 |
+
from tools.file_redaction import redact_page_with_pymupdf
|
8 |
+
import json
|
9 |
+
import pymupdf
|
10 |
+
from PIL import ImageDraw, Image
|
11 |
+
|
12 |
+
file_path = "output/page_as_img_example_complaint_letter_pages_1.png"
|
13 |
+
#file_path = "examples/graduate-job-example-cover-letter.pdf"
|
14 |
+
|
15 |
+
|
16 |
+
if is_pdf(file_path):
|
17 |
+
images = convert_pdf_to_images(file_path)
|
18 |
+
image = images[0]
|
19 |
+
doc = pymupdf.open(file_path)
|
20 |
+
else:
|
21 |
+
doc = []
|
22 |
+
|
23 |
+
with open('output/gradio_annotation_boxes.json', 'r') as f:
|
24 |
+
gradio_annotation_boxes = json.load(f)
|
25 |
+
|
26 |
+
example_annotation = {
|
27 |
+
"image": file_path,
|
28 |
+
"boxes": gradio_annotation_boxes
|
29 |
+
}
|
30 |
+
|
31 |
+
def apply_redactions(image_annotated:AnnotatedImageData, file_path:str, doc=[]):
|
32 |
+
#print(image_annotated['image'])
|
33 |
+
|
34 |
+
file_base = get_file_path_end(file_path)
|
35 |
+
|
36 |
+
image = Image.fromarray(image_annotated['image'].astype('uint8'))
|
37 |
+
|
38 |
+
draw = ImageDraw.Draw(image)
|
39 |
+
|
40 |
+
if is_pdf(file_path) == False:
|
41 |
+
for img_annotation_box in image_annotated['boxes']:
|
42 |
+
coords = [img_annotation_box["xmin"],
|
43 |
+
img_annotation_box["ymin"],
|
44 |
+
img_annotation_box["xmax"],
|
45 |
+
img_annotation_box["ymax"]]
|
46 |
+
|
47 |
+
fill = img_annotation_box["color"]
|
48 |
+
|
49 |
+
draw.rectangle(coords, fill=fill)
|
50 |
+
|
51 |
+
image.save(output_folder + file_base + "_additional.png")
|
52 |
+
|
53 |
+
# If it's a pdf, assume a doc object is available
|
54 |
+
else:
|
55 |
+
doc = redact_page_with_pymupdf(doc, image_annotated, 1, image)
|
56 |
+
|
57 |
+
|
58 |
+
def crop(annotations):
|
59 |
+
if annotations["boxes"]:
|
60 |
+
box = annotations["boxes"][0]
|
61 |
+
return annotations["image"][
|
62 |
+
box["ymin"]:box["ymax"],
|
63 |
+
box["xmin"]:box["xmax"]
|
64 |
+
]
|
65 |
+
return None
|
66 |
+
|
67 |
+
def get_boxes_json(annotations):
|
68 |
+
return annotations["boxes"]
|
69 |
+
|
70 |
+
with gr.Blocks() as demo:
|
71 |
+
with gr.Tab("Object annotation", id="tab_object_annotation"):
|
72 |
+
|
73 |
+
doc_state = gr.State(doc)
|
74 |
+
|
75 |
+
file_path_textbox = gr.Textbox(value=file_path)
|
76 |
+
annotator = image_annotator(
|
77 |
+
example_annotation,
|
78 |
+
label_list=["Redaction"],
|
79 |
+
label_colors=[(0, 0, 0)],
|
80 |
+
)
|
81 |
+
button_get = gr.Button("Get bounding boxes")
|
82 |
+
button_apply = gr.Button("Apply redactions")
|
83 |
+
json_boxes = gr.JSON()
|
84 |
+
button_get.click(get_boxes_json, annotator, json_boxes)
|
85 |
+
button_apply.click(apply_redactions, inputs=[annotator, file_path_textbox, doc_state])
|
86 |
+
|
87 |
+
if __name__ == "__main__":
|
88 |
+
demo.launch(inbrowser=True)
|
requirements.txt
CHANGED
@@ -14,3 +14,4 @@ boto3==1.34.158
|
|
14 |
pyarrow==14.0.2
|
15 |
openpyxl==3.1.2
|
16 |
Faker==22.2.0
|
|
|
|
14 |
pyarrow==14.0.2
|
15 |
openpyxl==3.1.2
|
16 |
Faker==22.2.0
|
17 |
+
gradio_image_annotation==0.2.3
|
tools/aws_functions.py
CHANGED
@@ -14,7 +14,6 @@ aws_var_default = "0"
|
|
14 |
aws_var_val = get_or_create_env_var(aws_var, aws_var_default)
|
15 |
print(f'The value of {aws_var} is {aws_var_val}')
|
16 |
|
17 |
-
# Launch the Gradio app
|
18 |
AWS_REGION = get_or_create_env_var('AWS_REGION', 'eu-west-2')
|
19 |
print(f'The value of AWS_REGION is {AWS_REGION}')
|
20 |
|
|
|
14 |
aws_var_val = get_or_create_env_var(aws_var, aws_var_default)
|
15 |
print(f'The value of {aws_var} is {aws_var_val}')
|
16 |
|
|
|
17 |
AWS_REGION = get_or_create_env_var('AWS_REGION', 'eu-west-2')
|
18 |
print(f'The value of AWS_REGION is {AWS_REGION}')
|
19 |
|
tools/file_conversion.py
CHANGED
@@ -53,8 +53,18 @@ def convert_pdf_to_images(pdf_path:str, page_min:int = 0, progress=Progress(trac
|
|
53 |
print("Converting page: ", str(page_num + 1))
|
54 |
|
55 |
# Convert one page to image
|
56 |
-
|
57 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
58 |
|
59 |
# If no images are returned, break the loop
|
60 |
if not image:
|
@@ -64,7 +74,7 @@ def convert_pdf_to_images(pdf_path:str, page_min:int = 0, progress=Progress(trac
|
|
64 |
# print("Conversion of page", str(page_num), "to file succeeded.")
|
65 |
# print("image:", image)
|
66 |
|
67 |
-
|
68 |
|
69 |
images.extend(image)
|
70 |
|
@@ -105,6 +115,8 @@ def get_input_file_names(file_input):
|
|
105 |
|
106 |
all_relevant_files = []
|
107 |
|
|
|
|
|
108 |
for file in file_input:
|
109 |
file_path = file.name
|
110 |
print(file_path)
|
@@ -114,15 +126,17 @@ def get_input_file_names(file_input):
|
|
114 |
|
115 |
file_extension = os.path.splitext(file_path)[1].lower()
|
116 |
|
|
|
|
|
117 |
# Check if the file is an image type
|
118 |
-
if file_extension in ['.jpg', '.jpeg', '.png', '.xlsx', '.csv', '.parquet']:
|
119 |
all_relevant_files.append(file_path_without_ext)
|
120 |
|
121 |
all_relevant_files_str = ", ".join(all_relevant_files)
|
122 |
|
123 |
-
print("all_relevant_files_str:", all_relevant_files_str)
|
124 |
|
125 |
-
return all_relevant_files_str
|
126 |
|
127 |
def prepare_image_or_pdf(
|
128 |
file_paths: List[str],
|
@@ -154,7 +168,7 @@ def prepare_image_or_pdf(
|
|
154 |
|
155 |
tic = time.perf_counter()
|
156 |
|
157 |
-
# If out message or
|
158 |
if isinstance(out_message, str):
|
159 |
out_message = [out_message]
|
160 |
|
@@ -162,15 +176,17 @@ def prepare_image_or_pdf(
|
|
162 |
if first_loop_state==True:
|
163 |
latest_file_completed = 0
|
164 |
out_message = []
|
165 |
-
|
|
|
166 |
else:
|
167 |
print("Now attempting file:", str(latest_file_completed))
|
168 |
-
|
|
|
169 |
|
170 |
if not file_paths:
|
171 |
file_paths = []
|
172 |
|
173 |
-
#
|
174 |
|
175 |
latest_file_completed = int(latest_file_completed)
|
176 |
|
@@ -181,7 +197,7 @@ def prepare_image_or_pdf(
|
|
181 |
final_out_message = '\n'.join(out_message)
|
182 |
else:
|
183 |
final_out_message = out_message
|
184 |
-
return final_out_message,
|
185 |
|
186 |
#in_allow_list_flat = [item for sublist in in_allow_list for item in sublist]
|
187 |
|
@@ -217,27 +233,33 @@ def prepare_image_or_pdf(
|
|
217 |
if not file_path:
|
218 |
out_message = "No file selected"
|
219 |
print(out_message)
|
220 |
-
return out_message,
|
221 |
|
222 |
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
223 |
# Analyse and redact image-based pdf or image
|
224 |
if is_pdf_or_image(file_path) == False:
|
225 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
226 |
print(out_message)
|
227 |
-
return out_message,
|
228 |
|
229 |
-
|
230 |
-
|
|
|
231 |
|
232 |
elif in_redact_method == "Simple text analysis - PDFs with selectable text":
|
233 |
if is_pdf(file_path) == False:
|
234 |
out_message = "Please upload a PDF file for text analysis."
|
235 |
print(out_message)
|
236 |
-
return out_message,
|
|
|
|
|
|
|
237 |
|
238 |
-
out_file_path = file_path
|
239 |
|
240 |
-
|
|
|
|
|
|
|
241 |
|
242 |
toc = time.perf_counter()
|
243 |
out_time = f"File '{file_path_without_ext}' prepared in {toc - tic:0.1f} seconds."
|
@@ -247,7 +269,7 @@ def prepare_image_or_pdf(
|
|
247 |
out_message.append(out_time)
|
248 |
out_message_out = '\n'.join(out_message)
|
249 |
|
250 |
-
return out_message_out,
|
251 |
|
252 |
def convert_text_pdf_to_img_pdf(in_file_path:str, out_text_file_path:List[str]):
|
253 |
file_path_without_ext = get_file_path_end(in_file_path)
|
@@ -270,4 +292,4 @@ def convert_text_pdf_to_img_pdf(in_file_path:str, out_text_file_path:List[str]):
|
|
270 |
|
271 |
#print("Out file paths:", out_file_paths)
|
272 |
|
273 |
-
return out_message, out_file_paths
|
|
|
53 |
print("Converting page: ", str(page_num + 1))
|
54 |
|
55 |
# Convert one page to image
|
56 |
+
out_path = pdf_path + "_" + str(page_num) + ".png"
|
57 |
|
58 |
+
# Ensure the directory exists
|
59 |
+
os.makedirs(os.path.dirname(out_path), exist_ok=True)
|
60 |
+
|
61 |
+
# Check if the image already exists
|
62 |
+
if os.path.exists(out_path):
|
63 |
+
print(f"Loading existing image from {out_path}.")
|
64 |
+
image = [Image.open(out_path)] # Load the existing image
|
65 |
+
else:
|
66 |
+
image = convert_from_path(pdf_path, first_page=page_num+1, last_page=page_num+1, dpi=300, use_cropbox=True, use_pdftocairo=False)
|
67 |
+
image[0].save(out_path, format="PNG") # Save the new image
|
68 |
|
69 |
# If no images are returned, break the loop
|
70 |
if not image:
|
|
|
74 |
# print("Conversion of page", str(page_num), "to file succeeded.")
|
75 |
# print("image:", image)
|
76 |
|
77 |
+
|
78 |
|
79 |
images.extend(image)
|
80 |
|
|
|
115 |
|
116 |
all_relevant_files = []
|
117 |
|
118 |
+
#print("file_input:", file_input)
|
119 |
+
|
120 |
for file in file_input:
|
121 |
file_path = file.name
|
122 |
print(file_path)
|
|
|
126 |
|
127 |
file_extension = os.path.splitext(file_path)[1].lower()
|
128 |
|
129 |
+
file_name_with_extension = file_path_without_ext + file_extension
|
130 |
+
|
131 |
# Check if the file is an image type
|
132 |
+
if file_extension in ['.jpg', '.jpeg', '.png', '.pdf', '.xlsx', '.csv', '.parquet']:
|
133 |
all_relevant_files.append(file_path_without_ext)
|
134 |
|
135 |
all_relevant_files_str = ", ".join(all_relevant_files)
|
136 |
|
137 |
+
#print("all_relevant_files_str:", all_relevant_files_str)
|
138 |
|
139 |
+
return all_relevant_files_str, file_name_with_extension
|
140 |
|
141 |
def prepare_image_or_pdf(
|
142 |
file_paths: List[str],
|
|
|
168 |
|
169 |
tic = time.perf_counter()
|
170 |
|
171 |
+
# If out message or converted_file_paths are blank, change to a list so it can be appended to
|
172 |
if isinstance(out_message, str):
|
173 |
out_message = [out_message]
|
174 |
|
|
|
176 |
if first_loop_state==True:
|
177 |
latest_file_completed = 0
|
178 |
out_message = []
|
179 |
+
converted_file_paths = []
|
180 |
+
image_file_paths = []
|
181 |
else:
|
182 |
print("Now attempting file:", str(latest_file_completed))
|
183 |
+
converted_file_paths = []
|
184 |
+
image_file_paths = []
|
185 |
|
186 |
if not file_paths:
|
187 |
file_paths = []
|
188 |
|
189 |
+
#converted_file_paths = file_paths
|
190 |
|
191 |
latest_file_completed = int(latest_file_completed)
|
192 |
|
|
|
197 |
final_out_message = '\n'.join(out_message)
|
198 |
else:
|
199 |
final_out_message = out_message
|
200 |
+
return final_out_message, converted_file_paths, image_file_paths
|
201 |
|
202 |
#in_allow_list_flat = [item for sublist in in_allow_list for item in sublist]
|
203 |
|
|
|
233 |
if not file_path:
|
234 |
out_message = "No file selected"
|
235 |
print(out_message)
|
236 |
+
return out_message, converted_file_paths, image_file_paths
|
237 |
|
238 |
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
239 |
# Analyse and redact image-based pdf or image
|
240 |
if is_pdf_or_image(file_path) == False:
|
241 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
242 |
print(out_message)
|
243 |
+
return out_message, converted_file_paths, image_file_paths
|
244 |
|
245 |
+
converted_file_path = process_file(file_path)
|
246 |
+
image_file_path = converted_file_path
|
247 |
+
#print("Out file path at image conversion step:", converted_file_path)
|
248 |
|
249 |
elif in_redact_method == "Simple text analysis - PDFs with selectable text":
|
250 |
if is_pdf(file_path) == False:
|
251 |
out_message = "Please upload a PDF file for text analysis."
|
252 |
print(out_message)
|
253 |
+
return out_message, converted_file_paths, image_file_paths
|
254 |
+
|
255 |
+
converted_file_path = file_path # Pikepdf works with the basic unconverted pdf file
|
256 |
+
image_file_path = process_file(file_path)
|
257 |
|
|
|
258 |
|
259 |
+
converted_file_paths.append(converted_file_path)
|
260 |
+
image_file_paths.extend(image_file_path)
|
261 |
+
|
262 |
+
#print("file conversion image_file_paths:", image_file_paths)
|
263 |
|
264 |
toc = time.perf_counter()
|
265 |
out_time = f"File '{file_path_without_ext}' prepared in {toc - tic:0.1f} seconds."
|
|
|
269 |
out_message.append(out_time)
|
270 |
out_message_out = '\n'.join(out_message)
|
271 |
|
272 |
+
return out_message_out, converted_file_paths, image_file_paths
|
273 |
|
274 |
def convert_text_pdf_to_img_pdf(in_file_path:str, out_text_file_path:List[str]):
|
275 |
file_path_without_ext = get_file_path_end(in_file_path)
|
|
|
292 |
|
293 |
#print("Out file paths:", out_file_paths)
|
294 |
|
295 |
+
return out_message, out_file_paths
|
tools/file_redaction.py
CHANGED
@@ -4,7 +4,7 @@ import json
|
|
4 |
import io
|
5 |
import os
|
6 |
from PIL import Image, ImageChops, ImageDraw
|
7 |
-
from typing import List, Dict
|
8 |
import pandas as pd
|
9 |
|
10 |
#from presidio_image_redactor.entities import ImageRecognizerResult
|
@@ -12,13 +12,11 @@ from pdfminer.high_level import extract_pages
|
|
12 |
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine, LTTextLineHorizontal, LTAnno
|
13 |
from pikepdf import Pdf, Dictionary, Name
|
14 |
import pymupdf
|
15 |
-
from pymupdf import Rect
|
|
|
16 |
|
17 |
import gradio as gr
|
18 |
from gradio import Progress
|
19 |
-
|
20 |
-
from typing import Tuple
|
21 |
-
|
22 |
from collections import defaultdict # For efficient grouping
|
23 |
|
24 |
from tools.custom_image_analyser_engine import CustomImageAnalyzerEngine, OCRResult, combine_ocr_results, CustomImageRecognizerResult
|
@@ -50,7 +48,7 @@ def sum_numbers_before_seconds(string:str):
|
|
50 |
|
51 |
return sum_of_numbers
|
52 |
|
53 |
-
def choose_and_run_redactor(file_paths:List[str],
|
54 |
'''
|
55 |
Based on the type of redaction selected, pass the document file content onto the relevant function and return a redacted document plus processing logs.
|
56 |
'''
|
@@ -63,6 +61,7 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
63 |
latest_file_completed = 0
|
64 |
#out_message = []
|
65 |
out_file_paths = []
|
|
|
66 |
|
67 |
# If out message is string or out_file_paths are blank, change to a list so it can be appended to
|
68 |
if isinstance(out_message, str):
|
@@ -73,9 +72,11 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
73 |
|
74 |
latest_file_completed = int(latest_file_completed)
|
75 |
|
|
|
|
|
76 |
# If we have already redacted the last file, return the input out_message and file list to the relevant components
|
77 |
if latest_file_completed >= len(file_paths):
|
78 |
-
print("Last file reached")
|
79 |
# Set to a very high number so as not to mix up with subsequent file processing by the user
|
80 |
latest_file_completed = 99
|
81 |
final_out_message = '\n'.join(out_message)
|
@@ -84,7 +85,9 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
84 |
estimate_total_processing_time = sum_numbers_before_seconds(final_out_message)
|
85 |
print("Estimated total processing time:", str(estimate_total_processing_time))
|
86 |
|
87 |
-
|
|
|
|
|
88 |
|
89 |
file_paths_loop = [file_paths[int(latest_file_completed)]]
|
90 |
|
@@ -110,26 +113,26 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
110 |
else:
|
111 |
out_message = "No file selected"
|
112 |
print(out_message)
|
113 |
-
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
|
114 |
|
115 |
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
116 |
#Analyse and redact image-based pdf or image
|
117 |
if is_pdf_or_image(file_path) == False:
|
118 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
119 |
-
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
|
120 |
|
121 |
print("Redacting file " + file_path_without_ext + " as an image-based file")
|
122 |
|
123 |
-
|
124 |
|
125 |
# Save file
|
126 |
if is_pdf(file_path) == False:
|
127 |
out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
|
128 |
-
|
129 |
|
130 |
else:
|
131 |
out_image_file_path = output_folder + file_path_without_ext + "_redacted.pdf"
|
132 |
-
|
133 |
|
134 |
out_file_paths.append(out_image_file_path)
|
135 |
if logging_file_paths:
|
@@ -137,12 +140,6 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
137 |
|
138 |
out_message.append("File '" + file_path_without_ext + "' successfully redacted")
|
139 |
|
140 |
-
# Save decision making process
|
141 |
-
# output_logs_str = str(output_logs)
|
142 |
-
# logs_output_file_name = out_image_file_path + "_decision_process_output.txt"
|
143 |
-
# with open(logs_output_file_name, "w") as f:
|
144 |
-
# f.write(output_logs_str)
|
145 |
-
# log_files_output_paths.append(logs_output_file_name)
|
146 |
|
147 |
logs_output_file_name = out_image_file_path + "_decision_process_output.csv"
|
148 |
redaction_logs.to_csv(logs_output_file_name)
|
@@ -160,14 +157,15 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
160 |
|
161 |
elif in_redact_method == "Simple text analysis - PDFs with selectable text":
|
162 |
|
163 |
-
print("file_path:", file_path)
|
164 |
|
165 |
if is_pdf(file_path) == False:
|
166 |
-
|
|
|
167 |
|
168 |
# Analyse text-based pdf
|
169 |
print('Redacting file as text-based PDF')
|
170 |
-
pdf_text, decision_process_logs, page_text_outputs = redact_text_pdf(file_path, language, chosen_redact_entities, in_allow_list_flat, page_min, page_max, "Simple text analysis - PDFs with selectable text")
|
171 |
|
172 |
out_text_file_path = output_folder + file_path_without_ext + "_text_redacted.pdf"
|
173 |
pdf_text.save(out_text_file_path)
|
@@ -200,7 +198,7 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
200 |
else:
|
201 |
out_message = "No redaction method selected"
|
202 |
print(out_message)
|
203 |
-
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
|
204 |
|
205 |
toc = time.perf_counter()
|
206 |
out_time = f"in {toc - tic:0.1f} seconds."
|
@@ -223,11 +221,132 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
223 |
log_files_output_paths.append(all_request_metadata_file_path)
|
224 |
|
225 |
|
226 |
-
return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
227 |
|
228 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
229 |
|
230 |
-
page = doc.load_page(page_no)
|
231 |
mediabox_height = page.mediabox[3] - page.mediabox[1]
|
232 |
mediabox_width = page.mediabox[2] - page.mediabox[0]
|
233 |
rect_height = page.rect.height
|
@@ -236,62 +355,91 @@ def redact_page_with_pymupdf(doc, annotations_on_page, page_no, image = None):#,
|
|
236 |
#print("page_rect_height:", page.rect.height)
|
237 |
#print("page mediabox size:", page.mediabox[3] - page.mediabox[1])
|
238 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
239 |
for annot in annotations_on_page:
|
240 |
-
|
241 |
-
|
|
|
|
|
242 |
|
243 |
-
|
244 |
-
scale_width = rect_width / image_page_width
|
245 |
-
scale_height = rect_height / image_page_height
|
246 |
|
247 |
-
#
|
248 |
-
|
|
|
|
|
|
|
|
|
|
|
249 |
|
250 |
-
|
251 |
|
252 |
-
#
|
253 |
-
|
254 |
-
|
255 |
-
x2 = ((annot.left + annot.width) * scale_width)# + page_x_adjust # Calculate x1
|
256 |
-
new_y2 = ((annot.top + annot.height) * scale_height)# - page_y_adjust # Calculate y1 correctly
|
257 |
|
258 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
259 |
|
260 |
-
|
261 |
-
# Calculate scaling factors
|
262 |
-
scale_height = rect_height / mediabox_height if mediabox_height else 1
|
263 |
-
scale_width = rect_width / mediabox_width if mediabox_width else 1
|
264 |
|
265 |
-
|
266 |
-
|
267 |
-
|
268 |
|
269 |
-
|
270 |
-
# Extract the /Rect field
|
271 |
-
rect_field = annot["/Rect"]
|
272 |
|
273 |
-
|
274 |
-
rect_coordinates = [float(coord) for coord in rect_field]
|
275 |
|
276 |
-
|
277 |
-
|
278 |
-
x1 = x1 + page_x_adjust
|
279 |
-
new_y1 = (rect_height - y2) - page_y_adjust
|
280 |
-
x2 = x2 + page_x_adjust
|
281 |
-
new_y2 = (rect_height - y1) - page_y_adjust
|
282 |
|
283 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
284 |
|
285 |
# Convert to a PyMuPDF Rect object
|
286 |
#rect = Rect(rect_coordinates)
|
287 |
|
288 |
-
|
289 |
-
|
290 |
-
|
291 |
-
|
292 |
-
|
293 |
-
# Add a redaction annotation
|
294 |
-
#page.add_redact_annot(rect)
|
295 |
|
296 |
# Add the annotation to the middle of the character line, so that it doesn't delete text from adjacent lines
|
297 |
page.add_redact_annot(rect_single_pixel_height)
|
@@ -302,10 +450,18 @@ def redact_page_with_pymupdf(doc, annotations_on_page, page_no, image = None):#,
|
|
302 |
shape.finish(color=(0, 0, 0), fill=(0, 0, 0)) # Black fill for the rectangle
|
303 |
shape.commit()
|
304 |
|
|
|
|
|
|
|
|
|
|
|
305 |
page.apply_redactions(images=0, graphics=0)
|
306 |
page.clean_contents()
|
307 |
|
308 |
-
|
|
|
|
|
|
|
309 |
|
310 |
def bounding_boxes_overlap(box1, box2):
|
311 |
"""Check if two bounding boxes overlap."""
|
@@ -329,6 +485,7 @@ def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_result
|
|
329 |
# Reconstruct bounding boxes for substrings of interest
|
330 |
reconstructed_bboxes = []
|
331 |
for bbox in bboxes:
|
|
|
332 |
bbox_box = (bbox.left, bbox.top, bbox.left + bbox.width, bbox.top + bbox.height)
|
333 |
for line_text, line_info in combined_results.items():
|
334 |
line_box = line_info['bounding_box']
|
@@ -350,7 +507,7 @@ def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_result
|
|
350 |
current_char += 1 # +1 for space if the word doesn't already end with a space
|
351 |
|
352 |
if relevant_words:
|
353 |
-
print("Relevant words:", relevant_words)
|
354 |
left = min(word['bounding_box'][0] for word in relevant_words)
|
355 |
top = min(word['bounding_box'][1] for word in relevant_words)
|
356 |
right = max(word['bounding_box'][2] for word in relevant_words)
|
@@ -358,6 +515,11 @@ def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_result
|
|
358 |
|
359 |
# Combine the text of all relevant words
|
360 |
combined_text = " ".join(word['text'] for word in relevant_words)
|
|
|
|
|
|
|
|
|
|
|
361 |
|
362 |
reconstructed_bbox = CustomImageRecognizerResult(
|
363 |
bbox.entity_type,
|
@@ -393,12 +555,19 @@ def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_result
|
|
393 |
else:
|
394 |
new_text = merged_box.text + " " + next_box.text
|
395 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
396 |
new_left = min(merged_box.left, next_box.left)
|
397 |
new_top = min(merged_box.top, next_box.top)
|
398 |
new_width = max(merged_box.left + merged_box.width, next_box.left + next_box.width) - new_left
|
399 |
new_height = max(merged_box.top + merged_box.height, next_box.top + next_box.height) - new_top
|
400 |
merged_box = CustomImageRecognizerResult(
|
401 |
-
|
402 |
)
|
403 |
else:
|
404 |
merged_bboxes.append(merged_box)
|
@@ -408,7 +577,7 @@ def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_result
|
|
408 |
|
409 |
return merged_bboxes
|
410 |
|
411 |
-
def redact_image_pdf(file_path:str,
|
412 |
'''
|
413 |
Take an path for an image of a document, then run this image through the Presidio ImageAnalyzer and PIL to get a redacted page back. Adapted from Presidio ImageRedactorEngine.
|
414 |
'''
|
@@ -418,24 +587,25 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
418 |
fill = (0, 0, 0) # Fill colour
|
419 |
decision_process_output_str = ""
|
420 |
images = []
|
|
|
421 |
#request_metadata = {}
|
422 |
image_analyser = CustomImageAnalyzerEngine(nlp_analyser)
|
423 |
|
424 |
# Also open as pymupdf pdf to apply annotations later on
|
425 |
-
|
426 |
|
427 |
-
if not
|
428 |
out_message = "PDF does not exist as images. Converting pages to image"
|
429 |
print(out_message)
|
430 |
|
431 |
-
|
432 |
|
433 |
-
if not isinstance(
|
434 |
-
print("Converting
|
435 |
-
|
436 |
|
437 |
-
#print("Image paths:",
|
438 |
-
number_of_pages = len(
|
439 |
|
440 |
print("Number of pages:", str(number_of_pages))
|
441 |
|
@@ -464,57 +634,37 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
464 |
if analysis_type == "Quick image analysis - typed text": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".csv"
|
465 |
elif analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.csv"
|
466 |
|
467 |
-
for
|
468 |
handwriting_or_signature_boxes = []
|
469 |
signature_recogniser_results = []
|
470 |
handwriting_recogniser_results = []
|
471 |
|
|
|
|
|
472 |
try:
|
473 |
-
image =
|
474 |
-
print("
|
475 |
-
#print("image:", image)
|
476 |
except Exception as e:
|
477 |
-
print("Could not redact page:",
|
478 |
print(e)
|
479 |
continue
|
480 |
|
481 |
-
|
482 |
|
483 |
-
|
|
|
|
|
|
|
484 |
|
485 |
reported_page_number = str(i + 1)
|
486 |
|
487 |
print("Redacting page", reported_page_number)
|
488 |
|
489 |
-
|
490 |
-
# Assuming image_paths[i] is your PIL image object
|
491 |
-
try:
|
492 |
-
image = image_paths[0][i]#.copy()
|
493 |
-
#print("image:", image)
|
494 |
-
except Exception as e:
|
495 |
-
print("Could not redact page:", reported_page_number, "due to:")
|
496 |
-
print(e)
|
497 |
-
continue
|
498 |
|
499 |
# Need image size to convert textract OCR outputs to the correct sizes
|
500 |
page_width, page_height = image.size
|
501 |
|
502 |
-
|
503 |
-
# Get the dimensions of the page in points with pymupdf to get relative scale
|
504 |
-
#page = doc.load_page(i)
|
505 |
-
#mu_page_rect = page.rect
|
506 |
-
#mu_page_width = mu_page_rect.width
|
507 |
-
#mu_page_height = max(mu_page_rect.height, page.mediabox[3] - page.mediabox[1])
|
508 |
-
#mu_page_width = max(mu_page_rect.width, page.mediabox[2] - page.mediabox[0])
|
509 |
-
#mu_page_height = mu_page_rect.height
|
510 |
-
|
511 |
-
# Calculate scaling factors between PIL image and pymupdf
|
512 |
-
#scale_width = mu_page_width / page_width
|
513 |
-
#scale_height = mu_page_height / page_height
|
514 |
-
|
515 |
-
#scale = (scale_width, scale_height)
|
516 |
-
|
517 |
-
|
518 |
# Possibility to use different languages
|
519 |
if language == 'en':
|
520 |
ocr_lang = 'eng'
|
@@ -559,21 +709,19 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
559 |
|
560 |
line_level_ocr_results, handwriting_or_signature_boxes, signature_recogniser_results, handwriting_recogniser_results, line_level_ocr_results_with_children = json_to_ocrresult(text_blocks, page_width, page_height)
|
561 |
|
562 |
-
# Save ocr_with_children_output
|
563 |
-
# ocr_results_with_children_str = str(line_level_ocr_results_with_children)
|
564 |
-
# logs_output_file_name = output_folder + "ocr_with_children_textract.txt"
|
565 |
-
# with open(logs_output_file_name, "w") as f:
|
566 |
-
# f.write(ocr_results_with_children_str)
|
567 |
-
|
568 |
# Step 2: Analyze text and identify PII
|
569 |
-
|
570 |
-
|
571 |
-
|
572 |
-
|
573 |
-
|
574 |
-
|
575 |
-
|
576 |
-
|
|
|
|
|
|
|
|
|
577 |
|
578 |
if analysis_type == "Quick image analysis - typed text": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".txt"
|
579 |
elif analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.txt"
|
@@ -586,30 +734,62 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
586 |
# Merge close bounding boxes
|
587 |
merged_redaction_bboxes = merge_img_bboxes(redaction_bboxes, line_level_ocr_results_with_children, signature_recogniser_results, handwriting_recogniser_results, handwrite_signature_checkbox)
|
588 |
|
589 |
-
|
|
|
590 |
|
591 |
# 3. Draw the merged boxes
|
|
|
592 |
if is_pdf(file_path) == False:
|
593 |
draw = ImageDraw.Draw(image)
|
594 |
|
|
|
|
|
595 |
for box in merged_redaction_bboxes:
|
|
|
|
|
596 |
x0 = box.left
|
597 |
y0 = box.top
|
598 |
x1 = x0 + box.width
|
599 |
y1 = y0 + box.height
|
600 |
-
draw.rectangle([x0, y0, x1, y1], fill=fill)
|
601 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
602 |
|
603 |
## Apply annotations with pymupdf
|
604 |
else:
|
605 |
-
|
606 |
-
|
607 |
-
#doc.save("image_redact.pdf")
|
608 |
|
609 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
610 |
|
611 |
-
|
612 |
-
#all_ocr_results.append(line_level_ocr_results_str)
|
613 |
|
614 |
# Convert to DataFrame and add to ongoing logging table
|
615 |
line_level_ocr_results_df = pd.DataFrame([{
|
@@ -623,43 +803,21 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
623 |
|
624 |
all_line_level_ocr_results_df = pd.concat([all_line_level_ocr_results_df, line_level_ocr_results_df])
|
625 |
|
626 |
-
# Convert decision process to table
|
627 |
-
# Export the decision making process
|
628 |
-
if merged_redaction_bboxes:
|
629 |
-
# for bbox in merged_redaction_bboxes:
|
630 |
-
# print(f"Entity: {bbox.entity_type}, Text: {bbox.text}, Bbox: ({bbox.left}, {bbox.top}, {bbox.width}, {bbox.height})")
|
631 |
-
|
632 |
-
#decision_process_output_str = "Page " + reported_page_number + ":\n" + str(merged_redaction_bboxes)
|
633 |
-
#all_decision_process.append(decision_process_output_str)
|
634 |
-
|
635 |
-
decision_process_table = pd.DataFrame([{
|
636 |
-
'page': reported_page_number,
|
637 |
-
'entity_type': result.entity_type,
|
638 |
-
'start': result.start,
|
639 |
-
'end': result.end,
|
640 |
-
'score': result.score,
|
641 |
-
'left': result.left,
|
642 |
-
'top': result.top,
|
643 |
-
'width': result.width,
|
644 |
-
'height': result.height,
|
645 |
-
'text': result.text
|
646 |
-
} for result in merged_redaction_bboxes])
|
647 |
-
|
648 |
-
all_decision_process_table = pd.concat([all_decision_process_table, decision_process_table])
|
649 |
-
|
650 |
if is_pdf(file_path) == False:
|
651 |
images.append(image)
|
652 |
-
|
653 |
|
654 |
-
|
655 |
-
# line_level_ocr_results_out = "\n".join(all_ocr_results)
|
656 |
-
# with open(ocr_results_file_path, "w") as f:
|
657 |
-
# f.write(line_level_ocr_results_out)
|
658 |
|
659 |
all_line_level_ocr_results_df.to_csv(ocr_results_file_path)
|
660 |
logging_file_paths.append(ocr_results_file_path)
|
661 |
|
662 |
-
return
|
|
|
|
|
|
|
|
|
|
|
663 |
|
664 |
def get_text_container_characters(text_container:LTTextContainer):
|
665 |
|
@@ -672,23 +830,27 @@ def get_text_container_characters(text_container:LTTextContainer):
|
|
672 |
return characters
|
673 |
return []
|
674 |
|
675 |
-
|
676 |
-
def analyze_text_container(text_container:OCRResult, language:str, chosen_redact_entities:List[str], score_threshold:float, allow_list:List[str]):
|
677 |
'''
|
678 |
Take text and bounding boxes in OCRResult format and analyze it for PII using spacy and the Microsoft Presidio package.
|
679 |
'''
|
680 |
|
|
|
|
|
681 |
text_to_analyze = text_container.text
|
682 |
#print("text_to_analyze:", text_to_analyze)
|
683 |
|
684 |
-
|
685 |
-
|
686 |
-
|
687 |
-
|
688 |
-
|
689 |
-
|
690 |
-
|
691 |
|
|
|
|
|
|
|
692 |
|
693 |
def create_text_bounding_boxes_from_characters(char_objects:List[LTChar]) -> Tuple[List[OCRResult], List[LTChar]]:
|
694 |
'''
|
@@ -768,16 +930,16 @@ def create_text_bounding_boxes_from_characters(char_objects:List[LTChar]) -> Tup
|
|
768 |
|
769 |
return line_level_results_out, line_level_characters_out # Return both results and character objects
|
770 |
|
771 |
-
def merge_text_bounding_boxes(
|
772 |
'''
|
773 |
Merge identified bounding boxes containing PII that are very close to one another
|
774 |
'''
|
775 |
-
|
776 |
-
if len(
|
777 |
# Extract bounding box coordinates for sorting
|
778 |
bounding_boxes = []
|
779 |
text_out = []
|
780 |
-
for result in
|
781 |
char_boxes = [char.bbox for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
782 |
char_text = [char._text for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
783 |
if char_boxes:
|
@@ -823,14 +985,21 @@ def merge_text_bounding_boxes(analyzer_results:CustomImageRecognizerResult, char
|
|
823 |
current_box[2] = char_box[2] # Extend the current box horizontally
|
824 |
current_box[3] = max(current_box[3], char_box[3]) # Ensure the top is the highest
|
825 |
current_result.end = max(current_result.end, result.end) # Extend the text range
|
|
|
|
|
|
|
|
|
826 |
# Add a space if current_text is not empty
|
827 |
if current_text:
|
828 |
current_text.append(" ") # Add space between texts
|
829 |
current_text.extend(text)
|
|
|
|
|
830 |
else:
|
831 |
merged_bounding_boxes.append(
|
832 |
{"text":"".join(current_text),"boundingBox": current_box, "result": current_result})
|
833 |
#print(f"Appending merged box: {current_box}")
|
|
|
834 |
|
835 |
# Reset current_box and current_y after appending
|
836 |
current_box = char_box
|
@@ -845,39 +1014,39 @@ def merge_text_bounding_boxes(analyzer_results:CustomImageRecognizerResult, char
|
|
845 |
#print(f"Appending final box for result: {current_box}")
|
846 |
|
847 |
if not merged_bounding_boxes:
|
848 |
-
|
849 |
{"text":text, "boundingBox": char.bbox, "result": result}
|
850 |
-
for result in
|
851 |
for char in characters[result.start:result.end]
|
852 |
if isinstance(char, LTChar)
|
853 |
)
|
854 |
else:
|
855 |
-
|
856 |
|
857 |
-
#print("Analyzed bounding boxes:\n\n",
|
858 |
|
859 |
-
return
|
860 |
|
861 |
-
def create_text_redaction_process_results(
|
862 |
decision_process_table = pd.DataFrame()
|
863 |
|
864 |
-
if len(
|
865 |
# Create summary df of annotations to be made
|
866 |
-
|
867 |
-
|
868 |
-
|
869 |
-
|
870 |
-
|
871 |
-
decision_process_table = pd.concat([decision_process_table,
|
872 |
|
873 |
#print('\n\ndecision_process_table:\n\n', decision_process_table)
|
874 |
|
875 |
return decision_process_table
|
876 |
|
877 |
-
def create_annotations_for_bounding_boxes(
|
878 |
annotations_on_page = []
|
879 |
-
for
|
880 |
-
bounding_box =
|
881 |
annotation = Dictionary(
|
882 |
Type=Name.Annot,
|
883 |
Subtype=Name.Square, #Name.Highlight,
|
@@ -887,7 +1056,7 @@ def create_annotations_for_bounding_boxes(analyzed_bounding_boxes):
|
|
887 |
C=[0, 0, 0],
|
888 |
IC=[0, 0, 0],
|
889 |
CA=1, # Transparency
|
890 |
-
T=
|
891 |
BS=Dictionary(
|
892 |
W=0, # Border width: 1 point
|
893 |
S=Name.S # Border style: solid
|
@@ -896,23 +1065,25 @@ def create_annotations_for_bounding_boxes(analyzed_bounding_boxes):
|
|
896 |
annotations_on_page.append(annotation)
|
897 |
return annotations_on_page
|
898 |
|
899 |
-
def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, page_min:int=0, page_max:int=999, analysis_type:str = "Simple text analysis - PDFs with selectable text", progress=Progress(track_tqdm=True)):
|
900 |
'''
|
901 |
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
902 |
'''
|
903 |
annotations_all_pages = []
|
|
|
904 |
page_text_outputs_all_pages = pd.DataFrame()
|
905 |
decision_process_table_all_pages = pd.DataFrame()
|
906 |
|
907 |
combine_pixel_dist = 20 # Horizontal distance between PII bounding boxes under/equal they are combined into one
|
908 |
|
909 |
# Open with Pikepdf to get text lines
|
910 |
-
|
911 |
-
|
912 |
-
doc = pymupdf.open(filename)
|
913 |
-
page_num = 0
|
914 |
|
915 |
-
|
|
|
|
|
|
|
916 |
|
917 |
# Check that page_min and page_max are within expected ranges
|
918 |
if page_max > number_of_pages or page_max == 0:
|
@@ -920,112 +1091,115 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
|
|
920 |
#else:
|
921 |
# page_max = page_max - 1
|
922 |
|
923 |
-
if page_min <= 0:
|
924 |
-
|
925 |
-
else:
|
926 |
-
page_min = page_min - 1
|
927 |
|
928 |
-
print("Page range is",str(page_min), "to", str(page_max))
|
929 |
|
930 |
-
for page_no in range(page_min, page_max):
|
931 |
-
|
|
|
|
|
932 |
|
933 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
934 |
|
935 |
-
# The /MediaBox in a PDF specifies the size of the page [left, bottom, right, top]
|
936 |
-
#media_box = page.MediaBox
|
937 |
-
#page_width = media_box[2] - media_box[0]
|
938 |
-
#page_height = media_box[3] - media_box[1]
|
939 |
-
|
940 |
-
for page_layout in extract_pages(filename, page_numbers = [page_no], maxpages=1):
|
941 |
-
|
942 |
-
page_analyzer_results = []
|
943 |
-
page_analyzed_bounding_boxes = []
|
944 |
-
|
945 |
-
characters = []
|
946 |
-
annotations_on_page = []
|
947 |
-
decision_process_table_on_page = pd.DataFrame()
|
948 |
-
page_text_outputs = pd.DataFrame()
|
949 |
-
|
950 |
-
if analysis_type == "Simple text analysis - PDFs with selectable text":
|
951 |
-
for text_container in page_layout:
|
952 |
-
|
953 |
-
text_container_analyzer_results = []
|
954 |
-
text_container_analyzed_bounding_boxes = []
|
955 |
-
|
956 |
-
characters = get_text_container_characters(text_container)
|
957 |
-
|
958 |
-
# Create dataframe for all the text on the page
|
959 |
-
line_level_text_results_list, line_characters = create_text_bounding_boxes_from_characters(characters)
|
960 |
-
|
961 |
-
print("line_characters:", line_characters)
|
962 |
-
|
963 |
-
# Create page_text_outputs (OCR format outputs)
|
964 |
-
if line_level_text_results_list:
|
965 |
-
# Convert to DataFrame and add to ongoing logging table
|
966 |
-
line_level_text_results_df = pd.DataFrame([{
|
967 |
-
'page': page_no + 1,
|
968 |
-
'text': result.text,
|
969 |
-
'left': result.left,
|
970 |
-
'top': result.top,
|
971 |
-
'width': result.width,
|
972 |
-
'height': result.height
|
973 |
-
} for result in line_level_text_results_list])
|
974 |
-
|
975 |
-
page_text_outputs = pd.concat([page_text_outputs, line_level_text_results_df])
|
976 |
-
|
977 |
-
# Analyse each line of text in turn for PII and add to list
|
978 |
-
for i, text_line in enumerate(line_level_text_results_list):
|
979 |
-
text_line_analyzer_result = []
|
980 |
-
text_line_bounding_boxes = []
|
981 |
-
|
982 |
-
#print("text_line:", text_line.text)
|
983 |
-
|
984 |
-
text_line_analyzer_result = analyze_text_container(text_line, language, chosen_redact_entities, score_threshold, allow_list)
|
985 |
-
|
986 |
-
# Merge bounding boxes for the line if multiple found close together
|
987 |
-
if text_line_analyzer_result:
|
988 |
-
# Merge bounding boxes if very close together
|
989 |
-
print("text_line_bounding_boxes:", text_line_bounding_boxes)
|
990 |
-
print("line_characters:")
|
991 |
-
#print(line_characters[i])
|
992 |
-
print("".join(char._text for char in line_characters[i]))
|
993 |
-
text_line_bounding_boxes = merge_text_bounding_boxes(text_line_analyzer_result, line_characters[i], combine_pixel_dist, vertical_padding = 0)
|
994 |
-
|
995 |
-
text_container_analyzer_results.extend(text_line_analyzer_result)
|
996 |
-
text_container_analyzed_bounding_boxes.extend(text_line_bounding_boxes)
|
997 |
|
998 |
-
|
|
|
999 |
|
1000 |
-
|
1001 |
-
|
1002 |
-
|
|
|
|
|
|
|
|
|
1003 |
|
1004 |
-
|
|
|
1005 |
|
1006 |
-
# Annotate redactions on page
|
1007 |
-
annotations_on_page = create_annotations_for_bounding_boxes(page_analyzed_bounding_boxes)
|
1008 |
-
|
1009 |
-
# Make pymupdf redactions
|
1010 |
-
doc = redact_page_with_pymupdf(doc, annotations_on_page, page_no)
|
1011 |
-
|
1012 |
-
# Make page annotations
|
1013 |
-
#page.Annots = pdf.make_indirect(annotations_on_page)
|
1014 |
-
if annotations_on_page:
|
1015 |
annotations_all_pages.extend([annotations_on_page])
|
1016 |
|
1017 |
-
|
|
|
|
|
|
|
|
|
1018 |
|
1019 |
-
|
1020 |
-
|
1021 |
-
decision_process_table_on_page = create_text_redaction_process_results(page_analyzer_results, page_analyzed_bounding_boxes, page_num)
|
1022 |
|
1023 |
-
|
1024 |
-
|
|
|
|
|
1025 |
|
1026 |
-
|
1027 |
-
page_text_outputs = page_text_outputs.sort_values(["top", "left"], ascending=[False, False]).reset_index(drop=True)
|
1028 |
-
#page_text_outputs.to_csv("text_page_text_outputs.csv")
|
1029 |
-
page_text_outputs_all_pages = pd.concat([page_text_outputs_all_pages, page_text_outputs])
|
1030 |
|
1031 |
-
return
|
|
|
4 |
import io
|
5 |
import os
|
6 |
from PIL import Image, ImageChops, ImageDraw
|
7 |
+
from typing import List, Dict, Tuple
|
8 |
import pandas as pd
|
9 |
|
10 |
#from presidio_image_redactor.entities import ImageRecognizerResult
|
|
|
12 |
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine, LTTextLineHorizontal, LTAnno
|
13 |
from pikepdf import Pdf, Dictionary, Name
|
14 |
import pymupdf
|
15 |
+
from pymupdf import Rect
|
16 |
+
from fitz import Document, Page
|
17 |
|
18 |
import gradio as gr
|
19 |
from gradio import Progress
|
|
|
|
|
|
|
20 |
from collections import defaultdict # For efficient grouping
|
21 |
|
22 |
from tools.custom_image_analyser_engine import CustomImageAnalyzerEngine, OCRResult, combine_ocr_results, CustomImageRecognizerResult
|
|
|
48 |
|
49 |
return sum_of_numbers
|
50 |
|
51 |
+
def choose_and_run_redactor(file_paths:List[str], prepared_pdf_file_paths:List[str], prepared_pdf_image_paths:List[str], language:str, chosen_redact_entities:List[str], in_redact_method:str, in_allow_list:List[List[str]]=None, latest_file_completed:int=0, out_message:list=[], out_file_paths:list=[], log_files_output_paths:list=[], first_loop_state:bool=False, page_min:int=0, page_max:int=999, estimated_time_taken_state:float=0.0, handwrite_signature_checkbox:List[str]=["Redact all identified handwriting", "Redact all identified signatures"], all_request_metadata_str:str = "", all_image_annotations:dict={}, pdf_text=[], progress=gr.Progress(track_tqdm=True)):
|
52 |
'''
|
53 |
Based on the type of redaction selected, pass the document file content onto the relevant function and return a redacted document plus processing logs.
|
54 |
'''
|
|
|
61 |
latest_file_completed = 0
|
62 |
#out_message = []
|
63 |
out_file_paths = []
|
64 |
+
pdf_text = []
|
65 |
|
66 |
# If out message is string or out_file_paths are blank, change to a list so it can be appended to
|
67 |
if isinstance(out_message, str):
|
|
|
72 |
|
73 |
latest_file_completed = int(latest_file_completed)
|
74 |
|
75 |
+
#pdf_text = []
|
76 |
+
|
77 |
# If we have already redacted the last file, return the input out_message and file list to the relevant components
|
78 |
if latest_file_completed >= len(file_paths):
|
79 |
+
#print("Last file reached")
|
80 |
# Set to a very high number so as not to mix up with subsequent file processing by the user
|
81 |
latest_file_completed = 99
|
82 |
final_out_message = '\n'.join(out_message)
|
|
|
85 |
estimate_total_processing_time = sum_numbers_before_seconds(final_out_message)
|
86 |
print("Estimated total processing time:", str(estimate_total_processing_time))
|
87 |
|
88 |
+
#print("Final all_image_annotations:", all_image_annotations)
|
89 |
+
|
90 |
+
return final_out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimate_total_processing_time, all_request_metadata_str, pdf_text, all_image_annotations
|
91 |
|
92 |
file_paths_loop = [file_paths[int(latest_file_completed)]]
|
93 |
|
|
|
113 |
else:
|
114 |
out_message = "No file selected"
|
115 |
print(out_message)
|
116 |
+
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str, pdf_text, all_image_annotations
|
117 |
|
118 |
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
119 |
#Analyse and redact image-based pdf or image
|
120 |
if is_pdf_or_image(file_path) == False:
|
121 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
122 |
+
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str, pdf_text, all_image_annotations
|
123 |
|
124 |
print("Redacting file " + file_path_without_ext + " as an image-based file")
|
125 |
|
126 |
+
pdf_text, redaction_logs, logging_file_paths, new_request_metadata, all_image_annotations = redact_image_pdf(file_path, prepared_pdf_image_paths, language, chosen_redact_entities, in_allow_list_flat, is_a_pdf, page_min, page_max, in_redact_method, handwrite_signature_checkbox)
|
127 |
|
128 |
# Save file
|
129 |
if is_pdf(file_path) == False:
|
130 |
out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
|
131 |
+
pdf_text[0].save(out_image_file_path, "PDF" ,resolution=100.0, save_all=True, append_images=pdf_text[1:])
|
132 |
|
133 |
else:
|
134 |
out_image_file_path = output_folder + file_path_without_ext + "_redacted.pdf"
|
135 |
+
pdf_text.save(out_image_file_path)
|
136 |
|
137 |
out_file_paths.append(out_image_file_path)
|
138 |
if logging_file_paths:
|
|
|
140 |
|
141 |
out_message.append("File '" + file_path_without_ext + "' successfully redacted")
|
142 |
|
|
|
|
|
|
|
|
|
|
|
|
|
143 |
|
144 |
logs_output_file_name = out_image_file_path + "_decision_process_output.csv"
|
145 |
redaction_logs.to_csv(logs_output_file_name)
|
|
|
157 |
|
158 |
elif in_redact_method == "Simple text analysis - PDFs with selectable text":
|
159 |
|
160 |
+
print("file_path for selectable text analysis:", file_path)
|
161 |
|
162 |
if is_pdf(file_path) == False:
|
163 |
+
out_message = "Please upload a PDF file for text analysis. If you have an image, select 'Image analysis'."
|
164 |
+
return out_message, None, None
|
165 |
|
166 |
# Analyse text-based pdf
|
167 |
print('Redacting file as text-based PDF')
|
168 |
+
pdf_text, decision_process_logs, page_text_outputs, all_image_annotations = redact_text_pdf(file_path, prepared_pdf_image_paths, language, chosen_redact_entities, in_allow_list_flat, page_min, page_max, "Simple text analysis - PDFs with selectable text")
|
169 |
|
170 |
out_text_file_path = output_folder + file_path_without_ext + "_text_redacted.pdf"
|
171 |
pdf_text.save(out_text_file_path)
|
|
|
198 |
else:
|
199 |
out_message = "No redaction method selected"
|
200 |
print(out_message)
|
201 |
+
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str, pdf_text, all_image_annotations
|
202 |
|
203 |
toc = time.perf_counter()
|
204 |
out_time = f"in {toc - tic:0.1f} seconds."
|
|
|
221 |
log_files_output_paths.append(all_request_metadata_file_path)
|
222 |
|
223 |
|
224 |
+
return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str, pdf_text, all_image_annotations
|
225 |
+
|
226 |
+
def convert_pikepdf_coords_to_pymudf(pymupdf_page, annot):
|
227 |
+
'''
|
228 |
+
Convert annotations from pikepdf to pymupdf format
|
229 |
+
'''
|
230 |
+
|
231 |
+
mediabox_height = pymupdf_page.mediabox[3] - pymupdf_page.mediabox[1]
|
232 |
+
mediabox_width = pymupdf_page.mediabox[2] - pymupdf_page.mediabox[0]
|
233 |
+
rect_height = pymupdf_page.rect.height
|
234 |
+
rect_width = pymupdf_page.rect.width
|
235 |
+
|
236 |
+
# Calculate scaling factors
|
237 |
+
#scale_height = rect_height / mediabox_height if mediabox_height else 1
|
238 |
+
#scale_width = rect_width / mediabox_width if mediabox_width else 1
|
239 |
+
|
240 |
+
# Adjust coordinates based on scaling factors
|
241 |
+
page_x_adjust = (rect_width - mediabox_width) / 2 # Center adjustment
|
242 |
+
page_y_adjust = (rect_height - mediabox_height) / 2 # Center adjustment
|
243 |
+
|
244 |
+
#print("In the pikepdf conversion function")
|
245 |
+
# Extract the /Rect field
|
246 |
+
rect_field = annot["/Rect"]
|
247 |
+
|
248 |
+
# Convert the extracted /Rect field to a list of floats (since pikepdf uses Decimal objects)
|
249 |
+
rect_coordinates = [float(coord) for coord in rect_field]
|
250 |
+
|
251 |
+
# Convert the Y-coordinates (flip using the page height)
|
252 |
+
x1, y1, x2, y2 = rect_coordinates
|
253 |
+
x1 = x1 + page_x_adjust
|
254 |
+
new_y1 = (rect_height - y2) - page_y_adjust
|
255 |
+
x2 = x2 + page_x_adjust
|
256 |
+
new_y2 = (rect_height - y1) - page_y_adjust
|
257 |
+
|
258 |
+
return x1, new_y1, x2, new_y2
|
259 |
+
|
260 |
+
def convert_pikepdf_to_image_coords(pymupdf_page, annot, image:Image):
|
261 |
+
'''
|
262 |
+
Convert annotations from pikepdf coordinates to image coordinates.
|
263 |
+
'''
|
264 |
+
|
265 |
+
# Get the dimensions of the page in points with pymupdf
|
266 |
+
rect_height = pymupdf_page.rect.height
|
267 |
+
rect_width = pymupdf_page.rect.width
|
268 |
+
|
269 |
+
# Get the dimensions of the image
|
270 |
+
image_page_width, image_page_height = image.size
|
271 |
+
|
272 |
+
# Calculate scaling factors between pymupdf and PIL image
|
273 |
+
scale_width = image_page_width / rect_width
|
274 |
+
scale_height = image_page_height / rect_height
|
275 |
+
|
276 |
+
# Extract the /Rect field
|
277 |
+
rect_field = annot["/Rect"]
|
278 |
+
|
279 |
+
# Convert the extracted /Rect field to a list of floats
|
280 |
+
rect_coordinates = [float(coord) for coord in rect_field]
|
281 |
+
|
282 |
+
# Convert the Y-coordinates (flip using the image height)
|
283 |
+
x1, y1, x2, y2 = rect_coordinates
|
284 |
+
x1_image = x1 * scale_width
|
285 |
+
new_y1_image = image_page_height - (y2 * scale_height) # Flip Y0 (since it starts from bottom)
|
286 |
+
x2_image = x2 * scale_width
|
287 |
+
new_y2_image = image_page_height - (y1 * scale_height) # Flip Y1
|
288 |
+
|
289 |
+
return x1_image, new_y1_image, x2_image, new_y2_image
|
290 |
+
|
291 |
+
def convert_image_coords_to_pymupdf(pymupdf_page, annot:CustomImageRecognizerResult, image:Image):
|
292 |
+
'''
|
293 |
+
Converts an image with redaction coordinates from a CustomImageRecognizerResult to pymupdf coordinates.
|
294 |
+
'''
|
295 |
+
|
296 |
+
rect_height = pymupdf_page.rect.height
|
297 |
+
rect_width = pymupdf_page.rect.width
|
298 |
+
|
299 |
+
image_page_width, image_page_height = image.size
|
300 |
+
|
301 |
+
# Calculate scaling factors between PIL image and pymupdf
|
302 |
+
scale_width = rect_width / image_page_width
|
303 |
+
scale_height = rect_height / image_page_height
|
304 |
|
305 |
+
# Calculate scaled coordinates
|
306 |
+
x1 = (annot.left * scale_width)# + page_x_adjust
|
307 |
+
new_y1 = (annot.top * scale_height)# - page_y_adjust # Flip Y0 (since it starts from bottom)
|
308 |
+
x2 = ((annot.left + annot.width) * scale_width)# + page_x_adjust # Calculate x1
|
309 |
+
new_y2 = ((annot.top + annot.height) * scale_height)# - page_y_adjust # Calculate y1 correctly
|
310 |
+
|
311 |
+
return x1, new_y1, x2, new_y2
|
312 |
+
|
313 |
+
def convert_gradio_annotation_coords_to_pymupdf(pymupdf_page:Page, annot:dict, image:Image):
|
314 |
+
'''
|
315 |
+
Converts an image with redaction coordinates from a gradio annotation component to pymupdf coordinates.
|
316 |
+
'''
|
317 |
+
|
318 |
+
rect_height = pymupdf_page.rect.height
|
319 |
+
rect_width = pymupdf_page.rect.width
|
320 |
+
|
321 |
+
image_page_width, image_page_height = image.size
|
322 |
+
|
323 |
+
# Calculate scaling factors between PIL image and pymupdf
|
324 |
+
scale_width = rect_width / image_page_width
|
325 |
+
scale_height = rect_height / image_page_height
|
326 |
+
|
327 |
+
# Calculate scaled coordinates
|
328 |
+
x1 = (annot["xmin"] * scale_width)# + page_x_adjust
|
329 |
+
new_y1 = (annot["ymin"] * scale_height)# - page_y_adjust # Flip Y0 (since it starts from bottom)
|
330 |
+
x2 = ((annot["xmax"]) * scale_width)# + page_x_adjust # Calculate x1
|
331 |
+
new_y2 = ((annot["ymax"]) * scale_height)# - page_y_adjust # Calculate y1 correctly
|
332 |
+
|
333 |
+
return x1, new_y1, x2, new_y2
|
334 |
+
|
335 |
+
def move_page_info(file_path: str) -> str:
|
336 |
+
# Split the string at '.png'
|
337 |
+
base, extension = file_path.rsplit('.pdf', 1)
|
338 |
+
|
339 |
+
# Extract the page info
|
340 |
+
page_info = base.split('page ')[1].split(' of')[0] # Get the page number
|
341 |
+
new_base = base.replace(f'page {page_info} of ', '') # Remove the page info from the original position
|
342 |
+
|
343 |
+
# Construct the new file path
|
344 |
+
new_file_path = f"{new_base}_page_{page_info}.png"
|
345 |
+
|
346 |
+
return new_file_path
|
347 |
+
|
348 |
+
def redact_page_with_pymupdf(page:Page, annotations_on_page, image = None):#, scale=(1,1)):
|
349 |
|
|
|
350 |
mediabox_height = page.mediabox[3] - page.mediabox[1]
|
351 |
mediabox_width = page.mediabox[2] - page.mediabox[0]
|
352 |
rect_height = page.rect.height
|
|
|
355 |
#print("page_rect_height:", page.rect.height)
|
356 |
#print("page mediabox size:", page.mediabox[3] - page.mediabox[1])
|
357 |
|
358 |
+
out_annotation_boxes = {}
|
359 |
+
all_image_annotation_boxes = []
|
360 |
+
image_path = ""
|
361 |
+
|
362 |
+
if isinstance(image, Image.Image):
|
363 |
+
image_path = move_page_info(str(page))
|
364 |
+
image.save(image_path)
|
365 |
+
elif isinstance(image, str):
|
366 |
+
image_path = image
|
367 |
+
image = Image.open(image_path)
|
368 |
+
|
369 |
+
#print("annotations_on_page:", annotations_on_page)
|
370 |
+
|
371 |
+
# Check if this is an object used in the Gradio Annotation component
|
372 |
+
if isinstance (annotations_on_page, dict):
|
373 |
+
annotations_on_page = annotations_on_page["boxes"]
|
374 |
+
#print("annotations on page:", annotations_on_page)
|
375 |
+
|
376 |
for annot in annotations_on_page:
|
377 |
+
#print("annot:", annot)
|
378 |
+
|
379 |
+
# Check if an Image recogniser result, or a Gradio annotation object
|
380 |
+
if (isinstance(annot, CustomImageRecognizerResult)) | isinstance(annot, dict):
|
381 |
|
382 |
+
img_annotation_box = {}
|
|
|
|
|
383 |
|
384 |
+
# Should already be in correct format if img_annotator_box is an input
|
385 |
+
if isinstance(annot, dict):
|
386 |
+
img_annotation_box = annot
|
387 |
+
try:
|
388 |
+
img_annotation_box["label"] = annot.entity_type
|
389 |
+
except:
|
390 |
+
img_annotation_box["label"] = "Redaction"
|
391 |
|
392 |
+
x1, pymupdf_y1, x2, pymupdf_y2 = convert_gradio_annotation_coords_to_pymupdf(page, annot, image)
|
393 |
|
394 |
+
# Else should be CustomImageRecognizerResult
|
395 |
+
else:
|
396 |
+
x1, pymupdf_y1, x2, pymupdf_y2 = convert_image_coords_to_pymupdf(page, annot, image)
|
|
|
|
|
397 |
|
398 |
+
img_annotation_box["xmin"] = annot.left
|
399 |
+
img_annotation_box["ymin"] = annot.top
|
400 |
+
img_annotation_box["xmax"] = annot.left + annot.width
|
401 |
+
img_annotation_box["ymax"] = annot.top + annot.height
|
402 |
+
img_annotation_box["color"] = (0,0,0)
|
403 |
+
try:
|
404 |
+
img_annotation_box["label"] = annot.entity_type
|
405 |
+
except:
|
406 |
+
img_annotation_box["label"] = "Redaction"
|
407 |
|
408 |
+
rect = Rect(x1, pymupdf_y1, x2, pymupdf_y2) # Create the PyMuPDF Rect
|
|
|
|
|
|
|
409 |
|
410 |
+
# Else it should be a pikepdf annotation object
|
411 |
+
else:
|
412 |
+
x1, pymupdf_y1, x2, pymupdf_y2 = convert_pikepdf_coords_to_pymudf(page, annot)
|
413 |
|
414 |
+
rect = Rect(x1, pymupdf_y1, x2, pymupdf_y2)
|
|
|
|
|
415 |
|
416 |
+
img_annotation_box = {}
|
|
|
417 |
|
418 |
+
if image:
|
419 |
+
image_x1, image_y1, image_x2, image_y2 = convert_pikepdf_to_image_coords(page, annot, image)
|
|
|
|
|
|
|
|
|
420 |
|
421 |
+
|
422 |
+
img_annotation_box["xmin"] = image_x1
|
423 |
+
img_annotation_box["ymin"] = image_y1
|
424 |
+
img_annotation_box["xmax"] = image_x2
|
425 |
+
img_annotation_box["ymax"] = image_y2
|
426 |
+
img_annotation_box["color"] = (0,0,0)
|
427 |
+
|
428 |
+
if isinstance(annot, Dictionary):
|
429 |
+
#print("Trying to get label out of annotation", annot["/T"])
|
430 |
+
img_annotation_box["label"] = str(annot["/T"])
|
431 |
+
#print("Label is:", img_annotation_box["label"])
|
432 |
+
else:
|
433 |
+
img_annotation_box["label"] = "REDACTION"
|
434 |
|
435 |
# Convert to a PyMuPDF Rect object
|
436 |
#rect = Rect(rect_coordinates)
|
437 |
|
438 |
+
all_image_annotation_boxes.append(img_annotation_box)
|
439 |
+
|
440 |
+
# Calculate the middle y value and set height to 1 pixel
|
441 |
+
middle_y = (pymupdf_y1 + pymupdf_y2) / 2
|
442 |
+
rect_single_pixel_height = Rect(x1, middle_y - 2, x2, middle_y + 2) # Small height in middle of word to remove text
|
|
|
|
|
443 |
|
444 |
# Add the annotation to the middle of the character line, so that it doesn't delete text from adjacent lines
|
445 |
page.add_redact_annot(rect_single_pixel_height)
|
|
|
450 |
shape.finish(color=(0, 0, 0), fill=(0, 0, 0)) # Black fill for the rectangle
|
451 |
shape.commit()
|
452 |
|
453 |
+
out_annotation_boxes = {
|
454 |
+
"image": image_path, #Image.open(image_path), #image_path,
|
455 |
+
"boxes": all_image_annotation_boxes
|
456 |
+
}
|
457 |
+
|
458 |
page.apply_redactions(images=0, graphics=0)
|
459 |
page.clean_contents()
|
460 |
|
461 |
+
#print("Everything is fine at end of redact_page_with_pymupdf")
|
462 |
+
#print("\nout_annotation_boxes:", out_annotation_boxes)
|
463 |
+
|
464 |
+
return page, out_annotation_boxes
|
465 |
|
466 |
def bounding_boxes_overlap(box1, box2):
|
467 |
"""Check if two bounding boxes overlap."""
|
|
|
485 |
# Reconstruct bounding boxes for substrings of interest
|
486 |
reconstructed_bboxes = []
|
487 |
for bbox in bboxes:
|
488 |
+
print("bbox:", bbox)
|
489 |
bbox_box = (bbox.left, bbox.top, bbox.left + bbox.width, bbox.top + bbox.height)
|
490 |
for line_text, line_info in combined_results.items():
|
491 |
line_box = line_info['bounding_box']
|
|
|
507 |
current_char += 1 # +1 for space if the word doesn't already end with a space
|
508 |
|
509 |
if relevant_words:
|
510 |
+
#print("Relevant words:", relevant_words)
|
511 |
left = min(word['bounding_box'][0] for word in relevant_words)
|
512 |
top = min(word['bounding_box'][1] for word in relevant_words)
|
513 |
right = max(word['bounding_box'][2] for word in relevant_words)
|
|
|
515 |
|
516 |
# Combine the text of all relevant words
|
517 |
combined_text = " ".join(word['text'] for word in relevant_words)
|
518 |
+
|
519 |
+
# Calculate new dimensions for the merged box
|
520 |
+
|
521 |
+
|
522 |
+
|
523 |
|
524 |
reconstructed_bbox = CustomImageRecognizerResult(
|
525 |
bbox.entity_type,
|
|
|
555 |
else:
|
556 |
new_text = merged_box.text + " " + next_box.text
|
557 |
|
558 |
+
if merged_box.text == next_box.text:
|
559 |
+
new_text = merged_box.text
|
560 |
+
new_entity_type = merged_box.entity_type # Keep the original entity type
|
561 |
+
else:
|
562 |
+
new_text = merged_box.text + " " + next_box.text
|
563 |
+
new_entity_type = merged_box.entity_type + " - " + next_box.entity_type # Concatenate entity types
|
564 |
+
|
565 |
new_left = min(merged_box.left, next_box.left)
|
566 |
new_top = min(merged_box.top, next_box.top)
|
567 |
new_width = max(merged_box.left + merged_box.width, next_box.left + next_box.width) - new_left
|
568 |
new_height = max(merged_box.top + merged_box.height, next_box.top + next_box.height) - new_top
|
569 |
merged_box = CustomImageRecognizerResult(
|
570 |
+
new_entity_type, merged_box.start, merged_box.end, merged_box.score, new_left, new_top, new_width, new_height, new_text
|
571 |
)
|
572 |
else:
|
573 |
merged_bboxes.append(merged_box)
|
|
|
577 |
|
578 |
return merged_bboxes
|
579 |
|
580 |
+
def redact_image_pdf(file_path:str, prepared_pdf_file_paths:List[str], language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, is_a_pdf:bool=True, page_min:int=0, page_max:int=999, analysis_type:str="Quick image analysis - typed text", handwrite_signature_checkbox:List[str]=["Redact all identified handwriting", "Redact all identified signatures"], request_metadata:str="", progress=Progress(track_tqdm=True)):
|
581 |
'''
|
582 |
Take an path for an image of a document, then run this image through the Presidio ImageAnalyzer and PIL to get a redacted page back. Adapted from Presidio ImageRedactorEngine.
|
583 |
'''
|
|
|
587 |
fill = (0, 0, 0) # Fill colour
|
588 |
decision_process_output_str = ""
|
589 |
images = []
|
590 |
+
all_image_annotations = []
|
591 |
#request_metadata = {}
|
592 |
image_analyser = CustomImageAnalyzerEngine(nlp_analyser)
|
593 |
|
594 |
# Also open as pymupdf pdf to apply annotations later on
|
595 |
+
pymupdf_doc = pymupdf.open(file_path)
|
596 |
|
597 |
+
if not prepared_pdf_file_paths:
|
598 |
out_message = "PDF does not exist as images. Converting pages to image"
|
599 |
print(out_message)
|
600 |
|
601 |
+
prepared_pdf_file_paths = process_file(file_path)
|
602 |
|
603 |
+
if not isinstance(prepared_pdf_file_paths, list):
|
604 |
+
print("Converting prepared_pdf_file_paths to list")
|
605 |
+
prepared_pdf_file_paths = [prepared_pdf_file_paths]
|
606 |
|
607 |
+
#print("Image paths:", prepared_pdf_file_paths)
|
608 |
+
number_of_pages = len(prepared_pdf_file_paths)
|
609 |
|
610 |
print("Number of pages:", str(number_of_pages))
|
611 |
|
|
|
634 |
if analysis_type == "Quick image analysis - typed text": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".csv"
|
635 |
elif analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.csv"
|
636 |
|
637 |
+
for i in range(0, number_of_pages):
|
638 |
handwriting_or_signature_boxes = []
|
639 |
signature_recogniser_results = []
|
640 |
handwriting_recogniser_results = []
|
641 |
|
642 |
+
|
643 |
+
# Assuming prepared_pdf_file_paths[i] is your PIL image object
|
644 |
try:
|
645 |
+
image = prepared_pdf_file_paths[i]#.copy()
|
646 |
+
print("image:", image)
|
|
|
647 |
except Exception as e:
|
648 |
+
print("Could not redact page:", reported_page_number, "due to:")
|
649 |
print(e)
|
650 |
continue
|
651 |
|
652 |
+
image_annotations = {"image": image, "boxes": []}
|
653 |
|
654 |
+
#try:
|
655 |
+
print("prepared_pdf_file_paths:", prepared_pdf_file_paths)
|
656 |
+
|
657 |
+
if i >= page_min and i < page_max:
|
658 |
|
659 |
reported_page_number = str(i + 1)
|
660 |
|
661 |
print("Redacting page", reported_page_number)
|
662 |
|
663 |
+
pymupdf_page = pymupdf_doc.load_page(i)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
664 |
|
665 |
# Need image size to convert textract OCR outputs to the correct sizes
|
666 |
page_width, page_height = image.size
|
667 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
668 |
# Possibility to use different languages
|
669 |
if language == 'en':
|
670 |
ocr_lang = 'eng'
|
|
|
709 |
|
710 |
line_level_ocr_results, handwriting_or_signature_boxes, signature_recogniser_results, handwriting_recogniser_results, line_level_ocr_results_with_children = json_to_ocrresult(text_blocks, page_width, page_height)
|
711 |
|
|
|
|
|
|
|
|
|
|
|
|
|
712 |
# Step 2: Analyze text and identify PII
|
713 |
+
if chosen_redact_entities:
|
714 |
+
|
715 |
+
redaction_bboxes = image_analyser.analyze_text(
|
716 |
+
line_level_ocr_results,
|
717 |
+
line_level_ocr_results_with_children,
|
718 |
+
language=language,
|
719 |
+
entities=chosen_redact_entities,
|
720 |
+
allow_list=allow_list,
|
721 |
+
score_threshold=score_threshold,
|
722 |
+
)
|
723 |
+
else:
|
724 |
+
redaction_bboxes = []
|
725 |
|
726 |
if analysis_type == "Quick image analysis - typed text": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".txt"
|
727 |
elif analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.txt"
|
|
|
734 |
# Merge close bounding boxes
|
735 |
merged_redaction_bboxes = merge_img_bboxes(redaction_bboxes, line_level_ocr_results_with_children, signature_recogniser_results, handwriting_recogniser_results, handwrite_signature_checkbox)
|
736 |
|
737 |
+
# Save image first so that the redactions can be checked after
|
738 |
+
#image.save(output_folder + "page_as_img_" + file_name + "_pages_" + str(reported_page_number) + ".png")
|
739 |
|
740 |
# 3. Draw the merged boxes
|
741 |
+
#if merged_redaction_bboxes:
|
742 |
if is_pdf(file_path) == False:
|
743 |
draw = ImageDraw.Draw(image)
|
744 |
|
745 |
+
all_image_annotations_boxes = []
|
746 |
+
|
747 |
for box in merged_redaction_bboxes:
|
748 |
+
print("box:", box)
|
749 |
+
|
750 |
x0 = box.left
|
751 |
y0 = box.top
|
752 |
x1 = x0 + box.width
|
753 |
y1 = y0 + box.height
|
|
|
754 |
|
755 |
+
try:
|
756 |
+
label = box.entity_type
|
757 |
+
except:
|
758 |
+
label = "Redaction"
|
759 |
+
|
760 |
+
# Directly append the dictionary with the required keys
|
761 |
+
all_image_annotations_boxes.append({
|
762 |
+
"xmin": x0,
|
763 |
+
"ymin": y0,
|
764 |
+
"xmax": x1,
|
765 |
+
"ymax": y1,
|
766 |
+
"label": label,
|
767 |
+
"color": (0, 0, 0)
|
768 |
+
})
|
769 |
+
|
770 |
+
draw.rectangle([x0, y0, x1, y1], fill=fill) # Adjusted to use a list for rectangle
|
771 |
+
|
772 |
+
image_annotations = {"image": file_path, "boxes": all_image_annotations_boxes}
|
773 |
|
774 |
## Apply annotations with pymupdf
|
775 |
else:
|
776 |
+
pymupdf_page, image_annotations = redact_page_with_pymupdf(pymupdf_page, merged_redaction_bboxes, image)#, scale)
|
|
|
|
|
777 |
|
778 |
+
# Convert decision process to table
|
779 |
+
decision_process_table = pd.DataFrame([{
|
780 |
+
'page': reported_page_number,
|
781 |
+
'entity_type': result.entity_type,
|
782 |
+
'start': result.start,
|
783 |
+
'end': result.end,
|
784 |
+
'score': result.score,
|
785 |
+
'left': result.left,
|
786 |
+
'top': result.top,
|
787 |
+
'width': result.width,
|
788 |
+
'height': result.height,
|
789 |
+
'text': result.text
|
790 |
+
} for result in merged_redaction_bboxes])
|
791 |
|
792 |
+
all_decision_process_table = pd.concat([all_decision_process_table, decision_process_table])
|
|
|
793 |
|
794 |
# Convert to DataFrame and add to ongoing logging table
|
795 |
line_level_ocr_results_df = pd.DataFrame([{
|
|
|
803 |
|
804 |
all_line_level_ocr_results_df = pd.concat([all_line_level_ocr_results_df, line_level_ocr_results_df])
|
805 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
806 |
if is_pdf(file_path) == False:
|
807 |
images.append(image)
|
808 |
+
pymupdf_doc = images
|
809 |
|
810 |
+
all_image_annotations.append(image_annotations)
|
|
|
|
|
|
|
811 |
|
812 |
all_line_level_ocr_results_df.to_csv(ocr_results_file_path)
|
813 |
logging_file_paths.append(ocr_results_file_path)
|
814 |
|
815 |
+
return pymupdf_doc, all_decision_process_table, logging_file_paths, request_metadata, all_image_annotations
|
816 |
+
|
817 |
+
|
818 |
+
###
|
819 |
+
# PIKEPDF TEXT PDF REDACTION
|
820 |
+
###
|
821 |
|
822 |
def get_text_container_characters(text_container:LTTextContainer):
|
823 |
|
|
|
830 |
return characters
|
831 |
return []
|
832 |
|
833 |
+
def analyse_text_container(text_container:OCRResult, language:str, chosen_redact_entities:List[str], score_threshold:float, allow_list:List[str]):
|
|
|
834 |
'''
|
835 |
Take text and bounding boxes in OCRResult format and analyze it for PII using spacy and the Microsoft Presidio package.
|
836 |
'''
|
837 |
|
838 |
+
analyser_results = []
|
839 |
+
|
840 |
text_to_analyze = text_container.text
|
841 |
#print("text_to_analyze:", text_to_analyze)
|
842 |
|
843 |
+
if chosen_redact_entities:
|
844 |
+
analyser_results = nlp_analyser.analyze(text=text_to_analyze,
|
845 |
+
language=language,
|
846 |
+
entities=chosen_redact_entities,
|
847 |
+
score_threshold=score_threshold,
|
848 |
+
return_decision_process=True,
|
849 |
+
allow_list=allow_list)
|
850 |
|
851 |
+
print(analyser_results)
|
852 |
+
|
853 |
+
return analyser_results
|
854 |
|
855 |
def create_text_bounding_boxes_from_characters(char_objects:List[LTChar]) -> Tuple[List[OCRResult], List[LTChar]]:
|
856 |
'''
|
|
|
930 |
|
931 |
return line_level_results_out, line_level_characters_out # Return both results and character objects
|
932 |
|
933 |
+
def merge_text_bounding_boxes(analyser_results:CustomImageRecognizerResult, characters:List[LTChar], combine_pixel_dist:int, vertical_padding:int=0):
|
934 |
'''
|
935 |
Merge identified bounding boxes containing PII that are very close to one another
|
936 |
'''
|
937 |
+
analysed_bounding_boxes = []
|
938 |
+
if len(analyser_results) > 0 and len(characters) > 0:
|
939 |
# Extract bounding box coordinates for sorting
|
940 |
bounding_boxes = []
|
941 |
text_out = []
|
942 |
+
for result in analyser_results:
|
943 |
char_boxes = [char.bbox for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
944 |
char_text = [char._text for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
945 |
if char_boxes:
|
|
|
985 |
current_box[2] = char_box[2] # Extend the current box horizontally
|
986 |
current_box[3] = max(current_box[3], char_box[3]) # Ensure the top is the highest
|
987 |
current_result.end = max(current_result.end, result.end) # Extend the text range
|
988 |
+
try:
|
989 |
+
current_result.type = current_result.type + " - " + result.type
|
990 |
+
except:
|
991 |
+
print("Unable to append new result type.")
|
992 |
# Add a space if current_text is not empty
|
993 |
if current_text:
|
994 |
current_text.append(" ") # Add space between texts
|
995 |
current_text.extend(text)
|
996 |
+
|
997 |
+
#print(f"Latest merged box: {current_box[-1]}")
|
998 |
else:
|
999 |
merged_bounding_boxes.append(
|
1000 |
{"text":"".join(current_text),"boundingBox": current_box, "result": current_result})
|
1001 |
#print(f"Appending merged box: {current_box}")
|
1002 |
+
#print(f"Latest merged box: {merged_bounding_boxes[-1]}")
|
1003 |
|
1004 |
# Reset current_box and current_y after appending
|
1005 |
current_box = char_box
|
|
|
1014 |
#print(f"Appending final box for result: {current_box}")
|
1015 |
|
1016 |
if not merged_bounding_boxes:
|
1017 |
+
analysed_bounding_boxes.extend(
|
1018 |
{"text":text, "boundingBox": char.bbox, "result": result}
|
1019 |
+
for result in analyser_results
|
1020 |
for char in characters[result.start:result.end]
|
1021 |
if isinstance(char, LTChar)
|
1022 |
)
|
1023 |
else:
|
1024 |
+
analysed_bounding_boxes.extend(merged_bounding_boxes)
|
1025 |
|
1026 |
+
#print("Analyzed bounding boxes:\n\n", analysed_bounding_boxes)
|
1027 |
|
1028 |
+
return analysed_bounding_boxes
|
1029 |
|
1030 |
+
def create_text_redaction_process_results(analyser_results, analysed_bounding_boxes, page_num):
|
1031 |
decision_process_table = pd.DataFrame()
|
1032 |
|
1033 |
+
if len(analyser_results) > 0:
|
1034 |
# Create summary df of annotations to be made
|
1035 |
+
analysed_bounding_boxes_df_new = pd.DataFrame(analysed_bounding_boxes)
|
1036 |
+
analysed_bounding_boxes_df_text = analysed_bounding_boxes_df_new['result'].astype(str).str.split(",",expand=True).replace(".*: ", "", regex=True)
|
1037 |
+
analysed_bounding_boxes_df_text.columns = ["type", "start", "end", "score"]
|
1038 |
+
analysed_bounding_boxes_df_new = pd.concat([analysed_bounding_boxes_df_new, analysed_bounding_boxes_df_text], axis = 1)
|
1039 |
+
analysed_bounding_boxes_df_new['page'] = page_num + 1
|
1040 |
+
decision_process_table = pd.concat([decision_process_table, analysed_bounding_boxes_df_new], axis = 0).drop('result', axis=1)
|
1041 |
|
1042 |
#print('\n\ndecision_process_table:\n\n', decision_process_table)
|
1043 |
|
1044 |
return decision_process_table
|
1045 |
|
1046 |
+
def create_annotations_for_bounding_boxes(analysed_bounding_boxes):
|
1047 |
annotations_on_page = []
|
1048 |
+
for analysed_bounding_box in analysed_bounding_boxes:
|
1049 |
+
bounding_box = analysed_bounding_box["boundingBox"]
|
1050 |
annotation = Dictionary(
|
1051 |
Type=Name.Annot,
|
1052 |
Subtype=Name.Square, #Name.Highlight,
|
|
|
1056 |
C=[0, 0, 0],
|
1057 |
IC=[0, 0, 0],
|
1058 |
CA=1, # Transparency
|
1059 |
+
T=analysed_bounding_box["result"].entity_type,
|
1060 |
BS=Dictionary(
|
1061 |
W=0, # Border width: 1 point
|
1062 |
S=Name.S # Border style: solid
|
|
|
1065 |
annotations_on_page.append(annotation)
|
1066 |
return annotations_on_page
|
1067 |
|
1068 |
+
def redact_text_pdf(filename:str, prepared_pdf_image_path:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, page_min:int=0, page_max:int=999, analysis_type:str = "Simple text analysis - PDFs with selectable text", progress=Progress(track_tqdm=True)):
|
1069 |
'''
|
1070 |
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
1071 |
'''
|
1072 |
annotations_all_pages = []
|
1073 |
+
all_image_annotations = []
|
1074 |
page_text_outputs_all_pages = pd.DataFrame()
|
1075 |
decision_process_table_all_pages = pd.DataFrame()
|
1076 |
|
1077 |
combine_pixel_dist = 20 # Horizontal distance between PII bounding boxes under/equal they are combined into one
|
1078 |
|
1079 |
# Open with Pikepdf to get text lines
|
1080 |
+
pikepdf_pdf = Pdf.open(filename)
|
1081 |
+
number_of_pages = len(pikepdf_pdf.pages)
|
|
|
|
|
1082 |
|
1083 |
+
# Also open pdf with pymupdf to be able to annotate later while retaining text
|
1084 |
+
pymupdf_doc = pymupdf.open(filename)
|
1085 |
+
|
1086 |
+
page_num = 0
|
1087 |
|
1088 |
# Check that page_min and page_max are within expected ranges
|
1089 |
if page_max > number_of_pages or page_max == 0:
|
|
|
1091 |
#else:
|
1092 |
# page_max = page_max - 1
|
1093 |
|
1094 |
+
if page_min <= 0: page_min = 0
|
1095 |
+
else: page_min = page_min - 1
|
|
|
|
|
1096 |
|
1097 |
+
print("Page range is",str(page_min + 1), "to", str(page_max))
|
1098 |
|
1099 |
+
for page_no in range(0, number_of_pages): #range(page_min, page_max):
|
1100 |
+
#print("prepared_pdf_image_path:", prepared_pdf_image_path)
|
1101 |
+
#print("prepared_pdf_image_path[page_no]:", prepared_pdf_image_path[page_no])
|
1102 |
+
image = prepared_pdf_image_path[page_no]
|
1103 |
|
1104 |
+
image_annotations = {"image": image, "boxes": []}
|
1105 |
+
|
1106 |
+
pymupdf_page = pymupdf_doc.load_page(page_no)
|
1107 |
+
|
1108 |
+
print("Page number is:", str(page_no + 1))
|
1109 |
+
|
1110 |
+
if page_min <= page_no < page_max:
|
1111 |
+
|
1112 |
+
for page_layout in extract_pages(filename, page_numbers = [page_no], maxpages=1):
|
1113 |
+
|
1114 |
+
page_analyser_results = []
|
1115 |
+
page_analysed_bounding_boxes = []
|
1116 |
+
|
1117 |
+
characters = []
|
1118 |
+
annotations_on_page = []
|
1119 |
+
decision_process_table_on_page = pd.DataFrame()
|
1120 |
+
page_text_outputs = pd.DataFrame()
|
1121 |
+
|
1122 |
+
if analysis_type == "Simple text analysis - PDFs with selectable text":
|
1123 |
+
for text_container in page_layout:
|
1124 |
+
|
1125 |
+
text_container_analyser_results = []
|
1126 |
+
text_container_analysed_bounding_boxes = []
|
1127 |
+
|
1128 |
+
characters = get_text_container_characters(text_container)
|
1129 |
+
|
1130 |
+
# Create dataframe for all the text on the page
|
1131 |
+
line_level_text_results_list, line_characters = create_text_bounding_boxes_from_characters(characters)
|
1132 |
+
|
1133 |
+
#print("line_characters:", line_characters)
|
1134 |
+
|
1135 |
+
# Create page_text_outputs (OCR format outputs)
|
1136 |
+
if line_level_text_results_list:
|
1137 |
+
# Convert to DataFrame and add to ongoing logging table
|
1138 |
+
line_level_text_results_df = pd.DataFrame([{
|
1139 |
+
'page': page_no + 1,
|
1140 |
+
'text': result.text,
|
1141 |
+
'left': result.left,
|
1142 |
+
'top': result.top,
|
1143 |
+
'width': result.width,
|
1144 |
+
'height': result.height
|
1145 |
+
} for result in line_level_text_results_list])
|
1146 |
+
|
1147 |
+
page_text_outputs = pd.concat([page_text_outputs, line_level_text_results_df])
|
1148 |
+
|
1149 |
+
# Analyse each line of text in turn for PII and add to list
|
1150 |
+
for i, text_line in enumerate(line_level_text_results_list):
|
1151 |
+
text_line_analyzer_result = []
|
1152 |
+
text_line_bounding_boxes = []
|
1153 |
+
|
1154 |
+
#print("text_line:", text_line.text)
|
1155 |
+
|
1156 |
+
text_line_analyzer_result = analyse_text_container(text_line, language, chosen_redact_entities, score_threshold, allow_list)
|
1157 |
+
|
1158 |
+
# Merge bounding boxes for the line if multiple found close together
|
1159 |
+
if text_line_analyzer_result:
|
1160 |
+
# Merge bounding boxes if very close together
|
1161 |
+
#print("text_line_bounding_boxes:", text_line_bounding_boxes)
|
1162 |
+
#print("line_characters:")
|
1163 |
+
#print(line_characters[i])
|
1164 |
+
#print("".join(char._text for char in line_characters[i]))
|
1165 |
+
text_line_bounding_boxes = merge_text_bounding_boxes(text_line_analyzer_result, line_characters[i], combine_pixel_dist, vertical_padding = 0)
|
1166 |
+
|
1167 |
+
text_container_analyser_results.extend(text_line_analyzer_result)
|
1168 |
+
text_container_analysed_bounding_boxes.extend(text_line_bounding_boxes)
|
1169 |
+
|
1170 |
+
#print("\n FINAL text_container_analyser_results:", text_container_analyser_results)
|
1171 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1172 |
|
1173 |
+
page_analyser_results.extend(text_container_analyser_results)
|
1174 |
+
page_analysed_bounding_boxes.extend(text_container_analysed_bounding_boxes)
|
1175 |
|
1176 |
+
# Annotate redactions on page
|
1177 |
+
annotations_on_page = create_annotations_for_bounding_boxes(page_analysed_bounding_boxes)
|
1178 |
+
|
1179 |
+
|
1180 |
+
# Make page annotations
|
1181 |
+
#page.Annots = pdf.make_indirect(annotations_on_page)
|
1182 |
+
#if annotations_on_page:
|
1183 |
|
1184 |
+
# Make pymupdf redactions
|
1185 |
+
pymupdf_page, image_annotations = redact_page_with_pymupdf(pymupdf_page, annotations_on_page, image)
|
1186 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1187 |
annotations_all_pages.extend([annotations_on_page])
|
1188 |
|
1189 |
+
print("For page number:", page_no, "there are", len(annotations_all_pages[page_num]), "annotations")
|
1190 |
+
|
1191 |
+
# Write logs
|
1192 |
+
# Create decision process table
|
1193 |
+
decision_process_table_on_page = create_text_redaction_process_results(page_analyser_results, page_analysed_bounding_boxes, page_num)
|
1194 |
|
1195 |
+
if not decision_process_table_on_page.empty:
|
1196 |
+
decision_process_table_all_pages = pd.concat([decision_process_table_all_pages, decision_process_table_on_page])
|
|
|
1197 |
|
1198 |
+
if not page_text_outputs.empty:
|
1199 |
+
page_text_outputs = page_text_outputs.sort_values(["top", "left"], ascending=[False, False]).reset_index(drop=True)
|
1200 |
+
#page_text_outputs.to_csv("text_page_text_outputs.csv")
|
1201 |
+
page_text_outputs_all_pages = pd.concat([page_text_outputs_all_pages, page_text_outputs])
|
1202 |
|
1203 |
+
all_image_annotations.append(image_annotations)
|
|
|
|
|
|
|
1204 |
|
1205 |
+
return pymupdf_doc, decision_process_table_all_pages, page_text_outputs_all_pages, all_image_annotations
|
tools/redaction_review.py
ADDED
@@ -0,0 +1,211 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import gradio as gr
|
2 |
+
import numpy as np
|
3 |
+
from typing import List
|
4 |
+
from gradio_image_annotation import image_annotator
|
5 |
+
from gradio_image_annotation.image_annotator import AnnotatedImageData
|
6 |
+
|
7 |
+
from tools.file_conversion import is_pdf, convert_pdf_to_images
|
8 |
+
from tools.helper_functions import get_file_path_end, output_folder
|
9 |
+
from tools.file_redaction import redact_page_with_pymupdf
|
10 |
+
import json
|
11 |
+
import pymupdf
|
12 |
+
from fitz import Document
|
13 |
+
from PIL import ImageDraw, Image
|
14 |
+
|
15 |
+
def decrease_page(number:int):
|
16 |
+
'''
|
17 |
+
Decrease page number for review redactions page.
|
18 |
+
'''
|
19 |
+
#print("number:", str(number))
|
20 |
+
if number > 1:
|
21 |
+
return number - 1
|
22 |
+
else:
|
23 |
+
return 1
|
24 |
+
|
25 |
+
def increase_page(number:int, image_annotator_object:AnnotatedImageData):
|
26 |
+
'''
|
27 |
+
Increase page number for review redactions page.
|
28 |
+
'''
|
29 |
+
|
30 |
+
if not image_annotator_object:
|
31 |
+
return 1
|
32 |
+
|
33 |
+
max_pages = len(image_annotator_object)
|
34 |
+
|
35 |
+
if number < max_pages:
|
36 |
+
return number + 1
|
37 |
+
else:
|
38 |
+
return max_pages
|
39 |
+
|
40 |
+
def update_annotator(image_annotator_object:AnnotatedImageData, page_num:int):
|
41 |
+
#print("\nImage annotator object:", image_annotator_object[0])
|
42 |
+
|
43 |
+
if not image_annotator_object:
|
44 |
+
return image_annotator(
|
45 |
+
label="Modify redaction boxes",
|
46 |
+
#label_list=["Redaction"],
|
47 |
+
#label_colors=[(0, 0, 0)],
|
48 |
+
sources=["upload"],
|
49 |
+
show_clear_button=False,
|
50 |
+
show_remove_button=False,
|
51 |
+
interactive=False
|
52 |
+
), gr.Number(label = "Current page", value=1, precision=0)
|
53 |
+
|
54 |
+
# Check bounding values for current page and page max
|
55 |
+
if page_num > 0:
|
56 |
+
page_num_reported = page_num
|
57 |
+
#page_num = page_num - 1
|
58 |
+
elif page_num == 0: page_num_reported = 1
|
59 |
+
else:
|
60 |
+
page_num = 0
|
61 |
+
page_num_reported = 1
|
62 |
+
|
63 |
+
page_max_reported = len(image_annotator_object)
|
64 |
+
|
65 |
+
if page_num_reported > page_max_reported:
|
66 |
+
page_num_reported = page_max_reported
|
67 |
+
|
68 |
+
out_image_annotator = image_annotator(value = image_annotator_object[page_num_reported - 1],
|
69 |
+
boxes_alpha=0.1,
|
70 |
+
box_thickness=1,
|
71 |
+
#label_list=["Redaction"],
|
72 |
+
#label_colors=[(0, 0, 0)],
|
73 |
+
height='60%',
|
74 |
+
width='60%',
|
75 |
+
box_min_size=1,
|
76 |
+
box_selected_thickness=2,
|
77 |
+
handle_size=4,
|
78 |
+
sources=None,#["upload"],
|
79 |
+
show_clear_button=False,
|
80 |
+
show_remove_button=False,
|
81 |
+
handles_cursor=True,
|
82 |
+
interactive=True
|
83 |
+
)
|
84 |
+
|
85 |
+
number_reported = gr.Number(label = "Current page", value=page_num_reported, precision=0)
|
86 |
+
|
87 |
+
return out_image_annotator, number_reported
|
88 |
+
|
89 |
+
def modify_existing_page_redactions(image_annotated:AnnotatedImageData, current_page:int, previous_page:int, all_image_annotations:List[AnnotatedImageData]):
|
90 |
+
'''
|
91 |
+
Overwrite current image annotations with modifications
|
92 |
+
'''
|
93 |
+
print("all_image_annotations before:",all_image_annotations)
|
94 |
+
|
95 |
+
image_annotated['image'] = all_image_annotations[previous_page - 1]["image"]
|
96 |
+
|
97 |
+
#print("image_annotated:", image_annotated)
|
98 |
+
|
99 |
+
all_image_annotations[previous_page - 1] = image_annotated
|
100 |
+
|
101 |
+
print("all_image_annotations after:",all_image_annotations)
|
102 |
+
|
103 |
+
return all_image_annotations, current_page
|
104 |
+
|
105 |
+
def apply_redactions(image_annotated:AnnotatedImageData, file_paths:str, doc:Document, all_image_annotations:List[AnnotatedImageData], current_page:int):
|
106 |
+
'''
|
107 |
+
Apply modified redactions to a pymupdf
|
108 |
+
'''
|
109 |
+
|
110 |
+
output_files = []
|
111 |
+
|
112 |
+
image_annotated['image'] = all_image_annotations[current_page - 1]["image"]
|
113 |
+
|
114 |
+
all_image_annotations[current_page - 1] = image_annotated
|
115 |
+
|
116 |
+
if not image_annotated:
|
117 |
+
print("No image annotations found")
|
118 |
+
return doc, all_image_annotations
|
119 |
+
|
120 |
+
file_path = file_paths[-1].name
|
121 |
+
print("file_path:", file_path)
|
122 |
+
file_base = get_file_path_end(file_path)
|
123 |
+
|
124 |
+
# If working with image docs
|
125 |
+
if is_pdf(file_path) == False:
|
126 |
+
unredacted_doc = Image.open(file_paths[-1])
|
127 |
+
|
128 |
+
image = unredacted_doc
|
129 |
+
|
130 |
+
# try:
|
131 |
+
# image = Image.open(image_annotated['image'])
|
132 |
+
# except:
|
133 |
+
# image = Image.fromarray(image_annotated['image'].astype('uint8'))
|
134 |
+
|
135 |
+
draw = ImageDraw.Draw(unredacted_doc)
|
136 |
+
|
137 |
+
for img_annotation_box in image_annotated['boxes']:
|
138 |
+
coords = [img_annotation_box["xmin"],
|
139 |
+
img_annotation_box["ymin"],
|
140 |
+
img_annotation_box["xmax"],
|
141 |
+
img_annotation_box["ymax"]]
|
142 |
+
|
143 |
+
fill = img_annotation_box["color"]
|
144 |
+
|
145 |
+
draw.rectangle(coords, fill=fill)
|
146 |
+
|
147 |
+
image.save(output_folder + file_base + "_redacted_mod.png")
|
148 |
+
|
149 |
+
doc = [image]
|
150 |
+
|
151 |
+
# If working with pdfs
|
152 |
+
else:
|
153 |
+
unredacted_doc = pymupdf.open(file_path)
|
154 |
+
|
155 |
+
number_of_pages = unredacted_doc.page_count
|
156 |
+
|
157 |
+
for i in range(0, number_of_pages):
|
158 |
+
|
159 |
+
print("Re-redacting page", str(i))
|
160 |
+
|
161 |
+
image_loc = all_image_annotations[i]['image']
|
162 |
+
print("Image location:", image_loc)
|
163 |
+
|
164 |
+
# Load in image
|
165 |
+
if isinstance(image_loc, Image.Image):
|
166 |
+
# Save to file so the image annotator can pick it up
|
167 |
+
image_out_folder = output_folder + file_base + "_page_" + str(i) + ".png"
|
168 |
+
image_loc.save(image_out_folder)
|
169 |
+
image = image_out_folder
|
170 |
+
elif isinstance(image_loc, str):
|
171 |
+
image = Image.open(image_loc)
|
172 |
+
else:
|
173 |
+
image = Image.fromarray(image_loc.astype('uint8'))
|
174 |
+
|
175 |
+
pymupdf_page = unredacted_doc.load_page(i) #doc.load_page(current_page -1)
|
176 |
+
pymupdf_page = redact_page_with_pymupdf(pymupdf_page, all_image_annotations[i], image)
|
177 |
+
|
178 |
+
#try:
|
179 |
+
out_pdf_file_path = output_folder + file_base + "_redacted_mod.pdf"
|
180 |
+
unredacted_doc.save(out_pdf_file_path)
|
181 |
+
output_files.append(out_pdf_file_path)
|
182 |
+
|
183 |
+
# Save the gradio_annotation_boxes to a JSON file
|
184 |
+
out_annotation_file_path = output_folder + file_base + '_modified_redactions.json'
|
185 |
+
all_image_annotations_with_lists = all_image_annotations
|
186 |
+
|
187 |
+
# Convert image arrays to lists for JSON serialization
|
188 |
+
for annotation in all_image_annotations_with_lists:
|
189 |
+
if isinstance(annotation['image'], np.ndarray):
|
190 |
+
annotation['image'] = annotation['image'].tolist()
|
191 |
+
elif isinstance(annotation['image'], Image.Image):
|
192 |
+
annotation['image'] = image_out_folder
|
193 |
+
|
194 |
+
with open(out_annotation_file_path, 'w') as f:
|
195 |
+
json.dump(all_image_annotations_with_lists, f)
|
196 |
+
|
197 |
+
output_files.append(out_annotation_file_path)
|
198 |
+
|
199 |
+
return doc, all_image_annotations, output_files
|
200 |
+
|
201 |
+
def crop(annotations:AnnotatedImageData):
|
202 |
+
if annotations["boxes"]:
|
203 |
+
box = annotations["boxes"][0]
|
204 |
+
return annotations["image"][
|
205 |
+
box["ymin"]:box["ymax"],
|
206 |
+
box["xmin"]:box["xmax"]
|
207 |
+
]
|
208 |
+
return None
|
209 |
+
|
210 |
+
def get_boxes_json(annotations:AnnotatedImageData):
|
211 |
+
return annotations["boxes"]
|