document_redaction / tools /find_duplicate_tabular.py
seanpedrickcase's picture
Added possibility to specify allowed hosts. Fixed some tests to return more reliably. Fixed some issues related to file path checks not working correctly. Redaction should now return review files correctly at redaction and apply changes stages.
b1f183d
raw
history blame
27.5 kB
import os
import time
from pathlib import Path
from typing import Dict, List, Tuple
import gradio as gr
import pandas as pd
from gradio import Progress
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from tools.config import (
DO_INITIAL_TABULAR_DATA_CLEAN,
MAX_SIMULTANEOUS_FILES,
MAX_TABLE_ROWS,
REMOVE_DUPLICATE_ROWS,
)
from tools.data_anonymise import initial_clean
from tools.helper_functions import OUTPUT_FOLDER, read_file
from tools.load_spacy_model_custom_recognisers import nlp
from tools.secure_path_utils import secure_join
def clean_and_stem_text_series(
df: pd.DataFrame,
column: str,
do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN,
):
"""
Clean and stem text columns in a data frame for tabular data
"""
# Function to apply lemmatisation and remove stopwords
def _apply_lemmatization(text):
doc = nlp(text)
# Keep only alphabetic tokens and remove stopwords
lemmatized_words = [
token.lemma_ for token in doc if token.is_alpha and not token.is_stop
]
return " ".join(lemmatized_words)
# Always create text_clean column first
if do_initial_clean_dup:
df["text_clean"] = initial_clean(df[column])
else:
df["text_clean"] = df[column]
df["text_clean"] = df["text_clean"].apply(_apply_lemmatization)
df["text_clean"] = df[
"text_clean"
].str.lower() # .str.replace(r'[^\w\s]', '', regex=True)
return df
def convert_tabular_data_to_analysis_format(
df: pd.DataFrame, file_name: str, text_columns: List[str] = None
) -> List[Tuple[str, pd.DataFrame]]:
"""
Convert tabular data (CSV/XLSX) to the format needed for duplicate analysis.
Args:
df (pd.DataFrame): The input DataFrame
file_name (str): Name of the file
text_columns (List[str], optional): Columns to analyze for duplicates.
If None, uses all string columns.
Returns:
List[Tuple[str, pd.DataFrame]]: List containing (file_name, processed_df) tuple
"""
# if text_columns is None:
# # Auto-detect text columns (string type columns)
# print(f"No text columns given for {file_name}")
# return []
# text_columns = df.select_dtypes(include=['object', 'string']).columns.tolist()
text_columns = [col for col in text_columns if col in df.columns]
if not text_columns:
print(f"No text columns found in {file_name}")
return list()
# Create a copy to avoid modifying original
df_copy = df.copy()
# Create a combined text column from all text columns
df_copy["combined_text"] = (
df_copy[text_columns].fillna("").astype(str).agg(" ".join, axis=1)
)
# Add row identifier
df_copy["row_id"] = df_copy.index
# Create the format expected by the duplicate detection system
# Using 'row_number' as row number and 'text' as the combined text
processed_df = pd.DataFrame(
{
"row_number": df_copy["row_id"],
"text": df_copy["combined_text"],
"file": file_name,
}
)
# Add original row data for reference
for col in text_columns:
processed_df[f"original_{col}"] = df_copy[col]
return [(file_name, processed_df)]
def find_duplicate_cells_in_tabular_data(
input_files: List[str],
similarity_threshold: float = 0.95,
min_word_count: int = 3,
text_columns: List[str] = [],
output_folder: str = OUTPUT_FOLDER,
do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN,
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
in_excel_tabular_sheets: str = "",
progress: Progress = Progress(track_tqdm=True),
) -> Tuple[pd.DataFrame, List[str], Dict[str, pd.DataFrame]]:
"""
Find duplicate cells/text in tabular data files (CSV, XLSX, Parquet).
Args:
input_files (List[str]): List of file paths to analyze
similarity_threshold (float): Minimum similarity score to consider duplicates
min_word_count (int): Minimum word count for text to be considered
text_columns (List[str], optional): Specific columns to analyze
output_folder (str, optional): Output folder for results
do_initial_clean_dup (bool, optional): Whether to do initial clean of text
progress (Progress): Progress tracking object
Returns:
Tuple containing:
- results_df: DataFrame with duplicate matches
- output_paths: List of output file paths
- full_data_by_file: Dictionary of processed data by file
"""
if not input_files:
raise gr.Error("Please upload files to analyze.")
progress(0.1, desc="Loading and processing files...")
all_data_to_process = list()
full_data_by_file = dict()
file_paths = list()
# Process each file
for file_path in input_files:
try:
if file_path.endswith(".xlsx") or file_path.endswith(".xls"):
temp_df = pd.DataFrame()
# Try finding each sheet in the given list until a match is found
for sheet_name in in_excel_tabular_sheets:
temp_df = read_file(file_path, excel_sheet_name=sheet_name)
# If sheet was successfully_loaded
if not temp_df.empty:
if temp_df.shape[0] > MAX_TABLE_ROWS:
out_message = f"Number of rows in {file_path} for sheet {sheet_name} is greater than {MAX_TABLE_ROWS}. Please submit a smaller file."
print(out_message)
raise Exception(out_message)
file_name = os.path.basename(file_path) + "_" + sheet_name
file_paths.append(file_path)
# Convert to analysis format
processed_data = convert_tabular_data_to_analysis_format(
temp_df, file_name, text_columns
)
if processed_data:
all_data_to_process.extend(processed_data)
full_data_by_file[file_name] = processed_data[0][1]
temp_df = pd.DataFrame()
else:
temp_df = read_file(file_path)
if temp_df.shape[0] > MAX_TABLE_ROWS:
out_message = f"Number of rows in {file_path} is greater than {MAX_TABLE_ROWS}. Please submit a smaller file."
print(out_message)
raise Exception(out_message)
file_name = os.path.basename(file_path)
file_paths.append(file_path)
# Convert to analysis format
processed_data = convert_tabular_data_to_analysis_format(
temp_df, file_name, text_columns
)
if processed_data:
all_data_to_process.extend(processed_data)
full_data_by_file[file_name] = processed_data[0][1]
except Exception as e:
print(f"Error processing {file_path}: {e}")
continue
if not all_data_to_process:
raise gr.Error("No valid data found in uploaded files.")
progress(0.2, desc="Combining data...")
# Combine all data
combined_df = pd.concat(
[data[1] for data in all_data_to_process], ignore_index=True
)
combined_df = combined_df.drop_duplicates(subset=["row_number", "file"])
progress(0.3, desc="Cleaning and preparing text...")
# Clean and prepare text
combined_df = clean_and_stem_text_series(
combined_df, "text", do_initial_clean_dup=do_initial_clean_dup
)
# Filter by minimum word count
combined_df["word_count"] = (
combined_df["text_clean"].str.split().str.len().fillna(0)
)
combined_df = combined_df[combined_df["word_count"] >= min_word_count].copy()
if len(combined_df) < 2:
return pd.DataFrame(), [], full_data_by_file
progress(0.4, desc="Calculating similarities...")
# Calculate similarities
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(combined_df["text_clean"])
similarity_matrix = cosine_similarity(tfidf_matrix, dense_output=False)
# Find similar pairs
coo_matrix = similarity_matrix.tocoo()
similar_pairs = [
(r, c, v)
for r, c, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data)
if r < c and v >= similarity_threshold
]
if not similar_pairs:
gr.Info("No duplicate cells found.")
return pd.DataFrame(), [], full_data_by_file
progress(0.7, desc="Processing results...")
# Create results DataFrame
results_data = []
for row1, row2, similarity in similar_pairs:
row1_data = combined_df.iloc[row1]
row2_data = combined_df.iloc[row2]
results_data.append(
{
"File1": row1_data["file"],
"Row1": int(row1_data["row_number"]),
"File2": row2_data["file"],
"Row2": int(row2_data["row_number"]),
"Similarity_Score": round(similarity, 3),
"Text1": (
row1_data["text"][:200] + "..."
if len(row1_data["text"]) > 200
else row1_data["text"]
),
"Text2": (
row2_data["text"][:200] + "..."
if len(row2_data["text"]) > 200
else row2_data["text"]
),
"Original_Index1": row1,
"Original_Index2": row2,
}
)
results_df = pd.DataFrame(results_data)
results_df = results_df.sort_values(["File1", "Row1", "File2", "Row2"])
progress(0.9, desc="Saving results...")
# Save results
output_paths = save_tabular_duplicate_results(
results_df,
output_folder,
file_paths,
remove_duplicate_rows=remove_duplicate_rows,
in_excel_tabular_sheets=in_excel_tabular_sheets,
)
gr.Info(f"Found {len(results_df)} duplicate cell matches")
return results_df, output_paths, full_data_by_file
def save_tabular_duplicate_results(
results_df: pd.DataFrame,
output_folder: str,
file_paths: List[str],
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
in_excel_tabular_sheets: List[str] = [],
) -> List[str]:
"""
Save tabular duplicate detection results to files.
Args:
results_df (pd.DataFrame): Results DataFrame
output_folder (str): Output folder path
file_paths (List[str]): List of file paths
remove_duplicate_rows (bool): Whether to remove duplicate rows
in_excel_tabular_sheets (str): Name of the Excel sheet to save the results to
Returns:
List[str]: List of output file paths
"""
output_paths = list()
output_folder_path = Path(output_folder)
output_folder_path.mkdir(exist_ok=True)
if results_df.empty:
print("No duplicate matches to save.")
return list()
# Save main results
results_file = output_folder_path / "tabular_duplicate_results.csv"
results_df.to_csv(results_file, index=False, encoding="utf-8-sig")
output_paths.append(str(results_file))
# Group results by original file to handle Excel files properly
excel_files_processed = dict() # Track which Excel files have been processed
# Save per-file duplicate lists
for file_name, group in results_df.groupby("File2"):
# Check for matches with original file names
for original_file in file_paths:
original_file_name = os.path.basename(original_file)
if original_file_name in file_name:
original_file_extension = os.path.splitext(original_file)[-1]
if original_file_extension in [".xlsx", ".xls"]:
# Split the string using secure regex to handle both .xlsx_ and .xls_ delimiters
from tools.secure_regex_utils import safe_split_filename
parts = safe_split_filename(
os.path.basename(file_name), [".xlsx_", ".xls_"]
)
# The sheet name is the last part after splitting
file_sheet_name = parts[-1]
file_path = original_file
# Initialize Excel file tracking if not already done
if file_path not in excel_files_processed:
excel_files_processed[file_path] = {
"sheets_data": dict(),
"all_sheets": list(),
"processed_sheets": set(),
}
# Read the original Excel file to get all sheet names
if not excel_files_processed[file_path]["all_sheets"]:
try:
excel_file = pd.ExcelFile(file_path)
excel_files_processed[file_path][
"all_sheets"
] = excel_file.sheet_names
except Exception as e:
print(f"Error reading Excel file {file_path}: {e}")
continue
# Read the current sheet
df = read_file(file_path, excel_sheet_name=file_sheet_name)
# Create duplicate rows file for this sheet
file_stem = Path(file_name).stem
duplicate_rows_file = (
output_folder_path
/ f"{file_stem}_{file_sheet_name}_duplicate_rows.csv"
)
# Get unique row numbers to remove
rows_to_remove = sorted(group["Row2"].unique())
duplicate_df = pd.DataFrame({"Row_to_Remove": rows_to_remove})
duplicate_df.to_csv(duplicate_rows_file, index=False)
output_paths.append(str(duplicate_rows_file))
# Process the sheet data
df_cleaned = df.copy()
df_cleaned["duplicated"] = False
df_cleaned.loc[rows_to_remove, "duplicated"] = True
if remove_duplicate_rows:
df_cleaned = df_cleaned.drop(index=rows_to_remove)
# Store the processed sheet data
excel_files_processed[file_path]["sheets_data"][
file_sheet_name
] = df_cleaned
excel_files_processed[file_path]["processed_sheets"].add(
file_sheet_name
)
else:
file_sheet_name = ""
file_path = original_file
print("file_path after match:", file_path)
file_base_name = os.path.basename(file_path)
df = read_file(file_path)
file_stem = Path(file_name).stem
duplicate_rows_file = (
output_folder_path / f"{file_stem}_duplicate_rows.csv"
)
# Get unique row numbers to remove
rows_to_remove = sorted(group["Row2"].unique())
duplicate_df = pd.DataFrame({"Row_to_Remove": rows_to_remove})
duplicate_df.to_csv(duplicate_rows_file, index=False)
output_paths.append(str(duplicate_rows_file))
df_cleaned = df.copy()
df_cleaned["duplicated"] = False
df_cleaned.loc[rows_to_remove, "duplicated"] = True
if remove_duplicate_rows:
df_cleaned = df_cleaned.drop(index=rows_to_remove)
file_ext = os.path.splitext(file_name)[-1]
if file_ext in [".parquet"]:
output_path = secure_join(
output_folder, f"{file_base_name}_deduplicated.parquet"
)
df_cleaned.to_parquet(output_path, index=False)
else:
output_path = secure_join(
output_folder, f"{file_base_name}_deduplicated.csv"
)
df_cleaned.to_csv(
output_path, index=False, encoding="utf-8-sig"
)
output_paths.append(str(output_path))
break
# Process Excel files to create complete deduplicated files
for file_path, file_data in excel_files_processed.items():
try:
# Create output filename
file_base_name = os.path.splitext(os.path.basename(file_path))[0]
file_ext = os.path.splitext(file_path)[-1]
output_path = secure_join(
output_folder, f"{file_base_name}_deduplicated{file_ext}"
)
# Create Excel writer
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
# Write all sheets
for sheet_name in file_data["all_sheets"]:
if sheet_name in file_data["processed_sheets"]:
# Use the processed (deduplicated) version
file_data["sheets_data"][sheet_name].to_excel(
writer, sheet_name=sheet_name, index=False
)
else:
# Use the original sheet (no duplicates found)
original_df = read_file(file_path, excel_sheet_name=sheet_name)
original_df.to_excel(writer, sheet_name=sheet_name, index=False)
output_paths.append(str(output_path))
print(f"Created deduplicated Excel file: {output_path}")
except Exception as e:
print(f"Error creating deduplicated Excel file for {file_path}: {e}")
continue
return output_paths
def remove_duplicate_rows_from_tabular_data(
file_path: str,
duplicate_rows: List[int],
output_folder: str = OUTPUT_FOLDER,
in_excel_tabular_sheets: List[str] = [],
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
) -> str:
"""
Remove duplicate rows from a tabular data file.
Args:
file_path (str): Path to the input file
duplicate_rows (List[int]): List of row indices to remove
output_folder (str): Output folder for cleaned file
in_excel_tabular_sheets (str): Name of the Excel sheet to save the results to
remove_duplicate_rows (bool): Whether to remove duplicate rows
Returns:
str: Path to the cleaned file
"""
try:
# Load the file
df = read_file(
file_path,
excel_sheet_name=in_excel_tabular_sheets if in_excel_tabular_sheets else "",
)
# Remove duplicate rows (0-indexed)
df_cleaned = df.drop(index=duplicate_rows).reset_index(drop=True)
# Save cleaned file
file_name = os.path.basename(file_path)
file_stem = os.path.splitext(file_name)[0]
file_ext = os.path.splitext(file_name)[-1]
output_path = secure_join(output_folder, f"{file_stem}_deduplicated{file_ext}")
if file_ext in [".xlsx", ".xls"]:
df_cleaned.to_excel(
output_path,
index=False,
sheet_name=in_excel_tabular_sheets if in_excel_tabular_sheets else [],
)
elif file_ext in [".parquet"]:
df_cleaned.to_parquet(output_path, index=False)
else:
df_cleaned.to_csv(output_path, index=False, encoding="utf-8-sig")
return output_path
except Exception as e:
print(f"Error removing duplicates from {file_path}: {e}")
raise
def run_tabular_duplicate_analysis(
files: List[str],
threshold: float,
min_words: int,
text_columns: List[str] = [],
output_folder: str = OUTPUT_FOLDER,
do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN,
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
in_excel_tabular_sheets: List[str] = [],
progress: Progress = Progress(track_tqdm=True),
) -> Tuple[pd.DataFrame, List[str], Dict[str, pd.DataFrame]]:
"""
Main function to run tabular duplicate analysis.
Args:
files (List[str]): List of file paths
threshold (float): Similarity threshold
min_words (int): Minimum word count
text_columns (List[str], optional): Specific columns to analyze
output_folder (str, optional): Output folder for results
progress (Progress): Progress tracking
Returns:
Tuple containing results DataFrame, output paths, and full data by file
"""
return find_duplicate_cells_in_tabular_data(
input_files=files,
similarity_threshold=threshold,
min_word_count=min_words,
text_columns=text_columns if text_columns else [],
output_folder=output_folder,
do_initial_clean_dup=do_initial_clean_dup,
in_excel_tabular_sheets=(
in_excel_tabular_sheets if in_excel_tabular_sheets else []
),
remove_duplicate_rows=remove_duplicate_rows,
)
# Function to update column choices when files are uploaded
def update_tabular_column_choices(files, in_excel_tabular_sheets: List[str] = []):
if not files:
return gr.update(choices=[])
all_columns = set()
for file in files:
try:
file_extension = os.path.splitext(file.name)[-1]
if file_extension in [".xlsx", ".xls"]:
for sheet_name in in_excel_tabular_sheets:
df = read_file(file.name, excel_sheet_name=sheet_name)
text_cols = df.select_dtypes(
include=["object", "string"]
).columns.tolist()
all_columns.update(text_cols)
else:
df = read_file(file.name)
text_cols = df.select_dtypes(
include=["object", "string"]
).columns.tolist()
all_columns.update(text_cols)
# Get text columns
text_cols = df.select_dtypes(include=["object", "string"]).columns.tolist()
all_columns.update(text_cols)
except Exception as e:
print(f"Error reading {file.name}: {e}")
continue
return gr.Dropdown(choices=sorted(list(all_columns)))
# Function to handle tabular duplicate detection
def run_tabular_duplicate_detection(
files,
threshold,
min_words,
text_columns,
output_folder: str = OUTPUT_FOLDER,
do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN,
in_excel_tabular_sheets: List[str] = [],
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
):
if not files:
print("No files uploaded")
return pd.DataFrame(), [], gr.Dropdown(choices=[]), 0, "deduplicate"
start_time = time.time()
task_textbox = "deduplicate"
# If output folder doesn't end with a forward slash, add one
if not output_folder.endswith("/"):
output_folder = output_folder + "/"
file_paths = list()
if isinstance(files, str):
# If 'files' is a single string, treat it as a list with one element
file_paths.append(files)
elif isinstance(files, list):
# If 'files' is a list, iterate through its elements
for f_item in files:
if isinstance(f_item, str):
# If an element is a string, it's a direct file path
file_paths.append(f_item)
elif hasattr(f_item, "name"):
# If an element has a '.name' attribute (e.g., a Gradio File object), use its name
file_paths.append(f_item.name)
else:
# Log a warning for unexpected element types within the list
print(
f"Warning: Skipping an element in 'files' list that is neither a string nor has a '.name' attribute: {type(f_item)}"
)
elif hasattr(files, "name"):
# Handle the case where a single file object (e.g., gr.File) is passed directly, not in a list
file_paths.append(files.name)
else:
# Raise an error for any other unexpected type of the 'files' argument itself
raise TypeError(
f"Unexpected type for 'files' argument: {type(files)}. Expected str, list of str/file objects, or a single file object."
)
if len(file_paths) > MAX_SIMULTANEOUS_FILES:
out_message = f"Number of files to deduplicate is greater than {MAX_SIMULTANEOUS_FILES}. Please submit a smaller number of files."
print(out_message)
raise Exception(out_message)
results_df, output_paths, full_data = run_tabular_duplicate_analysis(
files=file_paths,
threshold=threshold,
min_words=min_words,
text_columns=text_columns if text_columns else [],
output_folder=output_folder,
do_initial_clean_dup=do_initial_clean_dup,
in_excel_tabular_sheets=(
in_excel_tabular_sheets if in_excel_tabular_sheets else None
),
remove_duplicate_rows=remove_duplicate_rows,
)
# Update file choices for cleaning
file_choices = list(set([f for f in file_paths]))
end_time = time.time()
processing_time = round(end_time - start_time, 2)
return (
results_df,
output_paths,
gr.Dropdown(choices=file_choices),
processing_time,
task_textbox,
)
# Function to handle row selection for preview
def handle_tabular_row_selection(results_df, evt: gr.SelectData):
if not evt:
return None, "", ""
if not isinstance(results_df, pd.DataFrame):
return None, "", ""
elif results_df.empty:
return None, "", ""
selected_index = evt.index[0]
if selected_index >= len(results_df):
return None, "", ""
row = results_df.iloc[selected_index]
return selected_index, row["Text1"], row["Text2"]
# Function to clean duplicates from selected file
def clean_tabular_duplicates(
file_name,
results_df,
output_folder,
in_excel_tabular_sheets: str = "",
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
):
if not file_name or results_df.empty:
return None
# Get duplicate rows for this file
file_duplicates = results_df[results_df["File2"] == file_name]["Row2"].tolist()
if not file_duplicates:
return None
try:
# Find the original file path
# This is a simplified approach - in practice you might want to store file paths
cleaned_file = remove_duplicate_rows_from_tabular_data(
file_path=file_name,
duplicate_rows=file_duplicates,
output_folder=output_folder,
in_excel_tabular_sheets=in_excel_tabular_sheets,
remove_duplicate_rows=remove_duplicate_rows,
)
return cleaned_file
except Exception as e:
print(f"Error cleaning duplicates: {e}")
return None