Spaces:
Runtime error
Runtime error
File size: 2,963 Bytes
f0fe0fd c6a2a56 bb7c9a3 c6a2a56 bb7c9a3 f7e11c1 9fd6e20 a1a6d79 c6a2a56 bb7c9a3 c6a2a56 bb7c9a3 c6a2a56 f7e11c1 c6a2a56 8ec2c5a 9fd6e20 f7e11c1 bb5dde5 9fd6e20 f7e11c1 9fd6e20 8ec2c5a 9fd6e20 f7e11c1 9fd6e20 f7e11c1 bb7c9a3 f0fe0fd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
from dependency_injector.resources import AsyncResource
from loguru import logger
from pydantic import ConfigDict
from typing import Any, Self, Sequence, Set
from ctp_slack_bot.core import ApplicationComponentBase, Settings
from ctp_slack_bot.db.repositories import VectorizedChunkRepository
from ctp_slack_bot.enums import EventType
from ctp_slack_bot.models import Chunk, Content, SlackMessage
from .event_brokerage_service import EventBrokerageService
from .vectorization_service import VectorizationService
class ContentIngestionService(ApplicationComponentBase):
"""
Service for ingesting content.
"""
model_config = ConfigDict(frozen=True)
settings: Settings
vectorized_chunk_repository: VectorizedChunkRepository
vectorization_service: VectorizationService
async def process_incoming_content(self: Self, content: Content) -> None:
logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
if self.vectorized_chunk_repository.count_by_id(content.get_id()):
logger.debug("Ignored content with identifier, {}, because it already exists in the database.", content.get_id())
return
chunks = content.get_chunks()
inserted_ids = await self.__vectorize_and_store_chunks_in_database(chunks)
logger.debug("Stored {} vectorized chunk(s) in the database.", len(inserted_ids))
async def process_incoming_slack_message(self: Self, slack_message: SlackMessage) -> None:
logger.debug("Content ingestion service received a Slack message: {}", slack_message.text)
chunks = slack_message.get_chunks()
inserted_ids = await self.__vectorize_and_store_chunks_in_database(chunks)
logger.debug("Stored {} vectorized chunk(s) in the database.", len(inserted_ids))
async def __vectorize_and_store_chunks_in_database(self: Self, chunks: Sequence[Chunk]) -> Set[str]:
vectorized_chunks = await self.vectorization_service.vectorize(chunks)
return await self.vectorized_chunk_repository.insert_many(vectorized_chunks)
@property
def name(self: Self) -> str:
return "content_ingestion_service"
class ContentIngestionServiceResource(AsyncResource):
async def init(self: Self, settings: Settings, event_brokerage_service: EventBrokerageService, vectorized_chunk_repository: VectorizedChunkRepository, vectorization_service: VectorizationService) -> ContentIngestionService:
content_ingestion_service = ContentIngestionService(settings=settings, vectorized_chunk_repository=vectorized_chunk_repository, vectorization_service=vectorization_service)
await event_brokerage_service.subscribe(EventType.INCOMING_CONTENT, content_ingestion_service.process_incoming_content)
# await event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, content_ingestion_service.process_incoming_slack_message)
return content_ingestion_service
|