ctp-slack-bot / src /ctp_slack_bot /services /content_ingestion_service.py
LiKenun's picture
Refactor #6
f0fe0fd
from dependency_injector.resources import AsyncResource
from loguru import logger
from pydantic import ConfigDict
from typing import Any, Self, Sequence, Set
from ctp_slack_bot.core import ApplicationComponentBase, Settings
from ctp_slack_bot.db.repositories import VectorizedChunkRepository
from ctp_slack_bot.enums import EventType
from ctp_slack_bot.models import Chunk, Content, SlackMessage
from .event_brokerage_service import EventBrokerageService
from .vectorization_service import VectorizationService
class ContentIngestionService(ApplicationComponentBase):
"""
Service for ingesting content.
"""
model_config = ConfigDict(frozen=True)
settings: Settings
vectorized_chunk_repository: VectorizedChunkRepository
vectorization_service: VectorizationService
async def process_incoming_content(self: Self, content: Content) -> None:
logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
if self.vectorized_chunk_repository.count_by_id(content.get_id()):
logger.debug("Ignored content with identifier, {}, because it already exists in the database.", content.get_id())
return
chunks = content.get_chunks()
inserted_ids = await self.__vectorize_and_store_chunks_in_database(chunks)
logger.debug("Stored {} vectorized chunk(s) in the database.", len(inserted_ids))
async def process_incoming_slack_message(self: Self, slack_message: SlackMessage) -> None:
logger.debug("Content ingestion service received a Slack message: {}", slack_message.text)
chunks = slack_message.get_chunks()
inserted_ids = await self.__vectorize_and_store_chunks_in_database(chunks)
logger.debug("Stored {} vectorized chunk(s) in the database.", len(inserted_ids))
async def __vectorize_and_store_chunks_in_database(self: Self, chunks: Sequence[Chunk]) -> Set[str]:
vectorized_chunks = await self.vectorization_service.vectorize(chunks)
return await self.vectorized_chunk_repository.insert_many(vectorized_chunks)
@property
def name(self: Self) -> str:
return "content_ingestion_service"
class ContentIngestionServiceResource(AsyncResource):
async def init(self: Self, settings: Settings, event_brokerage_service: EventBrokerageService, vectorized_chunk_repository: VectorizedChunkRepository, vectorization_service: VectorizationService) -> ContentIngestionService:
content_ingestion_service = ContentIngestionService(settings=settings, vectorized_chunk_repository=vectorized_chunk_repository, vectorization_service=vectorization_service)
await event_brokerage_service.subscribe(EventType.INCOMING_CONTENT, content_ingestion_service.process_incoming_content)
# await event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, content_ingestion_service.process_incoming_slack_message)
return content_ingestion_service