|
from celery import current_app |
|
from celery.schedules import crontab |
|
from datetime import datetime |
|
import logging |
|
from backend.utils.database import init_supabase |
|
from backend.config import Config |
|
from backend.celery_tasks.scheduler import schedule_content_generation, schedule_post_publishing |
|
from backend.celery_config import celery_app |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
def get_supabase_config(): |
|
"""Get Supabase configuration from environment.""" |
|
return { |
|
'SUPABASE_URL': Config.SUPABASE_URL, |
|
'SUPABASE_KEY': Config.SUPABASE_KEY |
|
} |
|
|
|
def parse_schedule_time(schedule_time): |
|
""" |
|
Parse schedule time string into crontab format. |
|
|
|
Args: |
|
schedule_time (str): Schedule time in format "Day HH:MM" |
|
|
|
Returns: |
|
dict: Crontab parameters |
|
""" |
|
try: |
|
print(f"[CELERY BEAT] Parsing schedule time: {schedule_time}") |
|
day_name, time_str = schedule_time.split() |
|
hour, minute = map(int, time_str.split(':')) |
|
|
|
|
|
day_map = { |
|
'Monday': 1, |
|
'Tuesday': 2, |
|
'Wednesday': 3, |
|
'Thursday': 4, |
|
'Friday': 5, |
|
'Saturday': 6, |
|
'Sunday': 0 |
|
} |
|
|
|
day_of_week = day_map.get(day_name, '*') |
|
result = { |
|
'minute': minute, |
|
'hour': hour, |
|
'day_of_week': day_of_week |
|
} |
|
print(f"[CELERY BEAT] Parsed schedule time result: {result}") |
|
return result |
|
except Exception as e: |
|
logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}") |
|
|
|
return { |
|
'minute': '*', |
|
'hour': '*', |
|
'day_of_week': '*' |
|
} |
|
|
|
@celery_app.task(bind=True) |
|
def load_schedules_task(self): |
|
""" |
|
Celery task to load schedules from the database and create periodic tasks. |
|
This task runs every 5 minutes to check for new or updated schedules. |
|
""" |
|
try: |
|
print(f"[CELERY BEAT] Loading schedules from database at {datetime.now()}...") |
|
logger.info("Loading schedules from database...") |
|
|
|
|
|
supabase_config = get_supabase_config() |
|
print(f"[CELERY BEAT] Supabase config: URL={supabase_config['SUPABASE_URL'][:50]}...") |
|
|
|
|
|
supabase_client = init_supabase( |
|
supabase_config['SUPABASE_URL'], |
|
supabase_config['SUPABASE_KEY'] |
|
) |
|
|
|
|
|
print("[CELERY BEAT] Executing database query...") |
|
response = ( |
|
supabase_client |
|
.table("Scheduling") |
|
.select("*, Social_network(id_utilisateur, token, sub)") |
|
.execute() |
|
) |
|
|
|
print(f"[CELERY BEAT] Database query response: {type(response)}") |
|
print(f"[CELERY BEAT] Response data: {response.data if response.data else 'None'}") |
|
|
|
schedules = response.data if response.data else [] |
|
print(f"[CELERY BEAT] Found {len(schedules)} schedules in database") |
|
logger.info(f"Found {len(schedules)} schedules") |
|
|
|
|
|
for i, schedule in enumerate(schedules): |
|
print(f"[CELERY BEAT] Schedule {i}: {schedule}") |
|
schedule_id = schedule.get('id') |
|
schedule_time = schedule.get('schedule_time') |
|
adjusted_time = schedule.get('adjusted_time') |
|
social_network = schedule.get('Social_network', {}) |
|
print(f"[CELERY BEAT] Schedule {schedule_id} - schedule_time: {schedule_time}, adjusted_time: {adjusted_time}, social_network: {social_network}") |
|
|
|
|
|
current_schedule = celery_app.conf.beat_schedule |
|
|
|
|
|
|
|
loader_job = current_schedule.get('load-schedules', {}) |
|
new_schedule = {'load-schedules': loader_job} |
|
|
|
|
|
for schedule in schedules: |
|
try: |
|
schedule_id = schedule.get('id') |
|
schedule_time = schedule.get('schedule_time') |
|
adjusted_time = schedule.get('adjusted_time') |
|
|
|
print(f"[CELERY BEAT] Processing schedule {schedule_id}: schedule_time={schedule_time}, adjusted_time={adjusted_time}") |
|
|
|
if not schedule_time or not adjusted_time: |
|
logger.warning(f"Invalid schedule format for schedule {schedule_id}") |
|
print(f"[CELERY BEAT] WARNING: Invalid schedule format for schedule {schedule_id}") |
|
continue |
|
|
|
|
|
content_gen_time = parse_schedule_time(adjusted_time) |
|
publish_time = parse_schedule_time(schedule_time) |
|
|
|
print(f"[CELERY BEAT] Parsed times - Content gen: {content_gen_time}, Publish: {publish_time}") |
|
|
|
|
|
gen_job_id = f"gen_{schedule_id}" |
|
task_schedule = crontab( |
|
minute=content_gen_time['minute'], |
|
hour=content_gen_time['hour'], |
|
day_of_week=content_gen_time['day_of_week'] |
|
) |
|
print(f"[CELERY BEAT] Creating content task - ID: {gen_job_id}") |
|
print(f"[CELERY BEAT] Content task schedule: minute={content_gen_time['minute']}, hour={content_gen_time['hour']}, day_of_week={content_gen_time['day_of_week']}") |
|
print(f"[CELERY BEAT] Content task args: {( |
|
schedule.get('Social_network', {}).get('id_utilisateur'), |
|
schedule_id, |
|
supabase_config |
|
)}") |
|
new_schedule[gen_job_id] = { |
|
'task': 'backend.celery_tasks.content_tasks.generate_content_task', |
|
'schedule': task_schedule, |
|
'args': ( |
|
schedule.get('Social_network', {}).get('id_utilisateur'), |
|
schedule_id, |
|
supabase_config |
|
) |
|
} |
|
logger.info(f"Created content generation job: {gen_job_id}") |
|
print(f"[CELERY BEAT] Created content generation job: {gen_job_id}") |
|
|
|
|
|
pub_job_id = f"pub_{schedule_id}" |
|
task_schedule = crontab( |
|
minute=publish_time['minute'], |
|
hour=publish_time['hour'], |
|
day_of_week=publish_time['day_of_week'] |
|
) |
|
print(f"[CELERY BEAT] Creating publish task - ID: {pub_job_id}") |
|
print(f"[CELERY BEAT] Publish task schedule: minute={publish_time['minute']}, hour={publish_time['hour']}, day_of_week={publish_time['day_of_week']}") |
|
print(f"[CELERY BEAT] Publish task args: {( |
|
schedule_id, |
|
supabase_config |
|
)}") |
|
new_schedule[pub_job_id] = { |
|
'task': 'backend.celery_tasks.content_tasks.publish_post_task', |
|
'schedule': task_schedule, |
|
'args': ( |
|
schedule_id, |
|
supabase_config |
|
) |
|
} |
|
logger.info(f"Created publishing job: {pub_job_id}") |
|
print(f"[CELERY BEAT] Created publishing job: {pub_job_id}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}") |
|
|
|
|
|
print(f"[CELERY BEAT] Current schedule keys before update: {list(current_app.conf.beat_schedule.keys())}") |
|
print(f"[CELERY BEAT] New schedule keys: {list(new_schedule.keys())}") |
|
current_app.conf.beat_schedule = new_schedule |
|
print(f"[CELERY BEAT] Successfully updated Celery Beat schedule with {len(new_schedule)} jobs") |
|
logger.info("Updated Celery Beat schedule") |
|
|
|
return { |
|
'status': 'success', |
|
'message': f'Loaded {len(schedules)} schedules', |
|
'schedules_count': len(schedules) |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading schedules: {str(e)}") |
|
return { |
|
'status': 'error', |
|
'message': f'Error loading schedules: {str(e)}' |
|
} |