import re import secrets import base64 import time import pandas as pd from faker import Faker from gradio import Progress from typing import List from presidio_analyzer import AnalyzerEngine, BatchAnalyzerEngine from presidio_anonymizer import AnonymizerEngine, BatchAnonymizerEngine from presidio_anonymizer.entities import OperatorConfig, ConflictResolutionStrategy from tools.helper_functions import output_folder, get_file_path_end, read_file, detect_file_type from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold # Use custom version of analyze_dict to be able to track progress from tools.presidio_analyzer_custom import analyze_dict fake = Faker("en_UK") def fake_first_name(x): return fake.first_name() def anon_consistent_names(df): # ## Pick out common names and replace them with the same person value df_dict = df.to_dict(orient="list") analyzer = AnalyzerEngine() batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer) analyzer_results = batch_analyzer.analyze_dict(df_dict, language="en") analyzer_results = list(analyzer_results) # + tags=[] text = analyzer_results[3].value # + tags=[] recognizer_result = str(analyzer_results[3].recognizer_results) # + tags=[] recognizer_result # + tags=[] data_str = recognizer_result # abbreviated for brevity # Adjusting the parse_dict function to handle trailing ']' # Splitting the main data string into individual list strings list_strs = data_str[1:-1].split('], [') def parse_dict(s): s = s.strip('[]') # Removing any surrounding brackets items = s.split(', ') d = {} for item in items: key, value = item.split(': ') if key == 'score': d[key] = float(value) elif key in ['start', 'end']: d[key] = int(value) else: d[key] = value return d # Re-running the improved processing code result = [] for lst_str in list_strs: # Splitting each list string into individual dictionary strings dict_strs = lst_str.split(', type: ') dict_strs = [dict_strs[0]] + ['type: ' + s for s in dict_strs[1:]] # Prepending "type: " back to the split strings # Parsing each dictionary string dicts = [parse_dict(d) for d in dict_strs] result.append(dicts) #result # + tags=[] names = [] for idx, paragraph in enumerate(text): paragraph_texts = [] for dictionary in result[idx]: if dictionary['type'] == 'PERSON': paragraph_texts.append(paragraph[dictionary['start']:dictionary['end']]) names.append(paragraph_texts) # + tags=[] # Flatten the list of lists and extract unique names unique_names = list(set(name for sublist in names for name in sublist)) # + tags=[] fake_names = pd.Series(unique_names).apply(fake_first_name) # + tags=[] mapping_df = pd.DataFrame(data={"Unique names":unique_names, "Fake names": fake_names}) # + tags=[] # Convert mapping dataframe to dictionary # Convert mapping dataframe to dictionary, adding word boundaries for full-word match name_map = {r'\b' + k + r'\b': v for k, v in zip(mapping_df['Unique names'], mapping_df['Fake names'])} # + tags=[] name_map # + tags=[] scrubbed_df_consistent_names = df.replace(name_map, regex = True) # + tags=[] scrubbed_df_consistent_names return scrubbed_df_consistent_names def anonymise_script(df, anon_strat, language:str, chosen_redact_entities:List[str], in_allow_list:List[str]=[], progress=Progress(track_tqdm=False)): key_string = "" # DataFrame to dict df_dict = df.to_dict(orient="list") if in_allow_list: in_allow_list_flat = [item for sublist in in_allow_list for item in sublist] #analyzer = nlp_analyser #AnalyzerEngine() batch_analyzer = BatchAnalyzerEngine(analyzer_engine=nlp_analyser) anonymizer = AnonymizerEngine()#conflict_resolution=ConflictResolutionStrategy.MERGE_SIMILAR_OR_CONTAINED) batch_anonymizer = BatchAnonymizerEngine(anonymizer_engine = anonymizer) # analyzer_results = batch_analyzer.analyze_dict(df_dict, language=language, # entities=chosen_redact_entities, # score_threshold=score_threshold, # return_decision_process=False, # in_allow_list=in_allow_list_flat) print("Identifying personal information") analyse_tic = time.perf_counter() print("Allow list:", in_allow_list) # Use custom analyzer to be able to track progress with Gradio analyzer_results = analyze_dict(batch_analyzer, df_dict, language=language, entities=chosen_redact_entities, score_threshold=score_threshold, return_decision_process=False, allow_list=in_allow_list_flat) analyzer_results = list(analyzer_results) #analyzer_results analyse_toc = time.perf_counter() analyse_time_out = f"Analysing the text took {analyse_toc - analyse_tic:0.1f} seconds." print(analyse_time_out) # Create faker function (note that it has to receive a value) fake = Faker("en_UK") def fake_first_name(x): return fake.first_name() # Set up the anonymization configuration WITHOUT DATE_TIME simple_replace_config = eval('{"DEFAULT": OperatorConfig("replace", {"new_value": "REDACTED"})}') replace_config = eval('{"DEFAULT": OperatorConfig("replace")}') redact_config = eval('{"DEFAULT": OperatorConfig("redact")}') hash_config = eval('{"DEFAULT": OperatorConfig("hash")}') mask_config = eval('{"DEFAULT": OperatorConfig("mask", {"masking_char":"*", "chars_to_mask":100, "from_end":True})}') people_encrypt_config = eval('{"PERSON": OperatorConfig("encrypt", {"key": key_string})}') # The encryption is using AES cypher in CBC mode and requires a cryptographic key as an input for both the encryption and the decryption. fake_first_name_config = eval('{"PERSON": OperatorConfig("custom", {"lambda": fake_first_name})}') if anon_strat == "replace with ": chosen_mask_config = simple_replace_config if anon_strat == "replace with ": chosen_mask_config = replace_config if anon_strat == "redact": chosen_mask_config = redact_config if anon_strat == "hash": chosen_mask_config = hash_config if anon_strat == "mask": chosen_mask_config = mask_config if anon_strat == "encrypt": chosen_mask_config = people_encrypt_config # Generate a 128-bit AES key. Then encode the key using base64 to get a string representation key = secrets.token_bytes(16) # 128 bits = 16 bytes key_string = base64.b64encode(key).decode('utf-8') elif anon_strat == "fake_first_name": chosen_mask_config = fake_first_name_config # I think in general people will want to keep date / times keep_date_config = eval('{"DATE_TIME": OperatorConfig("keep")}') combined_config = {**chosen_mask_config, **keep_date_config} combined_config anonymizer_results = batch_anonymizer.anonymize_dict(analyzer_results, operators=combined_config) scrubbed_df = pd.DataFrame(anonymizer_results) return scrubbed_df, key_string def anon_wrapper_func(anon_file, anon_df, chosen_cols, out_file_paths, out_file_part, out_message, excel_sheet_name, anon_strat, language, chosen_redact_entities, in_allow_list, file_type, anon_xlsx_export_file_name): def check_lists(list1, list2): return any(string in list2 for string in list1) def get_common_strings(list1, list2): """ Finds the common strings between two lists. Args: list1: The first list of strings. list2: The second list of strings. Returns: A list containing the common strings. """ common_strings = [] for string in list1: if string in list2: common_strings.append(string) return common_strings # Check for chosen col, skip file if not found all_cols_original_order = list(anon_df.columns) any_cols_found = check_lists(chosen_cols, all_cols_original_order) if any_cols_found == False: out_message = "No chosen columns found in dataframe: " + out_file_part print(out_message) else: chosen_cols_in_anon_df = get_common_strings(chosen_cols, all_cols_original_order) # Split dataframe to keep only selected columns print("Remaining columns to redact:", chosen_cols_in_anon_df) anon_df_part = anon_df[chosen_cols_in_anon_df] anon_df_remain = anon_df.drop(chosen_cols_in_anon_df, axis = 1) # Anonymise the selected columns anon_df_part_out, key_string = anonymise_script(anon_df_part, anon_strat, language, chosen_redact_entities, in_allow_list) # Rejoin the dataframe together anon_df_out = pd.concat([anon_df_part_out, anon_df_remain], axis = 1) anon_df_out = anon_df_out[all_cols_original_order] # Export file # Rename anonymisation strategy for file path naming if anon_strat == "replace with ": anon_strat_txt = "redact_simple" elif anon_strat == "replace with ": anon_strat_txt = "redact_entity_type" else: anon_strat_txt = anon_strat # If the file is an xlsx, add a new sheet to the existing xlsx. Otherwise, write to csv if file_type == 'xlsx': anon_export_file_name = anon_xlsx_export_file_name # Create a Pandas Excel writer using XlsxWriter as the engine. with pd.ExcelWriter(anon_xlsx_export_file_name, engine='openpyxl', mode='a') as writer: # Write each DataFrame to a different worksheet. anon_df_out.to_excel(writer, sheet_name=excel_sheet_name, index=None) else: anon_export_file_name = output_folder + out_file_part + "_" + excel_sheet_name + "_anon_" + anon_strat_txt + ".csv" anon_df_out.to_csv(anon_export_file_name, index = None) out_file_paths.append(anon_export_file_name) # As files are created in a loop, there is a risk of duplicate file names being output. Use set to keep uniques. out_file_paths = list(set(out_file_paths)) # Print result text to output text box if just anonymising open text if anon_file=='open_text': out_message = [anon_df_out['text'][0]] return out_file_paths, out_message, key_string def anonymise_data_files(file_paths:List[str], in_text:str, anon_strat:str, chosen_cols:List[str], language:str, chosen_redact_entities:List[str], in_allow_list:List[str]=None, latest_file_completed:int=0, out_message:list=[], out_file_paths:list = [], in_excel_sheets:list=[], progress=Progress(track_tqdm=True)): tic = time.perf_counter() # Load file # If out message or out_file_paths are blank, change to a list so it can be appended to if isinstance(out_message, str): out_message = [out_message] if not out_file_paths: out_file_paths = [] if in_allow_list: in_allow_list_flat = [item for sublist in in_allow_list for item in sublist] anon_df = pd.DataFrame() #out_file_paths = [] # Check if files and text exist if not file_paths: if in_text: file_paths=['open_text'] else: out_message = "Please enter text or a file to redact." return out_message, out_file_paths, out_file_paths, latest_file_completed # If we have already redacted the last file, return the input out_message and file list to the relevant components if latest_file_completed == len(file_paths): print("Last file reached, returning files:", str(latest_file_completed)) final_out_message = '\n'.join(out_message) return final_out_message, out_file_paths, out_file_paths, latest_file_completed file_path_loop = [file_paths[int(latest_file_completed)]] for anon_file in progress.tqdm(file_path_loop, desc="Anonymising files", unit = "file"): if anon_file=='open_text': anon_df = pd.DataFrame(data={'text':[in_text]}) chosen_cols=['text'] out_file_part = anon_file else: # If file is an xlsx, we are going to run through all the Excel sheets to anonymise them separately. file_type = detect_file_type(anon_file) print("File type is:", file_type) out_file_part = get_file_path_end(anon_file.name) if file_type == 'xlsx': print("Running through all xlsx sheets") #anon_xlsx = pd.ExcelFile(anon_file) if not in_excel_sheets: out_message.append("No Excel sheets selected. Please select at least one to anonymise.") continue anon_xlsx = pd.ExcelFile(anon_file) # Create xlsx file: anon_xlsx_export_file_name = output_folder + out_file_part + ".xlsx" from openpyxl import Workbook wb = Workbook() wb.save(anon_xlsx_export_file_name) # Iterate through the sheet names for sheet_name in in_excel_sheets: # Read each sheet into a DataFrame if sheet_name not in anon_xlsx.sheet_names: continue anon_df = pd.read_excel(anon_file, sheet_name=sheet_name) # Process the DataFrame (e.g., print its contents) print(f"Sheet Name: {sheet_name}") print(anon_df.head()) # Print the first few rows out_file_paths, out_message, key_string = anon_wrapper_func(anon_file, anon_df, chosen_cols, out_file_paths, out_file_part, out_message, sheet_name, anon_strat, language, chosen_redact_entities, in_allow_list, file_type, anon_xlsx_export_file_name) else: sheet_name = "" anon_df = read_file(anon_file) out_file_part = get_file_path_end(anon_file.name) out_file_paths, out_message, key_string = anon_wrapper_func(anon_file, anon_df, chosen_cols, out_file_paths, out_file_part, out_message, sheet_name, anon_strat, language, chosen_redact_entities, in_allow_list, file_type, "") # Increase latest file completed count unless we are at the last file if latest_file_completed != len(file_paths): print("Completed file number:", str(latest_file_completed)) latest_file_completed += 1 toc = time.perf_counter() out_time = f"in {toc - tic:0.1f} seconds." print(out_time) if anon_strat == "encrypt": out_message.append(". Your decryption key is " + key_string + ".") out_message.append("Anonymisation of file '" + out_file_part + "' successfully completed in") out_message_out = '\n'.join(out_message) out_message_out = out_message_out + " " + out_time return out_message_out, out_file_paths, out_file_paths, latest_file_completed