Spaces:
Runtime error
Runtime error
File size: 2,152 Bytes
c21d29c f0fe0fd c21d29c bb7c9a3 c21d29c f0fe0fd c21d29c bb7c9a3 c21d29c bb7c9a3 f0fe0fd c21d29c bb7c9a3 c21d29c bb7c9a3 c21d29c bb7c9a3 c21d29c f0fe0fd c21d29c f0fe0fd c21d29c 92e41ba f0fe0fd c21d29c 92e41ba bb7c9a3 f0fe0fd bb7c9a3 f0fe0fd 92e41ba f0fe0fd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
from dependency_injector.resources import AsyncResource
from loguru import logger
from pydantic import ConfigDict
from pytz import timezone
from typing import Any, Self
from ctp_slack_bot.core import ApplicationComponentBase, Settings
class TaskService(ApplicationComponentBase):
"""
Service for running scheduled tasks.
"""
model_config = ConfigDict(frozen=True)
settings: Settings
_scheduler: AsyncIOScheduler
def model_post_init(self: Self, context: Any, /) -> None:
super().model_post_init(context)
self._scheduler = AsyncIOScheduler(timezone=timezone(self.settings.scheduler_timezone))
def _configure_jobs(self: Self) -> None:
# Example jobs (uncomment and implement as needed)
# self._scheduler.add_job(
# send_error_report,
# CronTrigger(hour=7, minute=0),
# id="daily_error_report",
# name="Daily Error Report",
# replace_existing=True,
# )
# self._scheduler.add_job(
# cleanup_old_transcripts,
# CronTrigger(day_of_week="sun", hour=1, minute=0),
# id="weekly_transcript_cleanup",
# name="Weekly Transcript Cleanup",
# replace_existing=True,
# )
pass
async def start(self: Self) -> None:
logger.info("Starting scheduler…")
self._scheduler.start()
async def stop(self: Self) -> None:
if self._scheduler.running:
self._scheduler.shutdown()
logger.info("Stopped scheduler.")
else:
logger.debug("The scheduler is not running. There is no scheduler to shut down.")
@property
def name(self: Self) -> str:
return "task_service"
class TaskServiceResource(AsyncResource):
async def init(self: Self, settings: Settings) -> TaskService:
return TaskService(settings=settings)
async def shutdown(self: Self, task_service: TaskService) -> None:
await task_service.stop()
|