File size: 2,088 Bytes
8ab2445
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from endpoints.api_models import OutputGuardrailsConfig , LLMResponse
from model_inference.groundedness_checker import GroundednessChecker
import re

# 
groundedness_checker = GroundednessChecker(model_path="./grounding_detector")

# A simple result class to hold individual check outcomes.
class Result:
    def __init__(self):
        self.details = {}

    def add(self, rule_name: str, passed: bool):
        self.details[rule_name] = passed

    def grounded(self) -> bool:
        # The response is considered "grounded" if all enabled rules pass.
        return all(self.details.values())

class ContextualGroundednessCheck:
    name = "Contextual Groundedness"
    
    def check(self,llm_response:LLMResponse) -> bool:
        groundedness_check = groundedness_checker.check(llm_response.question, llm_response.answer, llm_response.context)
        print(groundedness_check)
        return groundedness_check['is_grounded']


class ToxicityRule:

    name = "Toxicity"

    def check(self, llm_response:LLMResponse) -> bool:
        
        no_toxicity = True
        matched =  re.search(r"(hate|kill|suicide|selfharm)", llm_response.answer, re.IGNORECASE)
        
        if matched:
            no_toxicity = False
        
        return no_toxicity
    

# Manager class to load and execute the enabled guardrail rules.
class GuardrailsManager:
    def __init__(self, config: OutputGuardrailsConfig):
        self.config = config
        self.rules = self.load_rules()

    def load_rules(self):
        rules = []
        if self.config.contextual_grounding:
            rules.append(ContextualGroundednessCheck())
        if self.config.toxicity:
            rules.append(ToxicityRule())
        # Add additional rules based on configuration here.
        return rules

    def check(self, llm_response: LLMResponse) -> Result:
        result = Result()
        for rule in self.rules:
            rule_result = rule.check(llm_response)
            result.add(rule.name, rule_result)
        return result