Spaces:
Sleeping
Sleeping
File size: 7,098 Bytes
e9c4101 6ea0852 e9c4101 6ea0852 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
import pytesseract
from PIL import Image
import numpy as np
from presidio_analyzer import AnalyzerEngine, RecognizerResult
from typing import List, Dict, Optional, Union, Tuple
from dataclasses import dataclass
@dataclass
class OCRResult:
text: str
left: int
top: int
width: int
height: int
@dataclass
class CustomImageRecognizerResult:
entity_type: str
start: int
end: int
score: float
left: int
top: int
width: int
height: int
text: str
class CustomImageAnalyzerEngine:
def __init__(
self,
analyzer_engine: Optional[AnalyzerEngine] = None,
tesseract_config: Optional[str] = None
):
if not analyzer_engine:
analyzer_engine = AnalyzerEngine()
self.analyzer_engine = analyzer_engine
self.tesseract_config = tesseract_config or '--oem 3 --psm 11'
def perform_ocr(self, image: Union[str, Image.Image, np.ndarray]) -> List[OCRResult]:
# Ensure image is a PIL Image
if isinstance(image, str):
image = Image.open(image)
elif isinstance(image, np.ndarray):
image = Image.fromarray(image)
ocr_data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT, config=self.tesseract_config)
# Filter out empty strings and low confidence results
valid_indices = [i for i, text in enumerate(ocr_data['text']) if text.strip() and int(ocr_data['conf'][i]) > 0]
return [
OCRResult(
text=ocr_data['text'][i],
left=ocr_data['left'][i],
top=ocr_data['top'][i],
width=ocr_data['width'][i],
height=ocr_data['height'][i]
)
for i in valid_indices
]
def analyze_text(
self,
ocr_results: List[OCRResult],
**text_analyzer_kwargs
) -> List[CustomImageRecognizerResult]:
# Define English as default language, if not specified
if "language" not in text_analyzer_kwargs:
text_analyzer_kwargs["language"] = "en"
allow_list = text_analyzer_kwargs.get('allow_list', [])
combined_results = []
for ocr_result in ocr_results:
# Analyze each OCR result (line) individually
analyzer_result = self.analyzer_engine.analyze(
text=ocr_result.text, **text_analyzer_kwargs
)
for result in analyzer_result:
# Extract the relevant portion of text based on start and end
relevant_text = ocr_result.text[result.start:result.end]
# Create a new OCRResult with the relevant text and adjusted position
relevant_ocr_result = OCRResult(
text=relevant_text,
left=ocr_result.left + self.estimate_x_offset(ocr_result.text, result.start),
top=ocr_result.top,
width=self.estimate_width(ocr_result, result.start, result.end),
height=ocr_result.height
)
# Map the analyzer results to bounding boxes for this line
line_results = self.map_analyzer_results_to_bounding_boxes(
[result], [relevant_ocr_result], relevant_text, allow_list
)
combined_results.extend(line_results)
return combined_results
@staticmethod
def map_analyzer_results_to_bounding_boxes(
text_analyzer_results: List[RecognizerResult],
ocr_results: List[OCRResult],
full_text: str,
allow_list: List[str],
) -> List[CustomImageRecognizerResult]:
pii_bboxes = []
text_position = 0
for ocr_result in ocr_results:
word_end = text_position + len(ocr_result.text)
for result in text_analyzer_results:
if (max(text_position, result.start) < min(word_end, result.end)) and (ocr_result.text not in allow_list):
pii_bboxes.append(
CustomImageRecognizerResult(
entity_type=result.entity_type,
start=result.start,
end=result.end,
score=result.score,
left=ocr_result.left,
top=ocr_result.top,
width=ocr_result.width,
height=ocr_result.height,
text=ocr_result.text
)
)
break
text_position = word_end + 1 # +1 for the space between words
return pii_bboxes
@staticmethod
def estimate_x_offset(full_text: str, start: int) -> int:
# Estimate the x-offset based on character position
# This is a simple estimation and might need refinement for variable-width fonts
return int(start / len(full_text) * len(full_text))
@staticmethod
def estimate_width(ocr_result: OCRResult, start: int, end: int) -> int:
# Estimate the width of the relevant text portion
full_width = ocr_result.width
full_length = len(ocr_result.text)
return int((end - start) / full_length * full_width)
# Function to combine OCR results into line-level results
def combine_ocr_results(ocr_results, x_threshold = 20, y_threshold = 10):
# Sort OCR results by 'top' to ensure line order
ocr_results = sorted(ocr_results, key=lambda x: (x.top, x.left))
combined_results = []
current_line = []
current_bbox = None
for result in ocr_results:
if not current_line:
# Start a new line
current_line.append(result)
current_bbox = result
else:
# Check if the result is on the same line (y-axis) and close horizontally (x-axis)
last_result = current_line[-1]
if abs(result.top - last_result.top) <= y_threshold and \
(result.left - (last_result.left + last_result.width)) <= x_threshold:
# Update the bounding box to include the new word
new_right = max(current_bbox.left + current_bbox.width, result.left + result.width)
current_bbox = OCRResult(
text=f"{current_bbox.text} {result.text}",
left=current_bbox.left,
top=current_bbox.top,
width=new_right - current_bbox.left,
height=max(current_bbox.height, result.height)
)
current_line.append(result)
else:
# Commit the current line and start a new one
combined_results.append(current_bbox)
current_line = [result]
current_bbox = result
# Append the last line
if current_bbox:
combined_results.append(current_bbox)
return combined_results |