import os from typing import Dict, List, Tuple import cv2 import numpy as np from tools.config import OUTPUT_FOLDER, SAVE_WORD_SEGMENTER_OUTPUT_IMAGES INITIAL_KERNEL_WIDTH_FACTOR = 0.05 # Default 0.05 INITIAL_VALLEY_THRESHOLD_FACTOR = 0.05 # Default 0.05 MAIN_VALLEY_THRESHOLD_FACTOR = 0.15 # Default 0.15 C_VALUE = 4 # Default 4 BLOCK_SIZE_FACTOR = 1.5 # Default 1.5 MIN_SPACE_FACTOR = 0.3 # Default 0.4 MATCH_TOLERANCE = 0 # Default 0 MIN_AREA_THRESHOLD = 6 # Default 6 DEFAULT_TRIM_PERCENTAGE = 0.2 # Default 0.2 class AdaptiveSegmenter: """ Line to word segmentation pipeline. It features: 1. Adaptive Thresholding. 2. Targeted Noise Removal using Connected Component Analysis to isolate the main text body. 3. The robust two-stage adaptive search (Valley -> Kernel). 4. CCA for final pixel-perfect refinement. """ def __init__(self, output_folder: str = OUTPUT_FOLDER): self.output_folder = output_folder def _correct_orientation( self, gray_image: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Detects and corrects 90-degree orientation issues (e.g., vertical text). This runs *before* the fine-grained _deskew_image function. Returns the oriented image and the transformation matrix. """ h, w = gray_image.shape center = (w // 2, h // 2) # --- Binarization (copied from _deskew_image) --- block_size = 21 if h < block_size: block_size = h if h % 2 != 0 else h - 1 if block_size > 3: binary = cv2.adaptiveThreshold( gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, block_size, 4, ) else: _, binary = cv2.threshold( gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU ) # Small noise removal opening_kernel = np.ones((2, 2), np.uint8) binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel) # --- Extract text pixel coordinates --- coords = np.column_stack(np.where(binary > 0)) if len(coords) < 50: # print( # "Warning: Not enough text pixels for orientation. Assuming horizontal." # ) M_orient = cv2.getRotationMatrix2D(center, 0, 1.0) return gray_image, M_orient # --- Robust bounding-box check (no minAreaRect quirks) --- ymin, xmin = coords.min(axis=0) ymax, xmax = coords.max(axis=0) box_height = ymax - ymin box_width = xmax - xmin orientation_angle = 0.0 if box_height > box_width: # print( # f"Detected vertical orientation (W:{box_width} < H:{box_height}). Applying 90-degree correction." # ) orientation_angle = 90.0 else: # print( # f"Detected horizontal orientation (W:{box_width} >= H:{box_height}). No orientation correction." # ) M_orient = cv2.getRotationMatrix2D(center, 0, 1.0) return gray_image, M_orient # --- Apply 90-degree correction --- M_orient = cv2.getRotationMatrix2D(center, orientation_angle, 1.0) # Calculate new image bounds (they will be swapped) new_w, new_h = h, w # Adjust translation part of M_orient to center the new image M_orient[0, 2] += (new_w - w) / 2 M_orient[1, 2] += (new_h - h) / 2 oriented_gray = cv2.warpAffine( gray_image, M_orient, (new_w, new_h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE, ) return oriented_gray, M_orient def _deskew_image(self, gray_image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Detects skew using a robust method that normalizes the output of cv2.minAreaRect to correctly handle its angle/dimension ambiguity. """ h, w = gray_image.shape # Use a single, reliable binarization method for detection. block_size = 21 if h < block_size: block_size = h if h % 2 != 0 else h - 1 if block_size > 3: binary = cv2.adaptiveThreshold( gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, block_size, 4, ) else: _, binary = cv2.threshold( gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU ) opening_kernel = np.ones((2, 2), np.uint8) binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel) coords = np.column_stack(np.where(binary > 0)) if len(coords) < 50: # print("Warning: Not enough text pixels to detect skew. Skipping.") M = cv2.getRotationMatrix2D((w // 2, h // 2), 0, 1.0) return gray_image, M rect = cv2.minAreaRect(coords[:, ::-1]) rect_width, rect_height = rect[1] angle = rect[2] # If the rectangle is described as vertical, normalize it if rect_width < rect_height: # Swap dimensions rect_width, rect_height = rect_height, rect_width # Correct the angle angle += 90 # The angle from minAreaRect is in [-90, 0). After normalization, # our angle for a horizontal line will be close to 0 or -90/90. # We need one last correction for angles near +/- 90. if angle > 45: angle -= 90 elif angle < -45: angle += 90 correction_angle = angle # print(f"Normalized shape (W:{rect_width:.0f}, H:{rect_height:.0f}). Detected angle: {correction_angle:.2f} degrees.") # Final sanity checks on the angle MIN_SKEW_THRESHOLD = 0.5 # Ignore angles smaller than this (likely noise) MAX_SKEW_THRESHOLD = ( 15.0 # Angles larger than this are extreme and likely errors ) if abs(correction_angle) < MIN_SKEW_THRESHOLD: # print(f"Detected angle {correction_angle:.2f}° is too small (likely noise). Skipping deskew.") correction_angle = 0.0 elif abs(correction_angle) > MAX_SKEW_THRESHOLD: # print(f"Warning: Corrected angle {correction_angle:.2f}° is extreme. Skipping deskew.") correction_angle = 0.0 # Create rotation matrix and apply the final correction center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, correction_angle, 1.0) deskewed_gray = cv2.warpAffine( gray_image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE, ) return deskewed_gray, M def _get_boxes_from_profile( self, binary_image: np.ndarray, stable_avg_char_width: float, min_space_factor: float, valley_threshold_factor: float, ) -> List: # This helper function remains IDENTICAL. No changes needed. # ... (code from the previous version) img_h, img_w = binary_image.shape vertical_projection = np.sum(binary_image, axis=0) peaks = vertical_projection[vertical_projection > 0] if len(peaks) == 0: return [] avg_peak_height = np.mean(peaks) valley_threshold = int(avg_peak_height * valley_threshold_factor) min_space_width = int(stable_avg_char_width * min_space_factor) patched_projection = vertical_projection.copy() in_gap = False gap_start = 0 for x, col_sum in enumerate(patched_projection): if col_sum <= valley_threshold and not in_gap: in_gap = True gap_start = x elif col_sum > valley_threshold and in_gap: in_gap = False if (x - gap_start) < min_space_width: patched_projection[gap_start:x] = int(avg_peak_height) unlabeled_boxes = [] in_word = False start_x = 0 for x, col_sum in enumerate(patched_projection): if col_sum > valley_threshold and not in_word: start_x = x in_word = True elif col_sum <= valley_threshold and in_word: unlabeled_boxes.append((start_x, 0, x - start_x, img_h)) in_word = False if in_word: unlabeled_boxes.append((start_x, 0, img_w - start_x, img_h)) return unlabeled_boxes def segment( self, line_data: Dict[str, List], line_image: np.ndarray, min_space_factor=MIN_SPACE_FACTOR, match_tolerance=MATCH_TOLERANCE, image_name: str = None, ) -> Tuple[Dict[str, List], bool]: if line_image is None: # print( # f"Error: line_image is None in segment function (image_name: {image_name})" # ) return ({}, False) # Validate line_image is a valid numpy array if not isinstance(line_image, np.ndarray): # print( # f"Error: line_image is not a numpy array (type: {type(line_image)}, image_name: {image_name})" # ) return ({}, False) # Validate line_image has valid shape and size if line_image.size == 0: # print( # f"Error: line_image is empty (shape: {line_image.shape}, image_name: {image_name})" # ) return ({}, False) if len(line_image.shape) < 2: # print( # f"Error: line_image has invalid shape {line_image.shape} (image_name: {image_name})" # ) return ({}, False) # Early return if 1 or fewer words if line_data and line_data.get("text") and len(line_data["text"]) > 0: line_text = line_data["text"][0] words = line_text.split() if len(words) <= 1: return ({}, False) else: # print( # f"Error: line_data is empty or does not contain text (image_name: {image_name})" # ) return ({}, False) # print(f"line_text: {line_text}") shortened_line_text = line_text.replace(" ", "_")[:10] if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: os.makedirs(self.output_folder, exist_ok=True) output_path = f"{self.output_folder}/word_segmentation/{image_name}_{shortened_line_text}_original.png" os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) cv2.imwrite(output_path, line_image) # print(f"\nSaved original image to '{output_path}'") gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY) # --- STEP 1: Correct major orientation (e.g., 90 degrees) --- # M_orient transforms from ORIGINAL -> ORIENTED oriented_gray, M_orient = self._correct_orientation(gray) # --- STEP 2: Correct minor skew (e.g., -2 degrees) --- # M_skew transforms from ORIENTED -> DESKEWED deskewed_gray, M_skew = self._deskew_image(oriented_gray) # --- STEP 3: Combine Transformations --- # We need a single matrix 'M' that transforms from ORIGINAL -> DESKEWED # We do this by converting to 3x3 matrices and multiplying: M = M_skew * M_orient # Convert to 3x3 M_orient_3x3 = np.vstack([M_orient, [0, 0, 1]]) M_skew_3x3 = np.vstack([M_skew, [0, 0, 1]]) # Combine transformations M_total_3x3 = M_skew_3x3 @ M_orient_3x3 # Get the final 2x3 transformation matrix M = M_total_3x3[0:2, :] # --- Apply TOTAL transformation to the original color image --- # The final dimensions are those of the *last* image in the chain: deskewed_gray h, w = deskewed_gray.shape deskewed_line_image = cv2.warpAffine( line_image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE, ) # Validate deskewed_line_image before saving if ( deskewed_line_image is None or not isinstance(deskewed_line_image, np.ndarray) or deskewed_line_image.size == 0 ): # print( # f"Error: deskewed_line_image is None or empty (image_name: {image_name})" # ) return ({}, False) # Save deskewed image (optional, only if image_name is provided) if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: os.makedirs(self.output_folder, exist_ok=True) output_path = f"{self.output_folder}/word_segmentation/{image_name}_{shortened_line_text}_deskewed.png" os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) cv2.imwrite(output_path, deskewed_line_image) # print(f"\nSaved deskewed image to '{output_path}'") # --- Step 1: Binarization and Stable Width Calculation (Unchanged) --- approx_char_count = len(line_data["text"][0].replace(" ", "")) if approx_char_count == 0: return {}, False img_h, img_w = deskewed_gray.shape avg_char_width_approx = img_w / approx_char_count block_size = int(avg_char_width_approx * BLOCK_SIZE_FACTOR) if block_size % 2 == 0: block_size += 1 # Validate deskewed_gray and ensure block_size is valid if deskewed_gray is None or not isinstance(deskewed_gray, np.ndarray): # print( # f"Error: deskewed_gray is None or not a numpy array (image_name: {image_name})" # ) return ({}, False) if len(deskewed_gray.shape) != 2: # print( # f"Error: deskewed_gray must be a 2D grayscale image (shape: {deskewed_gray.shape}, image_name: {image_name})" # ) return ({}, False) if block_size < 3: # print( # f"Warning: block_size ({block_size}) is too small for adaptiveThreshold. " # f"Using minimum value of 3. (image_name: {image_name}, " # f"img_w: {img_w}, approx_char_count: {approx_char_count}, " # f"avg_char_width_approx: {avg_char_width_approx:.2f})" # ) block_size = 3 binary = cv2.adaptiveThreshold( deskewed_gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, block_size, C_VALUE, ) # Validate binary image before saving if binary is None or not isinstance(binary, np.ndarray) or binary.size == 0: # print( # f"Error: binary image is None or empty (image_name: {image_name})" # ) return ({}, False) # Save cropped image (optional, only if image_name is provided) if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: os.makedirs(self.output_folder, exist_ok=True) output_path = f"{self.output_folder}/word_segmentation/{image_name}_{shortened_line_text}_binary.png" os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) cv2.imwrite(output_path, binary) # print(f"\nSaved cropped image to '{output_path}'") # --- NEW STEP 1.5: Post-processing with Morphology --- # This "closes" gaps in letters and joins nearby components. # Create a small kernel (e.g., 3x3 rectangle) # You may need to tune this size. kernel_size = 3 kernel = np.ones((kernel_size, kernel_size), np.uint8) # Use MORPH_CLOSE to close small holes and gaps within the letters # It's a dilation followed by an erosion closed_binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=1) # Validate closed_binary image before saving if ( closed_binary is None or not isinstance(closed_binary, np.ndarray) or closed_binary.size == 0 ): # print( # f"Error: closed_binary image is None or empty (image_name: {image_name})" # ) return ({}, False) # (Optional) You could also use a DILATE to make letters thicker # dilated_binary = cv2.dilate(closed_binary, kernel, iterations=1) # Use 'closed_binary' (or 'dilated_binary') from now on. if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: os.makedirs(self.output_folder, exist_ok=True) output_path = f"{self.output_folder}/word_segmentation/{image_name}_{shortened_line_text}_closed_binary.png" os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) cv2.imwrite(output_path, closed_binary) # print(f"\nSaved dilated binary image to '{output_path}'") # --- Step 2: Intelligent Noise Removal (Improved) --- num_labels, labels, stats, _ = cv2.connectedComponentsWithStats( closed_binary, 8, cv2.CV_32S ) clean_binary = np.zeros_like(binary) if num_labels > 1: areas = stats[ 1:, cv2.CC_STAT_AREA ] # Get all component areas, skip background (label 0) # Handle edge case of empty 'areas' array if len(areas) == 0: clean_binary = binary # print("Warning: No components found after binarization.") areas = np.array([0]) # Add a dummy value to prevent crashes # --- 1. Calculate the DEFAULT CONSERVATIVE threshold --- # This is your existing logic, which works well for *clean* lines. p1 = np.percentile(areas, 1) img_h, img_w = binary.shape estimated_char_height = img_h * 0.7 estimated_min_letter_area = max( 2, int(estimated_char_height * 0.2 * estimated_char_height * 0.15) ) # This is the "safe" threshold that protects small letters on clean lines. area_threshold = max(MIN_AREA_THRESHOLD, min(p1, estimated_min_letter_area)) # print(f"Noise Removal: Initial conservative threshold: {area_threshold:.1f} (p1={p1:.1f}, est_min={estimated_min_letter_area:.1f})") # --- 2. Find a "Noise-to-Text" Gap (to enable AGGRESSIVE mode) --- sorted_areas = np.sort(areas) has_clear_gap = False aggressive_threshold = -1 area_before_gap = -1 if len(sorted_areas) > 10: # Need enough components to analyze area_diffs = np.diff(sorted_areas) if len(area_diffs) > 0: # Use your "gap" logic: find a jump > 3x the 95th percentile jump jump_threshold = np.percentile(area_diffs, 95) significant_jump_thresh = max( 10, jump_threshold * 3 ) # Add a 10px minimum jump jump_indices = np.where(area_diffs > significant_jump_thresh)[0] if len(jump_indices) > 0: has_clear_gap = True # This is the index of the *last noise component* gap_idx = jump_indices[0] area_before_gap = sorted_areas[gap_idx] # The aggressive threshold is 1 pixel *larger* than the biggest noise component aggressive_threshold = area_before_gap + 1 # --- 3. ADAPTIVE DECISION: Override if conservative threshold is clearly noise --- if has_clear_gap: # print( # f"Noise Removal: Gap detected. Noise cluster ends at {area_before_gap}px. Aggressive threshold = {aggressive_threshold:.1f}" # ) # Only use a more aggressive threshold IF our "safe" threshold is clearly # stuck *inside* the noise cluster. # e.g., Safe threshold = 1, but noise goes up to 10. # (We use 0.8 as a buffer, so if thresh=7 and gap=8, we don't switch) if area_threshold < (area_before_gap * 0.8): # print( # f"Noise Removal: Conservative threshold ({area_threshold:.1f}) is deep in noise cluster (ends at {area_before_gap}px)." # ) # Instead of using large percentage increases, use a very small absolute increment # This preserves legitimate small letters/words that might be just above the noise # Use a minimal fixed offset (2-3 pixels) above the noise cluster end # This ensures we only remove noise, not legitimate small components small_increment = ( 2 # Fixed small increment - just 2 pixels above noise ) moderate_threshold = area_before_gap + small_increment # Also check what the actual first component after the gap is # This gives us insight into where real text starts # If the gap is very large (e.g., noise ends at 229, text starts at 500), # we want to use a threshold closer to the noise end, not the text start if gap_idx + 1 < len(sorted_areas): first_after_gap = sorted_areas[gap_idx + 1] gap_size = first_after_gap - area_before_gap # If there's a large gap, stick close to the noise end (2 pixels above) # If the gap is small, we might be cutting into text, so be even more conservative if gap_size > 50: # Large gap - safe to use noise_end + 2 final_threshold = moderate_threshold else: # Small gap - might be cutting into text, use just 1 pixel above noise final_threshold = area_before_gap + 1 else: final_threshold = moderate_threshold # Ensure we're at least 1 pixel above the noise cluster final_threshold = max(final_threshold, area_before_gap + 1) # Cap at aggressive threshold as absolute upper bound (shouldn't be needed) final_threshold = min(final_threshold, aggressive_threshold) # Cap at 15 pixels as absolute upper bound final_threshold = min(final_threshold, 15) # print( # f"Noise Removal: Using MODERATE threshold: {final_threshold:.1f} (noise ends at {area_before_gap}px, increment: {small_increment}px)" # ) area_threshold = final_threshold else: # print( # f"Noise Removal: Gap found, but conservative threshold ({area_threshold:.1f}) is sufficient. Sticking with conservative." # ) pass # --- 4. Apply the final, determined threshold --- # print(f"Noise Removal: Final area threshold: {area_threshold:.1f}") for i in range(1, num_labels): # Use >= to be inclusive of the threshold itself if stats[i, cv2.CC_STAT_AREA] >= area_threshold: clean_binary[labels == i] = 255 else: # No components found, or only background clean_binary = binary # Validate clean_binary before proceeding if ( clean_binary is None or not isinstance(clean_binary, np.ndarray) or clean_binary.size == 0 ): # print( # f"Error: clean_binary image is None or empty (image_name: {image_name})" # ) return ({}, False) # Calculate the horizontal projection profile on the cleaned image horizontal_projection = np.sum(clean_binary, axis=1) # Find the top and bottom boundaries of the text non_zero_rows = np.where(horizontal_projection > 0)[0] if len(non_zero_rows) > 0: text_top = non_zero_rows[0] text_bottom = non_zero_rows[-1] text_height = text_bottom - text_top # Define a percentage to trim off the top and bottom # This is a tunable parameter. 15% is a good starting point. trim_percentage = DEFAULT_TRIM_PERCENTAGE trim_pixels = int(text_height * trim_percentage) # Calculate new, tighter boundaries y_start = text_top + trim_pixels y_end = text_bottom - trim_pixels # Ensure the crop is valid if y_start < y_end: # print( # f"Original text height: {text_height}px. Cropping to middle {100 - (2*trim_percentage*100):.0f}% region." # ) # Slice the image to get the vertically cropped ROI analysis_image = clean_binary[y_start:y_end, :] else: # If trimming would result in an empty image, use the full text region analysis_image = clean_binary[text_top:text_bottom, :] else: # If no text is found, use the original cleaned image analysis_image = clean_binary # Validate analysis_image before proceeding if ( analysis_image is None or not isinstance(analysis_image, np.ndarray) or analysis_image.size == 0 ): # print( # f"Error: analysis_image is None or empty (image_name: {image_name})" # ) return ({}, False) # --- Step 3: Hierarchical Adaptive Search (using the new clean_binary) --- # The rest of the pipeline is identical but now operates on a superior image. words = line_data["text"][0].split() target_word_count = len(words) # print(f"Target word count: {target_word_count}") # Save cropped image (optional, only if image_name is provided) if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: os.makedirs(self.output_folder, exist_ok=True) output_path = f"{self.output_folder}/word_segmentation/{image_name}_{shortened_line_text}_clean_binary.png" os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) cv2.imwrite(output_path, analysis_image) # print(f"\nSaved cropped image to '{output_path}'") best_boxes = None successful_binary_image = None # --- Step 3: Hierarchical Adaptive Search (using the CROPPED analysis_image) --- words = line_data["text"][0].split() target_word_count = len(words) stage1_succeeded = False # print("--- Stage 1: Searching with adaptive valley threshold ---") valley_factors_to_try = np.arange(INITIAL_VALLEY_THRESHOLD_FACTOR, 0.45, 0.05) for v_factor in valley_factors_to_try: # Pass the cropped image to the helper unlabeled_boxes = self._get_boxes_from_profile( analysis_image, avg_char_width_approx, min_space_factor, v_factor ) # ... (The rest of the Stage 1 loop is the same) if abs(target_word_count - len(unlabeled_boxes)) <= match_tolerance: best_boxes = unlabeled_boxes successful_binary_image = analysis_image stage1_succeeded = True break if not stage1_succeeded: # print( # "\n--- Stage 1 failed. Starting Stage 2: Searching with adaptive kernel ---" # ) kernel_factors_to_try = np.arange(INITIAL_KERNEL_WIDTH_FACTOR, 0.5, 0.05) fixed_valley_factor = MAIN_VALLEY_THRESHOLD_FACTOR for k_factor in kernel_factors_to_try: kernel_width = max(1, int(avg_char_width_approx * k_factor)) closing_kernel = np.ones((1, kernel_width), np.uint8) # Apply closing on the original clean_binary, then crop it closed_binary = cv2.morphologyEx( clean_binary, cv2.MORPH_CLOSE, closing_kernel ) # Validate closed_binary before proceeding if ( closed_binary is None or not isinstance(closed_binary, np.ndarray) or closed_binary.size == 0 ): # print( # f"Error: closed_binary in Stage 2 is None or empty (image_name: {image_name}, k_factor: {k_factor:.2f})" # ) continue # Skip this iteration and try next kernel factor # We need to re-apply the same vertical crop to this new image if len(non_zero_rows) > 0 and y_start < y_end: analysis_image = closed_binary[y_start:y_end, :] else: analysis_image = closed_binary # Validate analysis_image before using it if ( analysis_image is None or not isinstance(analysis_image, np.ndarray) or analysis_image.size == 0 ): # print( # f"Error: analysis_image in Stage 2 is None or empty (image_name: {image_name}, k_factor: {k_factor:.2f})" # ) continue # Skip this iteration and try next kernel factor unlabeled_boxes = self._get_boxes_from_profile( analysis_image, avg_char_width_approx, min_space_factor, fixed_valley_factor, ) # print( # f"Testing kernel factor {k_factor:.2f} ({kernel_width}px): Found {len(unlabeled_boxes)} boxes." # ) if abs(target_word_count - len(unlabeled_boxes)) <= match_tolerance: # print("SUCCESS (Stage 2): Found a match.") best_boxes = unlabeled_boxes successful_binary_image = ( closed_binary # For Stage 2, the source is the closed_binary ) break final_output = None used_fallback = False if best_boxes is None: # print("\nWarning: All adaptive searches failed. Falling back.") fallback_segmenter = HybridWordSegmenter() used_fallback = True final_output = fallback_segmenter.refine_words_bidirectional( line_data, deskewed_line_image ) else: # --- CCA Refinement using the determined successful_binary_image --- unlabeled_boxes = best_boxes cca_source_image = successful_binary_image if ( successful_binary_image is analysis_image ): # This comparison might not work as intended # A safer way is to check if Stage 1 succeeded if any( v_factor in locals() and abs( target_word_count - len( self._get_boxes_from_profile( analysis_image, avg_char_width_approx, min_space_factor, v_factor, ) ) ) <= match_tolerance for v_factor in np.arange( INITIAL_VALLEY_THRESHOLD_FACTOR, 0.45, 0.05 ) ): cca_source_image = clean_binary else: # Stage 2 must have succeeded # Recreate the successful closed_binary for CCA successful_k_factor = locals().get("k_factor") if successful_k_factor is not None: kernel_width = max( 1, int(avg_char_width_approx * successful_k_factor) ) closing_kernel = np.ones((1, kernel_width), np.uint8) cca_source_image = cv2.morphologyEx( clean_binary, cv2.MORPH_CLOSE, closing_kernel ) else: cca_source_image = clean_binary # Fallback else: cca_source_image = successful_binary_image # --- Proceed with CCA Refinement --- unlabeled_boxes = best_boxes num_labels, _, stats, _ = cv2.connectedComponentsWithStats( cca_source_image, 8, cv2.CV_32S ) refined_boxes_list = [] num_to_process = min(len(words), len(unlabeled_boxes)) for i in range(num_to_process): word_label = words[i] box_x, _, box_w, _ = unlabeled_boxes[i] box_r = box_x + box_w # Box right edge components_in_box = [] for j in range(1, num_labels): # Skip background comp_x = stats[j, cv2.CC_STAT_LEFT] comp_w = stats[j, cv2.CC_STAT_WIDTH] comp_r = comp_x + comp_w # Component right edge # --- THE CRITICAL FIX: Check for OVERLAP, not strict containment --- # Old logic: if box_x <= comp_x < box_r: # New logic: if comp_x < box_r and box_x < comp_r: components_in_box.append(stats[j]) if not components_in_box: continue # The rest of the CCA union logic is unchanged min_x = min(c[cv2.CC_STAT_LEFT] for c in components_in_box) min_y = min(c[cv2.CC_STAT_TOP] for c in components_in_box) max_r = max( c[cv2.CC_STAT_LEFT] + c[cv2.CC_STAT_WIDTH] for c in components_in_box ) max_b = max( c[cv2.CC_STAT_TOP] + c[cv2.CC_STAT_HEIGHT] for c in components_in_box ) refined_boxes_list.append( { "text": word_label, "left": min_x, "top": min_y, "width": max_r - min_x, "height": max_b - min_y, "conf": line_data["conf"][0], } ) # Convert to dict format final_output = { k: [] for k in ["text", "left", "top", "width", "height", "conf"] } for box in refined_boxes_list: for key in final_output.keys(): final_output[key].append(box[key]) # --- TRANSFORM COORDINATES BACK --- # Get the inverse transformation matrix M_inv = cv2.invertAffineTransform(M) # Create a new list for the re-mapped boxes remapped_boxes_list = [] # Iterate through the boxes found on the deskewed image for i in range(len(final_output["text"])): # Get the box coordinates from the deskewed image left, top = final_output["left"][i], final_output["top"][i] width, height = final_output["width"][i], final_output["height"][i] # Define the 4 corners of this box # Use float for accurate transformation corners = np.array( [ [left, top], [left + width, top], [left + width, top + height], [left, top + height], ], dtype="float32", ) # Add a '1' to each coordinate for the 2x3 affine matrix # shape (4, 1, 2) corners_expanded = np.expand_dims(corners, axis=1) # Apply the inverse transformation # shape (4, 1, 2) original_corners = cv2.transform(corners_expanded, M_inv) # Find the new axis-aligned bounding box in the original image # original_corners is now [[ [x1,y1] ], [ [x2,y2] ], ...] # We need to squeeze it to get [ [x1,y1], [x2,y2], ...] squeezed_corners = original_corners.squeeze(axis=1) # Find the min/max x and y min_x = int(np.min(squeezed_corners[:, 0])) max_x = int(np.max(squeezed_corners[:, 0])) min_y = int(np.min(squeezed_corners[:, 1])) max_y = int(np.max(squeezed_corners[:, 1])) # Create the re-mapped box remapped_box = { "text": final_output["text"][i], "left": min_x, "top": min_y, "width": max_x - min_x, "height": max_y - min_y, "conf": final_output["conf"][i], } remapped_boxes_list.append(remapped_box) # Convert the remapped list back to the dictionary format remapped_output = {k: [] for k in final_output.keys()} for box in remapped_boxes_list: for key in remapped_output.keys(): remapped_output[key].append(box[key]) # Visualisation if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: output_path = f"{self.output_folder}/word_segmentation/{image_name}_{shortened_line_text}_final_boxes.png" os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) output_image_vis = line_image.copy() # Validate output_image_vis before saving if ( output_image_vis is None or not isinstance(output_image_vis, np.ndarray) or output_image_vis.size == 0 ): pass # print( # f"Error: output_image_vis is None or empty (image_name: {image_name})" # ) else: # print(f"\nFinal refined {len(remapped_output['text'])} words:") for i in range(len(remapped_output["text"])): word = remapped_output["text"][i] x, y, w, h = ( int(remapped_output["left"][i]), int(remapped_output["top"][i]), int(remapped_output["width"][i]), int(remapped_output["height"][i]), ) # print(f"- '{word}' at ({x}, {y}, {w}, {h})") cv2.rectangle( output_image_vis, (x, y), (x + w, y + h), (0, 255, 0), 2 ) cv2.imwrite(output_path, output_image_vis) # print(f"\nSaved visualisation to '{output_path}'") return remapped_output, used_fallback class HybridWordSegmenter: """ Implements a two-step approach for word segmentation: 1. Proportional estimation based on text. 2. Image-based refinement with a "Bounded Scan" to prevent over-correction. """ def _convert_line_to_word_level_improved( self, line_data: Dict[str, List], image_width: int, image_height: int ) -> Dict[str, List]: """ Step 1: Converts line-level OCR results to word-level by using a robust proportional estimation method. (This function is unchanged from the previous version) """ output = { "text": list(), "left": list(), "top": list(), "width": list(), "height": list(), "conf": list(), } if not line_data or not line_data.get("text"): return output i = 0 # Assuming a single line line_text = line_data["text"][i] line_left = float(line_data["left"][i]) line_top = float(line_data["top"][i]) line_width = float(line_data["width"][i]) line_height = float(line_data["height"][i]) line_conf = line_data["conf"][i] if not line_text.strip(): return output words = line_text.split() if not words: return output num_chars = len("".join(words)) num_spaces = len(words) - 1 if num_chars == 0: return output if (num_chars * 2 + num_spaces) > 0: char_space_ratio = 2.0 estimated_space_width = line_width / ( num_chars * char_space_ratio + num_spaces ) avg_char_width = estimated_space_width * char_space_ratio else: avg_char_width = line_width / (num_chars if num_chars > 0 else 1) estimated_space_width = avg_char_width current_left = line_left for word in words: word_width = len(word) * avg_char_width clamped_left = max(0, min(current_left, image_width)) clamped_width = max(0, min(word_width, image_width - clamped_left)) output["text"].append(word) output["left"].append(clamped_left) output["top"].append(line_top) output["width"].append(clamped_width) output["height"].append(line_height) output["conf"].append(line_conf) current_left += word_width + estimated_space_width return output def _run_single_pass( self, initial_boxes: List[Dict], vertical_projection: np.ndarray, max_scan_distance: int, img_w: int, direction: str = "ltr", ) -> List[Dict]: """Helper function to run one pass of refinement (either LTR or RTL).""" refined_boxes = [box.copy() for box in initial_boxes] if direction == "ltr": last_corrected_right_edge = 0 indices = range(len(refined_boxes)) else: # rtl next_corrected_left_edge = img_w indices = range(len(refined_boxes) - 1, -1, -1) for i in indices: box = refined_boxes[i] left = int(box["left"]) right = int(box["left"] + box["width"]) left = max(0, min(left, img_w - 1)) right = max(0, min(right, img_w - 1)) new_left, new_right = left, right # Bounded Scan (logic is the same for both directions) if right < img_w and vertical_projection[right] > 0: scan_limit = min(img_w, right + max_scan_distance) for x in range(right + 1, scan_limit): if vertical_projection[x] == 0: new_right = x break if left > 0 and vertical_projection[left] > 0: scan_limit = max(0, left - max_scan_distance) for x in range(left - 1, scan_limit, -1): if vertical_projection[x] == 0: new_left = x break # Directional De-overlapping if direction == "ltr": if new_left < last_corrected_right_edge: new_left = last_corrected_right_edge last_corrected_right_edge = max(last_corrected_right_edge, new_right) else: # rtl if new_right > next_corrected_left_edge: new_right = next_corrected_left_edge next_corrected_left_edge = min(next_corrected_left_edge, new_left) box["left"] = new_left box["width"] = max(1, new_right - new_left) return refined_boxes def refine_words_bidirectional( self, line_data: Dict[str, List], line_image: np.ndarray, ) -> Dict[str, List]: """ Refines boxes using a more robust bidirectional scan and averaging. """ if line_image is None: return line_data # Early return if 1 or fewer words if line_data and line_data.get("text"): words = line_data["text"][0].split() if len(words) <= 1: img_h, img_w = line_image.shape[:2] return self._convert_line_to_word_level_improved( line_data, img_w, img_h ) gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY) _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) img_h, img_w = binary.shape vertical_projection = np.sum(binary, axis=0) char_blobs = [] in_blob = False blob_start = 0 for x, col_sum in enumerate(vertical_projection): if col_sum > 0 and not in_blob: blob_start = x in_blob = True elif col_sum == 0 and in_blob: char_blobs.append((blob_start, x)) in_blob = False if in_blob: char_blobs.append((blob_start, img_w)) if not char_blobs: return self._convert_line_to_word_level_improved(line_data, img_w, img_h) avg_char_width = np.mean([end - start for start, end in char_blobs]) max_scan_distance = int(avg_char_width * 1.5) estimated_data = self._convert_line_to_word_level_improved( line_data, img_w, img_h ) if not estimated_data["text"]: return estimated_data initial_boxes = [] for i in range(len(estimated_data["text"])): initial_boxes.append( { "text": estimated_data["text"][i], "left": estimated_data["left"][i], "top": estimated_data["top"][i], "width": estimated_data["width"][i], "height": estimated_data["height"][i], "conf": estimated_data["conf"][i], } ) # 1. & 2. Perform both passes ltr_boxes = self._run_single_pass( initial_boxes, vertical_projection, max_scan_distance, img_w, "ltr" ) rtl_boxes = self._run_single_pass( initial_boxes, vertical_projection, max_scan_distance, img_w, "rtl" ) # 3. Combine the results by taking the best edge from each pass combined_boxes = [box.copy() for box in initial_boxes] for i in range(len(combined_boxes)): # Get the "expert" left boundary from the LTR pass final_left = ltr_boxes[i]["left"] # Get the "expert" right boundary from the RTL pass rtl_right = rtl_boxes[i]["left"] + rtl_boxes[i]["width"] combined_boxes[i]["left"] = final_left combined_boxes[i]["width"] = max(1, rtl_right - final_left) # 4. Final De-overlap Pass last_corrected_right_edge = 0 for i, box in enumerate(combined_boxes): if box["left"] < last_corrected_right_edge: box["width"] = max( 1, box["width"] - (last_corrected_right_edge - box["left"]) ) box["left"] = last_corrected_right_edge if box["width"] < 1: # Handle edge case where a box is completely eliminated if i < len(combined_boxes) - 1: next_left = combined_boxes[i + 1]["left"] box["width"] = max(1, next_left - box["left"]) else: box["width"] = 1 last_corrected_right_edge = box["left"] + box["width"] # Convert back to Tesseract-style output dict final_output = {k: [] for k in estimated_data.keys()} for box in combined_boxes: if box["width"] > 0: # Ensure we don't add zero-width boxes for key in final_output.keys(): final_output[key].append(box[key]) return final_output