Spaces:
Running
Running
File size: 5,388 Bytes
63bfd18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import asyncio
from typing import Any
import weave
from guardrails import Guard
from guardrails.hub import SecretsPresent
from llm_guard.input_scanners import Secrets
from llm_guard.util import configure_logger
from guardrails_genie.guardrails import GuardrailManager
from guardrails_genie.guardrails.base import Guardrail
from guardrails_genie.guardrails.secrets_detection import (
SecretsDetectionResponse,
SecretsDetectionSimpleResponse,
SecretsDetectionGuardrail,
)
from guardrails_genie.metrics import AccuracyMetric
logger = configure_logger(log_level="ERROR")
class GuardrailsAISecretsDetector(Guardrail):
validator: Any
def __init__(self):
validator = Guard().use(SecretsPresent, on_fail="fix")
super().__init__(validator=validator)
def scan(self, text: str) -> dict:
response = self.validator.validate(text)
if response.validation_summaries:
summary = response.validation_summaries[0]
return {
"has_secret": True,
"detected_secrets": {
str(k): v
for k, v in enumerate(
summary.failure_reason.splitlines()[1:], start=1
)
},
"explanation": summary.failure_reason,
"modified_prompt": response.validated_output,
"risk_score": 1.0,
}
else:
return {
"has_secret": False,
"detected_secrets": None,
"explanation": "No secrets detected in the text.",
"modified_prompt": response.validated_output,
"risk_score": 0.0,
}
@weave.op
def guard(
self,
prompt: str,
return_detected_secrets: bool = True,
**kwargs,
) -> SecretsDetectionResponse | SecretsDetectionResponse:
results = self.scan(prompt)
if return_detected_secrets:
return SecretsDetectionResponse(
contains_secrets=results["has_secret"],
detected_secrets=results["detected_secrets"],
explanation=results["explanation"],
redacted_text=results["modified_prompt"],
risk_score=results["risk_score"],
)
else:
return SecretsDetectionSimpleResponse(
contains_secrets=not results["has_secret"],
explanation=results["explanation"],
redacted_text=results["modified_prompt"],
risk_score=results["risk_score"],
)
class LLMGuardSecretsDetector(Guardrail):
validator: Any
def __init__(self):
validator = Secrets(redact_mode="all")
super().__init__(validator=validator)
def scan(self, text: str) -> dict:
sanitized_prompt, is_valid, risk_score = self.validator.scan(text)
if is_valid:
return {
"has_secret": not is_valid,
"detected_secrets": None,
"explanation": "No secrets detected in the text.",
"modified_prompt": sanitized_prompt,
"risk_score": risk_score,
}
else:
return {
"has_secret": not is_valid,
"detected_secrets": {},
"explanation": "This library does not return detected secrets.",
"modified_prompt": sanitized_prompt,
"risk_score": risk_score,
}
@weave.op
def guard(
self,
prompt: str,
return_detected_secrets: bool = True,
**kwargs,
) -> SecretsDetectionResponse | SecretsDetectionResponse:
results = self.scan(prompt)
if return_detected_secrets:
return SecretsDetectionResponse(
contains_secrets=results["has_secret"],
detected_secrets=results["detected_secrets"],
explanation=results["explanation"],
redacted_text=results["modified_prompt"],
risk_score=results["risk_score"],
)
else:
return SecretsDetectionSimpleResponse(
contains_secrets=not results["has_secret"],
explanation=results["explanation"],
redacted_text=results["modified_prompt"],
risk_score=results["risk_score"],
)
def main():
client = weave.init("parambharat/secrets-detection")
dataset = weave.ref("secrets-detection-benchmark:latest").get()
llm_guard_guardrail = LLMGuardSecretsDetector()
guardrails_ai_guardrail = GuardrailsAISecretsDetector()
guardrails_genie_guardrail = SecretsDetectionGuardrail()
all_guards = [
llm_guard_guardrail,
guardrails_ai_guardrail,
guardrails_genie_guardrail,
]
evaluation = weave.Evaluation(
dataset=dataset.rows,
scorers=[AccuracyMetric()],
)
for guard in all_guards:
name = guard.__class__.__name__
guardrail_manager = GuardrailManager(
guardrails=[
guard,
]
)
results = asyncio.run(
evaluation.evaluate(
guardrail_manager,
__weave={"display_name": f"{name}"},
)
)
print(results)
if __name__ == "__main__":
main()
|