Lin / backend /celery_tasks /content_tasks.py
Zelyanoth's picture
tt
1cf04ef
raw
history blame
6.63 kB
from celery import current_task
from backend.services.content_service import ContentService
from backend.services.linkedin_service import LinkedInService
from backend.utils.database import init_supabase
from backend.celery_config import celery_app
# Configure logging
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True)
def generate_content_task(self, user_id: str, schedule_id: str, supabase_client_config: dict):
"""
Celery task to generate content for a scheduled post.
Args:
user_id (str): User ID
schedule_id (str): Schedule ID
supabase_client_config (dict): Supabase client configuration
Returns:
dict: Result of content generation
"""
try:
logger.info(f"Starting content generation for schedule {schedule_id}")
# Update task state
self.update_state(state='PROGRESS', meta={'status': 'Generating content...'})
# Initialize content service
content_service = ContentService()
# Generate content using content service
generated_content = content_service.generate_post_content(user_id)
# Initialize Supabase client from config
from backend.utils.database import init_supabase
supabase_client = init_supabase(
supabase_client_config['SUPABASE_URL'],
supabase_client_config['SUPABASE_KEY']
)
# Store generated content in database
# We need to get the social account ID from the schedule
schedule_response = (
supabase_client
.table("Scheduling")
.select("id_social")
.eq("id", schedule_id)
.execute()
)
if not schedule_response.data:
raise Exception(f"Schedule {schedule_id} not found")
social_account_id = schedule_response.data[0]['id_social']
# Store the generated content
response = (
supabase_client
.table("Post_content")
.insert({
"social_account_id": social_account_id,
"Text_content": generated_content,
"is_published": False,
"sched": schedule_id
})
.execute()
)
if response.data:
logger.info(f"Content generated and stored for schedule {schedule_id}")
return {
'status': 'success',
'message': f'Content generated for schedule {schedule_id}',
'post_id': response.data[0]['id']
}
else:
logger.error(f"Failed to store generated content for schedule {schedule_id}")
return {
'status': 'failure',
'message': f'Failed to store generated content for schedule {schedule_id}'
}
except Exception as e:
logger.error(f"Error in content generation task for schedule {schedule_id}: {str(e)}")
return {
'status': 'failure',
'message': f'Error in content generation: {str(e)}'
}
@celery_app.task(bind=True)
def publish_post_task(self, schedule_id: str, supabase_client_config: dict):
"""
Celery task to publish a scheduled post.
Args:
schedule_id (str): Schedule ID
supabase_client_config (dict): Supabase client configuration
Returns:
dict: Result of post publishing
"""
try:
logger.info(f"Starting post publishing for schedule {schedule_id}")
# Update task state
self.update_state(state='PROGRESS', meta={'status': 'Publishing post...'})
# Initialize Supabase client from config
from backend.utils.database import init_supabase
supabase_client = init_supabase(
supabase_client_config['SUPABASE_URL'],
supabase_client_config['SUPABASE_KEY']
)
# Fetch the post to publish
response = (
supabase_client
.table("Post_content")
.select("*")
.eq("sched", schedule_id)
.eq("is_published", False)
.order("created_at", desc=True)
.limit(1)
.execute()
)
if not response.data:
logger.info(f"No unpublished posts found for schedule {schedule_id}")
return {
'status': 'info',
'message': f'No unpublished posts found for schedule {schedule_id}'
}
post = response.data[0]
post_id = post.get('id')
text_content = post.get('Text_content')
image_url = post.get('image_content_url')
# Get social network credentials
schedule_response = (
supabase_client
.table("Scheduling")
.select("Social_network(token, sub)")
.eq("id", schedule_id)
.execute()
)
if not schedule_response.data:
raise Exception(f"Schedule {schedule_id} not found")
social_network = schedule_response.data[0].get('Social_network', {})
access_token = social_network.get('token')
user_sub = social_network.get('sub')
if not access_token or not user_sub:
logger.error(f"Missing social network credentials for schedule {schedule_id}")
return {
'status': 'failure',
'message': f'Missing social network credentials for schedule {schedule_id}'
}
# Publish to LinkedIn
linkedin_service = LinkedInService()
publish_response = linkedin_service.publish_post(
access_token, user_sub, text_content, image_url
)
# Update post status in database
update_response = (
supabase_client
.table("Post_content")
.update({"is_published": True})
.eq("id", post_id)
.execute()
)
logger.info(f"Post published successfully for schedule {schedule_id}")
return {
'status': 'success',
'message': f'Post published successfully for schedule {schedule_id}',
'linkedin_response': publish_response
}
except Exception as e:
logger.error(f"Error in publishing task for schedule {schedule_id}: {str(e)}")
return {
'status': 'failure',
'message': f'Error in publishing post: {str(e)}'
}