Added possibility to specify allowed hosts. Fixed some tests to return more reliably. Fixed some issues related to file path checks not working correctly. Redaction should now return review files correctly at redaction and apply changes stages.
b1f183d
| import os | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple | |
| import gradio as gr | |
| import pandas as pd | |
| from gradio import Progress | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from tools.config import ( | |
| DO_INITIAL_TABULAR_DATA_CLEAN, | |
| MAX_SIMULTANEOUS_FILES, | |
| MAX_TABLE_ROWS, | |
| REMOVE_DUPLICATE_ROWS, | |
| ) | |
| from tools.data_anonymise import initial_clean | |
| from tools.helper_functions import OUTPUT_FOLDER, read_file | |
| from tools.load_spacy_model_custom_recognisers import nlp | |
| from tools.secure_path_utils import secure_join | |
| def clean_and_stem_text_series( | |
| df: pd.DataFrame, | |
| column: str, | |
| do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN, | |
| ): | |
| """ | |
| Clean and stem text columns in a data frame for tabular data | |
| """ | |
| # Function to apply lemmatisation and remove stopwords | |
| def _apply_lemmatization(text): | |
| doc = nlp(text) | |
| # Keep only alphabetic tokens and remove stopwords | |
| lemmatized_words = [ | |
| token.lemma_ for token in doc if token.is_alpha and not token.is_stop | |
| ] | |
| return " ".join(lemmatized_words) | |
| # Always create text_clean column first | |
| if do_initial_clean_dup: | |
| df["text_clean"] = initial_clean(df[column]) | |
| else: | |
| df["text_clean"] = df[column] | |
| df["text_clean"] = df["text_clean"].apply(_apply_lemmatization) | |
| df["text_clean"] = df[ | |
| "text_clean" | |
| ].str.lower() # .str.replace(r'[^\w\s]', '', regex=True) | |
| return df | |
| def convert_tabular_data_to_analysis_format( | |
| df: pd.DataFrame, file_name: str, text_columns: List[str] = None | |
| ) -> List[Tuple[str, pd.DataFrame]]: | |
| """ | |
| Convert tabular data (CSV/XLSX) to the format needed for duplicate analysis. | |
| Args: | |
| df (pd.DataFrame): The input DataFrame | |
| file_name (str): Name of the file | |
| text_columns (List[str], optional): Columns to analyze for duplicates. | |
| If None, uses all string columns. | |
| Returns: | |
| List[Tuple[str, pd.DataFrame]]: List containing (file_name, processed_df) tuple | |
| """ | |
| # if text_columns is None: | |
| # # Auto-detect text columns (string type columns) | |
| # print(f"No text columns given for {file_name}") | |
| # return [] | |
| # text_columns = df.select_dtypes(include=['object', 'string']).columns.tolist() | |
| text_columns = [col for col in text_columns if col in df.columns] | |
| if not text_columns: | |
| print(f"No text columns found in {file_name}") | |
| return list() | |
| # Create a copy to avoid modifying original | |
| df_copy = df.copy() | |
| # Create a combined text column from all text columns | |
| df_copy["combined_text"] = ( | |
| df_copy[text_columns].fillna("").astype(str).agg(" ".join, axis=1) | |
| ) | |
| # Add row identifier | |
| df_copy["row_id"] = df_copy.index | |
| # Create the format expected by the duplicate detection system | |
| # Using 'row_number' as row number and 'text' as the combined text | |
| processed_df = pd.DataFrame( | |
| { | |
| "row_number": df_copy["row_id"], | |
| "text": df_copy["combined_text"], | |
| "file": file_name, | |
| } | |
| ) | |
| # Add original row data for reference | |
| for col in text_columns: | |
| processed_df[f"original_{col}"] = df_copy[col] | |
| return [(file_name, processed_df)] | |
| def find_duplicate_cells_in_tabular_data( | |
| input_files: List[str], | |
| similarity_threshold: float = 0.95, | |
| min_word_count: int = 3, | |
| text_columns: List[str] = [], | |
| output_folder: str = OUTPUT_FOLDER, | |
| do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN, | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| in_excel_tabular_sheets: str = "", | |
| progress: Progress = Progress(track_tqdm=True), | |
| ) -> Tuple[pd.DataFrame, List[str], Dict[str, pd.DataFrame]]: | |
| """ | |
| Find duplicate cells/text in tabular data files (CSV, XLSX, Parquet). | |
| Args: | |
| input_files (List[str]): List of file paths to analyze | |
| similarity_threshold (float): Minimum similarity score to consider duplicates | |
| min_word_count (int): Minimum word count for text to be considered | |
| text_columns (List[str], optional): Specific columns to analyze | |
| output_folder (str, optional): Output folder for results | |
| do_initial_clean_dup (bool, optional): Whether to do initial clean of text | |
| progress (Progress): Progress tracking object | |
| Returns: | |
| Tuple containing: | |
| - results_df: DataFrame with duplicate matches | |
| - output_paths: List of output file paths | |
| - full_data_by_file: Dictionary of processed data by file | |
| """ | |
| if not input_files: | |
| raise gr.Error("Please upload files to analyze.") | |
| progress(0.1, desc="Loading and processing files...") | |
| all_data_to_process = list() | |
| full_data_by_file = dict() | |
| file_paths = list() | |
| # Process each file | |
| for file_path in input_files: | |
| try: | |
| if file_path.endswith(".xlsx") or file_path.endswith(".xls"): | |
| temp_df = pd.DataFrame() | |
| # Try finding each sheet in the given list until a match is found | |
| for sheet_name in in_excel_tabular_sheets: | |
| temp_df = read_file(file_path, excel_sheet_name=sheet_name) | |
| # If sheet was successfully_loaded | |
| if not temp_df.empty: | |
| if temp_df.shape[0] > MAX_TABLE_ROWS: | |
| out_message = f"Number of rows in {file_path} for sheet {sheet_name} is greater than {MAX_TABLE_ROWS}. Please submit a smaller file." | |
| print(out_message) | |
| raise Exception(out_message) | |
| file_name = os.path.basename(file_path) + "_" + sheet_name | |
| file_paths.append(file_path) | |
| # Convert to analysis format | |
| processed_data = convert_tabular_data_to_analysis_format( | |
| temp_df, file_name, text_columns | |
| ) | |
| if processed_data: | |
| all_data_to_process.extend(processed_data) | |
| full_data_by_file[file_name] = processed_data[0][1] | |
| temp_df = pd.DataFrame() | |
| else: | |
| temp_df = read_file(file_path) | |
| if temp_df.shape[0] > MAX_TABLE_ROWS: | |
| out_message = f"Number of rows in {file_path} is greater than {MAX_TABLE_ROWS}. Please submit a smaller file." | |
| print(out_message) | |
| raise Exception(out_message) | |
| file_name = os.path.basename(file_path) | |
| file_paths.append(file_path) | |
| # Convert to analysis format | |
| processed_data = convert_tabular_data_to_analysis_format( | |
| temp_df, file_name, text_columns | |
| ) | |
| if processed_data: | |
| all_data_to_process.extend(processed_data) | |
| full_data_by_file[file_name] = processed_data[0][1] | |
| except Exception as e: | |
| print(f"Error processing {file_path}: {e}") | |
| continue | |
| if not all_data_to_process: | |
| raise gr.Error("No valid data found in uploaded files.") | |
| progress(0.2, desc="Combining data...") | |
| # Combine all data | |
| combined_df = pd.concat( | |
| [data[1] for data in all_data_to_process], ignore_index=True | |
| ) | |
| combined_df = combined_df.drop_duplicates(subset=["row_number", "file"]) | |
| progress(0.3, desc="Cleaning and preparing text...") | |
| # Clean and prepare text | |
| combined_df = clean_and_stem_text_series( | |
| combined_df, "text", do_initial_clean_dup=do_initial_clean_dup | |
| ) | |
| # Filter by minimum word count | |
| combined_df["word_count"] = ( | |
| combined_df["text_clean"].str.split().str.len().fillna(0) | |
| ) | |
| combined_df = combined_df[combined_df["word_count"] >= min_word_count].copy() | |
| if len(combined_df) < 2: | |
| return pd.DataFrame(), [], full_data_by_file | |
| progress(0.4, desc="Calculating similarities...") | |
| # Calculate similarities | |
| vectorizer = TfidfVectorizer() | |
| tfidf_matrix = vectorizer.fit_transform(combined_df["text_clean"]) | |
| similarity_matrix = cosine_similarity(tfidf_matrix, dense_output=False) | |
| # Find similar pairs | |
| coo_matrix = similarity_matrix.tocoo() | |
| similar_pairs = [ | |
| (r, c, v) | |
| for r, c, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data) | |
| if r < c and v >= similarity_threshold | |
| ] | |
| if not similar_pairs: | |
| gr.Info("No duplicate cells found.") | |
| return pd.DataFrame(), [], full_data_by_file | |
| progress(0.7, desc="Processing results...") | |
| # Create results DataFrame | |
| results_data = [] | |
| for row1, row2, similarity in similar_pairs: | |
| row1_data = combined_df.iloc[row1] | |
| row2_data = combined_df.iloc[row2] | |
| results_data.append( | |
| { | |
| "File1": row1_data["file"], | |
| "Row1": int(row1_data["row_number"]), | |
| "File2": row2_data["file"], | |
| "Row2": int(row2_data["row_number"]), | |
| "Similarity_Score": round(similarity, 3), | |
| "Text1": ( | |
| row1_data["text"][:200] + "..." | |
| if len(row1_data["text"]) > 200 | |
| else row1_data["text"] | |
| ), | |
| "Text2": ( | |
| row2_data["text"][:200] + "..." | |
| if len(row2_data["text"]) > 200 | |
| else row2_data["text"] | |
| ), | |
| "Original_Index1": row1, | |
| "Original_Index2": row2, | |
| } | |
| ) | |
| results_df = pd.DataFrame(results_data) | |
| results_df = results_df.sort_values(["File1", "Row1", "File2", "Row2"]) | |
| progress(0.9, desc="Saving results...") | |
| # Save results | |
| output_paths = save_tabular_duplicate_results( | |
| results_df, | |
| output_folder, | |
| file_paths, | |
| remove_duplicate_rows=remove_duplicate_rows, | |
| in_excel_tabular_sheets=in_excel_tabular_sheets, | |
| ) | |
| gr.Info(f"Found {len(results_df)} duplicate cell matches") | |
| return results_df, output_paths, full_data_by_file | |
| def save_tabular_duplicate_results( | |
| results_df: pd.DataFrame, | |
| output_folder: str, | |
| file_paths: List[str], | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| in_excel_tabular_sheets: List[str] = [], | |
| ) -> List[str]: | |
| """ | |
| Save tabular duplicate detection results to files. | |
| Args: | |
| results_df (pd.DataFrame): Results DataFrame | |
| output_folder (str): Output folder path | |
| file_paths (List[str]): List of file paths | |
| remove_duplicate_rows (bool): Whether to remove duplicate rows | |
| in_excel_tabular_sheets (str): Name of the Excel sheet to save the results to | |
| Returns: | |
| List[str]: List of output file paths | |
| """ | |
| output_paths = list() | |
| output_folder_path = Path(output_folder) | |
| output_folder_path.mkdir(exist_ok=True) | |
| if results_df.empty: | |
| print("No duplicate matches to save.") | |
| return list() | |
| # Save main results | |
| results_file = output_folder_path / "tabular_duplicate_results.csv" | |
| results_df.to_csv(results_file, index=False, encoding="utf-8-sig") | |
| output_paths.append(str(results_file)) | |
| # Group results by original file to handle Excel files properly | |
| excel_files_processed = dict() # Track which Excel files have been processed | |
| # Save per-file duplicate lists | |
| for file_name, group in results_df.groupby("File2"): | |
| # Check for matches with original file names | |
| for original_file in file_paths: | |
| original_file_name = os.path.basename(original_file) | |
| if original_file_name in file_name: | |
| original_file_extension = os.path.splitext(original_file)[-1] | |
| if original_file_extension in [".xlsx", ".xls"]: | |
| # Split the string using secure regex to handle both .xlsx_ and .xls_ delimiters | |
| from tools.secure_regex_utils import safe_split_filename | |
| parts = safe_split_filename( | |
| os.path.basename(file_name), [".xlsx_", ".xls_"] | |
| ) | |
| # The sheet name is the last part after splitting | |
| file_sheet_name = parts[-1] | |
| file_path = original_file | |
| # Initialize Excel file tracking if not already done | |
| if file_path not in excel_files_processed: | |
| excel_files_processed[file_path] = { | |
| "sheets_data": dict(), | |
| "all_sheets": list(), | |
| "processed_sheets": set(), | |
| } | |
| # Read the original Excel file to get all sheet names | |
| if not excel_files_processed[file_path]["all_sheets"]: | |
| try: | |
| excel_file = pd.ExcelFile(file_path) | |
| excel_files_processed[file_path][ | |
| "all_sheets" | |
| ] = excel_file.sheet_names | |
| except Exception as e: | |
| print(f"Error reading Excel file {file_path}: {e}") | |
| continue | |
| # Read the current sheet | |
| df = read_file(file_path, excel_sheet_name=file_sheet_name) | |
| # Create duplicate rows file for this sheet | |
| file_stem = Path(file_name).stem | |
| duplicate_rows_file = ( | |
| output_folder_path | |
| / f"{file_stem}_{file_sheet_name}_duplicate_rows.csv" | |
| ) | |
| # Get unique row numbers to remove | |
| rows_to_remove = sorted(group["Row2"].unique()) | |
| duplicate_df = pd.DataFrame({"Row_to_Remove": rows_to_remove}) | |
| duplicate_df.to_csv(duplicate_rows_file, index=False) | |
| output_paths.append(str(duplicate_rows_file)) | |
| # Process the sheet data | |
| df_cleaned = df.copy() | |
| df_cleaned["duplicated"] = False | |
| df_cleaned.loc[rows_to_remove, "duplicated"] = True | |
| if remove_duplicate_rows: | |
| df_cleaned = df_cleaned.drop(index=rows_to_remove) | |
| # Store the processed sheet data | |
| excel_files_processed[file_path]["sheets_data"][ | |
| file_sheet_name | |
| ] = df_cleaned | |
| excel_files_processed[file_path]["processed_sheets"].add( | |
| file_sheet_name | |
| ) | |
| else: | |
| file_sheet_name = "" | |
| file_path = original_file | |
| print("file_path after match:", file_path) | |
| file_base_name = os.path.basename(file_path) | |
| df = read_file(file_path) | |
| file_stem = Path(file_name).stem | |
| duplicate_rows_file = ( | |
| output_folder_path / f"{file_stem}_duplicate_rows.csv" | |
| ) | |
| # Get unique row numbers to remove | |
| rows_to_remove = sorted(group["Row2"].unique()) | |
| duplicate_df = pd.DataFrame({"Row_to_Remove": rows_to_remove}) | |
| duplicate_df.to_csv(duplicate_rows_file, index=False) | |
| output_paths.append(str(duplicate_rows_file)) | |
| df_cleaned = df.copy() | |
| df_cleaned["duplicated"] = False | |
| df_cleaned.loc[rows_to_remove, "duplicated"] = True | |
| if remove_duplicate_rows: | |
| df_cleaned = df_cleaned.drop(index=rows_to_remove) | |
| file_ext = os.path.splitext(file_name)[-1] | |
| if file_ext in [".parquet"]: | |
| output_path = secure_join( | |
| output_folder, f"{file_base_name}_deduplicated.parquet" | |
| ) | |
| df_cleaned.to_parquet(output_path, index=False) | |
| else: | |
| output_path = secure_join( | |
| output_folder, f"{file_base_name}_deduplicated.csv" | |
| ) | |
| df_cleaned.to_csv( | |
| output_path, index=False, encoding="utf-8-sig" | |
| ) | |
| output_paths.append(str(output_path)) | |
| break | |
| # Process Excel files to create complete deduplicated files | |
| for file_path, file_data in excel_files_processed.items(): | |
| try: | |
| # Create output filename | |
| file_base_name = os.path.splitext(os.path.basename(file_path))[0] | |
| file_ext = os.path.splitext(file_path)[-1] | |
| output_path = secure_join( | |
| output_folder, f"{file_base_name}_deduplicated{file_ext}" | |
| ) | |
| # Create Excel writer | |
| with pd.ExcelWriter(output_path, engine="openpyxl") as writer: | |
| # Write all sheets | |
| for sheet_name in file_data["all_sheets"]: | |
| if sheet_name in file_data["processed_sheets"]: | |
| # Use the processed (deduplicated) version | |
| file_data["sheets_data"][sheet_name].to_excel( | |
| writer, sheet_name=sheet_name, index=False | |
| ) | |
| else: | |
| # Use the original sheet (no duplicates found) | |
| original_df = read_file(file_path, excel_sheet_name=sheet_name) | |
| original_df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| output_paths.append(str(output_path)) | |
| print(f"Created deduplicated Excel file: {output_path}") | |
| except Exception as e: | |
| print(f"Error creating deduplicated Excel file for {file_path}: {e}") | |
| continue | |
| return output_paths | |
| def remove_duplicate_rows_from_tabular_data( | |
| file_path: str, | |
| duplicate_rows: List[int], | |
| output_folder: str = OUTPUT_FOLDER, | |
| in_excel_tabular_sheets: List[str] = [], | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| ) -> str: | |
| """ | |
| Remove duplicate rows from a tabular data file. | |
| Args: | |
| file_path (str): Path to the input file | |
| duplicate_rows (List[int]): List of row indices to remove | |
| output_folder (str): Output folder for cleaned file | |
| in_excel_tabular_sheets (str): Name of the Excel sheet to save the results to | |
| remove_duplicate_rows (bool): Whether to remove duplicate rows | |
| Returns: | |
| str: Path to the cleaned file | |
| """ | |
| try: | |
| # Load the file | |
| df = read_file( | |
| file_path, | |
| excel_sheet_name=in_excel_tabular_sheets if in_excel_tabular_sheets else "", | |
| ) | |
| # Remove duplicate rows (0-indexed) | |
| df_cleaned = df.drop(index=duplicate_rows).reset_index(drop=True) | |
| # Save cleaned file | |
| file_name = os.path.basename(file_path) | |
| file_stem = os.path.splitext(file_name)[0] | |
| file_ext = os.path.splitext(file_name)[-1] | |
| output_path = secure_join(output_folder, f"{file_stem}_deduplicated{file_ext}") | |
| if file_ext in [".xlsx", ".xls"]: | |
| df_cleaned.to_excel( | |
| output_path, | |
| index=False, | |
| sheet_name=in_excel_tabular_sheets if in_excel_tabular_sheets else [], | |
| ) | |
| elif file_ext in [".parquet"]: | |
| df_cleaned.to_parquet(output_path, index=False) | |
| else: | |
| df_cleaned.to_csv(output_path, index=False, encoding="utf-8-sig") | |
| return output_path | |
| except Exception as e: | |
| print(f"Error removing duplicates from {file_path}: {e}") | |
| raise | |
| def run_tabular_duplicate_analysis( | |
| files: List[str], | |
| threshold: float, | |
| min_words: int, | |
| text_columns: List[str] = [], | |
| output_folder: str = OUTPUT_FOLDER, | |
| do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN, | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| in_excel_tabular_sheets: List[str] = [], | |
| progress: Progress = Progress(track_tqdm=True), | |
| ) -> Tuple[pd.DataFrame, List[str], Dict[str, pd.DataFrame]]: | |
| """ | |
| Main function to run tabular duplicate analysis. | |
| Args: | |
| files (List[str]): List of file paths | |
| threshold (float): Similarity threshold | |
| min_words (int): Minimum word count | |
| text_columns (List[str], optional): Specific columns to analyze | |
| output_folder (str, optional): Output folder for results | |
| progress (Progress): Progress tracking | |
| Returns: | |
| Tuple containing results DataFrame, output paths, and full data by file | |
| """ | |
| return find_duplicate_cells_in_tabular_data( | |
| input_files=files, | |
| similarity_threshold=threshold, | |
| min_word_count=min_words, | |
| text_columns=text_columns if text_columns else [], | |
| output_folder=output_folder, | |
| do_initial_clean_dup=do_initial_clean_dup, | |
| in_excel_tabular_sheets=( | |
| in_excel_tabular_sheets if in_excel_tabular_sheets else [] | |
| ), | |
| remove_duplicate_rows=remove_duplicate_rows, | |
| ) | |
| # Function to update column choices when files are uploaded | |
| def update_tabular_column_choices(files, in_excel_tabular_sheets: List[str] = []): | |
| if not files: | |
| return gr.update(choices=[]) | |
| all_columns = set() | |
| for file in files: | |
| try: | |
| file_extension = os.path.splitext(file.name)[-1] | |
| if file_extension in [".xlsx", ".xls"]: | |
| for sheet_name in in_excel_tabular_sheets: | |
| df = read_file(file.name, excel_sheet_name=sheet_name) | |
| text_cols = df.select_dtypes( | |
| include=["object", "string"] | |
| ).columns.tolist() | |
| all_columns.update(text_cols) | |
| else: | |
| df = read_file(file.name) | |
| text_cols = df.select_dtypes( | |
| include=["object", "string"] | |
| ).columns.tolist() | |
| all_columns.update(text_cols) | |
| # Get text columns | |
| text_cols = df.select_dtypes(include=["object", "string"]).columns.tolist() | |
| all_columns.update(text_cols) | |
| except Exception as e: | |
| print(f"Error reading {file.name}: {e}") | |
| continue | |
| return gr.Dropdown(choices=sorted(list(all_columns))) | |
| # Function to handle tabular duplicate detection | |
| def run_tabular_duplicate_detection( | |
| files, | |
| threshold, | |
| min_words, | |
| text_columns, | |
| output_folder: str = OUTPUT_FOLDER, | |
| do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN, | |
| in_excel_tabular_sheets: List[str] = [], | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| ): | |
| if not files: | |
| print("No files uploaded") | |
| return pd.DataFrame(), [], gr.Dropdown(choices=[]), 0, "deduplicate" | |
| start_time = time.time() | |
| task_textbox = "deduplicate" | |
| # If output folder doesn't end with a forward slash, add one | |
| if not output_folder.endswith("/"): | |
| output_folder = output_folder + "/" | |
| file_paths = list() | |
| if isinstance(files, str): | |
| # If 'files' is a single string, treat it as a list with one element | |
| file_paths.append(files) | |
| elif isinstance(files, list): | |
| # If 'files' is a list, iterate through its elements | |
| for f_item in files: | |
| if isinstance(f_item, str): | |
| # If an element is a string, it's a direct file path | |
| file_paths.append(f_item) | |
| elif hasattr(f_item, "name"): | |
| # If an element has a '.name' attribute (e.g., a Gradio File object), use its name | |
| file_paths.append(f_item.name) | |
| else: | |
| # Log a warning for unexpected element types within the list | |
| print( | |
| f"Warning: Skipping an element in 'files' list that is neither a string nor has a '.name' attribute: {type(f_item)}" | |
| ) | |
| elif hasattr(files, "name"): | |
| # Handle the case where a single file object (e.g., gr.File) is passed directly, not in a list | |
| file_paths.append(files.name) | |
| else: | |
| # Raise an error for any other unexpected type of the 'files' argument itself | |
| raise TypeError( | |
| f"Unexpected type for 'files' argument: {type(files)}. Expected str, list of str/file objects, or a single file object." | |
| ) | |
| if len(file_paths) > MAX_SIMULTANEOUS_FILES: | |
| out_message = f"Number of files to deduplicate is greater than {MAX_SIMULTANEOUS_FILES}. Please submit a smaller number of files." | |
| print(out_message) | |
| raise Exception(out_message) | |
| results_df, output_paths, full_data = run_tabular_duplicate_analysis( | |
| files=file_paths, | |
| threshold=threshold, | |
| min_words=min_words, | |
| text_columns=text_columns if text_columns else [], | |
| output_folder=output_folder, | |
| do_initial_clean_dup=do_initial_clean_dup, | |
| in_excel_tabular_sheets=( | |
| in_excel_tabular_sheets if in_excel_tabular_sheets else None | |
| ), | |
| remove_duplicate_rows=remove_duplicate_rows, | |
| ) | |
| # Update file choices for cleaning | |
| file_choices = list(set([f for f in file_paths])) | |
| end_time = time.time() | |
| processing_time = round(end_time - start_time, 2) | |
| return ( | |
| results_df, | |
| output_paths, | |
| gr.Dropdown(choices=file_choices), | |
| processing_time, | |
| task_textbox, | |
| ) | |
| # Function to handle row selection for preview | |
| def handle_tabular_row_selection(results_df, evt: gr.SelectData): | |
| if not evt: | |
| return None, "", "" | |
| if not isinstance(results_df, pd.DataFrame): | |
| return None, "", "" | |
| elif results_df.empty: | |
| return None, "", "" | |
| selected_index = evt.index[0] | |
| if selected_index >= len(results_df): | |
| return None, "", "" | |
| row = results_df.iloc[selected_index] | |
| return selected_index, row["Text1"], row["Text2"] | |
| # Function to clean duplicates from selected file | |
| def clean_tabular_duplicates( | |
| file_name, | |
| results_df, | |
| output_folder, | |
| in_excel_tabular_sheets: str = "", | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| ): | |
| if not file_name or results_df.empty: | |
| return None | |
| # Get duplicate rows for this file | |
| file_duplicates = results_df[results_df["File2"] == file_name]["Row2"].tolist() | |
| if not file_duplicates: | |
| return None | |
| try: | |
| # Find the original file path | |
| # This is a simplified approach - in practice you might want to store file paths | |
| cleaned_file = remove_duplicate_rows_from_tabular_data( | |
| file_path=file_name, | |
| duplicate_rows=file_duplicates, | |
| output_folder=output_folder, | |
| in_excel_tabular_sheets=in_excel_tabular_sheets, | |
| remove_duplicate_rows=remove_duplicate_rows, | |
| ) | |
| return cleaned_file | |
| except Exception as e: | |
| print(f"Error cleaning duplicates: {e}") | |
| return None | |