LiKenun commited on
Commit
8ec2c5a
·
1 Parent(s): 9fd6e20

Switch to `async` architecture

Browse files
pyproject.toml CHANGED
@@ -32,6 +32,7 @@ dependencies = [
32
  "pytz>=2025.2",
33
  "apscheduler>=3.11.0",
34
  "slack-sdk>=3.35.0",
 
35
  "pymongo>=4.11.3 ",
36
  "webvtt-py>=0.5.1",
37
  "openai>=1.70.0",
 
32
  "pytz>=2025.2",
33
  "apscheduler>=3.11.0",
34
  "slack-sdk>=3.35.0",
35
+ "slack_bolt>=1.23.0",
36
  "pymongo>=4.11.3 ",
37
  "webvtt-py>=0.5.1",
38
  "openai>=1.70.0",
src/ctp_slack_bot/api/__init__.py CHANGED
@@ -1 +0,0 @@
1
- from ctp_slack_bot.api.main import app, run
 
 
src/ctp_slack_bot/api/main.py CHANGED
@@ -1,41 +1,31 @@
 
 
1
  from contextlib import asynccontextmanager
2
  from dependency_injector.wiring import inject, Provide
3
  from fastapi import FastAPI, HTTPException, Depends
4
  from loguru import logger
5
  from typing import Any, AsyncGenerator
6
- from slack_bolt import App
7
- from slack_bolt.adapter.socket_mode import SocketModeHandler
8
  from starlette.requests import Request
9
  from starlette.responses import Response
10
  from threading import Thread
11
  from typing import Any, Dict, Self
12
 
13
  from ctp_slack_bot.containers import Container
14
- from ctp_slack_bot.api.routes import router
15
  from ctp_slack_bot.core.config import Settings
16
  from ctp_slack_bot.core.logging import setup_logging
17
  from ctp_slack_bot.core.response_rendering import PrettyJSONResponse
18
  from ctp_slack_bot.tasks import start_scheduler, stop_scheduler
19
 
20
- @asynccontextmanager
21
- async def lifespan(app: FastAPI) -> AsyncGenerator:
22
- """
23
- Lifespan context manager for FastAPI application.
24
- Handles startup and shutdown events.
25
- """
26
-
27
- # Initialize container and wire the container to modules that need dependency injection.
28
  container = Container()
29
  container.wire(packages=['ctp_slack_bot'])
30
- app.container = container
31
 
32
  # Setup logging.
33
  setup_logging(container)
34
  logger.info("Starting application")
35
 
36
- # Log route being served.
37
- logger.info("Serving {} route(s):{}", len(app.routes), ''.join(f"\n* {", ".join(route.methods)} {route.path}" for route in app.routes))
38
-
39
  # Start the scheduler.
40
  scheduler = start_scheduler(container)
41
  logger.info("Started scheduler")
@@ -47,59 +37,25 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
47
  bolt_app = container.slack_bolt_app()
48
  slack_service = container.slack_service()
49
  @bolt_app.event("message")
50
- def handle_message(body: Dict[str, Any]) -> None:
51
- logger.debug("Received Slack message event: {}", body)
52
- slack_service.process_message(body)
53
  @bolt_app.event("app_mention")
54
- def handle_message(body: Dict[str, Any]) -> None:
55
- logger.debug("Received Slack app mention event: {}", body)
56
- slack_service.process_message(body)
57
 
58
  # Start Socket Mode handler in a background thread
59
- socket_mode_handler = SocketModeHandler(
60
  app=bolt_app,
61
  app_token=container.settings().SLACK_APP_TOKEN.get_secret_value()
62
  )
63
- socket_thread = Thread(target=socket_mode_handler.start)
64
- socket_thread.daemon = True
65
- socket_thread.start()
66
- logger.info("Started Slack Socket Mode handler")
67
-
68
- yield # control to FastAPI until shutdown.
69
 
70
  # Shutdown.
71
  logger.info("Shutting down application")
72
  stop_scheduler(scheduler)
73
  logger.info("Stopped scheduler")
74
 
75
- app = FastAPI(
76
- title="CTP Slack Bot",
77
- description="A Slack bot for processing and analyzing Zoom transcripts using AI",
78
- version="0.1.0",
79
- lifespan=lifespan,
80
- )
81
-
82
- # Include routers.
83
- app.include_router(router)
84
-
85
- # Provide a minimalist health check endpoint for clients to detect availability.
86
- @app.get("/health")
87
- async def get_health() -> dict[str, str]:
88
- """Health check"""
89
- return {
90
- "status": "healthy"
91
- }
92
-
93
- # Alternate starting path for development
94
- def run() -> None:
95
- import uvicorn
96
- settings = Settings() # type: ignore
97
- uvicorn.run(
98
- "main:app",
99
- host=settings.API_HOST,
100
- port=settings.API_PORT,
101
- reload=settings.DEBUG
102
- )
103
-
104
  if __name__ == "__main__":
105
- run()
 
 
1
+ from apscheduler.schedulers.asyncio import AsyncIOScheduler
2
+ from asyncio import run as run_async
3
  from contextlib import asynccontextmanager
4
  from dependency_injector.wiring import inject, Provide
5
  from fastapi import FastAPI, HTTPException, Depends
6
  from loguru import logger
7
  from typing import Any, AsyncGenerator
8
+ from slack_bolt.async_app import AsyncApp
9
+ from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler
10
  from starlette.requests import Request
11
  from starlette.responses import Response
12
  from threading import Thread
13
  from typing import Any, Dict, Self
14
 
15
  from ctp_slack_bot.containers import Container
 
16
  from ctp_slack_bot.core.config import Settings
17
  from ctp_slack_bot.core.logging import setup_logging
18
  from ctp_slack_bot.core.response_rendering import PrettyJSONResponse
19
  from ctp_slack_bot.tasks import start_scheduler, stop_scheduler
20
 
21
+ async def main() -> None:
 
 
 
 
 
 
 
22
  container = Container()
23
  container.wire(packages=['ctp_slack_bot'])
 
24
 
25
  # Setup logging.
26
  setup_logging(container)
27
  logger.info("Starting application")
28
 
 
 
 
29
  # Start the scheduler.
30
  scheduler = start_scheduler(container)
31
  logger.info("Started scheduler")
 
37
  bolt_app = container.slack_bolt_app()
38
  slack_service = container.slack_service()
39
  @bolt_app.event("message")
40
+ # async def handle_message(body: Dict[str, Any]) -> None:
41
+ # await slack_service.process_message(body)
 
42
  @bolt_app.event("app_mention")
43
+ async def handle_message(body: Dict[str, Any]) -> None:
44
+ await slack_service.process_message(body)
 
45
 
46
  # Start Socket Mode handler in a background thread
47
+ socket_mode_handler = AsyncSocketModeHandler(
48
  app=bolt_app,
49
  app_token=container.settings().SLACK_APP_TOKEN.get_secret_value()
50
  )
51
+ logger.info("Starting Slack Socket Mode handler…")
52
+ await socket_mode_handler.start_async()
 
 
 
 
53
 
54
  # Shutdown.
55
  logger.info("Shutting down application")
56
  stop_scheduler(scheduler)
57
  logger.info("Stopped scheduler")
58
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
  if __name__ == "__main__":
60
+ # run()
61
+ run_async(main())
src/ctp_slack_bot/api/routes.py DELETED
@@ -1,112 +0,0 @@
1
- from fastapi import APIRouter, Depends, HTTPException, status
2
- from dependency_injector.wiring import inject, Provide
3
- from loguru import logger
4
-
5
- from ctp_slack_bot.containers import Container
6
- from ctp_slack_bot.core.config import Settings
7
- from ctp_slack_bot.services import AnswerRetrievalService, ContentIngestionService, ContextRetrievalService, EmbeddingsModelService, LanguageModelService, QuestionDispatchService, SlackService, VectorDatabaseService, VectorizationService
8
-
9
- router = APIRouter(prefix="/api/v1")
10
-
11
- @router.get("/env", response_model=Settings)
12
- @inject
13
- async def get_env(settings: Settings = Depends(Provide[Container.settings])) -> Settings:
14
- if not settings.DEBUG:
15
- raise HTTPException(status_code=404)
16
- return settings
17
-
18
- @router.get("/answer_retrieval_service")
19
- @inject
20
- async def test(answer_retrieval_service: AnswerRetrievalService = Depends(Provide[Container.answer_retrieval_service])) -> None:
21
- pass
22
-
23
- @router.get("/content_ingestion_service")
24
- @inject
25
- async def test(content_ingestion_service: ContentIngestionService = Depends(Provide[Container.content_ingestion_service])) -> None:
26
- pass
27
-
28
- @router.get("/context_retrieval_service")
29
- @inject
30
- async def test(context_retrieval_service: ContextRetrievalService = Depends(Provide[Container.context_retrieval_service])) -> None:
31
- pass
32
-
33
- @router.get("/embeddings_model_service")
34
- @inject
35
- async def test(embeddings_model_service: EmbeddingsModelService = Depends(Provide[Container.embeddings_model_service])) -> None:
36
- pass
37
-
38
- @router.get("/language_model_service")
39
- @inject
40
- async def test(language_model_service: LanguageModelService = Depends(Provide[Container.language_model_service])) -> None:
41
- pass
42
-
43
- @router.get("/question_dispatch_service")
44
- @inject
45
- async def test(question_dispatch_service: QuestionDispatchService = Depends(Provide[Container.question_dispatch_service])) -> None:
46
- pass
47
-
48
- @router.get("/slack_service")
49
- @inject
50
- async def test(slack_service: SlackService = Depends(Provide[Container.slack_service])) -> None:
51
- pass
52
-
53
- @router.get("/vector_database_service")
54
- @inject
55
- async def test(vector_database_service: VectorDatabaseService = Depends(Provide[Container.vector_database_service])) -> None:
56
- pass
57
-
58
- @router.get("/vectorization_service")
59
- @inject
60
- async def test(vectorization_service: VectorizationService = Depends(Provide[Container.vectorization_service])) -> None:
61
- pass
62
-
63
- # @router.post("/transcripts/analyze", response_model=TranscriptResponse)
64
- # async def analyze_transcript(
65
- # request: TranscriptRequest,
66
- # transcript_service: TranscriptService = Depends(get_transcript_service),
67
- # ):
68
- # """
69
- # Analyze a Zoom transcript and return insights.
70
- # """
71
- # logger.info(f"Analyzing transcript: {request.transcript_id}")
72
- # try:
73
- # result = await transcript_service.analyze_transcript(request)
74
- # return result
75
- # except Exception as e:
76
- # logger.error(f"Error analyzing transcript: {e}")
77
- # raise HTTPException(
78
- # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
79
- # detail="Failed to analyze transcript",
80
- # )
81
-
82
-
83
- # @router.post("/slack/message")
84
- # async def send_slack_message(
85
- # channel: str,
86
- # message: str,
87
- # slack_service: SlackService = Depends(get_slack_service),
88
- # ):
89
- # """
90
- # Send a message to a Slack channel.
91
- # """
92
- # logger.info(f"Sending message to Slack channel: {channel}")
93
- # try:
94
- # result = await slack_service.send_message(channel, message)
95
- # return {"status": "success", "message_ts": result.get("ts")}
96
- # except Exception as e:
97
- # logger.error(f"Error sending Slack message: {e}")
98
- # raise HTTPException(
99
- # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
100
- # detail="Failed to send Slack message",
101
- # )
102
-
103
-
104
- # @router.post("/slack/webhook", include_in_schema=False)
105
- # async def slack_webhook(
106
- # slack_service: SlackService = Depends(get_slack_service),
107
- # ):
108
- # """
109
- # Webhook endpoint for Slack events.
110
- # """
111
- # # This would typically handle Slack verification and event processing
112
- # return {"challenge": "challenge_token"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/ctp_slack_bot/containers.py CHANGED
@@ -1,7 +1,6 @@
1
  from dependency_injector.containers import DeclarativeContainer
2
  from dependency_injector.providers import Factory, List, Singleton
3
- from slack_bolt import App
4
- from slack_bolt.adapter.socket_mode import SocketModeHandler
5
 
6
  from ctp_slack_bot.core.config import Settings
7
  from ctp_slack_bot.db.mongo_db import MongoDB
@@ -20,7 +19,7 @@ from ctp_slack_bot.services.vectorization_service import VectorizationService
20
  class Container(DeclarativeContainer):
21
  settings = Singleton(Settings)
22
  event_brokerage_service = Singleton(EventBrokerageService)
23
- slack_bolt_app = Factory(App, token=settings.provided.SLACK_BOT_TOKEN().get_secret_value())
24
  mongo_db = Singleton(MongoDB, settings=settings) # TODO: we could really use less commitment to MongoDB.
25
  # Repositories
26
  # transcript_repository = Factory(
 
1
  from dependency_injector.containers import DeclarativeContainer
2
  from dependency_injector.providers import Factory, List, Singleton
3
+ from slack_bolt.async_app import AsyncApp
 
4
 
5
  from ctp_slack_bot.core.config import Settings
6
  from ctp_slack_bot.db.mongo_db import MongoDB
 
19
  class Container(DeclarativeContainer):
20
  settings = Singleton(Settings)
21
  event_brokerage_service = Singleton(EventBrokerageService)
22
+ slack_bolt_app = Factory(AsyncApp, token=settings.provided.SLACK_BOT_TOKEN().get_secret_value())
23
  mongo_db = Singleton(MongoDB, settings=settings) # TODO: we could really use less commitment to MongoDB.
24
  # Repositories
25
  # transcript_repository = Factory(
src/ctp_slack_bot/models/base.py CHANGED
@@ -20,8 +20,6 @@ class VectorizedChunk(Chunk):
20
 
21
  embedding: Sequence[float] # The vector representation
22
 
23
- model_config = ConfigDict(frozen=True)
24
-
25
 
26
  class Content(ABC, BaseModel):
27
  """An abstract base class for all types of content."""
 
20
 
21
  embedding: Sequence[float] # The vector representation
22
 
 
 
23
 
24
  class Content(ABC, BaseModel):
25
  """An abstract base class for all types of content."""
src/ctp_slack_bot/models/slack.py CHANGED
@@ -69,7 +69,7 @@ class SlackMessage(Content):
69
  self._canonical_json = PrivateAttr(default_factory=lambda: dumps(data, sort_keys=True).encode())
70
 
71
  def get_chunks(self: Self) -> Sequence[Chunk]:
72
- return (Chunk(text=self.text, parent_id=self.id, chunk_id="", metadata=self.metadata), )
73
 
74
  def get_metadata(self: Self) -> Dict[str, Any]:
75
  return {
 
69
  self._canonical_json = PrivateAttr(default_factory=lambda: dumps(data, sort_keys=True).encode())
70
 
71
  def get_chunks(self: Self) -> Sequence[Chunk]:
72
+ return (Chunk(text=self.text, parent_id=self.id, chunk_id="", metadata=self.get_metadata()), )
73
 
74
  def get_metadata(self: Self) -> Dict[str, Any]:
75
  return {
src/ctp_slack_bot/services/answer_retrieval_service.py CHANGED
@@ -23,9 +23,10 @@ class AnswerRetrievalService(BaseModel):
23
  logger.debug("Created {}", self.__class__.__name__)
24
  return self
25
 
26
- def push(self: Self, question: SlackMessage, context: Collection[Chunk]) -> None:
27
  channel_to_respond_to = question.channel
28
  thread_to_respond_to = question.thread_ts if question.thread_ts else question.ts
29
  answer = self.language_model_service.answer_question(question.text, context)
 
30
  slack_response = SlackResponse(text=answer, channel=channel_to_respond_to, thread_ts=thread_to_respond_to)
31
- self.event_brokerage_service.publish(EventType.OUTGOING_SLACK_RESPONSE, slack_response)
 
23
  logger.debug("Created {}", self.__class__.__name__)
24
  return self
25
 
26
+ async def push(self: Self, question: SlackMessage, context: Collection[Chunk]) -> None:
27
  channel_to_respond_to = question.channel
28
  thread_to_respond_to = question.thread_ts if question.thread_ts else question.ts
29
  answer = self.language_model_service.answer_question(question.text, context)
30
+ logger.debug("Pushing response to channel {} and thread {}: {}", channel_to_respond_to, thread_to_respond_to, answer)
31
  slack_response = SlackResponse(text=answer, channel=channel_to_respond_to, thread_ts=thread_to_respond_to)
32
+ await self.event_brokerage_service.publish(EventType.OUTGOING_SLACK_RESPONSE, slack_response)
src/ctp_slack_bot/services/content_ingestion_service.py CHANGED
@@ -26,7 +26,7 @@ class ContentIngestionService(BaseModel):
26
  self.event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, self.process_incoming_slack_message)
27
  return self
28
 
29
- def process_incoming_content(self: Self, content: Content) -> None:
30
  logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
31
  # if self.vector_database_service.has_content(content.id) # TODO
32
  # logger.debug("Ignored content with ID {} because it already exists in the database.", content.id)
@@ -35,7 +35,7 @@ class ContentIngestionService(BaseModel):
35
  self.__vectorize_and_store_chunks_in_database(chunks)
36
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
37
 
38
- def process_incoming_slack_message(self: Self, slack_message: SlackMessage) -> None:
39
  logger.debug("Content ingestion service received a Slack message: {}", slack_message.text)
40
  chunks = slack_message.get_chunks()
41
  self.__vectorize_and_store_chunks_in_database(chunks)
 
26
  self.event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, self.process_incoming_slack_message)
27
  return self
28
 
29
+ async def process_incoming_content(self: Self, content: Content) -> None:
30
  logger.debug("Content ingestion service received content with metadata: {}", content.get_metadata())
31
  # if self.vector_database_service.has_content(content.id) # TODO
32
  # logger.debug("Ignored content with ID {} because it already exists in the database.", content.id)
 
35
  self.__vectorize_and_store_chunks_in_database(chunks)
36
  logger.debug("Stored {} vectorized chunk(s) in the database.", len(chunks))
37
 
38
+ async def process_incoming_slack_message(self: Self, slack_message: SlackMessage) -> None:
39
  logger.debug("Content ingestion service received a Slack message: {}", slack_message.text)
40
  chunks = slack_message.get_chunks()
41
  self.__vectorize_and_store_chunks_in_database(chunks)
src/ctp_slack_bot/services/context_retrieval_service.py CHANGED
@@ -76,5 +76,5 @@ class ContextRetrievalService(BaseModel):
76
  # except Exception as e:
77
  # logger.error(f"Error retrieving context for message {message.key}: {str(e)}")
78
  # return []
79
- return (VectorizedChunk(text="Mock context chunk", parent_id="lol", chunk_id="no", metadata={}),
80
- VectorizedChunk(text="Moar mock context chunk", parent_id="lol", chunk_id="wut", metadata={}))
 
76
  # except Exception as e:
77
  # logger.error(f"Error retrieving context for message {message.key}: {str(e)}")
78
  # return []
79
+ return (VectorizedChunk(text="Mock context chunk", parent_id="lol", chunk_id="no", metadata={}, embedding=tuple()),
80
+ VectorizedChunk(text="Moar mock context chunk", parent_id="lol", chunk_id="wut", metadata={}, embedding=tuple()))
src/ctp_slack_bot/services/event_brokerage_service.py CHANGED
@@ -1,4 +1,3 @@
1
-
2
  from asyncio import create_task, iscoroutinefunction, to_thread
3
  from collections import defaultdict
4
  from loguru import logger
@@ -26,7 +25,7 @@ class EventBrokerageService(BaseModel):
26
  subscribers.append(callback)
27
  logger.debug("Event type {} has {} subscriber(s).", type, len(subscribers))
28
 
29
- def publish(self: Self, type: EventType, data: Any = None) -> None:
30
  """Publish an event with optional data to all subscribers."""
31
  subscribers = self._subscribers[type]
32
  if not subscribers:
@@ -35,8 +34,8 @@ class EventBrokerageService(BaseModel):
35
  logger.debug("Broadcasting event {} to {} subscriber(s): {}", type, len(subscribers), data)
36
  for callback in subscribers:
37
  if iscoroutinefunction(callback):
38
- task: create_task(callback(data))
39
- task.add_done_callback(lambda done_task: logger.error("Error in asynchronous event callback handling event {}: {}", done_task.exception())
40
  if done_task.exception()
41
  else None)
42
  else:
 
 
1
  from asyncio import create_task, iscoroutinefunction, to_thread
2
  from collections import defaultdict
3
  from loguru import logger
 
25
  subscribers.append(callback)
26
  logger.debug("Event type {} has {} subscriber(s).", type, len(subscribers))
27
 
28
+ async def publish(self: Self, type: EventType, data: Any = None) -> None:
29
  """Publish an event with optional data to all subscribers."""
30
  subscribers = self._subscribers[type]
31
  if not subscribers:
 
34
  logger.debug("Broadcasting event {} to {} subscriber(s): {}", type, len(subscribers), data)
35
  for callback in subscribers:
36
  if iscoroutinefunction(callback):
37
+ task = create_task(callback(data))
38
+ task.add_done_callback(lambda done_task: logger.error("Error in asynchronous event callback handling event {}: {}", type, done_task.exception())
39
  if done_task.exception()
40
  else None)
41
  else:
src/ctp_slack_bot/services/question_dispatch_service.py CHANGED
@@ -27,8 +27,8 @@ class QuestionDispatchService(BaseModel):
27
  self.event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, self.__process_incoming_slack_message)
