File size: 5,828 Bytes
25f22bf c7d5529 25f22bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
from celery import current_app
from celery.schedules import crontab
from datetime import datetime
import logging
from backend.utils.database import init_supabase
# Use relative import for the Config class to work with Hugging Face Spaces
from backend.config import Config
from backend.celery_tasks.scheduler import schedule_content_generation, schedule_post_publishing
# Configure logging
logger = logging.getLogger(__name__)
def get_supabase_config():
"""Get Supabase configuration from environment."""
return {
'SUPABASE_URL': Config.SUPABASE_URL,
'SUPABASE_KEY': Config.SUPABASE_KEY
}
def parse_schedule_time(schedule_time):
"""
Parse schedule time string into crontab format.
Args:
schedule_time (str): Schedule time in format "Day HH:MM"
Returns:
dict: Crontab parameters
"""
try:
day_name, time_str = schedule_time.split()
hour, minute = map(int, time_str.split(':'))
# Map day names to crontab format
day_map = {
'Monday': 1,
'Tuesday': 2,
'Wednesday': 3,
'Thursday': 4,
'Friday': 5,
'Saturday': 6,
'Sunday': 0
}
day_of_week = day_map.get(day_name, '*')
return {
'minute': minute,
'hour': hour,
'day_of_week': day_of_week
}
except Exception as e:
logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}")
# Default to every minute for error cases
return {
'minute': '*',
'hour': '*',
'day_of_week': '*'
}
def load_schedules_task():
"""
Celery task to load schedules from the database and create periodic tasks.
This task runs every 5 minutes to check for new or updated schedules.
"""
try:
logger.info("Loading schedules from database...")
# Get Supabase configuration
supabase_config = get_supabase_config()
# Initialize Supabase client
supabase_client = init_supabase(
supabase_config['SUPABASE_URL'],
supabase_config['SUPABASE_KEY']
)
# Fetch all schedules from Supabase
response = (
supabase_client
.table("Scheduling")
.select("*, Social_network(id_utilisateur, token, sub)")
.execute()
)
schedules = response.data if response.data else []
logger.info(f"Found {len(schedules)} schedules")
# Get current beat schedule
current_schedule = current_app.conf.beat_schedule
# Remove existing scheduled jobs (except the loader job)
# In a production environment, you might want to be more selective about this
loader_job = current_schedule.get('load-schedules', {})
new_schedule = {'load-schedules': loader_job}
# Create jobs for each schedule
for schedule in schedules:
try:
schedule_id = schedule.get('id')
schedule_time = schedule.get('schedule_time')
adjusted_time = schedule.get('adjusted_time')
if not schedule_time or not adjusted_time:
logger.warning(f"Invalid schedule format for schedule {schedule_id}")
continue
# Parse schedule times
content_gen_time = parse_schedule_time(adjusted_time)
publish_time = parse_schedule_time(schedule_time)
# Create content generation job (5 minutes before publishing)
gen_job_id = f"gen_{schedule_id}"
new_schedule[gen_job_id] = {
'task': 'celery_tasks.content_tasks.generate_content_task',
'schedule': crontab(
minute=content_gen_time['minute'],
hour=content_gen_time['hour'],
day_of_week=content_gen_time['day_of_week']
),
'args': (
schedule.get('Social_network', {}).get('id_utilisateur'),
schedule_id,
supabase_config
)
}
logger.info(f"Created content generation job: {gen_job_id}")
# Create publishing job
pub_job_id = f"pub_{schedule_id}"
new_schedule[pub_job_id] = {
'task': 'celery_tasks.content_tasks.publish_post_task',
'schedule': crontab(
minute=publish_time['minute'],
hour=publish_time['hour'],
day_of_week=publish_time['day_of_week']
),
'args': (
schedule_id,
supabase_config
)
}
logger.info(f"Created publishing job: {pub_job_id}")
except Exception as e:
logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}")
# Update the beat schedule
current_app.conf.beat_schedule = new_schedule
logger.info("Updated Celery Beat schedule")
return {
'status': 'success',
'message': f'Loaded {len(schedules)} schedules',
'schedules_count': len(schedules)
}
except Exception as e:
logger.error(f"Error loading schedules: {str(e)}")
return {
'status': 'error',
'message': f'Error loading schedules: {str(e)}'
} |