from celery import current_app from celery.schedules import crontab from datetime import datetime import logging from backend.utils.database import init_supabase from backend.config import Config from backend.celery_tasks.scheduler import schedule_content_generation, schedule_post_publishing from backend.celery_config import celery_app # Configure logging logger = logging.getLogger(__name__) def get_supabase_config(): """Get Supabase configuration from environment.""" return { 'SUPABASE_URL': Config.SUPABASE_URL, 'SUPABASE_KEY': Config.SUPABASE_KEY } def parse_schedule_time(schedule_time): """ Parse schedule time string into crontab format. Args: schedule_time (str): Schedule time in format "Day HH:MM" Returns: dict: Crontab parameters """ try: day_name, time_str = schedule_time.split() hour, minute = map(int, time_str.split(':')) # Map day names to crontab format day_map = { 'Monday': 1, 'Tuesday': 2, 'Wednesday': 3, 'Thursday': 4, 'Friday': 5, 'Saturday': 6, 'Sunday': 0 } day_of_week = day_map.get(day_name, '*') return { 'minute': minute, 'hour': hour, 'day_of_week': day_of_week } except Exception as e: logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}") # Default to every minute for error cases return { 'minute': '*', 'hour': '*', 'day_of_week': '*' } def load_schedules_task(): """ Celery task to load schedules from the database and create periodic tasks. This task runs every 5 minutes to check for new or updated schedules. """ try: print(f"[CELERY BEAT] Loading schedules from database at {datetime.now()}...") logger.info("Loading schedules from database...") # Get Supabase configuration supabase_config = get_supabase_config() # Initialize Supabase client supabase_client = init_supabase( supabase_config['SUPABASE_URL'], supabase_config['SUPABASE_KEY'] ) # Fetch all schedules from Supabase response = ( supabase_client .table("Scheduling") .select("*, Social_network(id_utilisateur, token, sub)") .execute() ) schedules = response.data if response.data else [] print(f"[CELERY BEAT] Found {len(schedules)} schedules in database") logger.info(f"Found {len(schedules)} schedules") # Get current beat schedule current_schedule = celery_app.conf.beat_schedule # Remove existing scheduled jobs (except the loader job) # In a production environment, you might want to be more selective about this loader_job = current_schedule.get('load-schedules', {}) new_schedule = {'load-schedules': loader_job} # Create jobs for each schedule for schedule in schedules: try: schedule_id = schedule.get('id') schedule_time = schedule.get('schedule_time') adjusted_time = schedule.get('adjusted_time') if not schedule_time or not adjusted_time: logger.warning(f"Invalid schedule format for schedule {schedule_id}") continue # Parse schedule times content_gen_time = parse_schedule_time(adjusted_time) publish_time = parse_schedule_time(schedule_time) # Create content generation job (5 minutes before publishing) gen_job_id = f"gen_{schedule_id}" new_schedule[gen_job_id] = { 'task': 'celery_tasks.content_tasks.generate_content_task', 'schedule': crontab( minute=content_gen_time['minute'], hour=content_gen_time['hour'], day_of_week=content_gen_time['day_of_week'] ), 'args': ( schedule.get('Social_network', {}).get('id_utilisateur'), schedule_id, supabase_config ) } logger.info(f"Created content generation job: {gen_job_id}") # Create publishing job pub_job_id = f"pub_{schedule_id}" new_schedule[pub_job_id] = { 'task': 'celery_tasks.content_tasks.publish_post_task', 'schedule': crontab( minute=publish_time['minute'], hour=publish_time['hour'], day_of_week=publish_time['day_of_week'] ), 'args': ( schedule_id, supabase_config ) } logger.info(f"Created publishing job: {pub_job_id}") except Exception as e: logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}") # Update the beat schedule current_app.conf.beat_schedule = new_schedule print(f"[CELERY BEAT] Successfully updated Celery Beat schedule with {len(new_schedule)} jobs") logger.info("Updated Celery Beat schedule") return { 'status': 'success', 'message': f'Loaded {len(schedules)} schedules', 'schedules_count': len(schedules) } except Exception as e: logger.error(f"Error loading schedules: {str(e)}") return { 'status': 'error', 'message': f'Error loading schedules: {str(e)}' }