File size: 44,004 Bytes
e9c4101 66e145d bafcf39 e3365ed 5345e1f 44d987c bafcf39 ed5f8c7 bafcf39 f333cf5 44d987c bafcf39 e9c4101 bafcf39 6ea0852 bafcf39 6ea0852 bafcf39 6ea0852 bafcf39 f333cf5 5f824f4 bafcf39 d3e6a24 f333cf5 d3e6a24 f333cf5 d3e6a24 5f824f4 d3e6a24 5f824f4 d3e6a24 5f824f4 d3e6a24 bafcf39 d60759d bafcf39 e2aae24 bafcf39 d3e6a24 5f824f4 d3e6a24 bafcf39 f333cf5 bafcf39 f333cf5 bafcf39 f333cf5 bafcf39 5f824f4 d3e6a24 bafcf39 d3e6a24 bafcf39 f333cf5 bafcf39 d3e6a24 bafcf39 a33b955 bafcf39 a33b955 e2aae24 bafcf39 9504619 0e9dd2d bafcf39 0e9dd2d e3365ed bafcf39 e3365ed f0c28d7 bafcf39 9504619 e3365ed bafcf39 e3365ed bafcf39 e9c4101 bafcf39 66e145d eea5c07 bafcf39 e9c4101 bafcf39 e9c4101 eea5c07 bafcf39 e9c4101 bafcf39 e9c4101 f333cf5 e9c4101 bafcf39 44d987c f333cf5 b61459d bafcf39 f333cf5 44d987c bafcf39 eea5c07 bafcf39 e9c4101 44d987c 8652429 f333cf5 6ea0852 f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c e9c4101 f93e49c 44d987c bafcf39 f333cf5 bafcf39 84c83c0 44d987c bafcf39 66e145d 0ea8b9e b61459d 66e145d bafcf39 66e145d bafcf39 66e145d 5345e1f 6a6aac2 5345e1f f957846 66e145d 0ea8b9e 66e145d bafcf39 ed5f8c7 bafcf39 66e145d bafcf39 66e145d bafcf39 0ea8b9e bafcf39 0ea8b9e b61459d 0ea8b9e f333cf5 66e145d 0ea8b9e 66e145d ed5f8c7 bafcf39 ed5f8c7 66e145d 0ea8b9e 66e145d ed5f8c7 bafcf39 ed5f8c7 bafcf39 ed5f8c7 bafcf39 ed5f8c7 bafcf39 ed5f8c7 bafcf39 ed5f8c7 bafcf39 ed5f8c7 bafcf39 ed5f8c7 0ea8b9e 66e145d 0ea8b9e f333cf5 bafcf39 f333cf5 0ea8b9e bafcf39 0ea8b9e 66e145d 0ea8b9e f333cf5 66e145d f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c f333cf5 44d987c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 |
import io
import json
import os
import time
from pathlib import Path
from typing import Any, Dict, List
import boto3
import pandas as pd
import pikepdf
from tools.config import (
AWS_ACCESS_KEY,
AWS_REGION,
AWS_SECRET_KEY,
PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS,
RUN_AWS_FUNCTIONS,
)
from tools.custom_image_analyser_engine import CustomImageRecognizerResult, OCRResult
from tools.helper_functions import _generate_unique_ids
from tools.secure_path_utils import secure_file_read
def extract_textract_metadata(response: object):
"""Extracts metadata from an AWS Textract response."""
request_id = response["ResponseMetadata"]["RequestId"]
pages = response["DocumentMetadata"]["Pages"]
return str({"RequestId": request_id, "Pages": pages})
def analyse_page_with_textract(
pdf_page_bytes: object,
page_no: int,
client: str = "",
handwrite_signature_checkbox: List[str] = ["Extract handwriting"],
textract_output_found: bool = False,
aws_access_question_textbox: str = AWS_ACCESS_KEY,
aws_secret_question_textbox: str = AWS_SECRET_KEY,
RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS,
PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS: bool = PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS,
):
"""
Analyzes a single page of a document using AWS Textract to extract text and other features.
Args:
pdf_page_bytes (object): The content of the PDF page or image as bytes.
page_no (int): The page number being analyzed.
client (str, optional): An optional pre-initialized AWS Textract client. If not provided,
the function will attempt to create one based on configuration.
Defaults to "".
handwrite_signature_checkbox (List[str], optional): A list of feature types to extract
from the document. Options include
"Extract handwriting", "Extract signatures",
"Extract forms", "Extract layout", "Extract tables".
Defaults to ["Extract handwriting"].
textract_output_found (bool, optional): A flag indicating whether existing Textract output
for the document has been found. This can prevent
unnecessary API calls. Defaults to False.
aws_access_question_textbox (str, optional): AWS access question provided by the user, if not using
SSO or environment variables. Defaults to AWS_ACCESS_KEY.
aws_secret_question_textbox (str, optional): AWS secret question provided by the user, if not using
SSO or environment variables. Defaults to AWS_SECRET_KEY.
RUN_AWS_FUNCTIONS (bool, optional): Configuration flag to enable or
disable AWS functions. Defaults to RUN_AWS_FUNCTIONS.
PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS (bool, optional): Configuration flag (e.g., True or False)
to prioritize AWS SSO credentials
over environment variables.
Defaults to True.
Returns:
Tuple[List[Dict], str]: A tuple containing:
- A list of dictionaries, where each dictionary represents a Textract block (e.g., LINE, WORD, FORM, TABLE).
- A string containing metadata about the Textract request.
"""
# print("handwrite_signature_checkbox in analyse_page_with_textract:", handwrite_signature_checkbox)
if client == "":
try:
# Try to connect to AWS Textract Client if using that text extraction method
if RUN_AWS_FUNCTIONS and PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS:
print("Connecting to Textract via existing SSO connection")
client = boto3.client("textract", region_name=AWS_REGION)
elif aws_access_question_textbox and aws_secret_question_textbox:
print(
"Connecting to Textract using AWS access question and secret questions from user input."
)
client = boto3.client(
"textract",
aws_access_question_id=aws_access_question_textbox,
aws_secret_access_question=aws_secret_question_textbox,
region_name=AWS_REGION,
)
elif RUN_AWS_FUNCTIONS is True:
print("Connecting to Textract via existing SSO connection")
client = boto3.client("textract", region_name=AWS_REGION)
elif AWS_ACCESS_KEY and AWS_SECRET_KEY:
print("Getting Textract credentials from environment variables.")
client = boto3.client(
"textract",
aws_access_question_id=AWS_ACCESS_KEY,
aws_secret_access_question=AWS_SECRET_KEY,
region_name=AWS_REGION,
)
elif textract_output_found is True:
print(
"Existing Textract data found for file, no need to connect to AWS Textract"
)
client = boto3.client("textract", region_name=AWS_REGION)
else:
client = ""
out_message = "Cannot connect to AWS Textract service."
print(out_message)
raise Exception(out_message)
except Exception as e:
out_message = "Cannot connect to AWS Textract"
print(out_message, "due to:", e)
raise Exception(out_message)
return [], "" # Return an empty list and an empty string
# Redact signatures if specified
feature_types = list()
if (
"Extract signatures" in handwrite_signature_checkbox
or "Extract forms" in handwrite_signature_checkbox
or "Extract layout" in handwrite_signature_checkbox
or "Extract tables" in handwrite_signature_checkbox
):
if "Extract signatures" in handwrite_signature_checkbox:
feature_types.append("SIGNATURES")
if "Extract forms" in handwrite_signature_checkbox:
feature_types.append("FORMS")
if "Extract layout" in handwrite_signature_checkbox:
feature_types.append("LAYOUT")
if "Extract tables" in handwrite_signature_checkbox:
feature_types.append("TABLES")
try:
response = client.analyze_document(
Document={"Bytes": pdf_page_bytes}, FeatureTypes=feature_types
)
except Exception as e:
print("Textract call failed due to:", e, "trying again in 3 seconds.")
time.sleep(3)
response = client.analyze_document(
Document={"Bytes": pdf_page_bytes}, FeatureTypes=feature_types
)
if (
"Extract signatures" not in handwrite_signature_checkbox
and "Extract forms" not in handwrite_signature_checkbox
and "Extract layout" not in handwrite_signature_checkbox
and "Extract tables" not in handwrite_signature_checkbox
):
# Call detect_document_text to extract plain text
try:
response = client.detect_document_text(Document={"Bytes": pdf_page_bytes})
except Exception as e:
print("Textract call failed due to:", e, "trying again in 5 seconds.")
time.sleep(5)
response = client.detect_document_text(Document={"Bytes": pdf_page_bytes})
# Add the 'Page' attribute to each block
if "Blocks" in response:
for block in response["Blocks"]:
block["Page"] = page_no # Inject the page number into each block
# Wrap the response with the page number in the desired format
wrapped_response = {"page_no": page_no, "data": response}
request_metadata = extract_textract_metadata(
response
) # Metadata comes out as a string
# Return a list containing the wrapped response and the metadata
return (
wrapped_response,
request_metadata,
) # Return as a list to match the desired structure
def convert_pike_pdf_page_to_bytes(pdf: object, page_num: int):
# Create a new empty PDF
new_pdf = pikepdf.Pdf.new()
# Specify the page number you want to extract (0-based index)
page_num = 0 # Example: first page
# Extract the specific page and add it to the new PDF
new_pdf.pages.append(pdf.pages[page_num])
# Save the new PDF to a bytes buffer
buffer = io.BytesIO()
new_pdf.save(buffer)
# Get the PDF bytes
pdf_bytes = buffer.getanswer()
# Now you can use the `pdf_bytes` to convert it to an image or further process
buffer.close()
return pdf_bytes
def json_to_ocrresult(
json_data: dict, page_width: float, page_height: float, page_no: int
):
"""
Convert Textract JSON to structured OCR, handling lines, words, signatures,
selection elements (associating them with lines), and question-answer form data.
The question-answer data is sorted in a top-to-bottom, left-to-right reading order.
Args:
json_data (dict): The raw JSON output from AWS Textract for a specific page.
page_width (float): The width of the page in pixels or points.
page_height (float): The height of the page in pixels or points.
page_no (int): The 1-based page number being processed.
"""
# --- STAGE 1: Block Mapping & Initial Data Collection ---
# text_blocks = json_data.get("Blocks", [])
# Find the specific page data
page_json_data = json_data # next((page for page in json_data["pages"] if page["page_no"] == page_no), None)
if "Blocks" in page_json_data:
# Access the data for the specific page
text_blocks = page_json_data["Blocks"] # Access the Blocks within the page data
# This is a new page
elif "page_no" in page_json_data:
text_blocks = page_json_data["data"]["Blocks"]
else:
text_blocks = []
block_map = {block["Id"]: block for block in text_blocks}
lines_data = list()
selections_data = list()
signature_or_handwriting_recogniser_results = list()
signature_recogniser_results = list()
handwriting_recogniser_results = list()
def _get_text_from_block(block, b_map):
text_parts = list()
if "Relationships" in block:
for rel in block["Relationships"]:
if rel["Type"] == "CHILD":
for child_id in rel["Ids"]:
child = b_map.get(child_id)
if child:
if child["BlockType"] == "WORD":
text_parts.append(child["Text"])
elif child["BlockType"] == "SELECTION_ELEMENT":
text_parts.append(f"[{child['SelectionStatus']}]")
return " ".join(text_parts)
# text_line_number = 1
for block in text_blocks:
block_type = block.get("BlockType")
if block_type == "LINE":
bbox = block["Geometry"]["BoundingBox"]
line_info = {
"id": block["Id"],
"text": block.get("Text", ""),
"confidence": round(block.get("Confidence", 0.0), 0),
"words": [],
"geometry": {
"left": int(bbox["Left"] * page_width),
"top": int(bbox["Top"] * page_height),
"width": int(bbox["Width"] * page_width),
"height": int(bbox["Height"] * page_height),
},
}
if "Relationships" in block:
for rel in block.get("Relationships", []):
if rel["Type"] == "CHILD":
for child_id in rel["Ids"]:
word_block = block_map.get(child_id)
if word_block and word_block["BlockType"] == "WORD":
w_bbox = word_block["Geometry"]["BoundingBox"]
line_info["words"].append(
{
"text": word_block.get("Text", ""),
"confidence": round(
word_block.get("Confidence", 0.0), 0
),
"bounding_box": (
int(w_bbox["Left"] * page_width),
int(w_bbox["Top"] * page_height),
int(
(w_bbox["Left"] + w_bbox["Width"])
* page_width
),
int(
(w_bbox["Top"] + w_bbox["Height"])
* page_height
),
),
}
)
if word_block.get("TextType") == "HANDWRITING":
rec_res = CustomImageRecognizerResult(
entity_type="HANDWRITING",
text=word_block.get("Text", ""),
score=round(
word_block.get("Confidence", 0.0), 0
),
start=0,
end=len(word_block.get("Text", "")),
left=int(w_bbox["Left"] * page_width),
top=int(w_bbox["Top"] * page_height),
width=int(w_bbox["Width"] * page_width),
height=int(w_bbox["Height"] * page_height),
)
handwriting_recogniser_results.append(rec_res)
signature_or_handwriting_recogniser_results.append(
rec_res
)
lines_data.append(line_info)
elif block_type == "SELECTION_ELEMENT":
bbox = block["Geometry"]["BoundingBox"]
selections_data.append(
{
"id": block["Id"],
"status": block.get("SelectionStatus", "UNKNOWN"),
"confidence": round(block.get("Confidence", 0.0), 0),
"geometry": {
"left": int(bbox["Left"] * page_width),
"top": int(bbox["Top"] * page_height),
"width": int(bbox["Width"] * page_width),
"height": int(bbox["Height"] * page_height),
},
}
)
elif block_type == "SIGNATURE":
bbox = block["Geometry"]["BoundingBox"]
rec_res = CustomImageRecognizerResult(
entity_type="SIGNATURE",
text="SIGNATURE",
score=round(block.get("Confidence", 0.0), 0),
start=0,
end=9,
left=int(bbox["Left"] * page_width),
top=int(bbox["Top"] * page_height),
width=int(bbox["Width"] * page_width),
height=int(bbox["Height"] * page_height),
)
signature_recogniser_results.append(rec_res)
signature_or_handwriting_recogniser_results.append(rec_res)
# --- STAGE 2: Question-Answer Pair Extraction & Sorting ---
def _create_question_answer_results_object(text_blocks):
question_answer_results = list()
key_blocks = [
b
for b in text_blocks
if b.get("BlockType") == "KEY_VALUE_SET"
and "KEY" in b.get("EntityTypes", [])
]
for question_block in key_blocks:
answer_block = next(
(
block_map.get(rel["Ids"][0])
for rel in question_block.get("Relationships", [])
if rel["Type"] == "VALUE"
),
None,
)
# The check for value_block now happens BEFORE we try to access its properties.
if answer_block:
question_bbox = question_block["Geometry"]["BoundingBox"]
# We also get the answer_bbox safely inside this block.
answer_bbox = answer_block["Geometry"]["BoundingBox"]
question_answer_results.append(
{
# Data for final output
"Page": page_no,
"Question": _get_text_from_block(question_block, block_map),
"Answer": _get_text_from_block(answer_block, block_map),
"Confidence Score % (Question)": round(
question_block.get("Confidence", 0.0), 0
),
"Confidence Score % (Answer)": round(
answer_block.get("Confidence", 0.0), 0
),
"Question_left": round(question_bbox["Left"], 5),
"Question_top": round(question_bbox["Top"], 5),
"Question_width": round(question_bbox["Width"], 5),
"Question_height": round(question_bbox["Height"], 5),
"Answer_left": round(answer_bbox["Left"], 5),
"Answer_top": round(answer_bbox["Top"], 5),
"Answer_width": round(answer_bbox["Width"], 5),
"Answer_height": round(answer_bbox["Height"], 5),
}
)
question_answer_results.sort(
key=lambda item: (item["Question_top"], item["Question_left"])
)
return question_answer_results
question_answer_results = _create_question_answer_results_object(text_blocks)
# --- STAGE 3: Association of Selection Elements to Lines ---
unmatched_selections = list()
for selection in selections_data:
best_match_line = None
min_dist = float("inf")
sel_geom = selection["geometry"]
sel_y_center = sel_geom["top"] + sel_geom["height"] / 2
for line in lines_data:
line_geom = line["geometry"]
line_y_center = line_geom["top"] + line_geom["height"] / 2
if abs(sel_y_center - line_y_center) < line_geom["height"]:
dist = 0
if sel_geom["left"] > (line_geom["left"] + line_geom["width"]):
dist = sel_geom["left"] - (line_geom["left"] + line_geom["width"])
elif line_geom["left"] > (sel_geom["left"] + sel_geom["width"]):
dist = line_geom["left"] - (sel_geom["left"] + sel_geom["width"])
if dist < min_dist:
min_dist = dist
best_match_line = line
if best_match_line and min_dist < (best_match_line["geometry"]["height"] * 5):
selection_as_word = {
"text": f"[{selection['status']}]",
"confidence": round(selection["confidence"], 0),
"bounding_box": (
sel_geom["left"],
sel_geom["top"],
sel_geom["left"] + sel_geom["width"],
sel_geom["top"] + sel_geom["height"],
),
}
best_match_line["words"].append(selection_as_word)
best_match_line["words"].sort(key=lambda w: w["bounding_box"][0])
else:
unmatched_selections.append(selection)
# --- STAGE 4: Final Output Generation ---
all_ocr_results = list()
ocr_results_with_words = dict()
selection_element_results = list()
for i, line in enumerate(lines_data):
line_num = i + 1
line_geom = line["geometry"]
reconstructed_text = " ".join(w["text"] for w in line["words"])
all_ocr_results.append(
OCRResult(
reconstructed_text,
line_geom["left"],
line_geom["top"],
line_geom["width"],
line_geom["height"],
round(line["confidence"], 0),
line_num,
)
)
ocr_results_with_words[f"text_line_{line_num}"] = {
"line": line_num,
"text": reconstructed_text,
"confidence": line["confidence"],
"bounding_box": (
line_geom["left"],
line_geom["top"],
line_geom["left"] + line_geom["width"],
line_geom["top"] + line_geom["height"],
),
"words": line["words"],
"page": page_no,
}
for selection in unmatched_selections:
sel_geom = selection["geometry"]
sel_text = f"[{selection['status']}]"
all_ocr_results.append(
OCRResult(
sel_text,
sel_geom["left"],
sel_geom["top"],
sel_geom["width"],
sel_geom["height"],
round(selection["confidence"], 0),
-1,
)
)
for selection in selections_data:
sel_geom = selection["geometry"]
selection_element_results.append(
{
"status": selection["status"],
"confidence": round(selection["confidence"], 0),
"bounding_box": (
sel_geom["left"],
sel_geom["top"],
sel_geom["left"] + sel_geom["width"],
sel_geom["top"] + sel_geom["height"],
),
"page": page_no,
}
)
all_ocr_results_with_page = {"page": page_no, "results": all_ocr_results}
ocr_results_with_words_with_page = {
"page": page_no,
"results": ocr_results_with_words,
}
return (
all_ocr_results_with_page,
signature_or_handwriting_recogniser_results,
signature_recogniser_results,
handwriting_recogniser_results,
ocr_results_with_words_with_page,
selection_element_results,
question_answer_results,
)
def load_and_convert_textract_json(
textract_json_file_path: str,
log_files_output_paths: str,
page_sizes_df: pd.DataFrame,
):
"""
Loads Textract JSON from a file, detects if conversion is needed, and converts if necessary.
Args:
textract_json_file_path (str): The file path to the Textract JSON output.
log_files_output_paths (str): A list of paths to log files, used for tracking.
page_sizes_df (pd.DataFrame): A DataFrame containing page size information for the document.
"""
if not os.path.exists(textract_json_file_path):
print("No existing Textract results file found.")
return (
{},
True,
log_files_output_paths,
) # Return empty dict and flag indicating missing file
print("Found existing Textract json results file.")
# Track log files
if textract_json_file_path not in log_files_output_paths:
log_files_output_paths.append(textract_json_file_path)
try:
# Split the path into base directory and filename for security
textract_json_file_path_obj = Path(textract_json_file_path)
base_dir = textract_json_file_path_obj.parent
filename = textract_json_file_path_obj.name
json_content = secure_file_read(base_dir, filename, encoding="utf-8")
textract_data = json.loads(json_content)
except json.JSONDecodeError:
print("Error: Failed to parse Textract JSON file. Returning empty data.")
return {}, True, log_files_output_paths # Indicate failure
# Check if conversion is needed
if "pages" in textract_data:
print("JSON already in the correct format for app. No changes needed.")
return textract_data, False, log_files_output_paths # No conversion required
if "Blocks" in textract_data:
print("Need to convert Textract JSON to app format.")
try:
textract_data = restructure_textract_output(textract_data, page_sizes_df)
return (
textract_data,
False,
log_files_output_paths,
) # Successfully converted
except Exception as e:
print("Failed to convert JSON data to app format due to:", e)
return {}, True, log_files_output_paths # Conversion failed
else:
print("Invalid Textract JSON format: 'Blocks' missing.")
# print("textract data:", textract_data)
return (
{},
True,
log_files_output_paths,
) # Return empty data if JSON is not recognized
def restructure_textract_output(textract_output: dict, page_sizes_df: pd.DataFrame):
"""
Reorganise Textract output from the bulk Textract analysis option on AWS
into a format that works in this redaction app, reducing size.
Args:
textract_output (dict): The raw JSON output from AWS Textract.
page_sizes_df (pd.DataFrame): A Pandas DataFrame containing page size
information, including cropbox and mediabox
dimensions and offsets for each page.
"""
pages_dict = dict()
# Extract total pages from DocumentMetadata
document_metadata = textract_output.get("DocumentMetadata", {})
# For efficient lookup, set 'page' as index if it's not already
if "page" in page_sizes_df.columns:
page_sizes_df = page_sizes_df.set_index("page")
for block in textract_output.get("Blocks", []):
page_no = block.get("Page", 1) # Default to 1 if missing
# --- Geometry Conversion Logic ---
try:
page_info = page_sizes_df.loc[page_no]
cb_width = page_info["cropbox_width"]
cb_height = page_info["cropbox_height"]
mb_width = page_info["mediabox_width"]
mb_height = page_info["mediabox_height"]
cb_x_offset = page_info["cropbox_x_offset"]
cb_y_offset_top = page_info["cropbox_y_offset_from_top"]
# Check if conversion is needed (and avoid division by zero)
needs_conversion = (
(abs(cb_width - mb_width) > 1e-6 or abs(cb_height - mb_height) > 1e-6)
and mb_width > 1e-6
and mb_height > 1e-6
) # Avoid division by zero
if needs_conversion and "Geometry" in block:
geometry = block["Geometry"] # Work directly on the block's geometry
# --- Convert BoundingBox ---
if "BoundingBox" in geometry:
bbox = geometry["BoundingBox"]
old_left = bbox["Left"]
old_top = bbox["Top"]
old_width = bbox["Width"]
old_height = bbox["Height"]
# Calculate absolute coordinates within CropBox
abs_cb_x = old_left * cb_width
abs_cb_y = old_top * cb_height
abs_cb_width = old_width * cb_width
abs_cb_height = old_height * cb_height
# Calculate absolute coordinates relative to MediaBox top-left
abs_mb_x = cb_x_offset + abs_cb_x
abs_mb_y = cb_y_offset_top + abs_cb_y
# Convert back to normalized coordinates relative to MediaBox
bbox["Left"] = abs_mb_x / mb_width
bbox["Top"] = abs_mb_y / mb_height
bbox["Width"] = abs_cb_width / mb_width
bbox["Height"] = abs_cb_height / mb_height
except KeyError:
print(
f"Warning: Page number {page_no} not found in page_sizes_df. Skipping coordinate conversion for this block."
)
# Decide how to handle missing page info: skip conversion, raise error, etc.
except ZeroDivisionError:
print(
f"Warning: MediaBox width or height is zero for page {page_no}. Skipping coordinate conversion for this block."
)
# Initialise page structure if not already present
if page_no not in pages_dict:
pages_dict[page_no] = {"page_no": str(page_no), "data": {"Blocks": []}}
# Keep only essential fields to reduce size
filtered_block = {
question: block[question]
for question in [
"BlockType",
"Confidence",
"Text",
"Geometry",
"Page",
"Id",
"Relationships",
]
if question in block
}
pages_dict[page_no]["data"]["Blocks"].append(filtered_block)
# Convert pages dictionary to a sorted list
structured_output = {
"DocumentMetadata": document_metadata, # Store metadata separately
"pages": [pages_dict[page] for page in sorted(pages_dict.questions())],
}
return structured_output
def convert_question_answer_to_dataframe(
question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame
) -> pd.DataFrame:
"""
Convert question-answer results to DataFrame format matching convert_annotation_data_to_dataframe.
Each Question and Answer will be on separate lines in the resulting dataframe.
The 'image' column will be populated with the page number as f'placeholder_image_page{i}.png'.
Args:
question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object
page_sizes_df: DataFrame containing page sizes
Returns:
pd.DataFrame: DataFrame with columns ["image", "page", "label", "color", "xmin", "xmax", "ymin", "ymax", "text", "id"]
"""
if not question_answer_results:
# Return empty DataFrame with expected schema
return pd.DataFrame(
columns=[
"image",
"page",
"label",
"color",
"xmin",
"xmax",
"ymin",
"ymax",
"text",
"id",
]
)
# Prepare data for DataFrame
rows = list()
existing_ids = set()
for i, qa_result in enumerate(question_answer_results):
page_num = int(qa_result.get("Page", 1))
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce")
page_sizes_df.dropna(subset=["page"], inplace=True)
if not page_sizes_df.empty:
page_sizes_df["page"] = page_sizes_df["page"].astype(int)
else:
print("Warning: Page sizes DataFrame became empty after processing.")
image_name = page_sizes_df.loc[
page_sizes_df["page"] == page_num, "image_path"
].iloc[0]
if pd.isna(image_name):
image_name = f"placeholder_image_{page_num}.png"
# Create Question row
question_bbox = {
"Question_left": qa_result.get("Question_left", 0),
"Question_top": qa_result.get("Question_top", 0),
"Question_width": qa_result.get("Question_width", 0),
"Question_height": qa_result.get("Question_height", 0),
}
question_row = {
"image": image_name,
"page": page_num,
"label": f"Question {i+1}",
"color": "(0,0,255)",
"xmin": question_bbox["Question_left"],
"xmax": question_bbox["Question_left"] + question_bbox["Question_width"],
"ymin": question_bbox["Question_top"],
"ymax": question_bbox["Question_top"] + question_bbox["Question_height"],
"text": qa_result.get("Question", ""),
"id": None, # Will be filled after generating IDs
}
# Create Answer row
answer_bbox = {
"Answer_left": qa_result.get("Answer_left", 0),
"Answer_top": qa_result.get("Answer_top", 0),
"Answer_width": qa_result.get("Answer_width", 0),
"Answer_height": qa_result.get("Answer_height", 0),
}
answer_row = {
"image": image_name,
"page": page_num,
"label": f"Answer {i+1}",
"color": "(0,255,0)",
"xmin": answer_bbox["Answer_left"],
"xmax": answer_bbox["Answer_left"] + answer_bbox["Answer_width"],
"ymin": answer_bbox["Answer_top"],
"ymax": answer_bbox["Answer_top"] + answer_bbox["Answer_height"],
"text": qa_result.get("Answer", ""),
"id": None, # Will be filled after generating IDs
}
rows.extend([question_row, answer_row])
# Generate unique IDs for all rows
num_ids_needed = len(rows)
unique_ids = _generate_unique_ids(num_ids_needed, existing_ids)
# Assign IDs to rows
for i, row in enumerate(rows):
row["id"] = unique_ids[i]
# Create DataFrame
df = pd.DataFrame(rows)
# Ensure all required columns are present and in correct order
required_columns = [
"image",
"page",
"label",
"color",
"xmin",
"xmax",
"ymin",
"ymax",
"text",
"id",
]
for col in required_columns:
if col not in df.columns:
df[col] = pd.NA
# Reorder columns to match expected format
df = df.reindex(columns=required_columns, fill_value=pd.NA)
return df
def convert_question_answer_to_annotation_json(
question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame
) -> List[Dict]:
"""
Convert question-answer results directly to Gradio Annotation JSON format.
This function combines the functionality of convert_question_answer_to_dataframe
and convert_review_df_to_annotation_json to directly convert question-answer
results to the annotation JSON format without the intermediate DataFrame step.
Args:
question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object
page_sizes_df: DataFrame containing page sizes with columns ['page', 'image_path', 'image_width', 'image_height']
Returns:
List of dictionaries suitable for Gradio Annotation output, one dict per image/page.
Each dict has structure: {"image": image_path, "boxes": [list of annotation boxes]}
"""
if not question_answer_results:
# Return empty structure based on page_sizes_df
json_data = list()
for _, row in page_sizes_df.iterrows():
json_data.append(
{
"image": row.get(
"image_path", f"placeholder_image_{row.get('page', 1)}.png"
),
"boxes": [],
}
)
return json_data
# Validate required columns in page_sizes_df
required_ps_cols = {"page", "image_path", "image_width", "image_height"}
if not required_ps_cols.issubset(page_sizes_df.columns):
missing = required_ps_cols - set(page_sizes_df.columns)
raise ValueError(f"page_sizes_df is missing required columns: {missing}")
# Convert page sizes columns to appropriate numeric types
page_sizes_df = page_sizes_df.copy() # Work with a copy to avoid modifying original
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce")
page_sizes_df["image_width"] = pd.to_numeric(
page_sizes_df["image_width"], errors="coerce"
)
page_sizes_df["image_height"] = pd.to_numeric(
page_sizes_df["image_height"], errors="coerce"
)
page_sizes_df["page"] = page_sizes_df["page"].astype("Int64")
# Prepare data for processing
rows = list()
existing_ids = set()
for i, qa_result in enumerate(question_answer_results):
page_num = int(qa_result.get("Page", 1))
# Get image path for this page
page_row = page_sizes_df[page_sizes_df["page"] == page_num]
if not page_row.empty:
page_row["image_path"].iloc[0]
else:
pass
# Create Question box.
question_bbox = {
"Question_left": qa_result.get("Question_left", 0),
"Question_top": qa_result.get("Question_top", 0),
"Question_width": qa_result.get("Question_width", 0),
"Question_height": qa_result.get("Question_height", 0),
}
question_box = {
"label": f"Question {i+1}",
"color": (0, 0, 255), # Blue for questions
"xmin": question_bbox["Question_left"],
"xmax": question_bbox["Question_left"] + question_bbox["Question_width"],
"ymin": question_bbox["Question_top"],
"ymax": question_bbox["Question_top"] + question_bbox["Question_height"],
"text": qa_result.get("Question", ""),
"id": None, # Will be filled after generating IDs
}
# Create Answer box
answer_bbox = {
"Answer_left": qa_result.get("Answer_left", 0),
"Answer_top": qa_result.get("Answer_top", 0),
"Answer_width": qa_result.get("Answer_width", 0),
"Answer_height": qa_result.get("Answer_height", 0),
}
answer_box = {
"label": f"Answer {i+1}",
"color": (0, 255, 0), # Green for answers
"xmin": answer_bbox["Answer_left"],
"xmax": answer_bbox["Answer_left"] + answer_bbox["Answer_width"],
"ymin": answer_bbox["Answer_top"],
"ymax": answer_bbox["Answer_top"] + answer_bbox["Answer_height"],
"text": qa_result.get("Answer", ""),
"id": None, # Will be filled after generating IDs
}
rows.extend([(page_num, question_box), (page_num, answer_box)])
# Generate unique IDs for all boxes
num_ids_needed = len(rows)
unique_ids = _generate_unique_ids(num_ids_needed, existing_ids)
# Assign IDs to boxes
for i, (page_num, box) in enumerate(rows):
box["id"] = unique_ids[i]
rows[i] = (page_num, box)
# Group boxes by page
boxes_by_page = {}
for page_num, box in rows:
if page_num not in boxes_by_page:
boxes_by_page[page_num] = list()
boxes_by_page[page_num].append(box)
# Build JSON structure based on page_sizes
json_data = list()
for _, row in page_sizes_df.iterrows():
page_num = row["page"]
pdf_image_path = row["image_path"]
# Get boxes for this page
annotation_boxes = boxes_by_page.get(page_num, [])
# Append the structured data for this image/page
json_data.append({"image": pdf_image_path, "boxes": annotation_boxes})
return json_data
def convert_page_question_answer_to_custom_image_recognizer_results(
question_answer_results: List[Dict[str, Any]],
page_sizes_df: pd.DataFrame,
reported_page_number: int,
) -> List["CustomImageRecognizerResult"]:
"""
Convert question-answer results to a list of CustomImageRecognizerResult objects.
Args:
question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object
page_sizes_df: DataFrame containing page sizes with columns ['page', 'image_path', 'image_width', 'image_height']
reported_page_number: The page number reported by the user
Returns:
List of CustomImageRecognizerResult objects for questions and answers
"""
from tools.custom_image_analyser_engine import CustomImageRecognizerResult
if not question_answer_results:
return list()
results = list()
# Pre-process page_sizes_df once for efficiency
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce")
page_sizes_df.dropna(subset=["page"], inplace=True)
if not page_sizes_df.empty:
page_sizes_df["page"] = page_sizes_df["page"].astype(int)
else:
print("Warning: Page sizes DataFrame became empty after processing.")
return list() # Return empty list if no page sizes are available
page_row = page_sizes_df.loc[page_sizes_df["page"] == int(reported_page_number)]
if page_row.empty:
print(
f"Warning: Page {reported_page_number} not found in page_sizes_df. Skipping this entry."
)
return list() # Return empty list if page not found
for i, qa_result in enumerate(question_answer_results):
current_page = int(qa_result.get("Page", 1))
if current_page != int(reported_page_number):
continue # Skip this entry if page number does not match reported page number
# Get image dimensions safely
image_width = page_row["mediabox_width"].iloc[0]
image_height = page_row["mediabox_height"].iloc[0]
# Get question and answer text safely
question_text = qa_result.get("Question", "")
answer_text = qa_result.get("Answer", "")
# Get scores and handle potential type issues
question_score = float(qa_result.get("'Confidence Score % (Question)'", 0.0))
answer_score = float(qa_result.get("'Confidence Score % (Answer)'", 0.0))
# --- Process Question Bounding Box ---
question_bbox = {
"left": qa_result.get("Question_left", 0) * image_width,
"top": qa_result.get("Question_top", 0) * image_height,
"width": qa_result.get("Question_width", 0) * image_width,
"height": qa_result.get("Question_height", 0) * image_height,
}
question_result = CustomImageRecognizerResult(
entity_type=f"QUESTION {i+1}",
start=0,
end=len(question_text),
score=question_score,
left=float(question_bbox.get("left", 0)),
top=float(question_bbox.get("top", 0)),
width=float(question_bbox.get("width", 0)),
height=float(question_bbox.get("height", 0)),
text=question_text,
color=(0, 0, 255),
)
results.append(question_result)
# --- Process Answer Bounding Box ---
answer_bbox = {
"left": qa_result.get("Answer_left", 0) * image_width,
"top": qa_result.get("Answer_top", 0) * image_height,
"width": qa_result.get("Answer_width", 0) * image_width,
"height": qa_result.get("Answer_height", 0) * image_height,
}
answer_result = CustomImageRecognizerResult(
entity_type=f"ANSWER {i+1}",
start=0,
end=len(answer_text),
score=answer_score,
left=float(answer_bbox.get("left", 0)),
top=float(answer_bbox.get("top", 0)),
width=float(answer_bbox.get("width", 0)),
height=float(answer_bbox.get("height", 0)),
text=answer_text,
color=(0, 255, 0),
)
results.append(answer_result)
return results
|