Spaces:
Runtime error
Runtime error
from dependency_injector.resources import AsyncResource | |
from loguru import logger | |
from pydantic import ConfigDict | |
from typing import Any, Self, Sequence, Set | |
from ctp_slack_bot.core import ApplicationComponentBase, Settings | |
from ctp_slack_bot.db.repositories import VectorizedChunkRepository | |
from ctp_slack_bot.enums import EventType | |
from ctp_slack_bot.models import Chunk, Content, SlackMessage | |
from .event_brokerage_service import EventBrokerageService | |
from .vectorization_service import VectorizationService | |
class ContentIngestionService(ApplicationComponentBase): | |
""" | |
Service for ingesting content. | |
""" | |
model_config = ConfigDict(frozen=True) | |
settings: Settings | |
vectorized_chunk_repository: VectorizedChunkRepository | |
vectorization_service: VectorizationService | |
async def process_incoming_content(self: Self, content: Content) -> None: | |
logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata()) | |
if self.vectorized_chunk_repository.count_by_id(content.get_id()): | |
logger.debug("Ignored content with identifier, {}, because it already exists in the database.", content.get_id()) | |
return | |
chunks = content.get_chunks() | |
inserted_ids = await self.__vectorize_and_store_chunks_in_database(chunks) | |
logger.debug("Stored {} vectorized chunk(s) in the database.", len(inserted_ids)) | |
async def process_incoming_slack_message(self: Self, slack_message: SlackMessage) -> None: | |
logger.debug("Content ingestion service received a Slack message: {}", slack_message.text) | |
chunks = slack_message.get_chunks() | |
inserted_ids = await self.__vectorize_and_store_chunks_in_database(chunks) | |
logger.debug("Stored {} vectorized chunk(s) in the database.", len(inserted_ids)) | |
async def __vectorize_and_store_chunks_in_database(self: Self, chunks: Sequence[Chunk]) -> Set[str]: | |
vectorized_chunks = await self.vectorization_service.vectorize(chunks) | |
return await self.vectorized_chunk_repository.insert_many(vectorized_chunks) | |
def name(self: Self) -> str: | |
return "content_ingestion_service" | |
class ContentIngestionServiceResource(AsyncResource): | |
async def init(self: Self, settings: Settings, event_brokerage_service: EventBrokerageService, vectorized_chunk_repository: VectorizedChunkRepository, vectorization_service: VectorizationService) -> ContentIngestionService: | |
content_ingestion_service = ContentIngestionService(settings=settings, vectorized_chunk_repository=vectorized_chunk_repository, vectorization_service=vectorization_service) | |
await event_brokerage_service.subscribe(EventType.INCOMING_CONTENT, content_ingestion_service.process_incoming_content) | |
# await event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, content_ingestion_service.process_incoming_slack_message) | |
return content_ingestion_service | |