Lin / backend /scheduler /task_scheduler.py.bak
Zelyanoth's picture
fff
25f22bf
raw
history blame
8.65 kB
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
import logging
from services.content_service import ContentService
from services.linkedin_service import LinkedInService
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def init_scheduler(scheduler: BackgroundScheduler, supabase_client):
"""
Initialize the task scheduler with jobs.
Args:
scheduler (BackgroundScheduler): The scheduler instance
supabase_client: Supabase client instance
"""
# Add a job to load schedules from database
scheduler.add_job(
func=load_schedules,
trigger=CronTrigger(minute='*/5'), # Run every 5 minutes
id='load_schedules',
name='Load schedules from database',
args=[scheduler, supabase_client]
)
# Load initial schedules
load_schedules(scheduler, supabase_client)
def load_schedules(scheduler: BackgroundScheduler, supabase_client):
"""
Load schedules from the database and create jobs.
Args:
scheduler (BackgroundScheduler): The scheduler instance
supabase_client: Supabase client instance
"""
try:
logger.info("Loading schedules from database...")
# Fetch all schedules from Supabase
response = (
supabase_client
.table("Scheduling")
.select("*, Social_network(id_utilisateur, token, sub)")
.execute()
)
schedules = response.data if response.data else []
logger.info(f"Found {len(schedules)} schedules")
# Remove existing scheduled jobs (except the loader job)
job_ids = [job.id for job in scheduler.get_jobs() if job.id != 'load_schedules']
for job_id in job_ids:
scheduler.remove_job(job_id)
logger.info(f"Removed job: {job_id}")
# Create jobs for each schedule
for schedule in schedules:
try:
create_scheduling_jobs(scheduler, schedule, supabase_client)
except Exception as e:
logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}")
except Exception as e:
logger.error(f"Error loading schedules: {str(e)}")
def create_scheduling_jobs(scheduler: BackgroundScheduler, schedule: dict, supabase_client):
"""
Create jobs for a specific schedule.
Args:
scheduler (BackgroundScheduler): The scheduler instance
schedule (dict): Schedule data
supabase_client: Supabase client instance
"""
schedule_id = schedule.get('id')
schedule_time = schedule.get('schedule_time')
adjusted_time = schedule.get('adjusted_time')
if not schedule_time or not adjusted_time:
logger.warning(f"Invalid schedule format for schedule {schedule_id}")
return
# Parse schedule times
try:
# Parse main schedule time (publishing)
day_name, time_str = schedule_time.split()
hour, minute = map(int, time_str.split(':'))
# Parse adjusted time (content generation)
adj_day_name, adj_time_str = adjusted_time.split()
adj_hour, adj_minute = map(int, adj_time_str.split(':'))
# Map day names to cron format
day_map = {
'Monday': 'mon',
'Tuesday': 'tue',
'Wednesday': 'wed',
'Thursday': 'thu',
'Friday': 'fri',
'Saturday': 'sat',
'Sunday': 'sun'
}
if day_name not in day_map or adj_day_name not in day_map:
logger.warning(f"Invalid day name in schedule {schedule_id}")
return
day_cron = day_map[day_name]
adj_day_cron = day_map[adj_day_name]
# Create content generation job (5 minutes before publishing)
gen_job_id = f"gen_{schedule_id}"
scheduler.add_job(
func=generate_content_job,
trigger=CronTrigger(
day_of_week=adj_day_cron,
hour=adj_hour,
minute=adj_minute
),
id=gen_job_id,
name=f"Generate content for schedule {schedule_id}",
args=[schedule, supabase_client]
)
logger.info(f"Created content generation job: {gen_job_id}")
# Create publishing job
pub_job_id = f"pub_{schedule_id}"
scheduler.add_job(
func=publish_post_job,
trigger=CronTrigger(
day_of_week=day_cron,
hour=hour,
minute=minute
),
id=pub_job_id,
name=f"Publish post for schedule {schedule_id}",
args=[schedule, supabase_client]
)
logger.info(f"Created publishing job: {pub_job_id}")
except Exception as e:
logger.error(f"Error creating jobs for schedule {schedule_id}: {str(e)}")
def generate_content_job(schedule: dict, supabase_client):
"""
Job to generate content for a scheduled post.
Args:
schedule (dict): Schedule data
supabase_client: Supabase client instance
"""
try:
schedule_id = schedule.get('id')
user_id = schedule.get('Social_network', {}).get('id_utilisateur')
if not user_id:
logger.warning(f"No user ID found for schedule {schedule_id}")
return
logger.info(f"Generating content for schedule {schedule_id}")
# Generate content using content service
content_service = ContentService()
generated_content = content_service.generate_post_content(user_id)
# Store generated content in database
social_account_id = schedule.get('id_social')
response = (
supabase_client
.table("Post_content")
.insert({
"social_account_id": social_account_id,
"Text_content": generated_content,
"is_published": False,
"sched": schedule_id
})
.execute()
)
if response.data:
logger.info(f"Content generated and stored for schedule {schedule_id}")
else:
logger.error(f"Failed to store generated content for schedule {schedule_id}")
except Exception as e:
logger.error(f"Error in content generation job for schedule {schedule.get('id')}: {str(e)}")
def publish_post_job(schedule: dict, supabase_client):
"""
Job to publish a scheduled post.
Args:
schedule (dict): Schedule data
supabase_client: Supabase client instance
"""
try:
schedule_id = schedule.get('id')
logger.info(f"Publishing post for schedule {schedule_id}")
# Fetch the post to publish
response = (
supabase_client
.table("Post_content")
.select("*")
.eq("sched", schedule_id)
.eq("is_published", False)
.order("created_at", desc=True)
.limit(1)
.execute()
)
if not response.data:
logger.info(f"No unpublished posts found for schedule {schedule_id}")
return
post = response.data[0]
post_id = post.get('id')
text_content = post.get('Text_content')
image_url = post.get('image_content_url')
# Get social network credentials
access_token = schedule.get('Social_network', {}).get('token')
user_sub = schedule.get('Social_network', {}).get('sub')
if not access_token or not user_sub:
logger.error(f"Missing social network credentials for schedule {schedule_id}")
return
# Publish to LinkedIn
linkedin_service = LinkedInService()
publish_response = linkedin_service.publish_post(
access_token, user_sub, text_content, image_url
)
# Update post status in database
update_response = (
supabase_client
.table("Post_content")
.update({"is_published": True})
.eq("id", post_id)
.execute()
)
logger.info(f"Post published successfully for schedule {schedule_id}")
except Exception as e:
logger.error(f"Error in publishing job for schedule {schedule.get('id')}: {str(e)}")