EthanZyh's picture
copied from EthanZyh/DiffusionText2WorldGeneration
8c31d70
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import re
import string
from difflib import SequenceMatcher
from .log import log
import nltk
from better_profanity import profanity
from .guardrail_blocklist_utils import read_keyword_list_from_dir, to_ascii
from .guardrail_core import ContentSafetyGuardrail, GuardrailRunner
from .misc import misc, Color, timer
DEFAULT_CHECKPOINT_DIR = "checkpoints/Cosmos-1.0-Guardrail/blocklist"
CENSOR = Color.red("*")
class Blocklist(ContentSafetyGuardrail):
def __init__(
self,
checkpoint_dir: str = DEFAULT_CHECKPOINT_DIR,
guardrail_partial_match_min_chars: int = 4,
guardrail_partial_match_letter_count: float = 0.5,
) -> None:
nltk.data.path.append(os.path.join(checkpoint_dir, "nltk_data"))
self.lemmatizer = nltk.WordNetLemmatizer()
self.profanity = profanity
self.checkpoint_dir = checkpoint_dir
self.guardrail_partial_match_min_chars = guardrail_partial_match_min_chars
self.guardrail_partial_match_letter_count = guardrail_partial_match_letter_count
# Load blocklist and whitelist keywords
self.blocklist_words = read_keyword_list_from_dir(os.path.join(self.checkpoint_dir, "custom"))
self.whitelist_words = read_keyword_list_from_dir(os.path.join(self.checkpoint_dir, "whitelist"))
self.exact_match_words = read_keyword_list_from_dir(os.path.join(self.checkpoint_dir, "exact_match"))
self.profanity.load_censor_words(custom_words=self.blocklist_words, whitelist_words=self.whitelist_words)
log.debug(f"Loaded {len(self.blocklist_words)} words/phrases from blocklist")
log.debug(f"Whitelisted {len(self.whitelist_words)} words/phrases from whitelist")
log.debug(f"Loaded {len(self.exact_match_words)} exact match words/phrases from blocklist")
def uncensor_whitelist(self, input_prompt: str, censored_prompt: str) -> str:
"""Explicitly uncensor words that are in the whitelist."""
input_words = input_prompt.split()
censored_words = censored_prompt.split()
whitelist_words = set(self.whitelist_words)
for i, token in enumerate(input_words):
if token.strip(string.punctuation).lower() in whitelist_words:
censored_words[i] = token
censored_prompt = " ".join(censored_words)
return censored_prompt
def censor_prompt(self, input_prompt: str) -> tuple[bool, str]:
"""Censor the prompt using the blocklist with better-profanity fuzzy matching.
Args:
input_prompt: input prompt to censor
Returns:
bool: True if the prompt is blocked, False otherwise
str: A message indicating why the prompt was blocked
"""
censored_prompt = self.profanity.censor(input_prompt, censor_char=CENSOR)
# Uncensor whitelisted words that were censored from blocklist fuzzy matching
censored_prompt = self.uncensor_whitelist(input_prompt, censored_prompt)
if CENSOR in censored_prompt:
return True, f"Prompt blocked by censorship: Censored Prompt: {censored_prompt}"
return False, ""
@staticmethod
def check_partial_match(
normalized_prompt: str, normalized_word: str, guardrail_partial_match_letter_count: float
) -> tuple[bool, str]:
"""
Check robustly if normalized word and the matching target have a difference of up to guardrail_partial_match_letter_count characters.
Args:
normalized_prompt: a string with many words
normalized_word: a string with one or multiple words, its length is smaller than normalized_prompt
guardrail_partial_match_letter_count: maximum allowed difference in characters (float to allow partial characters)
Returns:
bool: True if a match is found, False otherwise
str: A message indicating why the prompt was blocked
"""
prompt_words = normalized_prompt.split()
word_length = len(normalized_word.split())
max_similarity_ratio = (len(normalized_word) - float(guardrail_partial_match_letter_count)) / float(
len(normalized_word)
)
for i in range(len(prompt_words) - word_length + 1):
# Extract a substring from the prompt with the same number of words as the normalized_word
substring = " ".join(prompt_words[i : i + word_length])
similarity_ratio = SequenceMatcher(None, substring, normalized_word).ratio()
if similarity_ratio >= max_similarity_ratio:
return (
True,
f"Prompt blocked by partial match blocklist: Prompt: {normalized_prompt}, Partial Match Word: {normalized_word}",
)
return False, ""
@staticmethod
def check_against_whole_word_blocklist(
prompt: str,
blocklist: list[str],
guardrail_partial_match_min_chars: int = 4,
guardrail_partial_match_letter_count: float = 0.5,
) -> bool:
"""
Check if the prompt contains any whole words from the blocklist.
The match is case insensitive and robust to multiple spaces between words.
Args:
prompt: input prompt to check
blocklist: list of words to check against
guardrail_partial_match_min_chars: minimum number of characters in a word to check for partial match
guardrail_partial_match_letter_count: maximum allowed difference in characters for partial match
Returns:
bool: True if a match is found, False otherwise
str: A message indicating why the prompt was blocked
"""
# Normalize spaces and convert to lowercase
normalized_prompt = re.sub(r"\s+", " ", prompt).strip().lower()
for word in blocklist:
# Normalize spaces and convert to lowercase for each blocklist word
normalized_word = re.sub(r"\s+", " ", word).strip().lower()
# Use word boundaries to ensure whole word match
if re.search(r"\b" + re.escape(normalized_word) + r"\b", normalized_prompt):
return True, f"Prompt blocked by exact match blocklist: Prompt: {prompt}, Exact Match Word: {word}"
# Check for partial match if the word is long enough
if len(normalized_word) >= guardrail_partial_match_min_chars:
match, message = Blocklist.check_partial_match(
normalized_prompt, normalized_word, guardrail_partial_match_letter_count
)
if match:
return True, message
return False, ""
def is_safe(self, input_prompt: str = "") -> tuple[bool, str]:
"""Check if the input prompt is safe using the blocklist."""
# Check if the input is empty
if not input_prompt:
return False, "Input is empty"
input_prompt = to_ascii(input_prompt)
# Check full sentence for censored words
censored, message = self.censor_prompt(input_prompt)
if censored:
return False, message
# Check lemmatized words for censored words
tokens = nltk.word_tokenize(input_prompt)
lemmas = [self.lemmatizer.lemmatize(token) for token in tokens]
lemmatized_prompt = " ".join(lemmas)
censored, message = self.censor_prompt(lemmatized_prompt)
if censored:
return False, message
# Check for exact match blocklist words
censored, message = self.check_against_whole_word_blocklist(
input_prompt,
self.exact_match_words,
self.guardrail_partial_match_min_chars,
self.guardrail_partial_match_letter_count,
)
if censored:
return False, message
# If all these checks pass, the input is safe
return True, "Input is safe"
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--prompt", type=str, required=True, help="Input prompt")
parser.add_argument(
"--checkpoint_dir",
type=str,
help="Path to the Blocklist checkpoint folder",
default=DEFAULT_CHECKPOINT_DIR,
)
return parser.parse_args()
def main(args):
blocklist = Blocklist(checkpoint_dir=args.checkpoint_dir)
runner = GuardrailRunner(safety_models=[blocklist])
with timer("blocklist safety check"):
safety, message = runner.run_safety_check(args.prompt)
log.info(f"Input is: {'SAFE' if safety else 'UNSAFE'}")
log.info(f"Message: {message}") if not safety else None
if __name__ == "__main__":
args = parse_args()
main(args)