Lin / backend /celery_tasks /scheduler.py
Zelyanoth's picture
fff
25f22bf
raw
history blame
3.45 kB
from datetime import datetime, timedelta
from celery import chain
import logging
from celery_app import celery
from celery_tasks.content_tasks import generate_content_task, publish_post_task
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def init_celery_scheduler(supabase_client):
"""
Initialize the Celery-based task scheduler.
Args:
supabase_client: Supabase client instance
"""
logger.info("Initializing Celery scheduler")
# In a Celery-based approach, we don't need to initialize a scheduler here
# Tasks will be scheduled through Celery Beat or called directly
def schedule_content_generation(schedule: dict, supabase_client_config: dict):
"""
Schedule content generation task using Celery.
Args:
schedule (dict): Schedule data
supabase_client_config (dict): Supabase client configuration
Returns:
dict: Celery task result
"""
schedule_id = schedule.get('id')
user_id = schedule.get('Social_network', {}).get('id_utilisateur')
if not user_id:
logger.warning(f"No user ID found for schedule {schedule_id}")
return None
logger.info(f"Scheduling content generation for schedule {schedule_id}")
# Schedule the content generation task
task = generate_content_task.delay(user_id, schedule_id, supabase_client_config)
return {
'task_id': task.id,
'status': 'scheduled',
'message': f'Content generation scheduled for schedule {schedule_id}'
}
def schedule_post_publishing(schedule: dict, supabase_client_config: dict):
"""
Schedule post publishing task using Celery.
Args:
schedule (dict): Schedule data
supabase_client_config (dict): Supabase client configuration
Returns:
dict: Celery task result
"""
schedule_id = schedule.get('id')
logger.info(f"Scheduling post publishing for schedule {schedule_id}")
# Schedule the post publishing task
task = publish_post_task.delay(schedule_id, supabase_client_config)
return {
'task_id': task.id,
'status': 'scheduled',
'message': f'Post publishing scheduled for schedule {schedule_id}'
}
def schedule_content_and_publish(schedule: dict, supabase_client_config: dict):
"""
Schedule both content generation and post publishing as a chain.
Args:
schedule (dict): Schedule data
supabase_client_config (dict): Supabase client configuration
Returns:
dict: Celery task result
"""
schedule_id = schedule.get('id')
user_id = schedule.get('Social_network', {}).get('id_utilisateur')
if not user_id:
logger.warning(f"No user ID found for schedule {schedule_id}")
return None
logger.info(f"Scheduling content generation and publishing chain for schedule {schedule_id}")
# Create a chain of tasks: generate content first, then publish
task_chain = chain(
generate_content_task.s(user_id, schedule_id, supabase_client_config),
publish_post_task.s(supabase_client_config)
)
# Apply the chain asynchronously
result = task_chain.apply_async()
return {
'task_id': result.id,
'status': 'scheduled',
'message': f'Content generation and publishing chain scheduled for schedule {schedule_id}'
}