File size: 3,452 Bytes
25f22bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
from datetime import datetime, timedelta
from celery import chain
import logging
from celery_app import celery
from celery_tasks.content_tasks import generate_content_task, publish_post_task
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def init_celery_scheduler(supabase_client):
"""
Initialize the Celery-based task scheduler.
Args:
supabase_client: Supabase client instance
"""
logger.info("Initializing Celery scheduler")
# In a Celery-based approach, we don't need to initialize a scheduler here
# Tasks will be scheduled through Celery Beat or called directly
def schedule_content_generation(schedule: dict, supabase_client_config: dict):
"""
Schedule content generation task using Celery.
Args:
schedule (dict): Schedule data
supabase_client_config (dict): Supabase client configuration
Returns:
dict: Celery task result
"""
schedule_id = schedule.get('id')
user_id = schedule.get('Social_network', {}).get('id_utilisateur')
if not user_id:
logger.warning(f"No user ID found for schedule {schedule_id}")
return None
logger.info(f"Scheduling content generation for schedule {schedule_id}")
# Schedule the content generation task
task = generate_content_task.delay(user_id, schedule_id, supabase_client_config)
return {
'task_id': task.id,
'status': 'scheduled',
'message': f'Content generation scheduled for schedule {schedule_id}'
}
def schedule_post_publishing(schedule: dict, supabase_client_config: dict):
"""
Schedule post publishing task using Celery.
Args:
schedule (dict): Schedule data
supabase_client_config (dict): Supabase client configuration
Returns:
dict: Celery task result
"""
schedule_id = schedule.get('id')
logger.info(f"Scheduling post publishing for schedule {schedule_id}")
# Schedule the post publishing task
task = publish_post_task.delay(schedule_id, supabase_client_config)
return {
'task_id': task.id,
'status': 'scheduled',
'message': f'Post publishing scheduled for schedule {schedule_id}'
}
def schedule_content_and_publish(schedule: dict, supabase_client_config: dict):
"""
Schedule both content generation and post publishing as a chain.
Args:
schedule (dict): Schedule data
supabase_client_config (dict): Supabase client configuration
Returns:
dict: Celery task result
"""
schedule_id = schedule.get('id')
user_id = schedule.get('Social_network', {}).get('id_utilisateur')
if not user_id:
logger.warning(f"No user ID found for schedule {schedule_id}")
return None
logger.info(f"Scheduling content generation and publishing chain for schedule {schedule_id}")
# Create a chain of tasks: generate content first, then publish
task_chain = chain(
generate_content_task.s(user_id, schedule_id, supabase_client_config),
publish_post_task.s(supabase_client_config)
)
# Apply the chain asynchronously
result = task_chain.apply_async()
return {
'task_id': result.id,
'status': 'scheduled',
'message': f'Content generation and publishing chain scheduled for schedule {schedule_id}'
} |