LiKenun commited on
Commit
67436c8
·
1 Parent(s): 92e41ba

Simplify models

Browse files
src/ctp_slack_bot/models/__init__.py CHANGED
@@ -1,2 +1,3 @@
1
  from ctp_slack_bot.models.base import Chunk, Content, VectorizedChunk, VectorQuery
2
  from ctp_slack_bot.models.slack import SlackEventPayload, SlackMessage, SlackReaction, SlackResponse, SlackUserTimestampPair
 
 
1
  from ctp_slack_bot.models.base import Chunk, Content, VectorizedChunk, VectorQuery
2
  from ctp_slack_bot.models.slack import SlackEventPayload, SlackMessage, SlackReaction, SlackResponse, SlackUserTimestampPair
3
+ from ctp_slack_bot.models.webvtt import WebVTTContent, WebVTTFrame
src/ctp_slack_bot/models/base.py CHANGED
@@ -1,7 +1,6 @@
1
  from abc import ABC, abstractmethod
2
  from pydantic import BaseModel, ConfigDict, Field
3
- from types import MappingProxyType
4
- from typing import Any, Dict, final, Mapping, Self, Sequence, Optional
5
 
6
 
7
  class Chunk(BaseModel):
@@ -14,6 +13,7 @@ class Chunk(BaseModel):
14
 
15
  model_config = ConfigDict(frozen=True)
16
 
 
17
  @final
18
  class VectorQuery(BaseModel):
19
  """Model for vector database similarity search queries.
@@ -30,12 +30,14 @@ class VectorQuery(BaseModel):
30
  score_threshold: float = Field(default=0.7)
31
  filter_metadata: Optional[Mapping[str, Any]] = None
32
 
 
 
33
 
34
  @final
35
  class VectorizedChunk(Chunk):
36
  """A class representing a vectorized chunk of content."""
37
 
38
- embedding: Sequence[float] # The vector representation
39
 
40
 
41
  class Content(ABC, BaseModel):
@@ -44,22 +46,13 @@ class Content(ABC, BaseModel):
44
  model_config = ConfigDict(frozen=True)
45
 
46
  @abstractmethod
47
- def get_chunks(self: Self) -> Sequence[Chunk]:
48
  pass
49
 
50
  @abstractmethod
51
- def get_metadata(self: Self) -> Mapping[str, Any]:
52
- pass
53
-
54
- @abstractmethod
55
- def get_text(self: Self) -> str:
56
- pass
57
-
58
- @abstractmethod
59
- def get_bytes(self: Self) -> bytes:
60
  pass
61
 
62
- @property
63
  @abstractmethod
64
- def id(self: Self) -> str:
65
  pass
 
1
  from abc import ABC, abstractmethod
2
  from pydantic import BaseModel, ConfigDict, Field
3
+ from typing import Any, final, Mapping, Self, Sequence, Optional
 
4
 
5
 
6
  class Chunk(BaseModel):
 
13
 
14
  model_config = ConfigDict(frozen=True)
15
 
16
+
17
  @final
18
  class VectorQuery(BaseModel):
19
  """Model for vector database similarity search queries.
 
30
  score_threshold: float = Field(default=0.7)
31
  filter_metadata: Optional[Mapping[str, Any]] = None
32
 
33
+ model_config = ConfigDict(frozen=True)
34
+
35
 
36
  @final
37
  class VectorizedChunk(Chunk):
38
  """A class representing a vectorized chunk of content."""
39
 
40
+ embedding: Sequence[float] # The vector representation
41
 
42
 
43
  class Content(ABC, BaseModel):
 
46
  model_config = ConfigDict(frozen=True)
47
 
48
  @abstractmethod
49
+ def get_id(self: Self) -> str:
50
  pass
51
 
52
  @abstractmethod
53
+ def get_chunks(self: Self) -> Sequence[Chunk]:
 
 
 
 
 
 
 
 
54
  pass
55
 
 
56
  @abstractmethod
57
+ def get_metadata(self: Self) -> Mapping[str, Any]:
58
  pass
src/ctp_slack_bot/models/slack.py CHANGED
@@ -63,31 +63,19 @@ class SlackMessage(Content):
63
  is_starred: Optional[bool] = None
64
  pinned_to: Optional[Sequence[str]] = None
65
  reactions: Optional[Sequence[SlackReaction]] = None
66
- _canonical_json: PrivateAttr
67
 
68
- def __init__(self: Self, **data: Dict[str, Any]) -> None:
69
- super().__init__(**data)
70
- self._canonical_json = PrivateAttr(default_factory=lambda: dumps(data, sort_keys=True).encode())
71
 
