from celery import current_task from celery_app import celery from services.content_service import ContentService from services.linkedin_service import LinkedInService import logging # Configure logging logger = logging.getLogger(__name__) @celery.task(bind=True) def generate_content_task(self, user_id: str, schedule_id: str, supabase_client_config: dict): """ Celery task to generate content for a scheduled post. Args: user_id (str): User ID schedule_id (str): Schedule ID supabase_client_config (dict): Supabase client configuration Returns: dict: Result of content generation """ try: logger.info(f"Starting content generation for schedule {schedule_id}") # Update task state self.update_state(state='PROGRESS', meta={'status': 'Generating content...'}) # Initialize content service content_service = ContentService() # Generate content using content service generated_content = content_service.generate_post_content(user_id) # Initialize Supabase client from config from utils.database import init_supabase supabase_client = init_supabase( supabase_client_config['SUPABASE_URL'], supabase_client_config['SUPABASE_KEY'] ) # Store generated content in database # We need to get the social account ID from the schedule schedule_response = ( supabase_client .table("Scheduling") .select("id_social") .eq("id", schedule_id) .execute() ) if not schedule_response.data: raise Exception(f"Schedule {schedule_id} not found") social_account_id = schedule_response.data[0]['id_social'] # Store the generated content response = ( supabase_client .table("Post_content") .insert({ "social_account_id": social_account_id, "Text_content": generated_content, "is_published": False, "sched": schedule_id }) .execute() ) if response.data: logger.info(f"Content generated and stored for schedule {schedule_id}") return { 'status': 'success', 'message': f'Content generated for schedule {schedule_id}', 'post_id': response.data[0]['id'] } else: logger.error(f"Failed to store generated content for schedule {schedule_id}") return { 'status': 'failure', 'message': f'Failed to store generated content for schedule {schedule_id}' } except Exception as e: logger.error(f"Error in content generation task for schedule {schedule_id}: {str(e)}") return { 'status': 'failure', 'message': f'Error in content generation: {str(e)}' } @celery.task(bind=True) def publish_post_task(self, schedule_id: str, supabase_client_config: dict): """ Celery task to publish a scheduled post. Args: schedule_id (str): Schedule ID supabase_client_config (dict): Supabase client configuration Returns: dict: Result of post publishing """ try: logger.info(f"Starting post publishing for schedule {schedule_id}") # Update task state self.update_state(state='PROGRESS', meta={'status': 'Publishing post...'}) # Initialize Supabase client from config from utils.database import init_supabase supabase_client = init_supabase( supabase_client_config['SUPABASE_URL'], supabase_client_config['SUPABASE_KEY'] ) # Fetch the post to publish response = ( supabase_client .table("Post_content") .select("*") .eq("sched", schedule_id) .eq("is_published", False) .order("created_at", desc=True) .limit(1) .execute() ) if not response.data: logger.info(f"No unpublished posts found for schedule {schedule_id}") return { 'status': 'info', 'message': f'No unpublished posts found for schedule {schedule_id}' } post = response.data[0] post_id = post.get('id') text_content = post.get('Text_content') image_url = post.get('image_content_url') # Get social network credentials schedule_response = ( supabase_client .table("Scheduling") .select("Social_network(token, sub)") .eq("id", schedule_id) .execute() ) if not schedule_response.data: raise Exception(f"Schedule {schedule_id} not found") social_network = schedule_response.data[0].get('Social_network', {}) access_token = social_network.get('token') user_sub = social_network.get('sub') if not access_token or not user_sub: logger.error(f"Missing social network credentials for schedule {schedule_id}") return { 'status': 'failure', 'message': f'Missing social network credentials for schedule {schedule_id}' } # Publish to LinkedIn linkedin_service = LinkedInService() publish_response = linkedin_service.publish_post( access_token, user_sub, text_content, image_url ) # Update post status in database update_response = ( supabase_client .table("Post_content") .update({"is_published": True}) .eq("id", post_id) .execute() ) logger.info(f"Post published successfully for schedule {schedule_id}") return { 'status': 'success', 'message': f'Post published successfully for schedule {schedule_id}', 'linkedin_response': publish_response } except Exception as e: logger.error(f"Error in publishing task for schedule {schedule_id}: {str(e)}") return { 'status': 'failure', 'message': f'Error in publishing post: {str(e)}' }