File size: 8,896 Bytes
25f22bf
 
 
 
c7d5529
 
 
1d6d1e6
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
026b511
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
026b511
25f22bf
 
 
 
026b511
 
25f22bf
 
 
 
 
 
 
 
 
85054d5
 
25f22bf
 
 
 
 
3d8beb9
25f22bf
 
 
 
85054d5
25f22bf
 
 
 
 
 
 
 
85054d5
25f22bf
 
 
 
 
 
 
85054d5
 
 
25f22bf
3d8beb9
25f22bf
 
85054d5
 
 
 
 
 
 
 
 
25f22bf
1cf04ef
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
85054d5
 
25f22bf
 
85054d5
25f22bf
 
 
 
 
 
85054d5
 
25f22bf
 
026b511
 
 
 
 
 
 
78b0b47
026b511
 
 
78b0b47
 
25f22bf
85054d5
026b511
25f22bf
 
 
 
 
 
 
85054d5
25f22bf
 
 
026b511
 
 
 
 
 
 
78b0b47
026b511
 
78b0b47
 
25f22bf
85054d5
026b511
25f22bf
 
 
 
 
 
85054d5
25f22bf
 
 
 
 
026b511
 
25f22bf
3d8beb9
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from celery import current_app
from celery.schedules import crontab
from datetime import datetime
import logging
from backend.utils.database import init_supabase
from backend.config import Config
from backend.celery_tasks.scheduler import schedule_content_generation, schedule_post_publishing
from backend.celery_config import celery_app

# Configure logging
logger = logging.getLogger(__name__)

def get_supabase_config():
    """Get Supabase configuration from environment."""
    return {
        'SUPABASE_URL': Config.SUPABASE_URL,
        'SUPABASE_KEY': Config.SUPABASE_KEY
    }

def parse_schedule_time(schedule_time):
    """
    Parse schedule time string into crontab format.
    
    Args:
        schedule_time (str): Schedule time in format "Day HH:MM"
        
    Returns:
        dict: Crontab parameters
    """
    try:
        print(f"[CELERY BEAT] Parsing schedule time: {schedule_time}")
        day_name, time_str = schedule_time.split()
        hour, minute = map(int, time_str.split(':'))
        
        # Map day names to crontab format
        day_map = {
            'Monday': 1,
            'Tuesday': 2,
            'Wednesday': 3,
            'Thursday': 4,
            'Friday': 5,
            'Saturday': 6,
            'Sunday': 0
        }
        
        day_of_week = day_map.get(day_name, '*')
        result = {
            'minute': minute,
            'hour': hour,
            'day_of_week': day_of_week
        }
        print(f"[CELERY BEAT] Parsed schedule time result: {result}")
        return result
    except Exception as e:
        logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}")
        # Default to every minute for error cases
        return {
            'minute': '*',
            'hour': '*',
            'day_of_week': '*'
        }