72
  def get_chunks(self: Self) -> Sequence[Chunk]:
73
- return (Chunk(text=self.text, parent_id=self.id, chunk_id="", metadata=self.get_metadata()), )
74
 
75
  def get_metadata(self: Self) -> Mapping[str, Any]:
76
  return MappingProxyType({
77
  "modificationTime": datetime.fromtimestamp(float(self.ts))
78
  })
79
 
80
- def get_text(self: Self) -> str:
81
- return self.text
82
-
83
- def get_bytes(self: Self) -> bytes:
84
- return self._canonical_json
85
-
86
- @property
87
- def id(self: Self) -> str:
88
- """Unique identifier for this message."""
89
- return f"slack-message:{self.channel}:{self.ts}"
90
-
91
  class SlackResponse(BaseModel): # TODO: This should also be based on Content as it is a SlackMessage―just not one for which we know the identity yet.
92
  """Represents a response message to be sent to Slack."""
93
 
 
63
  is_starred: Optional[bool] = None
64
  pinned_to: Optional[Sequence[str]] = None
65
  reactions: Optional[Sequence[SlackReaction]] = None
 
66
 
67
+ def get_id(self: Self) -> str:
68
+ """Unique identifier for this message."""
69
+ return f"slack-message:{self.channel}:{self.ts}"
70
 
71
  def get_chunks(self: Self) -> Sequence[Chunk]:
72
+ return (Chunk(text=self.text, parent_id=self.get_id(), chunk_id="", metadata=self.get_metadata()), )
73
 
74
  def get_metadata(self: Self) -> Mapping[str, Any]:
75
  return MappingProxyType({
76
  "modificationTime": datetime.fromtimestamp(float(self.ts))
77
  })
78
 
 
 
 
 
 
 
 
 
 
 
 
79
  class SlackResponse(BaseModel): # TODO: This should also be based on Content as it is a SlackMessage―just not one for which we know the identity yet.
80
  """Represents a response message to be sent to Slack."""
81
 
src/ctp_slack_bot/models/webvtt.py CHANGED
@@ -1,15 +1,17 @@
1
  from datetime import datetime, timedelta
2
  from io import BytesIO
3
  from json import dumps
4
- from pydantic import BaseModel, ConfigDict, PositiveInt, PrivateAttr
5
- import re
6
  from types import MappingProxyType
7
  from typing import Any, Dict, Literal, Mapping, Optional, Self, Sequence
8
  from webvtt import Caption, WebVTT
9
 
10
  from ctp_slack_bot.models.base import Chunk, Content
11
 
12
- SPEAKER_SPEECH_CAPTION_TEXT_PATTERN = re.compile('(?:([^:]+): )?(.*)')
 
 
13
 
14
  class WebVTTFrame(BaseModel):
15
  """Represents a WebVTT frame"""
@@ -27,50 +29,44 @@ class WebVTTFrame(BaseModel):
27
  identifier = caption.identifier
28
  start = timedelta(**caption.start_time.__dict__)
29
  end = timedelta(**caption.end_time.__dict__)
30
- speech = caption.text
31
- match SPEAKER_SPEECH_CAPTION_TEXT_PATTERN.search(speech).groups():
32
- case (speaker, speech):
33
  return cls(identifier=identifier, start=start, end=end, speaker=speaker, speech=speech)
34
- case _:
35
  return cls(identifier=identifier, start=start, end=end, speech=speech)
36
 
37
 
38
- class WebVTTFile(Content): # TODO: insert a FileContent class in the object inheritance hierarchy.
39
- """Represents a message from Slack after adaptation."""
40
 
41
- filename: str
42
- modification_time: datetime
43
- bytes: bytes
44
 
 
 
45
 
46
  def get_chunks(self: Self) -> Sequence[Chunk]:
47
- return tuple(Chunk(text=frame.speech,
48
- parent_id=self.id,
49
- chunk_id=frame.identifier,
 
 
 
 
 
50
  metadata={
51
- "filename": self.filename,
52
- "start": self.modification_time + frame.start,
53
- "end": self.modification_time + frame.end,
54
- "user": frame.speaker
55
  })
56
- for frame
57
- in self.get_frames())
58
 
59
  def get_metadata(self: Self) -> Mapping[str, Any]:
60
- return MappingProxyType({
61
- "filename": self.filename,
62
- "modificationTime": self.modification_time
63
- })
64
 
