document_redaction / tools /find_duplicate_pages.py
seanpedrickcase's picture
Corrected a couple of bugs. Now Textract whole document API call outputs will load also the input PDF into the app
10f46e9
raw
history blame
9.87 kB
import pandas as pd
#import argparse
#import glob
import os
import re
from tools.helper_functions import OUTPUT_FOLDER
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# import nltk
# from nltk.corpus import stopwords
# from nltk.tokenize import word_tokenize
# from nltk.stem import PorterStemmer
#import spacy
import numpy as np
import random
import string
from typing import List
from gradio import Progress
import en_core_web_lg #en_core_web_sm
nlp = en_core_web_lg.load()
#from tqdm import tqdm
# nltk.download('punkt')
# nltk.download('stopwords')
# nltk.download('punkt_tab')
similarity_threshold = 0.9
def combine_ocr_output_text(input_files:List[str], output_folder:str=OUTPUT_FOLDER):
"""
Combines text from multiple CSV files containing page and text columns.
Groups text by file and page number, concatenating text within these groups.
Args:
input_files (list): List of paths to CSV files
Returns:
pd.DataFrame: Combined dataframe with columns [file, page, text]
"""
all_data = []
output_files = []
if isinstance(input_files, str):
file_paths_list = [input_files]
else:
file_paths_list = input_files
for file in file_paths_list:
if isinstance(file, str):
file_path = file
else:
file_path = file.name
# Read CSV file
df = pd.read_csv(file_path)
# Ensure required columns exist
if 'page' not in df.columns or 'text' not in df.columns:
print(f"Warning: Skipping {file_path} - missing required columns 'page' and 'text'")
continue
df['text'] = df['text'].fillna('').astype(str)
# Group by page and concatenate text
grouped = df.groupby('page')['text'].apply(' '.join).reset_index()
# Add filename column
grouped['file'] = os.path.basename(file_path)
all_data.append(grouped)
if not all_data:
raise ValueError("No valid CSV files were processed")
# Combine all dataframes
combined_df = pd.concat(all_data, ignore_index=True)
# Reorder columns
combined_df = combined_df[['file', 'page', 'text']]
output_combined_file_path = output_folder + "combined_ocr_output_files.csv"
combined_df.to_csv(output_combined_file_path, index=None)
output_files.append(output_combined_file_path)
return combined_df, output_files
def process_data(df:pd.DataFrame, column:str):
'''
Clean and stem text columns in a data frame
'''
def _clean_text(raw_text):
# Remove HTML tags
clean = re.sub(r'<.*?>', '', raw_text)
# clean = re.sub(r'&nbsp;', ' ', clean)
# clean = re.sub(r'\r\n', ' ', clean)
# clean = re.sub(r'&lt;', ' ', clean)
# clean = re.sub(r'&gt;', ' ', clean)
# clean = re.sub(r'<strong>', ' ', clean)
# clean = re.sub(r'</strong>', ' ', clean)
# Replace non-breaking space \xa0 with a space
# clean = clean.replace(u'\xa0', u' ')
# Remove extra whitespace
clean = ' '.join(clean.split())
# # Tokenize the text
# words = word_tokenize(clean.lower())
# # Remove punctuation and numbers
# words = [word for word in words if word.isalpha()]
# # Remove stopwords
# words = [word for word in words if word not in stop_words]
# Join the cleaned words back into a string
return clean
# Function to apply lemmatization and remove stopwords
def _apply_lemmatization(text):
doc = nlp(text)
# Keep only alphabetic tokens and remove stopwords
lemmatized_words = [token.lemma_ for token in doc if token.is_alpha and not token.is_stop]
return ' '.join(lemmatized_words)
df['text_clean'] = df[column].apply(_clean_text)
df['text_clean'] = df['text_clean'].apply(_apply_lemmatization)
return df
def identify_similar_pages(input_files: List[str], similarity_threshold: float = 0.9, output_folder:str=OUTPUT_FOLDER, progress=Progress(track_tqdm=True)):
output_paths = []
progress(0.1, desc="Cleaning input text")
# Load and clean data
df, output_files = combine_ocr_output_text(input_files)
output_paths.extend(output_files)
df = process_data(df, 'text') # Assume this returns 'text_clean', 'file', and 'page' columns
# Vectorize text
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(df['text_clean'])
progress(0.3, desc="Calculating text similarity")
# Compute sparse cosine similarity
similarity_matrix = cosine_similarity(tfidf_matrix, dense_output=False) # Keep sparse format
# Extract indices of similar pages above threshold
coo_matrix = similarity_matrix.tocoo()
similar_pages = np.array([(i, j, v) for i, j, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data) if v > similarity_threshold])
if similar_pages.size == 0:
return pd.DataFrame(), output_paths # Return empty if no matches
# Create a DataFrame for similar pairs
similarity_df = pd.DataFrame(similar_pages, columns=['Page1_Index', 'Page2_Index', 'Similarity_Score'])
# Remove duplicate pairs (keep one direction)
similarity_df = similarity_df[similarity_df['Page1_Index'] < similarity_df['Page2_Index']]
progress(0.8, desc="Mapping back results")
# Map indices to metadata
# index_map = df[['file', 'page', 'text']].to_dict(orient='index')
# similarity_df['Page1_File'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['file'])
# similarity_df['Page2_File'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['file'])
# similarity_df['Page1_Page'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['page'])
# similarity_df['Page2_Page'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['page'])
# similarity_df['Page1_Text'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['text'][0:200])
# similarity_df['Page2_Text'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['text'][0:200])
# Create a DataFrame with the metadata
metadata_df = df[['file', 'page', 'text']].reset_index()
# Merge to get the metadata for Page1
similarity_df = similarity_df.merge(metadata_df, left_on='Page1_Index', right_on='index', suffixes=('', '_Page1'))
similarity_df = similarity_df.rename(columns={'file': 'Page1_File', 'page': 'Page1_Page', 'text': 'Page1_Text'})
# Merge to get the metadata for Page2
similarity_df = similarity_df.merge(metadata_df, left_on='Page2_Index', right_on='index', suffixes=('', '_Page2'))
similarity_df = similarity_df.rename(columns={'file': 'Page2_File', 'page': 'Page2_Page', 'text': 'Page2_Text'})
# Optionally, drop the index columns if not needed
#similarity_df = similarity_df.drop(columns=['index_Page1', 'index_Page2'])
similarity_df["Similarity_Score"] = similarity_df["Similarity_Score"].round(3)
# Sort results
similarity_df_out = similarity_df[['Page1_File', 'Page1_Page', 'Page2_File', 'Page2_Page', 'Similarity_Score', 'Page1_Text', 'Page2_Text']]
similarity_df_out = similarity_df_out.sort_values(["Page1_File", "Page1_Page", "Page2_File", "Page2_Page", "Similarity_Score"], ascending=[True, True, True, True, False])
similarity_df_out['Page1_Text'] = similarity_df_out['Page1_Text'][0:100]
similarity_df_out['Page2_Text'] = similarity_df_out['Page2_Text'][0:100]
progress(0.8, desc="Saving output files")
# Save results
similarity_file_output_path = output_folder + 'page_similarity_results.csv'
similarity_df_out.to_csv(similarity_file_output_path, index=False)
output_paths.append(similarity_file_output_path)
# Save per-file redaction lists
for redact_file in similarity_df_out['Page2_File'].unique():
output_file_name = output_folder + redact_file + "_whole_page.csv"
whole_pages_to_redact_df = similarity_df_out.loc[similarity_df_out['Page2_File'] == redact_file, ['Page2_Page']].drop_duplicates(['Page2_Page']).sort_values('Page2_Page')
whole_pages_to_redact_df.to_csv(output_file_name, header=False, index=False)
output_paths.append(output_file_name)
return similarity_df_out, output_paths
# Perturb text
# Apply the perturbation function with a 10% error probability
def perturb_text_with_errors(series:pd.Series):
def _perturb_text(text, error_probability=0.1):
words = text.split() # Split text into words
perturbed_words = []
for word in words:
if random.random() < error_probability: # Add a random error
perturbation_type = random.choice(['char_error', 'extra_space', 'extra_punctuation'])
if perturbation_type == 'char_error': # Introduce a character error
idx = random.randint(0, len(word) - 1)
char = random.choice(string.ascii_lowercase) # Add a random letter
word = word[:idx] + char + word[idx:]
elif perturbation_type == 'extra_space': # Add extra space around a word
word = ' ' + word + ' '
elif perturbation_type == 'extra_punctuation': # Add punctuation to the word
punctuation = random.choice(string.punctuation)
idx = random.randint(0, len(word)) # Insert punctuation randomly
word = word[:idx] + punctuation + word[idx:]
perturbed_words.append(word)
return ' '.join(perturbed_words)
series = series.apply(lambda x: _perturb_text(x, error_probability=0.1))
return series