@celery_app.task(bind=True)
def load_schedules_task(self):
    """
    Celery task to load schedules from the database and create periodic tasks.
    This task runs every 5 minutes to check for new or updated schedules.
    """
    try:
        print(f"[CELERY BEAT] Loading schedules from database at {datetime.now()}...")
        logger.info("Loading schedules from database...")
        
        # Get Supabase configuration
        supabase_config = get_supabase_config()
        print(f"[CELERY BEAT] Supabase config: URL={supabase_config['SUPABASE_URL'][:50]}...")
        
        # Initialize Supabase client
        supabase_client = init_supabase(
            supabase_config['SUPABASE_URL'],
            supabase_config['SUPABASE_KEY']
        )
        
        # Fetch all schedules from Supabase
        print("[CELERY BEAT] Executing database query...")
        response = (
            supabase_client
            .table("Scheduling")
            .select("*, Social_network(id_utilisateur, token, sub)")
            .execute()
        )
        
        print(f"[CELERY BEAT] Database query response: {type(response)}")
        print(f"[CELERY BEAT] Response data: {response.data if response.data else 'None'}")
        
        schedules = response.data if response.data else []
        print(f"[CELERY BEAT] Found {len(schedules)} schedules in database")
        logger.info(f"Found {len(schedules)} schedules")
        
        # Log details of each schedule for debugging
        for i, schedule in enumerate(schedules):
            print(f"[CELERY BEAT] Schedule {i}: {schedule}")
            schedule_id = schedule.get('id')
            schedule_time = schedule.get('schedule_time')
            adjusted_time = schedule.get('adjusted_time')
            social_network = schedule.get('Social_network', {})
            print(f"[CELERY BEAT] Schedule {schedule_id} - schedule_time: {schedule_time}, adjusted_time: {adjusted_time}, social_network: {social_network}")
        
        # Get current beat schedule
        current_schedule = celery_app.conf.beat_schedule
        
        # Remove existing scheduled jobs (except the loader job)
        # In a production environment, you might want to be more selective about this
        loader_job = current_schedule.get('load-schedules', {})
        new_schedule = {'load-schedules': loader_job}
        
        # Create jobs for each schedule
        for schedule in schedules:
            try:
                schedule_id = schedule.get('id')
                schedule_time = schedule.get('schedule_time')
                adjusted_time = schedule.get('adjusted_time')
                
                print(f"[CELERY BEAT] Processing schedule {schedule_id}: schedule_time={schedule_time}, adjusted_time={adjusted_time}")
                
                if not schedule_time or not adjusted_time:
                    logger.warning(f"Invalid schedule format for schedule {schedule_id}")
                    print(f"[CELERY BEAT] WARNING: Invalid schedule format for schedule {schedule_id}")
                    continue
                
                # Parse schedule times
                content_gen_time = parse_schedule_time(adjusted_time)
                publish_time = parse_schedule_time(schedule_time)
                
                print(f"[CELERY BEAT] Parsed times - Content gen: {content_gen_time}, Publish: {publish_time}")
                
                # Create content generation job (5 minutes before publishing)
                gen_job_id = f"gen_{schedule_id}"
                task_schedule = crontab(
                    minute=content_gen_time['minute'],
                    hour=content_gen_time['hour'],
                    day_of_week=content_gen_time['day_of_week']
                )
                print(f"[CELERY BEAT] Creating content task - ID: {gen_job_id}")
                print(f"[CELERY BEAT] Content task schedule: minute={content_gen_time['minute']}, hour={content_gen_time['hour']}, day_of_week={content_gen_time['day_of_week']}")
                args = (
                    schedule.get('Social_network', {}).get('id_utilisateur'),
                    schedule_id,
                    supabase_config
                )
                print(f"[CELERY BEAT] Content task args: {args}")
                new_schedule[gen_job_id] = {
                    'task': 'backend.celery_tasks.content_tasks.generate_content_task',
                    'schedule': task_schedule,
                    'args': (
                        schedule.get('Social_network', {}).get('id_utilisateur'),
                        schedule_id,
                        supabase_config
                    )
                }
                logger.info(f"Created content generation job: {gen_job_id}")
                print(f"[CELERY BEAT] Created content generation job: {gen_job_id}")
                
                # Create publishing job
                pub_job_id = f"pub_{schedule_id}"
                task_schedule = crontab(
                    minute=publish_time['minute'],
                    hour=publish_time['hour'],
                    day_of_week=publish_time['day_of_week']
                )
                print(f"[CELERY BEAT] Creating publish task - ID: {pub_job_id}")
                print(f"[CELERY BEAT] Publish task schedule: minute={publish_time['minute']}, hour={publish_time['hour']}, day_of_week={publish_time['day_of_week']}")
                args = (
                    schedule_id,
                    supabase_config
                )
                print(f"[CELERY BEAT] Publish task args: {args}")
                new_schedule[pub_job_id] = {
                    'task': 'backend.celery_tasks.content_tasks.publish_post_task',
                    'schedule': task_schedule,
                    'args': (
                        schedule_id,
                        supabase_config
                    )
                }
                logger.info(f"Created publishing job: {pub_job_id}")
                print(f"[CELERY BEAT] Created publishing job: {pub_job_id}")
                
            except Exception as e:
                logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}")
        
        # Update the beat schedule
        print(f"[CELERY BEAT] Current schedule keys before update: {list(current_app.conf.beat_schedule.keys())}")
        print(f"[CELERY BEAT] New schedule keys: {list(new_schedule.keys())}")
        current_app.conf.beat_schedule = new_schedule
        print(f"[CELERY BEAT] Successfully updated Celery Beat schedule with {len(new_schedule)} jobs")
        logger.info("Updated Celery Beat schedule")
        
        return {
            'status': 'success',
            'message': f'Loaded {len(schedules)} schedules',
            'schedules_count': len(schedules)
        }
        
    except Exception as e:
        logger.error(f"Error loading schedules: {str(e)}")
        return {
            'status': 'error',
            'message': f'Error loading schedules: {str(e)}'
        }