from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from datetime import datetime from dependency_injector.resources import AsyncResource from loguru import logger from pydantic import ConfigDict from pytz import timezone from typing import Any, Self from ctp_slack_bot.core import ApplicationComponentBase, Settings class TaskService(ApplicationComponentBase): """ Service for running scheduled tasks. """ model_config = ConfigDict(frozen=True) settings: Settings _scheduler: AsyncIOScheduler def model_post_init(self: Self, context: Any, /) -> None: super().model_post_init(context) self._scheduler = AsyncIOScheduler(timezone=timezone(self.settings.scheduler_timezone)) def _configure_jobs(self: Self) -> None: # Example jobs (uncomment and implement as needed) # self._scheduler.add_job( # send_error_report, # CronTrigger(hour=7, minute=0), # id="daily_error_report", # name="Daily Error Report", # replace_existing=True, # ) # self._scheduler.add_job( # cleanup_old_transcripts, # CronTrigger(day_of_week="sun", hour=1, minute=0), # id="weekly_transcript_cleanup", # name="Weekly Transcript Cleanup", # replace_existing=True, # ) pass async def start(self: Self) -> None: logger.info("Starting scheduler…") self._scheduler.start() async def stop(self: Self) -> None: if self._scheduler.running: self._scheduler.shutdown() logger.info("Stopped scheduler.") else: logger.debug("The scheduler is not running. There is no scheduler to shut down.") @property def name(self: Self) -> str: return "task_service" class TaskServiceResource(AsyncResource): async def init(self: Self, settings: Settings) -> TaskService: return TaskService(settings=settings) async def shutdown(self: Self, task_service: TaskService) -> None: await task_service.stop()