Lin / backend /celery_tasks /schedule_loader.py
Zelyanoth's picture
l
026b511
raw
history blame
8.84 kB
from celery import current_app
from celery.schedules import crontab
from datetime import datetime
import logging
from backend.utils.database import init_supabase
from backend.config import Config
from backend.celery_tasks.scheduler import schedule_content_generation, schedule_post_publishing
from backend.celery_config import celery_app
# Configure logging
logger = logging.getLogger(__name__)
def get_supabase_config():
"""Get Supabase configuration from environment."""
return {
'SUPABASE_URL': Config.SUPABASE_URL,
'SUPABASE_KEY': Config.SUPABASE_KEY
}
def parse_schedule_time(schedule_time):
"""
Parse schedule time string into crontab format.
Args:
schedule_time (str): Schedule time in format "Day HH:MM"
Returns:
dict: Crontab parameters
"""
try:
print(f"[CELERY BEAT] Parsing schedule time: {schedule_time}")
day_name, time_str = schedule_time.split()
hour, minute = map(int, time_str.split(':'))
# Map day names to crontab format
day_map = {
'Monday': 1,
'Tuesday': 2,
'Wednesday': 3,
'Thursday': 4,
'Friday': 5,
'Saturday': 6,
'Sunday': 0
}
day_of_week = day_map.get(day_name, '*')
result = {
'minute': minute,
'hour': hour,
'day_of_week': day_of_week
}
print(f"[CELERY BEAT] Parsed schedule time result: {result}")
return result
except Exception as e:
logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}")
# Default to every minute for error cases
return {
'minute': '*',
'hour': '*',
'day_of_week': '*'
}
@celery_app.task(bind=True)
def load_schedules_task(self):
"""
Celery task to load schedules from the database and create periodic tasks.
This task runs every 5 minutes to check for new or updated schedules.
"""
try:
print(f"[CELERY BEAT] Loading schedules from database at {datetime.now()}...")
logger.info("Loading schedules from database...")
# Get Supabase configuration
supabase_config = get_supabase_config()
print(f"[CELERY BEAT] Supabase config: URL={supabase_config['SUPABASE_URL'][:50]}...")
# Initialize Supabase client
supabase_client = init_supabase(
supabase_config['SUPABASE_URL'],
supabase_config['SUPABASE_KEY']
)
# Fetch all schedules from Supabase
print("[CELERY BEAT] Executing database query...")
response = (
supabase_client
.table("Scheduling")
.select("*, Social_network(id_utilisateur, token, sub)")
.execute()
)
print(f"[CELERY BEAT] Database query response: {type(response)}")
print(f"[CELERY BEAT] Response data: {response.data if response.data else 'None'}")
schedules = response.data if response.data else []
print(f"[CELERY BEAT] Found {len(schedules)} schedules in database")
logger.info(f"Found {len(schedules)} schedules")
# Log details of each schedule for debugging
for i, schedule in enumerate(schedules):
print(f"[CELERY BEAT] Schedule {i}: {schedule}")
schedule_id = schedule.get('id')
schedule_time = schedule.get('schedule_time')
adjusted_time = schedule.get('adjusted_time')
social_network = schedule.get('Social_network', {})
print(f"[CELERY BEAT] Schedule {schedule_id} - schedule_time: {schedule_time}, adjusted_time: {adjusted_time}, social_network: {social_network}")
# Get current beat schedule
current_schedule = celery_app.conf.beat_schedule
# Remove existing scheduled jobs (except the loader job)
# In a production environment, you might want to be more selective about this
loader_job = current_schedule.get('load-schedules', {})
new_schedule = {'load-schedules': loader_job}
# Create jobs for each schedule
for schedule in schedules:
try:
schedule_id = schedule.get('id')
schedule_time = schedule.get('schedule_time')
adjusted_time = schedule.get('adjusted_time')
print(f"[CELERY BEAT] Processing schedule {schedule_id}: schedule_time={schedule_time}, adjusted_time={adjusted_time}")
if not schedule_time or not adjusted_time:
logger.warning(f"Invalid schedule format for schedule {schedule_id}")
print(f"[CELERY BEAT] WARNING: Invalid schedule format for schedule {schedule_id}")
continue
# Parse schedule times
content_gen_time = parse_schedule_time(adjusted_time)
publish_time = parse_schedule_time(schedule_time)
print(f"[CELERY BEAT] Parsed times - Content gen: {content_gen_time}, Publish: {publish_time}")
# Create content generation job (5 minutes before publishing)
gen_job_id = f"gen_{schedule_id}"
task_schedule = crontab(
minute=content_gen_time['minute'],
hour=content_gen_time['hour'],
day_of_week=content_gen_time['day_of_week']
)
print(f"[CELERY BEAT] Creating content task - ID: {gen_job_id}")
print(f"[CELERY BEAT] Content task schedule: minute={content_gen_time['minute']}, hour={content_gen_time['hour']}, day_of_week={content_gen_time['day_of_week']}")
print(f"[CELERY BEAT] Content task args: {(
schedule.get('Social_network', {}).get('id_utilisateur'),
schedule_id,
supabase_config
)}")
new_schedule[gen_job_id] = {
'task': 'backend.celery_tasks.content_tasks.generate_content_task',
'schedule': task_schedule,
'args': (
schedule.get('Social_network', {}).get('id_utilisateur'),
schedule_id,
supabase_config
)
}
logger.info(f"Created content generation job: {gen_job_id}")
print(f"[CELERY BEAT] Created content generation job: {gen_job_id}")
# Create publishing job
pub_job_id = f"pub_{schedule_id}"
task_schedule = crontab(
minute=publish_time['minute'],
hour=publish_time['hour'],
day_of_week=publish_time['day_of_week']
)
print(f"[CELERY BEAT] Creating publish task - ID: {pub_job_id}")
print(f"[CELERY BEAT] Publish task schedule: minute={publish_time['minute']}, hour={publish_time['hour']}, day_of_week={publish_time['day_of_week']}")
print(f"[CELERY BEAT] Publish task args: {(
schedule_id,
supabase_config
)}")
new_schedule[pub_job_id] = {
'task': 'backend.celery_tasks.content_tasks.publish_post_task',
'schedule': task_schedule,
'args': (
schedule_id,
supabase_config
)
}
logger.info(f"Created publishing job: {pub_job_id}")
print(f"[CELERY BEAT] Created publishing job: {pub_job_id}")
except Exception as e:
logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}")
# Update the beat schedule
print(f"[CELERY BEAT] Current schedule keys before update: {list(current_app.conf.beat_schedule.keys())}")
print(f"[CELERY BEAT] New schedule keys: {list(new_schedule.keys())}")
current_app.conf.beat_schedule = new_schedule
print(f"[CELERY BEAT] Successfully updated Celery Beat schedule with {len(new_schedule)} jobs")
logger.info("Updated Celery Beat schedule")
return {
'status': 'success',
'message': f'Loaded {len(schedules)} schedules',
'schedules_count': len(schedules)
}
except Exception as e:
logger.error(f"Error loading schedules: {str(e)}")
return {
'status': 'error',
'message': f'Error loading schedules: {str(e)}'
}