from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from datetime import datetime, timedelta import logging from services.content_service import ContentService from services.linkedin_service import LinkedInService # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def init_scheduler(scheduler: BackgroundScheduler, supabase_client): """ Initialize the task scheduler with jobs. Args: scheduler (BackgroundScheduler): The scheduler instance supabase_client: Supabase client instance """ # Add a job to load schedules from database scheduler.add_job( func=load_schedules, trigger=CronTrigger(minute='*/5'), # Run every 5 minutes id='load_schedules', name='Load schedules from database', args=[scheduler, supabase_client] ) # Load initial schedules load_schedules(scheduler, supabase_client) def load_schedules(scheduler: BackgroundScheduler, supabase_client): """ Load schedules from the database and create jobs. Args: scheduler (BackgroundScheduler): The scheduler instance supabase_client: Supabase client instance """ try: logger.info("Loading schedules from database...") # Fetch all schedules from Supabase response = ( supabase_client .table("Scheduling") .select("*, Social_network(id_utilisateur, token, sub)") .execute() ) schedules = response.data if response.data else [] logger.info(f"Found {len(schedules)} schedules") # Remove existing scheduled jobs (except the loader job) job_ids = [job.id for job in scheduler.get_jobs() if job.id != 'load_schedules'] for job_id in job_ids: scheduler.remove_job(job_id) logger.info(f"Removed job: {job_id}") # Create jobs for each schedule for schedule in schedules: try: create_scheduling_jobs(scheduler, schedule, supabase_client) except Exception as e: logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}") except Exception as e: logger.error(f"Error loading schedules: {str(e)}") def create_scheduling_jobs(scheduler: BackgroundScheduler, schedule: dict, supabase_client): """ Create jobs for a specific schedule. Args: scheduler (BackgroundScheduler): The scheduler instance schedule (dict): Schedule data supabase_client: Supabase client instance """ schedule_id = schedule.get('id') schedule_time = schedule.get('schedule_time') adjusted_time = schedule.get('adjusted_time') if not schedule_time or not adjusted_time: logger.warning(f"Invalid schedule format for schedule {schedule_id}") return # Parse schedule times try: # Parse main schedule time (publishing) day_name, time_str = schedule_time.split() hour, minute = map(int, time_str.split(':')) # Parse adjusted time (content generation) adj_day_name, adj_time_str = adjusted_time.split() adj_hour, adj_minute = map(int, adj_time_str.split(':')) # Map day names to cron format day_map = { 'Monday': 'mon', 'Tuesday': 'tue', 'Wednesday': 'wed', 'Thursday': 'thu', 'Friday': 'fri', 'Saturday': 'sat', 'Sunday': 'sun' } if day_name not in day_map or adj_day_name not in day_map: logger.warning(f"Invalid day name in schedule {schedule_id}") return day_cron = day_map[day_name] adj_day_cron = day_map[adj_day_name] # Create content generation job (5 minutes before publishing) gen_job_id = f"gen_{schedule_id}" scheduler.add_job( func=generate_content_job, trigger=CronTrigger( day_of_week=adj_day_cron, hour=adj_hour, minute=adj_minute ), id=gen_job_id, name=f"Generate content for schedule {schedule_id}", args=[schedule, supabase_client] ) logger.info(f"Created content generation job: {gen_job_id}") # Create publishing job pub_job_id = f"pub_{schedule_id}" scheduler.add_job( func=publish_post_job, trigger=CronTrigger( day_of_week=day_cron, hour=hour, minute=minute ), id=pub_job_id, name=f"Publish post for schedule {schedule_id}", args=[schedule, supabase_client] ) logger.info(f"Created publishing job: {pub_job_id}") except Exception as e: logger.error(f"Error creating jobs for schedule {schedule_id}: {str(e)}") def generate_content_job(schedule: dict, supabase_client): """ Job to generate content for a scheduled post. Args: schedule (dict): Schedule data supabase_client: Supabase client instance """ try: schedule_id = schedule.get('id') user_id = schedule.get('Social_network', {}).get('id_utilisateur') if not user_id: logger.warning(f"No user ID found for schedule {schedule_id}") return logger.info(f"Generating content for schedule {schedule_id}") # Generate content using content service content_service = ContentService() generated_content = content_service.generate_post_content(user_id) # Store generated content in database social_account_id = schedule.get('id_social') response = ( supabase_client .table("Post_content") .insert({ "social_account_id": social_account_id, "Text_content": generated_content, "is_published": False, "sched": schedule_id }) .execute() ) if response.data: logger.info(f"Content generated and stored for schedule {schedule_id}") else: logger.error(f"Failed to store generated content for schedule {schedule_id}") except Exception as e: logger.error(f"Error in content generation job for schedule {schedule.get('id')}: {str(e)}") def publish_post_job(schedule: dict, supabase_client): """ Job to publish a scheduled post. Args: schedule (dict): Schedule data supabase_client: Supabase client instance """ try: schedule_id = schedule.get('id') logger.info(f"Publishing post for schedule {schedule_id}") # Fetch the post to publish response = ( supabase_client .table("Post_content") .select("*") .eq("sched", schedule_id) .eq("is_published", False) .order("created_at", desc=True) .limit(1) .execute() ) if not response.data: logger.info(f"No unpublished posts found for schedule {schedule_id}") return post = response.data[0] post_id = post.get('id') text_content = post.get('Text_content') image_url = post.get('image_content_url') # Get social network credentials access_token = schedule.get('Social_network', {}).get('token') user_sub = schedule.get('Social_network', {}).get('sub') if not access_token or not user_sub: logger.error(f"Missing social network credentials for schedule {schedule_id}") return # Publish to LinkedIn linkedin_service = LinkedInService() publish_response = linkedin_service.publish_post( access_token, user_sub, text_content, image_url ) # Update post status in database update_response = ( supabase_client .table("Post_content") .update({"is_published": True}) .eq("id", post_id) .execute() ) logger.info(f"Post published successfully for schedule {schedule_id}") except Exception as e: logger.error(f"Error in publishing job for schedule {schedule.get('id')}: {str(e)}")