File size: 2,152 Bytes
c21d29c
 
 
f0fe0fd
c21d29c
bb7c9a3
c21d29c
f0fe0fd
c21d29c
bb7c9a3
c21d29c
bb7c9a3
f0fe0fd
c21d29c
 
 
 
bb7c9a3
c21d29c
bb7c9a3
 
c21d29c
bb7c9a3
 
 
c21d29c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f0fe0fd
 
c21d29c
 
f0fe0fd
c21d29c
92e41ba
f0fe0fd
c21d29c
 
92e41ba
bb7c9a3
 
f0fe0fd
bb7c9a3
 
f0fe0fd
 
 
92e41ba
f0fe0fd
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
from dependency_injector.resources import AsyncResource
from loguru import logger
from pydantic import ConfigDict
from pytz import timezone
from typing import Any, Self

from ctp_slack_bot.core import ApplicationComponentBase, Settings


class TaskService(ApplicationComponentBase):
    """
    Service for running scheduled tasks.
    """

    model_config = ConfigDict(frozen=True)

    settings: Settings
    _scheduler: AsyncIOScheduler

    def model_post_init(self: Self, context: Any, /) -> None:
        super().model_post_init(context)
        self._scheduler = AsyncIOScheduler(timezone=timezone(self.settings.scheduler_timezone))

    def _configure_jobs(self: Self) -> None:
        # Example jobs (uncomment and implement as needed)
        # self._scheduler.add_job(
        #     send_error_report,
        #     CronTrigger(hour=7, minute=0),
        #     id="daily_error_report",
        #     name="Daily Error Report",
        #     replace_existing=True,
        # )
        # self._scheduler.add_job(
        #     cleanup_old_transcripts,
        #     CronTrigger(day_of_week="sun", hour=1, minute=0),
        #     id="weekly_transcript_cleanup",
        #     name="Weekly Transcript Cleanup",
        #     replace_existing=True,
        # )
        pass

    async def start(self: Self) -> None:
        logger.info("Starting scheduler…")
        self._scheduler.start()

    async def stop(self: Self) -> None:
        if self._scheduler.running:
            self._scheduler.shutdown()
            logger.info("Stopped scheduler.")
        else:
            logger.debug("The scheduler is not running. There is no scheduler to shut down.")

    @property
    def name(self: Self) -> str:
        return "task_service"


class TaskServiceResource(AsyncResource):
    async def init(self: Self, settings: Settings) -> TaskService:
        return TaskService(settings=settings)

    async def shutdown(self: Self, task_service: TaskService) -> None:
        await task_service.stop()