File size: 6,064 Bytes
25f22bf
 
 
 
c7d5529
 
 
1d6d1e6
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d8beb9
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d8beb9
25f22bf
 
 
1cf04ef
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d8beb9
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from celery import current_app
from celery.schedules import crontab
from datetime import datetime
import logging
from backend.utils.database import init_supabase
from backend.config import Config
from backend.celery_tasks.scheduler import schedule_content_generation, schedule_post_publishing
from backend.celery_config import celery_app

# Configure logging
logger = logging.getLogger(__name__)

def get_supabase_config():
    """Get Supabase configuration from environment."""
    return {
        'SUPABASE_URL': Config.SUPABASE_URL,
        'SUPABASE_KEY': Config.SUPABASE_KEY
    }

def parse_schedule_time(schedule_time):
    """
    Parse schedule time string into crontab format.
    
    Args:
        schedule_time (str): Schedule time in format "Day HH:MM"
        
    Returns:
        dict: Crontab parameters
    """
    try:
        day_name, time_str = schedule_time.split()
        hour, minute = map(int, time_str.split(':'))
        
        # Map day names to crontab format
        day_map = {
            'Monday': 1,
            'Tuesday': 2,
            'Wednesday': 3,
            'Thursday': 4,
            'Friday': 5,
            'Saturday': 6,
            'Sunday': 0
        }
        
        day_of_week = day_map.get(day_name, '*')
        
        return {
            'minute': minute,
            'hour': hour,
            'day_of_week': day_of_week
        }
    except Exception as e:
        logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}")
        # Default to every minute for error cases
        return {
            'minute': '*',
            'hour': '*',
            'day_of_week': '*'
        }

def load_schedules_task():
    """
    Celery task to load schedules from the database and create periodic tasks.
    This task runs every 5 minutes to check for new or updated schedules.
    """
    try:
        print(f"[CELERY BEAT] Loading schedules from database at {datetime.now()}...")
        logger.info("Loading schedules from database...")
        
        # Get Supabase configuration
        supabase_config = get_supabase_config()
        
        # Initialize Supabase client
        supabase_client = init_supabase(
            supabase_config['SUPABASE_URL'],
            supabase_config['SUPABASE_KEY']
        )
        
        # Fetch all schedules from Supabase
        response = (
            supabase_client
            .table("Scheduling")
            .select("*, Social_network(id_utilisateur, token, sub)")
            .execute()
        )
        
        schedules = response.data if response.data else []
        print(f"[CELERY BEAT] Found {len(schedules)} schedules in database")
        logger.info(f"Found {len(schedules)} schedules")
        
        # Get current beat schedule
        current_schedule = celery_app.conf.beat_schedule
        
        # Remove existing scheduled jobs (except the loader job)
        # In a production environment, you might want to be more selective about this
        loader_job = current_schedule.get('load-schedules', {})
        new_schedule = {'load-schedules': loader_job}
        
        # Create jobs for each schedule
        for schedule in schedules:
            try:
                schedule_id = schedule.get('id')
                schedule_time = schedule.get('schedule_time')
                adjusted_time = schedule.get('adjusted_time')
                
                if not schedule_time or not adjusted_time:
                    logger.warning(f"Invalid schedule format for schedule {schedule_id}")
                    continue
                
                # Parse schedule times
                content_gen_time = parse_schedule_time(adjusted_time)
                publish_time = parse_schedule_time(schedule_time)
                
                # Create content generation job (5 minutes before publishing)
                gen_job_id = f"gen_{schedule_id}"
                new_schedule[gen_job_id] = {
                    'task': 'celery_tasks.content_tasks.generate_content_task',
                    'schedule': crontab(
                        minute=content_gen_time['minute'],
                        hour=content_gen_time['hour'],
                        day_of_week=content_gen_time['day_of_week']
                    ),
                    'args': (
                        schedule.get('Social_network', {}).get('id_utilisateur'),
                        schedule_id,
                        supabase_config
                    )
                }
                logger.info(f"Created content generation job: {gen_job_id}")
                
                # Create publishing job
                pub_job_id = f"pub_{schedule_id}"
                new_schedule[pub_job_id] = {
                    'task': 'celery_tasks.content_tasks.publish_post_task',
                    'schedule': crontab(
                        minute=publish_time['minute'],
                        hour=publish_time['hour'],
                        day_of_week=publish_time['day_of_week']
                    ),
                    'args': (
                        schedule_id,
                        supabase_config
                    )
                }
                logger.info(f"Created publishing job: {pub_job_id}")
                
            except Exception as e:
                logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}")
        
        # Update the beat schedule
        current_app.conf.beat_schedule = new_schedule
        print(f"[CELERY BEAT] Successfully updated Celery Beat schedule with {len(new_schedule)} jobs")
        logger.info("Updated Celery Beat schedule")
        
        return {
            'status': 'success',
            'message': f'Loaded {len(schedules)} schedules',
            'schedules_count': len(schedules)
        }
        
    except Exception as e:
        logger.error(f"Error loading schedules: {str(e)}")
        return {
            'status': 'error',
            'message': f'Error loading schedules: {str(e)}'
        }