Spaces:
Runtime error
Runtime error
File size: 1,975 Bytes
f0fe0fd c6a2a56 bb7c9a3 c6a2a56 bb7c9a3 9fd6e20 a1a6d79 c6a2a56 bb7c9a3 c6a2a56 bb7c9a3 c6a2a56 f0fe0fd 9fd6e20 3300601 8ec2c5a bb7c9a3 f0fe0fd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
from dependency_injector.resources import AsyncResource
from loguru import logger
from pydantic import ConfigDict
from typing import Any, Self
from ctp_slack_bot.core import ApplicationComponentBase, Settings
from ctp_slack_bot.enums import EventType
from ctp_slack_bot.models import Chunk, SlackMessage
from .answer_retrieval_service import AnswerRetrievalService
from .context_retrieval_service import ContextRetrievalService
from .event_brokerage_service import EventBrokerageService
class QuestionDispatchService(ApplicationComponentBase):
"""
Service for determining whether a Slack message constitutes a question.
"""
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
settings: Settings
context_retrieval_service: ContextRetrievalService
answer_retrieval_service: AnswerRetrievalService
async def process_incoming_slack_message(self: Self, message: SlackMessage) -> None:
if message.subtype != 'bot_message':
logger.debug("Question dispatch service received an answerable question: {}", message.text)
context = await self.context_retrieval_service.get_context(message)
await self.answer_retrieval_service.push(message, context)
@property
def name(self: Self) -> str:
return "question_dispatch_service"
class QuestionDispatchServiceResource(AsyncResource):
async def init(self: Self, settings: Settings, event_brokerage_service: EventBrokerageService, context_retrieval_service: ContextRetrievalService, answer_retrieval_service: AnswerRetrievalService) -> QuestionDispatchService:
question_dispatch_service = QuestionDispatchService(settings=settings, context_retrieval_service=context_retrieval_service, answer_retrieval_service=answer_retrieval_service)
await event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, question_dispatch_service.process_incoming_slack_message)
return question_dispatch_service
|