Added regex functionality to deny lists. Corrected tesseract to word level parsing. Improved review search regex capabilities. Updated documentation
4852fb5
| from typing import List | |
| import spacy | |
| from presidio_analyzer import ( | |
| AnalyzerEngine, | |
| EntityRecognizer, | |
| Pattern, | |
| PatternRecognizer, | |
| RecognizerResult, | |
| ) | |
| from presidio_analyzer.nlp_engine import ( | |
| NerModelConfiguration, | |
| NlpArtifacts, | |
| SpacyNlpEngine, | |
| ) | |
| from spacy.matcher import Matcher | |
| from spaczz.matcher import FuzzyMatcher | |
| spacy.prefer_gpu() | |
| import os | |
| import re | |
| import gradio as gr | |
| import Levenshtein | |
| import requests | |
| from spacy.cli.download import download | |
| from tools.config import ( | |
| CUSTOM_ENTITIES, | |
| DEFAULT_LANGUAGE, | |
| SPACY_MODEL_PATH, | |
| TESSERACT_DATA_FOLDER, | |
| ) | |
| score_threshold = 0.001 | |
| custom_entities = CUSTOM_ENTITIES | |
| # Create a class inheriting from SpacyNlpEngine | |
| class LoadedSpacyNlpEngine(SpacyNlpEngine): | |
| def __init__(self, loaded_spacy_model, language_code: str): | |
| super().__init__( | |
| ner_model_configuration=NerModelConfiguration( | |
| labels_to_ignore=["CARDINAL", "ORDINAL"] | |
| ) | |
| ) # Ignore non-relevant labels | |
| self.nlp = {language_code: loaded_spacy_model} | |
| def _base_language_code(language: str) -> str: | |
| lang = _normalize_language_input(language) | |
| if "_" in lang: | |
| return lang.split("_")[0] | |
| return lang | |
| def load_spacy_model(language: str = DEFAULT_LANGUAGE): | |
| """ | |
| Load a spaCy model for the requested language and return it as `nlp`. | |
| Accepts common inputs like: "en", "en_lg", "en_sm", "de", "fr", "es", "it", "nl", "pt", "zh", "ja", "xx". | |
| Falls back through sensible candidates and will download if missing. | |
| """ | |
| # Set spaCy data path for custom model storage (only if specified) | |
| import os | |
| if SPACY_MODEL_PATH and SPACY_MODEL_PATH.strip(): | |
| os.environ["SPACY_DATA"] = SPACY_MODEL_PATH | |
| print(f"Setting spaCy model path to: {SPACY_MODEL_PATH}") | |
| else: | |
| print("Using default spaCy model storage location") | |
| synonyms = { | |
| "english": "en", | |
| "catalan": "ca", | |
| "danish": "da", | |
| "german": "de", | |
| "french": "fr", | |
| "greek": "el", | |
| "finnish": "fi", | |
| "croatian": "hr", | |
| "lithuanian": "lt", | |
| "macedonian": "mk", | |
| "norwegian_bokmaal": "nb", | |
| "polish": "pl", | |
| "russian": "ru", | |
| "slovenian": "sl", | |
| "swedish": "sv", | |
| "dutch": "nl", | |
| "portuguese": "pt", | |
| "chinese": "zh", | |
| "japanese": "ja", | |
| "multilingual": "xx", | |
| } | |
| lang_norm = _normalize_language_input(language) | |
| lang_norm = synonyms.get(lang_norm, lang_norm) | |
| base_lang = _base_language_code(lang_norm) | |
| candidates_by_lang = { | |
| # English - prioritize lg, then trf, then md, then sm | |
| "en": [ | |
| "en_core_web_lg", | |
| "en_core_web_trf", | |
| "en_core_web_md", | |
| "en_core_web_sm", | |
| ], | |
| "en_lg": ["en_core_web_lg"], | |
| "en_trf": ["en_core_web_trf"], | |
| "en_md": ["en_core_web_md"], | |
| "en_sm": ["en_core_web_sm"], | |
| # Major languages (news pipelines) - prioritize lg, then md, then sm | |
| "ca": ["ca_core_news_lg", "ca_core_news_md", "ca_core_news_sm"], # Catalan | |
| "da": ["da_core_news_lg", "da_core_news_md", "da_core_news_sm"], # Danish | |
| "de": ["de_core_news_lg", "de_core_news_md", "de_core_news_sm"], # German | |
| "el": ["el_core_news_lg", "el_core_news_md", "el_core_news_sm"], # Greek | |
| "es": ["es_core_news_lg", "es_core_news_md", "es_core_news_sm"], # Spanish | |
| "fi": ["fi_core_news_lg", "fi_core_news_md", "fi_core_news_sm"], # Finnish | |
| "fr": ["fr_core_news_lg", "fr_core_news_md", "fr_core_news_sm"], # French | |
| "hr": ["hr_core_news_lg", "hr_core_news_md", "hr_core_news_sm"], # Croatian | |
| "it": ["it_core_news_lg", "it_core_news_md", "it_core_news_sm"], # Italian | |
| "ja": ["ja_core_news_lg", "ja_core_news_md", "ja_core_news_sm"], # Japanese | |
| "ko": ["ko_core_news_lg", "ko_core_news_md", "ko_core_news_sm"], # Korean | |
| "lt": ["lt_core_news_lg", "lt_core_news_md", "lt_core_news_sm"], # Lithuanian | |
| "mk": ["mk_core_news_lg", "mk_core_news_md", "mk_core_news_sm"], # Macedonian | |
| "nb": [ | |
| "nb_core_news_lg", | |
| "nb_core_news_md", | |
| "nb_core_news_sm", | |
| ], # Norwegian Bokmål | |
| "nl": ["nl_core_news_lg", "nl_core_news_md", "nl_core_news_sm"], # Dutch | |
| "pl": ["pl_core_news_lg", "pl_core_news_md", "pl_core_news_sm"], # Polish | |
| "pt": ["pt_core_news_lg", "pt_core_news_md", "pt_core_news_sm"], # Portuguese | |
| "ro": ["ro_core_news_lg", "ro_core_news_md", "ro_core_news_sm"], # Romanian | |
| "ru": ["ru_core_news_lg", "ru_core_news_md", "ru_core_news_sm"], # Russian | |
| "sl": ["sl_core_news_lg", "sl_core_news_md", "sl_core_news_sm"], # Slovenian | |
| "sv": ["sv_core_news_lg", "sv_core_news_md", "sv_core_news_sm"], # Swedish | |
| "uk": ["uk_core_news_lg", "uk_core_news_md", "uk_core_news_sm"], # Ukrainian | |
| "zh": [ | |
| "zh_core_web_lg", | |
| "zh_core_web_mod", | |
| "zh_core_web_sm", | |
| "zh_core_web_trf", | |
| ], # Chinese | |
| # Multilingual NER | |
| "xx": ["xx_ent_wiki_sm"], | |
| } | |
| if lang_norm in candidates_by_lang: | |
| candidates = candidates_by_lang[lang_norm] | |
| elif base_lang in candidates_by_lang: | |
| candidates = candidates_by_lang[base_lang] | |
| else: | |
| # Fallback to multilingual if unknown | |
| candidates = candidates_by_lang["xx"] | |
| last_error = None | |
| if language != "en": | |
| print( | |
| f"Attempting to load spaCy model for language '{language}' with candidates: {candidates}" | |
| ) | |
| print( | |
| "Note: Models are prioritized by size (lg > md > sm) - will stop after first successful load" | |
| ) | |
| for i, candidate in enumerate(candidates): | |
| if language != "en": | |
| print(f"Trying candidate {i+1}/{len(candidates)}: {candidate}") | |
| # Try importable package first (fast-path when installed as a package) | |
| try: | |
| module = __import__(candidate) | |
| print(f"✓ Successfully imported spaCy model: {candidate}") | |
| return module.load() | |
| except Exception as e: | |
| last_error = e | |
| # Try spacy.load if package is linked/installed | |
| try: | |
| nlp = spacy.load(candidate) | |
| print(f"✓ Successfully loaded spaCy model via spacy.load: {candidate}") | |
| return nlp | |
| except OSError: | |
| # Model not found, proceed with download | |
| print(f"Model {candidate} not found, attempting to download...") | |
| try: | |
| download(candidate) | |
| print(f"✓ Successfully downloaded spaCy model: {candidate}") | |
| # Refresh spaCy's model registry after download | |
| import importlib | |
| import sys | |
| importlib.reload(spacy) | |
| # Clear any cached imports that might interfere | |
| if candidate in sys.modules: | |
| del sys.modules[candidate] | |
| # Small delay to ensure model is fully registered | |
| import time | |
| time.sleep(0.5) | |
| # Try to load the downloaded model | |
| nlp = spacy.load(candidate) | |
| print(f"✓ Successfully loaded downloaded spaCy model: {candidate}") | |
| return nlp | |
| except Exception as download_error: | |
| print(f"✗ Failed to download or load {candidate}: {download_error}") | |
| # Try alternative loading methods | |
| try: | |
| # Try importing the module directly after download | |
| module = __import__(candidate) | |
| print( | |
| f"✓ Successfully loaded {candidate} via direct import after download" | |
| ) | |
| return module.load() | |
| except Exception as import_error: | |
| print(f"✗ Direct import also failed: {import_error}") | |
| # Try one more approach - force spaCy to refresh its model registry | |
| try: | |
| from spacy.util import get_model_path | |
| model_path = get_model_path(candidate) | |
| if model_path and os.path.exists(model_path): | |
| print(f"Found model at path: {model_path}") | |
| nlp = spacy.load(model_path) | |
| print( | |
| f"✓ Successfully loaded {candidate} from path: {model_path}" | |
| ) | |
| return nlp | |
| except Exception as path_error: | |
| print(f"✗ Path-based loading also failed: {path_error}") | |
| last_error = download_error | |
| continue | |
| except Exception as e: | |
| print(f"✗ Failed to load {candidate}: {e}") | |
| last_error = e | |
| continue | |
| # Provide more helpful error message | |
| error_msg = f"Failed to load spaCy model for language '{language}'" | |
| if last_error: | |
| error_msg += f". Last error: {last_error}" | |
| error_msg += f". Tried candidates: {candidates}" | |
| raise RuntimeError(error_msg) | |
| # Language-aware spaCy model loader | |
| def _normalize_language_input(language: str) -> str: | |
| return language.strip().lower().replace("-", "_") | |
| # Update the global variables to use the new function | |
| ACTIVE_LANGUAGE_CODE = _base_language_code(DEFAULT_LANGUAGE) | |
| nlp = None # Placeholder, will be loaded in the create_nlp_analyser function below #load_spacy_model(DEFAULT_LANGUAGE) | |
| def get_tesseract_lang_code(short_code: str): | |
| """ | |
| Maps a two-letter language code to the corresponding Tesseract OCR code. | |
| Args: | |
| short_code (str): The two-letter language code (e.g., "en", "de"). | |
| Returns: | |
| str or None: The Tesseract language code (e.g., "eng", "deu"), | |
| or None if no mapping is found. | |
| """ | |
| # Mapping from 2-letter codes to Tesseract 3-letter codes | |
| # Based on ISO 639-2/T codes. | |
| lang_map = { | |
| "en": "eng", | |
| "de": "deu", | |
| "fr": "fra", | |
| "es": "spa", | |
| "it": "ita", | |
| "nl": "nld", | |
| "pt": "por", | |
| "zh": "chi_sim", # Mapping to Simplified Chinese by default | |
| "ja": "jpn", | |
| "ko": "kor", | |
| "lt": "lit", | |
| "mk": "mkd", | |
| "nb": "nor", | |
| "pl": "pol", | |
| "ro": "ron", | |
| "ru": "rus", | |
| "sl": "slv", | |
| "sv": "swe", | |
| "uk": "ukr", | |
| } | |
| return lang_map.get(short_code) | |
| def download_tesseract_lang_pack( | |
| short_lang_code: str, tessdata_dir=TESSERACT_DATA_FOLDER | |
| ): | |
| """ | |
| Downloads a Tesseract language pack to a local directory. | |
| Args: | |
| lang_code (str): The short code for the language (e.g., "eng", "fra"). | |
| tessdata_dir (str, optional): The directory to save the language pack. | |
| Defaults to "tessdata". | |
| """ | |
| # Create the directory if it doesn't exist | |
| if not os.path.exists(tessdata_dir): | |
| os.makedirs(tessdata_dir) | |
| # Get the Tesseract language code | |
| lang_code = get_tesseract_lang_code(short_lang_code) | |
| if lang_code is None: | |
| raise ValueError( | |
| f"Language code {short_lang_code} not found in Tesseract language map" | |
| ) | |
| # Set the local file path | |
| file_path = os.path.join(tessdata_dir, f"{lang_code}.traineddata") | |
| # Check if the file already exists | |
| if os.path.exists(file_path): | |
| print(f"Language pack {lang_code}.traineddata already exists at {file_path}") | |
| return file_path | |
| # Construct the URL for the language pack | |
| url = f"https://raw.githubusercontent.com/tesseract-ocr/tessdata/main/{lang_code}.traineddata" | |
| # Download the file | |
| try: | |
| response = requests.get(url, stream=True, timeout=60) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| with open(file_path, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"Successfully downloaded {lang_code}.traineddata to {file_path}") | |
| return file_path | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error downloading {lang_code}.traineddata: {e}") | |
| return None | |
| #### Custom recognisers | |
| def _is_regex_pattern(term: str) -> bool: | |
| """ | |
| Detect if a term is intended to be a regex pattern or a literal string. | |
| Args: | |
| term: The term to check | |
| Returns: | |
| True if the term appears to be a regex pattern, False if it's a literal string | |
| """ | |
| term = term.strip() | |
| if not term: | |
| return False | |
| # First, try to compile as regex to validate it | |
| # This catches patterns like \d\d\d-\d\d\d that use regex escape sequences | |
| try: | |
| re.compile(term) | |
| is_valid_regex = True | |
| except re.error: | |
| # If it doesn't compile as regex, treat as literal | |
| return False | |
| # If it compiles, check if it contains regex-like features | |
| # Regex metacharacters that suggest a pattern (excluding escaped literals) | |
| regex_metacharacters = [ | |
| "+", | |
| "*", | |
| "?", | |
| "{", | |
| "}", | |
| "[", | |
| "]", | |
| "(", | |
| ")", | |
| "|", | |
| "^", | |
| "$", | |
| ".", | |
| ] | |
| # Common regex escape sequences that indicate regex intent | |
| regex_escape_sequences = [ | |
| "\\d", | |
| "\\w", | |
| "\\s", | |
| "\\D", | |
| "\\W", | |
| "\\S", | |
| "\\b", | |
| "\\B", | |
| "\\n", | |
| "\\t", | |
| "\\r", | |
| ] | |
| # Check if term contains regex metacharacters or escape sequences | |
| has_metacharacters = False | |
| has_escape_sequences = False | |
| i = 0 | |
| while i < len(term): | |
| if term[i] == "\\" and i + 1 < len(term): | |
| # Check if it's a regex escape sequence | |
| escape_seq = term[i : i + 2] | |
| if escape_seq in regex_escape_sequences: | |
| has_escape_sequences = True | |
| # Skip the escape sequence (backslash + next char) | |
| i += 2 | |
| continue | |
| if term[i] in regex_metacharacters: | |
| has_metacharacters = True | |
| i += 1 | |
| # If it's a valid regex and contains regex features, treat as regex pattern | |
| if is_valid_regex and (has_metacharacters or has_escape_sequences): | |
| return True | |
| # If it compiles but has no regex features, it might be a literal that happens to compile | |
| # (e.g., "test" compiles as regex but is just literal text) | |
| # In this case, if it has escape sequences, it's definitely regex | |
| if has_escape_sequences: | |
| return True | |
| # Otherwise, treat as literal | |
| return False | |
| def custom_word_list_recogniser(custom_list: List[str] = list()): | |
| # Create regex pattern, handling quotes carefully | |
| # Supports both literal strings and regex patterns | |
| quote_str = '"' | |
| replace_str = '(?:"|"|")' | |
| regex_patterns = [] | |
| literal_patterns = [] | |
| # Separate regex patterns from literal strings | |
| for term in custom_list: | |
| term = term.strip() | |
| if not term: | |
| continue | |
| if _is_regex_pattern(term): | |
| # Use regex pattern as-is (but wrap with word boundaries if appropriate) | |
| # Note: Word boundaries might not be appropriate for all regex patterns | |
| # (e.g., email patterns), so we'll add them conditionally | |
| regex_patterns.append(term) | |
| else: | |
| # Escape literal strings and add word boundaries | |
| escaped_term = re.escape(term).replace(quote_str, replace_str) | |
| literal_patterns.append(rf"(?<!\w){escaped_term}(?!\w)") | |
| # Combine patterns: regex patterns first, then literal patterns | |
| all_patterns = [] | |
| # Add regex patterns (without word boundaries, as they may have their own) | |
| for pattern in regex_patterns: | |
| all_patterns.append(f"({pattern})") | |
| # Add literal patterns (with word boundaries) | |
| all_patterns.extend(literal_patterns) | |
| if not all_patterns: | |
| # Return empty recognizer if no patterns | |
| custom_pattern = Pattern( | |
| name="custom_pattern", regex="(?!)", score=1 | |
| ) # Never matches | |
| else: | |
| custom_regex = "|".join(all_patterns) | |
| # print(custom_regex) | |
| custom_pattern = Pattern(name="custom_pattern", regex=custom_regex, score=1) | |
| custom_recogniser = PatternRecognizer( | |
| supported_entity="CUSTOM", | |
| name="CUSTOM", | |
| patterns=[custom_pattern], | |
| global_regex_flags=re.DOTALL | re.MULTILINE | re.IGNORECASE, | |
| ) | |
| return custom_recogniser | |
| # Initialise custom recogniser that will be overwritten later | |
| custom_recogniser = custom_word_list_recogniser() | |
| # Custom title recogniser | |
| titles_list = [ | |
| "Sir", | |
| "Ma'am", | |
| "Madam", | |
| "Mr", | |
| "Mr.", | |
| "Mrs", | |
| "Mrs.", | |
| "Ms", | |
| "Ms.", | |
| "Miss", | |
| "Dr", | |
| "Dr.", | |
| "Professor", | |
| ] | |
| titles_regex = ( | |
| "\\b" + "\\b|\\b".join(rf"{re.escape(title)}" for title in titles_list) + "\\b" | |
| ) | |
| titles_pattern = Pattern(name="titles_pattern", regex=titles_regex, score=1) | |
| titles_recogniser = PatternRecognizer( | |
| supported_entity="TITLES", | |
| name="TITLES", | |
| patterns=[titles_pattern], | |
| global_regex_flags=re.DOTALL | re.MULTILINE, | |
| ) | |
| # %% | |
| # Custom postcode recogniser | |
| # Define the regex pattern in a Presidio `Pattern` object: | |
| ukpostcode_pattern = Pattern( | |
| name="ukpostcode_pattern", | |
| regex=r"\b([A-Z]{1,2}\d[A-Z\d]? ?\d[A-Z]{2}|GIR ?0AA)\b", | |
| score=1, | |
| ) | |
| # Define the recognizer with one or more patterns | |
| ukpostcode_recogniser = PatternRecognizer( | |
| supported_entity="UKPOSTCODE", name="UKPOSTCODE", patterns=[ukpostcode_pattern] | |
| ) | |
| ### Street name | |
| def extract_street_name(text: str) -> str: | |
| """ | |
| Extracts the street name and preceding word (that should contain at least one number) from the given text. | |
| """ | |
| street_types = [ | |
| "Street", | |
| "St", | |
| "Boulevard", | |
| "Blvd", | |
| "Highway", | |
| "Hwy", | |
| "Broadway", | |
| "Freeway", | |
| "Causeway", | |
| "Cswy", | |
| "Expressway", | |
| "Way", | |
| "Walk", | |
| "Lane", | |
| "Ln", | |
| "Road", | |
| "Rd", | |
| "Avenue", | |
| "Ave", | |
| "Circle", | |
| "Cir", | |
| "Cove", | |
| "Cv", | |
| "Drive", | |
| "Dr", | |
| "Parkway", | |
| "Pkwy", | |
| "Park", | |
| "Court", | |
| "Ct", | |
| "Square", | |
| "Sq", | |
| "Loop", | |
| "Place", | |
| "Pl", | |
| "Parade", | |
| "Estate", | |
| "Alley", | |
| "Arcade", | |
| "Avenue", | |
| "Ave", | |
| "Bay", | |
| "Bend", | |
| "Brae", | |
| "Byway", | |
| "Close", | |
| "Corner", | |
| "Cove", | |
| "Crescent", | |
| "Cres", | |
| "Cul-de-sac", | |
| "Dell", | |
| "Drive", | |
| "Dr", | |
| "Esplanade", | |
| "Glen", | |
| "Green", | |
| "Grove", | |
| "Heights", | |
| "Hts", | |
| "Mews", | |
| "Parade", | |
| "Path", | |
| "Piazza", | |
| "Promenade", | |
| "Quay", | |
| "Ridge", | |
| "Row", | |
| "Terrace", | |
| "Ter", | |
| "Track", | |
| "Trail", | |
| "View", | |
| "Villas", | |
| "Marsh", | |
| "Embankment", | |
| "Cut", | |
| "Hill", | |
| "Passage", | |
| "Rise", | |
| "Vale", | |
| "Side", | |
| ] | |
| # Dynamically construct the regex pattern with all possible street types | |
| street_types_pattern = "|".join( | |
| rf"{re.escape(street_type)}" for street_type in street_types | |
| ) | |
| # The overall regex pattern to capture the street name and preceding word(s) | |
| pattern = r"(?P<preceding_word>\w*\d\w*)\s*" | |
| pattern += rf"(?P<street_name>\w+\s*\b(?:{street_types_pattern})\b)" | |
| # Find all matches in text | |
| matches = re.finditer(pattern, text, re.DOTALL | re.MULTILINE | re.IGNORECASE) | |
| start_positions = list() | |
| end_positions = list() | |
| for match in matches: | |
| match.group("preceding_word").strip() | |
| match.group("street_name").strip() | |
| start_pos = match.start() | |
| end_pos = match.end() | |
| # print(f"Start: {start_pos}, End: {end_pos}") | |
| # print(f"Preceding words: {preceding_word}") | |
| # print(f"Street name: {street_name}") | |
| start_positions.append(start_pos) | |
| end_positions.append(end_pos) | |
| return start_positions, end_positions | |
| class StreetNameRecognizer(EntityRecognizer): | |
| def load(self) -> None: | |
| """No loading is required.""" | |
| pass | |
| def analyze( | |
| self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts | |
| ) -> List[RecognizerResult]: | |
| """ | |
| Logic for detecting a specific PII | |
| """ | |
| start_pos, end_pos = extract_street_name(text) | |
| results = list() | |
| for i in range(0, len(start_pos)): | |
| result = RecognizerResult( | |
| entity_type="STREETNAME", start=start_pos[i], end=end_pos[i], score=1 | |
| ) | |
| results.append(result) | |
| return results | |
| street_recogniser = StreetNameRecognizer(supported_entities=["STREETNAME"]) | |
| ## Custom fuzzy match recogniser for list of strings | |
| def custom_fuzzy_word_list_regex(text: str, custom_list: List[str] = list()): | |
| # Create regex pattern, handling quotes carefully | |
| quote_str = '"' | |
| replace_str = '(?:"|"|")' | |
| custom_regex_pattern = "|".join( | |
| rf"(?<!\w){re.escape(term.strip()).replace(quote_str, replace_str)}(?!\w)" | |
| for term in custom_list | |
| ) | |
| # Find all matches in text | |
| matches = re.finditer( | |
| custom_regex_pattern, text, re.DOTALL | re.MULTILINE | re.IGNORECASE | |
| ) | |
| start_positions = list() | |
| end_positions = list() | |
| for match in matches: | |
| start_pos = match.start() | |
| end_pos = match.end() | |
| start_positions.append(start_pos) | |
| end_positions.append(end_pos) | |
| return start_positions, end_positions | |
| class CustomWordFuzzyRecognizer(EntityRecognizer): | |
| def __init__( | |
| self, | |
| supported_entities: List[str], | |
| custom_list: List[str] = list(), | |
| spelling_mistakes_max: int = 1, | |
| search_whole_phrase: bool = True, | |
| ): | |
| super().__init__(supported_entities=supported_entities) | |
| self.custom_list = custom_list # Store the custom_list as an instance attribute | |
| self.spelling_mistakes_max = ( | |
| spelling_mistakes_max # Store the max spelling mistakes | |
| ) | |
| self.search_whole_phrase = ( | |
| search_whole_phrase # Store the search whole phrase flag | |
| ) | |
| def load(self) -> None: | |
| """No loading is required.""" | |
| pass | |
| def analyze( | |
| self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts | |
| ) -> List[RecognizerResult]: | |
| """ | |
| Logic for detecting a specific PII | |
| """ | |
| start_pos, end_pos = spacy_fuzzy_search( | |
| text, self.custom_list, self.spelling_mistakes_max, self.search_whole_phrase | |
| ) # Pass new parameters | |
| results = list() | |
| for i in range(0, len(start_pos)): | |
| result = RecognizerResult( | |
| entity_type="CUSTOM_FUZZY", start=start_pos[i], end=end_pos[i], score=1 | |
| ) | |
| results.append(result) | |
| return results | |
| custom_list_default = list() | |
| custom_word_fuzzy_recognizer = CustomWordFuzzyRecognizer( | |
| supported_entities=["CUSTOM_FUZZY"], custom_list=custom_list_default | |
| ) | |
| # Pass the loaded model to the new LoadedSpacyNlpEngine | |
| loaded_nlp_engine = LoadedSpacyNlpEngine( | |
| loaded_spacy_model=nlp, language_code=ACTIVE_LANGUAGE_CODE | |
| ) | |
| def create_nlp_analyser( | |
| language: str = DEFAULT_LANGUAGE, | |
| custom_list: List[str] = None, | |
| spelling_mistakes_max: int = 1, | |
| search_whole_phrase: bool = True, | |
| existing_nlp_analyser: AnalyzerEngine = None, | |
| return_also_model: bool = False, | |
| ): | |
| """ | |
| Create an nlp_analyser object based on the specified language input. | |
| Args: | |
| language (str): Language code (e.g., "en", "de", "fr", "es", etc.) | |
| custom_list (List[str], optional): List of custom words to recognize. Defaults to None. | |
| spelling_mistakes_max (int, optional): Maximum number of spelling mistakes for fuzzy matching. Defaults to 1. | |
| search_whole_phrase (bool, optional): Whether to search for whole phrases or individual words. Defaults to True. | |
| existing_nlp_analyser (AnalyzerEngine, optional): Existing nlp_analyser object to use. Defaults to None. | |
| return_also_model (bool, optional): Whether to return the nlp_model object as well. Defaults to False. | |
| Returns: | |
| AnalyzerEngine: Configured nlp_analyser object with custom recognizers | |
| """ | |
| if existing_nlp_analyser is None: | |
| pass | |
| else: | |
| if existing_nlp_analyser.supported_languages[0] == language: | |
| nlp_analyser = existing_nlp_analyser | |
| print(f"Using existing nlp_analyser for {language}") | |
| return nlp_analyser | |
| # Load spaCy model for the specified language | |
| nlp_model = load_spacy_model(language) | |
| # Get base language code | |
| base_lang_code = _base_language_code(language) | |
| # Create custom recognizers | |
| if custom_list is None: | |
| custom_list = list() | |
| custom_recogniser = custom_word_list_recogniser(custom_list) | |
| custom_word_fuzzy_recognizer = CustomWordFuzzyRecognizer( | |
| supported_entities=["CUSTOM_FUZZY"], | |
| custom_list=custom_list, | |
| spelling_mistakes_max=spelling_mistakes_max, | |
| search_whole_phrase=search_whole_phrase, | |
| ) | |
| # Create NLP engine with loaded model | |
| loaded_nlp_engine = LoadedSpacyNlpEngine( | |
| loaded_spacy_model=nlp_model, language_code=base_lang_code | |
| ) | |
| # Create analyzer engine | |
| nlp_analyser = AnalyzerEngine( | |
| nlp_engine=loaded_nlp_engine, | |
| default_score_threshold=score_threshold, | |
| supported_languages=[base_lang_code], | |
| log_decision_process=False, | |
| ) | |
| # Add custom recognizers to nlp_analyser | |
| nlp_analyser.registry.add_recognizer(custom_recogniser) | |
| nlp_analyser.registry.add_recognizer(custom_word_fuzzy_recognizer) | |
| # Add language-specific recognizers for English | |
| if base_lang_code == "en": | |
| nlp_analyser.registry.add_recognizer(street_recogniser) | |
| nlp_analyser.registry.add_recognizer(ukpostcode_recogniser) | |
| nlp_analyser.registry.add_recognizer(titles_recogniser) | |
| if return_also_model: | |
| return nlp_analyser, nlp_model | |
| return nlp_analyser | |
| # Create the default nlp_analyser using the new function | |
| nlp_analyser, nlp = create_nlp_analyser(DEFAULT_LANGUAGE, return_also_model=True) | |
| def spacy_fuzzy_search( | |
| text: str, | |
| custom_query_list: List[str] = list(), | |
| spelling_mistakes_max: int = 1, | |
| search_whole_phrase: bool = True, | |
| nlp=nlp, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """Conduct fuzzy match on a list of text data.""" | |
| all_matches = list() | |
| all_start_positions = list() | |
| all_end_positions = list() | |
| all_ratios = list() | |
| # print("custom_query_list:", custom_query_list) | |
| if not text: | |
| out_message = "No text data found. Skipping page." | |
| print(out_message) | |
| return all_start_positions, all_end_positions | |
| for string_query in custom_query_list: | |
| query = nlp(string_query) | |
| if search_whole_phrase is False: | |
| # Keep only words that are not stop words | |
| token_query = [ | |
| token.text | |
| for token in query | |
| if not token.is_space and not token.is_stop and not token.is_punct | |
| ] | |
| spelling_mistakes_fuzzy_pattern = "FUZZY" + str(spelling_mistakes_max) | |
| if len(token_query) > 1: | |
| # pattern_lemma = [{"LEMMA": {"IN": query}}] | |
| pattern_fuzz = [ | |
| {"TEXT": {spelling_mistakes_fuzzy_pattern: {"IN": token_query}}} | |
| ] | |
| else: | |
| # pattern_lemma = [{"LEMMA": query[0]}] | |
| pattern_fuzz = [ | |
| {"TEXT": {spelling_mistakes_fuzzy_pattern: token_query[0]}} | |
| ] | |
| matcher = Matcher(nlp.vocab) | |
| matcher.add(string_query, [pattern_fuzz]) | |
| # matcher.add(string_query, [pattern_lemma]) | |
| else: | |
| # If matching a whole phrase, use Spacy PhraseMatcher, then consider similarity after using Levenshtein distance. | |
| # If you want to match the whole phrase, use phrase matcher | |
| matcher = FuzzyMatcher(nlp.vocab) | |
| patterns = [nlp.make_doc(string_query)] # Convert query into a Doc object | |
| matcher.add("PHRASE", patterns, [{"ignore_case": True}]) | |
| batch_size = 256 | |
| docs = nlp.pipe([text], batch_size=batch_size) | |
| # Get number of matches per doc | |
| for doc in docs: # progress.tqdm(docs, desc = "Searching text", unit = "rows"): | |
| matches = matcher(doc) | |
| match_count = len(matches) | |
| # If considering each sub term individually, append match. If considering together, consider weight of the relevance to that of the whole phrase. | |
| if search_whole_phrase is False: | |
| all_matches.append(match_count) | |
| for match_id, start, end in matches: | |
| span = str(doc[start:end]).strip() | |
| query_search = str(query).strip() | |
| # Convert word positions to character positions | |
| start_char = doc[start].idx # Start character position | |
| end_char = doc[end - 1].idx + len( | |
| doc[end - 1] | |
| ) # End character position | |
| # The positions here are word position, not character position | |
| all_matches.append(match_count) | |
| all_start_positions.append(start_char) | |
| all_end_positions.append(end_char) | |
| else: | |
| for match_id, start, end, ratio, pattern in matches: | |
| span = str(doc[start:end]).strip() | |
| query_search = str(query).strip() | |
| # Calculate Levenshtein distance. Only keep matches with less than specified number of spelling mistakes | |
| distance = Levenshtein.distance(query_search.lower(), span.lower()) | |
| # print("Levenshtein distance:", distance) | |
| if distance > spelling_mistakes_max: | |
| match_count = match_count - 1 | |
| else: | |
| # Convert word positions to character positions | |
| start_char = doc[start].idx # Start character position | |
| end_char = doc[end - 1].idx + len( | |
| doc[end - 1] | |
| ) # End character position | |
| all_matches.append(match_count) | |
| all_start_positions.append(start_char) | |
| all_end_positions.append(end_char) | |
| all_ratios.append(ratio) | |
| return all_start_positions, all_end_positions | |