Spaces:
Runtime error
Runtime error
from datetime import datetime | |
from json import dumps | |
from pydantic import BaseModel, ConfigDict, PositiveInt | |
from types import MappingProxyType | |
from typing import Any, Literal, Mapping, Optional, Self | |
from .base import Chunk, Content | |
class SlackEventPayload(BaseModel): | |
"""Represents a general event payload from Slack.""" | |
model_config = ConfigDict(extra='allow', frozen=True) | |
type: str | |
event_ts: str | |
class SlackEvent(BaseModel): | |
"""Represents a general event from Slack.""" | |
model_config = ConfigDict(frozen=True) | |
token: str | |
team_id: str | |
api_app_id: str | |
event: SlackEventPayload | |
type: str | |
event_id: str | |
event_time: int | |
authed_users: tuple[str, ...] | |
class SlackUserTimestampPair(BaseModel): | |
"""Represents a Slack user-timestamp pair.""" | |
model_config = ConfigDict(frozen=True) | |
user: str | |
ts: str | |
class SlackReaction(BaseModel): | |
"""Represents a Slack reaction information.""" | |
model_config = ConfigDict(frozen=True) | |
name: str | |
count: PositiveInt | |
users: tuple[str, ...] | |
class SlackMessage(Content): | |
"""Represents a message from Slack after adaptation.""" | |
type: Literal["app_mention", "message"] | |
subtype: Optional[str] = None | |
channel: str | |
channel_type: Optional[str] = None | |
user: Optional[str] = None | |
bot_id: Optional[str] = None | |
thread_ts: Optional[str] = None | |
text: str | |
ts: str | |
edited: Optional[SlackUserTimestampPair] = None | |
event_ts: str | |
deleted_ts: Optional[str] = None | |
hidden: bool = False | |
is_starred: Optional[bool] = None | |
pinned_to: Optional[tuple[str, ...]] = None | |
reactions: Optional[tuple[SlackReaction, ...]] = None | |
def get_id(self: Self) -> str: | |
"""Unique identifier for this message.""" | |
return f"slack-message:{self.channel}:{self.ts}" | |
def get_chunks(self: Self) -> tuple[Chunk]: | |
return (Chunk(text=self.text, parent_id=self.get_id(), chunk_id="", metadata=self.get_metadata()), ) | |
def get_metadata(self: Self) -> Mapping[str, Any]: | |
return MappingProxyType({ | |
"modificationTime": datetime.fromtimestamp(float(self.ts)) | |
}) | |
class SlackResponse(BaseModel): # TODO: This should also be based on Content as it is a SlackMessage―just not one for which we know the identity yet. | |
"""Represents a response message to be sent to Slack.""" | |
model_config = ConfigDict(frozen=True) | |
text: str | |
channel: Optional[str] | |
thread_ts: Optional[str] = None | |