|
import logging |
|
import threading |
|
import time |
|
from typing import Any, Optional |
|
|
|
from flask import Flask, current_app |
|
from pydantic import BaseModel, ConfigDict |
|
|
|
from configs import dify_config |
|
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom |
|
from core.app.entities.queue_entities import QueueMessageReplaceEvent |
|
from core.moderation.base import ModerationAction, ModerationOutputsResult |
|
from core.moderation.factory import ModerationFactory |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class ModerationRule(BaseModel): |
|
type: str |
|
config: dict[str, Any] |
|
|
|
|
|
class OutputModeration(BaseModel): |
|
tenant_id: str |
|
app_id: str |
|
|
|
rule: ModerationRule |
|
queue_manager: AppQueueManager |
|
|
|
thread: Optional[threading.Thread] = None |
|
thread_running: bool = True |
|
buffer: str = "" |
|
is_final_chunk: bool = False |
|
final_output: Optional[str] = None |
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
|
def should_direct_output(self) -> bool: |
|
return self.final_output is not None |
|
|
|
def get_final_output(self) -> str: |
|
return self.final_output or "" |
|
|
|
def append_new_token(self, token: str) -> None: |
|
self.buffer += token |
|
|
|
if not self.thread: |
|
self.thread = self.start_thread() |
|
|
|
def moderation_completion(self, completion: str, public_event: bool = False) -> str: |
|
self.buffer = completion |
|
self.is_final_chunk = True |
|
|
|
result = self.moderation(tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=completion) |
|
|
|
if not result or not result.flagged: |
|
return completion |
|
|
|
if result.action == ModerationAction.DIRECT_OUTPUT: |
|
final_output = result.preset_response |
|
else: |
|
final_output = result.text |
|
|
|
if public_event: |
|
self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE) |
|
|
|
return final_output |
|
|
|
def start_thread(self) -> threading.Thread: |
|
buffer_size = dify_config.MODERATION_BUFFER_SIZE |
|
thread = threading.Thread( |
|
target=self.worker, |
|
kwargs={ |
|
"flask_app": current_app._get_current_object(), |
|
"buffer_size": buffer_size if buffer_size > 0 else dify_config.MODERATION_BUFFER_SIZE, |
|
}, |
|
) |
|
|
|
thread.start() |
|
|
|
return thread |
|
|
|
def stop_thread(self): |
|
if self.thread and self.thread.is_alive(): |
|
self.thread_running = False |
|
|
|
def worker(self, flask_app: Flask, buffer_size: int): |
|
with flask_app.app_context(): |
|
current_length = 0 |
|
while self.thread_running: |
|
moderation_buffer = self.buffer |
|
buffer_length = len(moderation_buffer) |
|
if not self.is_final_chunk: |
|
chunk_length = buffer_length - current_length |
|
if 0 <= chunk_length < buffer_size: |
|
time.sleep(1) |
|
continue |
|
|
|
current_length = buffer_length |
|
|
|
result = self.moderation( |
|
tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=moderation_buffer |
|
) |
|
|
|
if not result or not result.flagged: |
|
continue |
|
|
|
if result.action == ModerationAction.DIRECT_OUTPUT: |
|
final_output = result.preset_response |
|
self.final_output = final_output |
|
else: |
|
final_output = result.text + self.buffer[len(moderation_buffer) :] |
|
|
|
|
|
if self.thread_running: |
|
self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE) |
|
|
|
if result.action == ModerationAction.DIRECT_OUTPUT: |
|
break |
|
|
|
def moderation(self, tenant_id: str, app_id: str, moderation_buffer: str) -> Optional[ModerationOutputsResult]: |
|
try: |
|
moderation_factory = ModerationFactory( |
|
name=self.rule.type, app_id=app_id, tenant_id=tenant_id, config=self.rule.config |
|
) |
|
|
|
result: ModerationOutputsResult = moderation_factory.moderation_for_outputs(moderation_buffer) |
|
return result |
|
except Exception as e: |
|
logger.error("Moderation Output error: %s", e) |
|
|
|
return None |
|
|