Spaces:
Runtime error
Runtime error
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| from apscheduler.triggers.cron import CronTrigger | |
| from datetime import datetime | |
| from dependency_injector.resources import AsyncResource | |
| from loguru import logger | |
| from pydantic import ConfigDict | |
| from pytz import timezone | |
| from typing import Any, Self | |
| from ctp_slack_bot.core import ApplicationComponentBase, Settings | |
| class TaskService(ApplicationComponentBase): | |
| """ | |
| Service for running scheduled tasks. | |
| """ | |
| model_config = ConfigDict(frozen=True) | |
| settings: Settings | |
| _scheduler: AsyncIOScheduler | |
| def model_post_init(self: Self, context: Any, /) -> None: | |
| super().model_post_init(context) | |
| self._scheduler = AsyncIOScheduler(timezone=timezone(self.settings.scheduler_timezone)) | |
| def _configure_jobs(self: Self) -> None: | |
| # Example jobs (uncomment and implement as needed) | |
| # self._scheduler.add_job( | |
| # send_error_report, | |
| # CronTrigger(hour=7, minute=0), | |
| # id="daily_error_report", | |
| # name="Daily Error Report", | |
| # replace_existing=True, | |
| # ) | |
| # self._scheduler.add_job( | |
| # cleanup_old_transcripts, | |
| # CronTrigger(day_of_week="sun", hour=1, minute=0), | |
| # id="weekly_transcript_cleanup", | |
| # name="Weekly Transcript Cleanup", | |
| # replace_existing=True, | |
| # ) | |
| pass | |
| async def start(self: Self) -> None: | |
| logger.info("Starting scheduler…") | |
| self._scheduler.start() | |
| async def stop(self: Self) -> None: | |
| if self._scheduler.running: | |
| self._scheduler.shutdown() | |
| logger.info("Stopped scheduler.") | |
| else: | |
| logger.debug("The scheduler is not running. There is no scheduler to shut down.") | |
| def name(self: Self) -> str: | |
| return "task_service" | |
| class TaskServiceResource(AsyncResource): | |
| async def init(self: Self, settings: Settings) -> TaskService: | |
| return TaskService(settings=settings) | |
| async def shutdown(self: Self, task_service: TaskService) -> None: | |
| await task_service.stop() | |