|
from celery import current_task |
|
from backend.services.content_service import ContentService |
|
from backend.services.linkedin_service import LinkedInService |
|
from backend.utils.database import init_supabase |
|
from backend.celery_config import celery_app |
|
|
|
|
|
import logging |
|
logger = logging.getLogger(__name__) |
|
|
|
@celery_app.task(bind=True) |
|
def generate_content_task(self, user_id: str, schedule_id: str, supabase_client_config: dict): |
|
""" |
|
Celery task to generate content for a scheduled post. |
|
|
|
Args: |
|
user_id (str): User ID |
|
schedule_id (str): Schedule ID |
|
supabase_client_config (dict): Supabase client configuration |
|
|
|
Returns: |
|
dict: Result of content generation |
|
""" |
|
try: |
|
logger.info(f"Starting content generation for schedule {schedule_id}") |
|
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Generating content...'}) |
|
|
|
|
|
content_service = ContentService() |
|
|
|
|
|
generated_content = content_service.generate_post_content(user_id) |
|
|
|
|
|
from backend.utils.database import init_supabase |
|
supabase_client = init_supabase( |
|
supabase_client_config['SUPABASE_URL'], |
|
supabase_client_config['SUPABASE_KEY'] |
|
) |
|
|
|
|
|
|
|
schedule_response = ( |
|
supabase_client |
|
.table("Scheduling") |
|
.select("id_social") |
|
.eq("id", schedule_id) |
|
.execute() |
|
) |
|
|
|
if not schedule_response.data: |
|
raise Exception(f"Schedule {schedule_id} not found") |
|
|
|
social_account_id = schedule_response.data[0]['id_social'] |
|
|
|
|
|
response = ( |
|
supabase_client |
|
.table("Post_content") |
|
.insert({ |
|
"social_account_id": social_account_id, |
|
"Text_content": generated_content, |
|
"is_published": False, |
|
"sched": schedule_id |
|
}) |
|
.execute() |
|
) |
|
|
|
if response.data: |
|
logger.info(f"Content generated and stored for schedule {schedule_id}") |
|
return { |
|
'status': 'success', |
|
'message': f'Content generated for schedule {schedule_id}', |
|
'post_id': response.data[0]['id'] |
|
} |
|
else: |
|
logger.error(f"Failed to store generated content for schedule {schedule_id}") |
|
return { |
|
'status': 'failure', |
|
'message': f'Failed to store generated content for schedule {schedule_id}' |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in content generation task for schedule {schedule_id}: {str(e)}") |
|
return { |
|
'status': 'failure', |
|
'message': f'Error in content generation: {str(e)}' |
|
} |
|
|
|
@celery_app.task(bind=True) |
|
def publish_post_task(self, schedule_id: str, supabase_client_config: dict): |
|
""" |
|
Celery task to publish a scheduled post. |
|
|
|
Args: |
|
schedule_id (str): Schedule ID |
|
supabase_client_config (dict): Supabase client configuration |
|
|
|
Returns: |
|
dict: Result of post publishing |
|
""" |
|
try: |
|
logger.info(f"Starting post publishing for schedule {schedule_id}") |
|
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Publishing post...'}) |
|
|
|
|
|
from backend.utils.database import init_supabase |
|
supabase_client = init_supabase( |
|
supabase_client_config['SUPABASE_URL'], |
|
supabase_client_config['SUPABASE_KEY'] |
|
) |
|
|
|
|
|
response = ( |
|
supabase_client |
|
.table("Post_content") |
|
.select("*") |
|
.eq("sched", schedule_id) |
|
.eq("is_published", False) |
|
.order("created_at", desc=True) |
|
.limit(1) |
|
.execute() |
|
) |
|
|
|
if not response.data: |
|
logger.info(f"No unpublished posts found for schedule {schedule_id}") |
|
return { |
|
'status': 'info', |
|
'message': f'No unpublished posts found for schedule {schedule_id}' |
|
} |
|
|
|
post = response.data[0] |
|
post_id = post.get('id') |
|
text_content = post.get('Text_content') |
|
image_url = post.get('image_content_url') |
|
|
|
|
|
schedule_response = ( |
|
supabase_client |
|
.table("Scheduling") |
|
.select("Social_network(token, sub)") |
|
.eq("id", schedule_id) |
|
.execute() |
|
) |
|
|
|
if not schedule_response.data: |
|
raise Exception(f"Schedule {schedule_id} not found") |
|
|
|
social_network = schedule_response.data[0].get('Social_network', {}) |
|
access_token = social_network.get('token') |
|
user_sub = social_network.get('sub') |
|
|
|
if not access_token or not user_sub: |
|
logger.error(f"Missing social network credentials for schedule {schedule_id}") |
|
return { |
|
'status': 'failure', |
|
'message': f'Missing social network credentials for schedule {schedule_id}' |
|
} |
|
|
|
|
|
linkedin_service = LinkedInService() |
|
publish_response = linkedin_service.publish_post( |
|
access_token, user_sub, text_content, image_url |
|
) |
|
|
|
|
|
update_response = ( |
|
supabase_client |
|
.table("Post_content") |
|
.update({"is_published": True}) |
|
.eq("id", post_id) |
|
.execute() |
|
) |
|
|
|
logger.info(f"Post published successfully for schedule {schedule_id}") |
|
return { |
|
'status': 'success', |
|
'message': f'Post published successfully for schedule {schedule_id}', |
|
'linkedin_response': publish_response |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in publishing task for schedule {schedule_id}: {str(e)}") |
|
return { |
|
'status': 'failure', |
|
'message': f'Error in publishing post: {str(e)}' |
|
} |
|
|