Laid groundwork for passing in AWS API keys. Duplicate pages option should now work for pages with no text.
7907ad4
import pandas as pd | |
import argparse | |
import glob | |
import os | |
import re | |
from tools.helper_functions import output_folder | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import nltk | |
from nltk.corpus import stopwords | |
from nltk.tokenize import word_tokenize | |
from nltk.stem import PorterStemmer | |
import numpy as np | |
import random | |
import string | |
from typing import List | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
nltk.download('punkt_tab') | |
similarity_threshold = 0.9 | |
stop_words = set(stopwords.words('english')) | |
# List of words to remove from the stopword set | |
#words_to_remove = ['no', 'nor', 'not', 'don', 'don't', 'wasn', 'wasn't', 'weren', 'weren't', "don't", "wasn't", "weren't"] | |
# Remove the specified words from the stopwords set | |
#for word in words_to_remove: | |
# stop_words.discard(word.lower()) | |
stemmer = PorterStemmer() | |
vectorizer = TfidfVectorizer() | |
def combine_ocr_output_text(input_files): | |
""" | |
Combines text from multiple CSV files containing page and text columns. | |
Groups text by file and page number, concatenating text within these groups. | |
Args: | |
input_files (list): List of paths to CSV files | |
Returns: | |
pd.DataFrame: Combined dataframe with columns [file, page, text] | |
""" | |
all_data = [] | |
output_files = [] | |
if isinstance(input_files, str): | |
file_paths_list = [input_files] | |
else: | |
file_paths_list = input_files | |
for file in file_paths_list: | |
if isinstance(file, str): | |
file_path = file | |
else: | |
file_path = file.name | |
# Read CSV file | |
df = pd.read_csv(file_path) | |
# Ensure required columns exist | |
if 'page' not in df.columns or 'text' not in df.columns: | |
print(f"Warning: Skipping {file_path} - missing required columns 'page' and 'text'") | |
continue | |
df['text'] = df['text'].fillna('').astype(str) | |
# Group by page and concatenate text | |
grouped = df.groupby('page')['text'].apply(' '.join).reset_index() | |
# Add filename column | |
grouped['file'] = os.path.basename(file_path) | |
all_data.append(grouped) | |
if not all_data: | |
raise ValueError("No valid CSV files were processed") | |
# Combine all dataframes | |
combined_df = pd.concat(all_data, ignore_index=True) | |
# Reorder columns | |
combined_df = combined_df[['file', 'page', 'text']] | |
output_combined_file_path = output_folder + "combined_ocr_output_files.csv" | |
combined_df.to_csv(output_combined_file_path, index=None) | |
output_files.append(output_combined_file_path) | |
return combined_df, output_files | |
def process_data(df, column:str): | |
''' | |
Clean and stem text columns in a data frame | |
''' | |
def _clean_text(raw_text): | |
# Remove HTML tags | |
clean = re.sub(r'<.*?>', '', raw_text) | |
clean = re.sub(r' ', ' ', clean) | |
clean = re.sub(r'\r\n', ' ', clean) | |
clean = re.sub(r'<', ' ', clean) | |
clean = re.sub(r'>', ' ', clean) | |
clean = re.sub(r'<strong>', ' ', clean) | |
clean = re.sub(r'</strong>', ' ', clean) | |
# Replace non-breaking space \xa0 with a space | |
clean = clean.replace(u'\xa0', u' ') | |
# Remove extra whitespace | |
clean = ' '.join(clean.split()) | |
# Tokenize the text | |
words = word_tokenize(clean.lower()) | |
# Remove punctuation and numbers | |
words = [word for word in words if word.isalpha()] | |
# Remove stopwords | |
words = [word for word in words if word not in stop_words] | |
# Join the cleaned words back into a string | |
return ' '.join(words) | |
# Function to apply stemming | |
def _apply_stemming(text): | |
# Tokenize the text | |
words = word_tokenize(text.lower()) | |
# Apply stemming to each word | |
stemmed_words = [stemmer.stem(word) for word in words] | |
# Join the stemmed words back into a single string | |
return ' '.join(stemmed_words) | |
df['text_clean'] = df[column].apply(_clean_text) | |
df['text_clean'] = df['text_clean'].apply(_apply_stemming) | |
return df | |
def identify_similar_pages(input_files:List[str]): | |
output_paths = [] | |
df, output_files = combine_ocr_output_text(input_files) | |
output_paths.extend(output_files) | |
# Clean text | |
df = process_data(df, 'text') | |
# Vectorise text | |
tfidf_matrix = vectorizer.fit_transform(df['text_clean']) | |
# Calculate cosine similarity | |
similarity_matrix = cosine_similarity(tfidf_matrix) | |
# Find the indices of the most similar pages | |
np.fill_diagonal(similarity_matrix, 0) # Ignore self-comparisons | |
similar_pages = np.argwhere(similarity_matrix > similarity_threshold) # Threshold of similarity | |
#print(similar_pages) | |
# Create a DataFrame for similar pairs and their scores | |
similarity_df = pd.DataFrame({ | |
'Page1_Index': similar_pages[:, 0], | |
'Page2_Index': similar_pages[:, 1], | |
'Page1_File': similar_pages[:, 0], | |
'Page2_File': similar_pages[:, 1], | |
'Similarity_Score': similarity_matrix[similar_pages[:, 0], similar_pages[:, 1]] | |
}) | |
# Filter out duplicate pairs (keep only one direction) | |
similarity_df = similarity_df[similarity_df['Page1_Index'] < similarity_df['Page2_Index']] | |
# Map the indices to their corresponding text and metadata | |
similarity_df['Page1_File'] = similarity_df['Page1_File'].map(df['file']) | |
similarity_df['Page2_File'] = similarity_df['Page2_File'].map(df['file']) | |
similarity_df['Page1_Page'] = similarity_df['Page1_Index'].map(df['page']) | |
similarity_df['Page2_Page'] = similarity_df['Page2_Index'].map(df['page']) | |
similarity_df['Page1_Text'] = similarity_df['Page1_Index'].map(df['text']) | |
similarity_df['Page2_Text'] = similarity_df['Page2_Index'].map(df['text']) | |
similarity_df_out = similarity_df[['Page1_File', 'Page1_Page', 'Page2_File', 'Page2_Page', 'Similarity_Score', 'Page1_Text', 'Page2_Text']] | |
similarity_df_out = similarity_df_out.sort_values(["Page1_File", "Page1_Page", "Page2_File", "Page2_Page", "Similarity_Score"], ascending=[True, True, True, True, False]) | |
# Save detailed results to a CSV file | |
similarity_file_output_path = output_folder + 'page_similarity_results.csv' | |
similarity_df_out.to_csv(similarity_file_output_path, index=False) | |
output_paths.append(similarity_file_output_path) | |
if not similarity_df_out.empty: | |
unique_files = similarity_df_out['Page2_File'].unique() | |
for redact_file in unique_files: | |
output_file_name = output_folder + redact_file + "_whole_page.csv" | |
whole_pages_to_redact_df = similarity_df_out.loc[similarity_df_out['Page2_File']==redact_file,:][['Page2_Page']] | |
whole_pages_to_redact_df.to_csv(output_file_name, header=None, index=None) | |
output_paths.append(output_file_name) | |
return similarity_df_out, output_paths | |
# Perturb text | |
# Apply the perturbation function with a 10% error probability | |
def perturb_text_with_errors(series): | |
def _perturb_text(text, error_probability=0.1): | |
words = text.split() # Split text into words | |
perturbed_words = [] | |
for word in words: | |
if random.random() < error_probability: # Add a random error | |
perturbation_type = random.choice(['char_error', 'extra_space', 'extra_punctuation']) | |
if perturbation_type == 'char_error': # Introduce a character error | |
idx = random.randint(0, len(word) - 1) | |
char = random.choice(string.ascii_lowercase) # Add a random letter | |
word = word[:idx] + char + word[idx:] | |
elif perturbation_type == 'extra_space': # Add extra space around a word | |
word = ' ' + word + ' ' | |
elif perturbation_type == 'extra_punctuation': # Add punctuation to the word | |
punctuation = random.choice(string.punctuation) | |
idx = random.randint(0, len(word)) # Insert punctuation randomly | |
word = word[:idx] + punctuation + word[idx:] | |
perturbed_words.append(word) | |
return ' '.join(perturbed_words) | |
series = series.apply(lambda x: _perturb_text(x, error_probability=0.1)) | |
return series | |
# Run through command line | |
# def main(): | |
# parser = argparse.ArgumentParser(description='Combine text from multiple CSV files by page') | |
# parser.add_argument('input_pattern', help='Input file pattern (e.g., "input/*.csv")') | |
# parser.add_argument('--output', '-o', default='combined_text.csv', | |
# help='Output CSV file path (default: combined_text.csv)') | |
# args = parser.parse_args() | |
# # Get list of input files | |
# input_files = glob.glob(args.input_pattern) | |
# if not input_files: | |
# print(f"No files found matching pattern: {args.input_pattern}") | |
# return | |
# print(f"Processing {len(input_files)} files...") | |
# try: | |
# # Combine the text from all files | |
# combined_df = combine_ocr_output_text(input_files) | |
# # Save to CSV | |
# combined_df.to_csv(args.output, index=False) | |
# print(f"Successfully created combined output: {args.output}") | |
# print(f"Total pages processed: {len(combined_df)}") | |
# except Exception as e: | |
# print(f"Error processing files: {str(e)}") | |
# if __name__ == "__main__": | |
# main() | |