File size: 8,840 Bytes
25f22bf c7d5529 1d6d1e6 25f22bf 026b511 25f22bf 026b511 25f22bf 026b511 25f22bf 85054d5 25f22bf 3d8beb9 25f22bf 85054d5 25f22bf 85054d5 25f22bf 85054d5 25f22bf 3d8beb9 25f22bf 85054d5 25f22bf 1cf04ef 25f22bf 85054d5 25f22bf 85054d5 25f22bf 85054d5 25f22bf 026b511 25f22bf 85054d5 026b511 25f22bf 85054d5 25f22bf 026b511 25f22bf 85054d5 026b511 25f22bf 85054d5 25f22bf 026b511 25f22bf 3d8beb9 25f22bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
from celery import current_app
from celery.schedules import crontab
from datetime import datetime
import logging
from backend.utils.database import init_supabase
from backend.config import Config
from backend.celery_tasks.scheduler import schedule_content_generation, schedule_post_publishing
from backend.celery_config import celery_app
# Configure logging
logger = logging.getLogger(__name__)
def get_supabase_config():
"""Get Supabase configuration from environment."""
return {
'SUPABASE_URL': Config.SUPABASE_URL,
'SUPABASE_KEY': Config.SUPABASE_KEY
}
def parse_schedule_time(schedule_time):
"""
Parse schedule time string into crontab format.
Args:
schedule_time (str): Schedule time in format "Day HH:MM"
Returns:
dict: Crontab parameters
"""
try:
print(f"[CELERY BEAT] Parsing schedule time: {schedule_time}")
day_name, time_str = schedule_time.split()
hour, minute = map(int, time_str.split(':'))
# Map day names to crontab format
day_map = {
'Monday': 1,
'Tuesday': 2,
'Wednesday': 3,
'Thursday': 4,
'Friday': 5,
'Saturday': 6,
'Sunday': 0
}
day_of_week = day_map.get(day_name, '*')
result = {
'minute': minute,
'hour': hour,
'day_of_week': day_of_week
}
print(f"[CELERY BEAT] Parsed schedule time result: {result}")
return result
except Exception as e:
logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}")
# Default to every minute for error cases
return {
'minute': '*',
'hour': '*',
'day_of_week': '*'
}
@celery_app.task(bind=True)
def load_schedules_task(self):
"""
Celery task to load schedules from the database and create periodic tasks.
This task runs every 5 minutes to check for new or updated schedules.
"""
try:
print(f"[CELERY BEAT] Loading schedules from database at {datetime.now()}...")
logger.info("Loading schedules from database...")
# Get Supabase configuration
supabase_config = get_supabase_config()
print(f"[CELERY BEAT] Supabase config: URL={supabase_config['SUPABASE_URL'][:50]}...")
# Initialize Supabase client
supabase_client = init_supabase(
supabase_config['SUPABASE_URL'],
supabase_config['SUPABASE_KEY']
)
# Fetch all schedules from Supabase
print("[CELERY BEAT] Executing database query...")
response = (
supabase_client
.table("Scheduling")
.select("*, Social_network(id_utilisateur, token, sub)")
.execute()
)
print(f"[CELERY BEAT] Database query response: {type(response)}")
print(f"[CELERY BEAT] Response data: {response.data if response.data else 'None'}")
schedules = response.data if response.data else []
print(f"[CELERY BEAT] Found {len(schedules)} schedules in database")
logger.info(f"Found {len(schedules)} schedules")
# Log details of each schedule for debugging
for i, schedule in enumerate(schedules):
print(f"[CELERY BEAT] Schedule {i}: {schedule}")
schedule_id = schedule.get('id')
schedule_time = schedule.get('schedule_time')
adjusted_time = schedule.get('adjusted_time')
social_network = schedule.get('Social_network', {})
print(f"[CELERY BEAT] Schedule {schedule_id} - schedule_time: {schedule_time}, adjusted_time: {adjusted_time}, social_network: {social_network}")
# Get current beat schedule
current_schedule = celery_app.conf.beat_schedule
# Remove existing scheduled jobs (except the loader job)
# In a production environment, you might want to be more selective about this
loader_job = current_schedule.get('load-schedules', {})
new_schedule = {'load-schedules': loader_job}
# Create jobs for each schedule
for schedule in schedules:
try:
schedule_id = schedule.get('id')
schedule_time = schedule.get('schedule_time')
adjusted_time = schedule.get('adjusted_time')
print(f"[CELERY BEAT] Processing schedule {schedule_id}: schedule_time={schedule_time}, adjusted_time={adjusted_time}")
if not schedule_time or not adjusted_time:
logger.warning(f"Invalid schedule format for schedule {schedule_id}")
print(f"[CELERY BEAT] WARNING: Invalid schedule format for schedule {schedule_id}")
continue
# Parse schedule times
content_gen_time = parse_schedule_time(adjusted_time)
publish_time = parse_schedule_time(schedule_time)
print(f"[CELERY BEAT] Parsed times - Content gen: {content_gen_time}, Publish: {publish_time}")
# Create content generation job (5 minutes before publishing)
gen_job_id = f"gen_{schedule_id}"
task_schedule = crontab(
minute=content_gen_time['minute'],
hour=content_gen_time['hour'],
day_of_week=content_gen_time['day_of_week']
)
print(f"[CELERY BEAT] Creating content task - ID: {gen_job_id}")
print(f"[CELERY BEAT] Content task schedule: minute={content_gen_time['minute']}, hour={content_gen_time['hour']}, day_of_week={content_gen_time['day_of_week']}")
print(f"[CELERY BEAT] Content task args: {(
schedule.get('Social_network', {}).get('id_utilisateur'),
schedule_id,
supabase_config
)}")
new_schedule[gen_job_id] = {
'task': 'backend.celery_tasks.content_tasks.generate_content_task',
'schedule': task_schedule,
'args': (
schedule.get('Social_network', {}).get('id_utilisateur'),
schedule_id,
supabase_config
)
}
logger.info(f"Created content generation job: {gen_job_id}")
print(f"[CELERY BEAT] Created content generation job: {gen_job_id}")
# Create publishing job
pub_job_id = f"pub_{schedule_id}"
task_schedule = crontab(
minute=publish_time['minute'],
hour=publish_time['hour'],
day_of_week=publish_time['day_of_week']
)
print(f"[CELERY BEAT] Creating publish task - ID: {pub_job_id}")
print(f"[CELERY BEAT] Publish task schedule: minute={publish_time['minute']}, hour={publish_time['hour']}, day_of_week={publish_time['day_of_week']}")
print(f"[CELERY BEAT] Publish task args: {(
schedule_id,
supabase_config
)}")
new_schedule[pub_job_id] = {
'task': 'backend.celery_tasks.content_tasks.publish_post_task',
'schedule': task_schedule,
'args': (
schedule_id,
supabase_config
)
}
logger.info(f"Created publishing job: {pub_job_id}")
print(f"[CELERY BEAT] Created publishing job: {pub_job_id}")
except Exception as e:
logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}")
# Update the beat schedule
print(f"[CELERY BEAT] Current schedule keys before update: {list(current_app.conf.beat_schedule.keys())}")
print(f"[CELERY BEAT] New schedule keys: {list(new_schedule.keys())}")
current_app.conf.beat_schedule = new_schedule
print(f"[CELERY BEAT] Successfully updated Celery Beat schedule with {len(new_schedule)} jobs")
logger.info("Updated Celery Beat schedule")
return {
'status': 'success',
'message': f'Loaded {len(schedules)} schedules',
'schedules_count': len(schedules)
}
except Exception as e:
logger.error(f"Error loading schedules: {str(e)}")
return {
'status': 'error',
'message': f'Error loading schedules: {str(e)}'
} |