28
  return self
29
 
30
- def __process_incoming_slack_message(self: Self, message: SlackMessage) -> None:
31
  if message.subtype != 'bot_message':
32
  logger.debug("Question dispatch service received an answerable question: {}", message.text)
33
  context = self.context_retrieval_service.get_context(message)
34
- self.answer_retrieval_service.push(message, context)
 
27
  self.event_brokerage_service.subscribe(EventType.INCOMING_SLACK_MESSAGE, self.__process_incoming_slack_message)
28
  return self
29
 
30
+ async def __process_incoming_slack_message(self: Self, message: SlackMessage) -> None:
31
  if message.subtype != 'bot_message':
32
  logger.debug("Question dispatch service received an answerable question: {}", message.text)
33
  context = self.context_retrieval_service.get_context(message)
34
+ await self.answer_retrieval_service.push(message, context)
src/ctp_slack_bot/services/slack_service.py CHANGED
@@ -1,7 +1,7 @@
1
  from loguru import logger
2
  from openai import OpenAI
3
  from pydantic import BaseModel, model_validator
4
- from slack_bolt import App
5
  from typing import Any, Dict, Self
6
 
7
  from ctp_slack_bot.enums import EventType
@@ -15,7 +15,7 @@ class SlackService(BaseModel):
15
  """
16
 
17
  event_brokerage_service: EventBrokerageService
18
- slack_bolt_app: App
19
 
20
  class Config:
21
  arbitrary_types_allowed = True
@@ -40,10 +40,10 @@ class SlackService(BaseModel):
40
  event_ts=event.get("event_ts")
41
  )
42
 
43
- def process_message(self: Self, event: Dict[str, Any]) -> None:
44
  slack_message = self.adapt_event_payload(event.get("event", {}))
45
  logger.debug("Received message from Slack: {}", slack_message)
46
- self.event_brokerage_service.publish(EventType.INCOMING_SLACK_MESSAGE, slack_message)
47
 
48
- def send_message(self: Self, message: SlackResponse) -> None:
49
- self.slack_bolt_app.client.chat_postMessage(channel=message.channel, text=message.text, thread_ts=message.thread_ts)
 
1
  from loguru import logger
2
  from openai import OpenAI
3
  from pydantic import BaseModel, model_validator
4
+ from slack_bolt.async_app import AsyncApp
5
  from typing import Any, Dict, Self
6
 
7
  from ctp_slack_bot.enums import EventType
 
15
  """
16
 
17
  event_brokerage_service: EventBrokerageService
18
+ slack_bolt_app: AsyncApp
19
 
20
  class Config:
21
  arbitrary_types_allowed = True
 
40
  event_ts=event.get("event_ts")
41
  )
42
 
43
+ async def process_message(self: Self, event: Dict[str, Any]) -> None:
44
  slack_message = self.adapt_event_payload(event.get("event", {}))
45
  logger.debug("Received message from Slack: {}", slack_message)
46
+ await self.event_brokerage_service.publish(EventType.INCOMING_SLACK_MESSAGE, slack_message)
47
 
48
+ async def send_message(self: Self, message: SlackResponse) -> None:
49
+ await self.slack_bolt_app.client.chat_postMessage(channel=message.channel, text=message.text, thread_ts=message.thread_ts)