import logging from datetime import datetime, timedelta from celery import current_app from celery.schedules import crontab from backend.services.content_service import ContentService from backend.services.linkedin_service import LinkedInService # Use relative import for the Config class to work with Hugging Face Spaces from backend.config import Config # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def get_supabase_config(): """Get Supabase configuration from environment.""" return { 'SUPABASE_URL': Config.SUPABASE_URL, 'SUPABASE_KEY': Config.SUPABASE_KEY } def init_scheduler(supabase_client): """ Initialize the Celery-based task scheduler. Args: supabase_client: Supabase client instance """ logger.info("Initializing Celery scheduler") # In a Celery-based approach, we don't need to initialize a scheduler here # Tasks will be scheduled through Celery Beat or called directly def parse_schedule_time(schedule_time): """ Parse schedule time string into crontab format. Args: schedule_time (str): Schedule time in format "Day HH:MM" Returns: dict: Crontab parameters """ try: day_name, time_str = schedule_time.split() hour, minute = map(int, time_str.split(':')) # Map day names to crontab format day_map = { 'Monday': 1, 'Tuesday': 2, 'Wednesday': 3, 'Thursday': 4, 'Friday': 5, 'Saturday': 6, 'Sunday': 0 } day_of_week = day_map.get(day_name, '*') return { 'minute': minute, 'hour': hour, 'day_of_week': day_of_week } except Exception as e: logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}") # Default to every minute for error cases return { 'minute': '*', 'hour': '*', 'day_of_week': '*' } def load_schedules(supabase_client): """ Load schedules from the database and create periodic tasks. This function is called by the Celery Beat scheduler. Args: supabase_client: Supabase client instance """ try: logger.info("Loading schedules from database...") # Get Supabase configuration supabase_config = get_supabase_config() # Fetch all schedules from Supabase response = ( supabase_client .table("Scheduling") .select("*, Social_network(id_utilisateur, token, sub)") .execute() ) schedules = response.data if response.data else [] logger.info(f"Found {len(schedules)} schedules") # Get current beat schedule current_schedule = current_app.conf.beat_schedule # Remove existing scheduled jobs (except the loader job) # In a production environment, you might want to be more selective about this loader_job = current_schedule.get('load-schedules', {}) new_schedule = {'load-schedules': loader_job} # Create jobs for each schedule for schedule in schedules: try: schedule_id = schedule.get('id') schedule_time = schedule.get('schedule_time') adjusted_time = schedule.get('adjusted_time') if not schedule_time or not adjusted_time: logger.warning(f"Invalid schedule format for schedule {schedule_id}") continue # Parse schedule times content_gen_time = parse_schedule_time(adjusted_time) publish_time = parse_schedule_time(schedule_time) # Create content generation job (5 minutes before publishing) gen_job_id = f"gen_{schedule_id}" new_schedule[gen_job_id] = { 'task': 'celery_tasks.content_tasks.generate_content_task', 'schedule': crontab( minute=content_gen_time['minute'], hour=content_gen_time['hour'], day_of_week=content_gen_time['day_of_week'] ), 'args': ( schedule.get('Social_network', {}).get('id_utilisateur'), schedule_id, supabase_config ) } logger.info(f"Created content generation job: {gen_job_id}") # Create publishing job pub_job_id = f"pub_{schedule_id}" new_schedule[pub_job_id] = { 'task': 'celery_tasks.content_tasks.publish_post_task', 'schedule': crontab( minute=publish_time['minute'], hour=publish_time['hour'], day_of_week=publish_time['day_of_week'] ), 'args': ( schedule_id, supabase_config ) } logger.info(f"Created publishing job: {pub_job_id}") except Exception as e: logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}") # Update the beat schedule current_app.conf.beat_schedule = new_schedule logger.info("Updated Celery Beat schedule") except Exception as e: logger.error(f"Error loading schedules: {str(e)}") def generate_content_job(schedule: dict, supabase_client): """ Job to generate content for a scheduled post. This function is kept for backward compatibility but should be replaced with Celery tasks. Args: schedule (dict): Schedule data supabase_client: Supabase client instance """ try: schedule_id = schedule.get('id') user_id = schedule.get('Social_network', {}).get('id_utilisateur') if not user_id: logger.warning(f"No user ID found for schedule {schedule_id}") return logger.info(f"Generating content for schedule {schedule_id}") # Generate content using content service content_service = ContentService() generated_content = content_service.generate_post_content(user_id) # Store generated content in database social_account_id = schedule.get('id_social') response = ( supabase_client .table("Post_content") .insert({ "social_account_id": social_account_id, "Text_content": generated_content, "is_published": False, "sched": schedule_id }) .execute() ) if response.data: logger.info(f"Content generated and stored for schedule {schedule_id}") else: logger.error(f"Failed to store generated content for schedule {schedule_id}") except Exception as e: logger.error(f"Error in content generation job for schedule {schedule.get('id')}: {str(e)}") def publish_post_job(schedule: dict, supabase_client): """ Job to publish a scheduled post. This function is kept for backward compatibility but should be replaced with Celery tasks. Args: schedule (dict): Schedule data supabase_client: Supabase client instance """ try: schedule_id = schedule.get('id') logger.info(f"Publishing post for schedule {schedule_id}") # Fetch the post to publish response = ( supabase_client .table("Post_content") .select("*") .eq("sched", schedule_id) .eq("is_published", False) .order("created_at", desc=True) .limit(1) .execute() ) if not response.data: logger.info(f"No unpublished posts found for schedule {schedule_id}") return post = response.data[0] post_id = post.get('id') text_content = post.get('Text_content') image_url = post.get('image_content_url') # Get social network credentials access_token = schedule.get('Social_network', {}).get('token') user_sub = schedule.get('Social_network', {}).get('sub') if not access_token or not user_sub: logger.error(f"Missing social network credentials for schedule {schedule_id}") return # Publish to LinkedIn linkedin_service = LinkedInService() publish_response = linkedin_service.publish_post( access_token, user_sub, text_content, image_url ) # Update post status in database update_response = ( supabase_client .table("Post_content") .update({"is_published": True}) .eq("id", post_id) .execute() ) logger.info(f"Post published successfully for schedule {schedule_id}") except Exception as e: logger.error(f"Error in publishing job for schedule {schedule.get('id')}: {str(e)}")