File size: 2,963 Bytes
f0fe0fd
c6a2a56
bb7c9a3
 
c6a2a56
bb7c9a3
f7e11c1
9fd6e20
 
a1a6d79
 
c6a2a56
bb7c9a3
 
c6a2a56
 
 
 
bb7c9a3
 
c6a2a56
f7e11c1
c6a2a56
 
8ec2c5a
9fd6e20
f7e11c1
bb5dde5
 
9fd6e20
f7e11c1
 
9fd6e20
8ec2c5a
9fd6e20
 
f7e11c1
 
9fd6e20
f7e11c1
 
 
bb7c9a3
 
 
 
f0fe0fd
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from dependency_injector.resources import AsyncResource
from loguru import logger
from pydantic import ConfigDict
from typing import Any, Self, Sequence, Set

from ctp_slack_bot.core import ApplicationComponentBase, Settings
from ctp_slack_bot.db.repositories import VectorizedChunkRepository
from ctp_slack_bot.enums import EventType
from ctp_slack_bot.models import Chunk, Content, SlackMessage
from .event_brokerage_service import EventBrokerageService
from .vectorization_service import VectorizationService


class ContentIngestionService(ApplicationComponentBase):
    """
    Service for ingesting content.
    """

    model_config = ConfigDict(frozen=True)

    settings: Settings
    vectorized_chunk_repository: VectorizedChunkRepository
    vectorization_service: VectorizationService

    async def process_incoming_content(self: Self, content: Content) -> None:
        logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
        if self.vectorized_chunk_repository.count_by_id(content.get_id()):
           logger.debug("Ignored content with identifier, {}, because it already exists in the database.", content.get_id())
           return
        chunks = content.get_chunks()
        inserted_ids = await self.__vectorize_and_store_chunks_in_database(chunks)
        logger.debug("Stored {} vectorized chunk(s) in the database.", len(inserted_ids))

    async def process_incoming_slack_message(self: Self, slack_message: SlackMessage) -> None:
        logger.debug("Content ingestion service received a Slack message: {}", slack_message.text)
        chunks = slack_message.get_chunks()
        inserted_ids = await self.__vectorize_and_store_chunks_in_database(chunks)
        logger.debug("Stored {} vectorized chunk(s) in the database.", len(inserted_ids))

    async def __vectorize_and_store_chunks_in_database(self: Self, chunks: Sequence[Chunk]) -> Set[str]:
        vectorized_chunks = await self.vectorization_service.vectorize(chunks)
        return await self.vectorized_chunk_repository.insert_many(vectorized_chunks)

    @property
    def name(self: Self) -> str:
        return "content_ingestion_service"


class ContentIngestionServiceResource(AsyncResource):
    async def init(self: Self, settings: Settings, event_brokerage_service: EventBrokerageService, vectorized_chunk_repository: VectorizedChunkRepository, vectorization_service: VectorizationService) -> ContentIngestionService:
        content_ingestion_service = ContentIngestionService(settings=settings, vectorized_chunk_repository=vectorized_chunk_repository, vectorization_service=vectorization_service)
        await event_brokerage_service.subscribe(EventType.INCOMING_CONTENT, content_ingestion_service.process_incoming_content)
        # await event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, content_ingestion_service.process_incoming_slack_message)
        return content_ingestion_service