65
- def get_text(self: Self) -> str: # TODO
66
- raise NotImplemented()
67
-
68
- def get_bytes(self: Self) -> bytes:
69
- return self.bytes
70
-
71
- def get_frames(self: Self) -> Sequence[WebVTTFrame]:
72
- return tuple(map(WebVTTFrame.from_webvtt_caption, WebVTT.from_buffer(BytesIO(buffer)).captions))
73
-
74
- @property
75
- def id(self: Self) -> str:
76
- return f"file:{self.filename}"
 
1
  from datetime import datetime, timedelta
2
  from io import BytesIO
3
  from json import dumps
4
+ from more_itertools import windowed
5
+ from pydantic import BaseModel, ConfigDict, Field, PositiveInt, PrivateAttr
6
  from types import MappingProxyType
7
  from typing import Any, Dict, Literal, Mapping, Optional, Self, Sequence
8
  from webvtt import Caption, WebVTT
9
 
10
  from ctp_slack_bot.models.base import Chunk, Content
11
 
12
+ CHUNK_FRAMES_OVERLAP = 1
13
+ CHUNK_FRAMES_WINDOW = 5
14
+ SPEAKER_SPEECH_TEXT_SEPARATOR = ": "
15
 
16
  class WebVTTFrame(BaseModel):
17
  """Represents a WebVTT frame"""
 
29
  identifier = caption.identifier
30
  start = timedelta(**caption.start_time.__dict__)
31
  end = timedelta(**caption.end_time.__dict__)
32
+ match caption.text.split(SPEAKER_SPEECH_TEXT_SEPARATOR, 1):
33
+ case [speaker, speech]:
 
34
  return cls(identifier=identifier, start=start, end=end, speaker=speaker, speech=speech)
35
+ case [speech]:
36
  return cls(identifier=identifier, start=start, end=end, speech=speech)
37
 
38
 
39
+ class WebVTTContent(Content):
40
+ """Represents parsed WebVTT content."""
41
 
42
+ id: str
43
+ metadata: Mapping[str, Any] = Field(default_factory=dict)
44
+ frames: Sequence[WebVTTFrame]
45
 
46
+ def get_id(self: Self) -> str:
47
+ return self.id
48
 
49
  def get_chunks(self: Self) -> Sequence[Chunk]:
50
+ windows = (tuple(filter(None, window))
51
+ for window
52
+ in windowed(self.frames, CHUNK_FRAMES_WINDOW, step=CHUNK_FRAMES_WINDOW-CHUNK_FRAMES_OVERLAP))
53
+ return tuple(Chunk(text="\n\n".join(": ".join(filter(None, (frame.speaker, frame.speech)))
54
+ for frame
55
+ in frames),
56
+ parent_id=self.get_id(),
57
+ chunk_id=f"{frames[0].identifier}-{frames[-1].identifier}",
58
  metadata={
59
+ "start": frames[0].start,
60
+ "end": frames[-1].end,
61
+ "speakers": frozenset(frame.speaker for frame in frames)
 
62
  })
63
+ for frames
64
+ in windows)
65
 
66
  def get_metadata(self: Self) -> Mapping[str, Any]:
67
+ return MappingProxyType(self.metadata)
 
 
 
68
 
69
+ @classmethod
70
+ def from_bytes(cls: type["WebVTTContent"], id: str, metadata: Mapping[str, Any], buffer: bytes) -> Self:
71
+ frames = tuple(map(WebVTTFrame.from_webvtt_caption, WebVTT.from_buffer(BytesIO(buffer)).captions))
72
+ return WebVTTContent(id=id, metadata=MappingProxyType(metadata), frames=frames)
 
 
 
 
 
 
 
 
src/ctp_slack_bot/services/content_ingestion_service.py CHANGED
@@ -30,8 +30,8 @@ class ContentIngestionService(BaseModel):
30
 
31
  async def process_incoming_content(self: Self, content: Content) -> None:
32
  logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
33
- # if self.vector_database_service.has_content(content.id) # TODO
34
- # logger.debug("Ignored content with ID {} because it already exists in the database.", content.id)
35
  # return
36
  chunks = content.get_chunks()
37
  await self.__vectorize_and_store_chunks_in_database(chunks)
 
30
 
31
  async def process_incoming_content(self: Self, content: Content) -> None:
32
  logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
33
+ # if self.vector_database_service.has_content(content.get_id()) # TODO
34
+ # logger.debug("Ignored content with ID {} because it already exists in the database.", content.get_id())
35
  # return
36
  chunks = content.get_chunks()
37
  await self.__vectorize_and_store_chunks_in_database(chunks)