Lin / backend /api /posts.py
Zelyanoth's picture
gg
c7d5529
raw
history blame
19.6 kB
import codecs
import uuid
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import jwt_required, get_jwt_identity
from backend.services.content_service import ContentService
from backend.services.linkedin_service import LinkedInService
posts_bp = Blueprint('posts', __name__)
def safe_log_message(message):
"""Safely log messages containing Unicode characters."""
try:
# Try to encode as UTF-8 first, then decode with error handling
if isinstance(message, str):
# For strings, try to encode and decode safely
encoded = message.encode('utf-8', errors='replace')
safe_message = encoded.decode('utf-8', errors='replace')
else:
# For non-strings, convert to string first
safe_message = str(message)
# Log to app logger instead of print
current_app.logger.debug(safe_message)
except Exception as e:
# Ultimate fallback - log the error
current_app.logger.error(f"Failed to log message: {str(e)}")
@posts_bp.route('/', methods=['OPTIONS'])
@posts_bp.route('', methods=['OPTIONS'])
def handle_options():
"""Handle OPTIONS requests for preflight CORS checks."""
return '', 200
@posts_bp.route('/', methods=['GET'])
@posts_bp.route('', methods=['GET'])
@jwt_required()
def get_posts():
"""
Get all posts for the current user.
Query Parameters:
published (bool): Filter by published status
Returns:
JSON: List of posts
"""
try:
user_id = get_jwt_identity()
published = request.args.get('published', type=bool)
# Check if Supabase client is initialized
if not hasattr(current_app, 'supabase') or current_app.supabase is None:
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'Database connection not initialized'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
# Build query
query = (
current_app.supabase
.table("Post_content")
.select("*, Social_network(id_utilisateur)")
)
# Apply published filter if specified
if published is not None:
query = query.eq("is_published", published)
response = query.execute()
# Filter posts for the current user
user_posts = [
post for post in response.data
if post.get('Social_network', {}).get('id_utilisateur') == user_id
] if response.data else []
# Add CORS headers explicitly
response_data = jsonify({
'success': True,
'posts': user_posts
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 200
except Exception as e:
error_message = str(e)
safe_log_message(f"Get posts error: {error_message}")
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'An error occurred while fetching posts'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
# Add CORS headers explicitly
response_data = jsonify({
'success': True,
'posts': user_posts
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 200
except Exception as e:
error_message = str(e)
safe_log_message(f"Get posts error: {error_message}")
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'An error occurred while fetching posts'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
def _generate_post_task(user_id, job_id, job_store, hugging_key):
"""
Background task to generate post content.
Args:
user_id (str): User ID for personalization
job_id (str): Job ID to update status in job store
job_store (dict): Job store dictionary
hugging_key (str): Hugging Face API key
"""
try:
# Update job status to processing
job_store[job_id] = {
'status': 'processing',
'result': None,
'error': None
}
# Generate content using content service
# Pass the Hugging Face key directly to the service
content_service = ContentService(hugging_key=hugging_key)
generated_content = content_service.generate_post_content(user_id)
# Update job status to completed with result
job_store[job_id] = {
'status': 'completed',
'result': generated_content,
'error': None
}
except Exception as e:
error_message = str(e)
safe_log_message(f"Generate post background task error: {error_message}")
# Update job status to failed with error
job_store[job_id] = {
'status': 'failed',
'result': None,
'error': error_message
}
@posts_bp.route('/generate', methods=['POST'])
@jwt_required()
def generate_post():
"""
Generate a new post using AI asynchronously.
Request Body:
user_id (str): User ID (optional, defaults to current user)
Returns:
JSON: Job ID for polling
"""
try:
current_user_id = get_jwt_identity()
data = request.get_json()
# Use provided user_id or default to current user
user_id = data.get('user_id', current_user_id)
# Verify user authorization (can only generate for self unless admin)
if user_id != current_user_id:
return jsonify({
'success': False,
'message': 'Unauthorized to generate posts for other users'
}), 403
# Create a job ID
job_id = str(uuid.uuid4())
# Initialize job status
current_app.job_store[job_id] = {
'status': 'pending',
'result': None,
'error': None
}
# Get Hugging Face key
hugging_key = current_app.config['HUGGING_KEY']
# Submit the background task, passing all necessary data
current_app.executor.submit(_generate_post_task, user_id, job_id, current_app.job_store, hugging_key)
# Return job ID immediately
return jsonify({
'success': True,
'job_id': job_id,
'message': 'Post generation started'
}), 202 # 202 Accepted
except Exception as e:
error_message = str(e)
safe_log_message(f"Generate post error: {error_message}")
return jsonify({
'success': False,
'message': f'An error occurred while starting post generation: {error_message}'
}), 500
@posts_bp.route('/jobs/<job_id>', methods=['GET'])
@jwt_required()
def get_job_status(job_id):
"""
Get the status of a post generation job.
Path Parameters:
job_id (str): Job ID
Returns:
JSON: Job status and result if completed
"""
try:
# Get job from store
job = current_app.job_store.get(job_id)
if not job:
return jsonify({
'success': False,
'message': 'Job not found'
}), 404
# Prepare response
response_data = {
'success': True,
'job_id': job_id,
'status': job['status']
}
# Include result or error if available
if job['status'] == 'completed':
response_data['content'] = job['result']
elif job['status'] == 'failed':
response_data['error'] = job['error']
return jsonify(response_data), 200
except Exception as e:
error_message = str(e)
safe_log_message(f"Get job status error: {error_message}")
return jsonify({
'success': False,
'message': f'An error occurred while fetching job status: {error_message}'
}), 500
@posts_bp.route('/', methods=['OPTIONS'])
@posts_bp.route('', methods=['OPTIONS'])
def handle_create_options():
"""Handle OPTIONS requests for preflight CORS checks for create post route."""
return '', 200
@posts_bp.route('/publish-direct', methods=['OPTIONS'])
def handle_publish_direct_options():
"""Handle OPTIONS requests for preflight CORS checks for publish direct route."""
return '', 200
@posts_bp.route('/publish-direct', methods=['POST'])
@jwt_required()
def publish_post_direct():
"""
Publish a post directly to social media and save to database.
Request Body:
social_account_id (str): Social account ID
text_content (str): Post text content
image_content_url (str, optional): Image URL
scheduled_at (str, optional): Scheduled time in ISO format
Returns:
JSON: Publish post result
"""
try:
user_id = get_jwt_identity()
data = request.get_json()
# Validate required fields
social_account_id = data.get('social_account_id')
text_content = data.get('text_content')
if not social_account_id or not text_content:
return jsonify({
'success': False,
'message': 'social_account_id and text_content are required'
}), 400
# Verify the social account belongs to the user
account_response = (
current_app.supabase
.table("Social_network")
.select("id_utilisateur, token, sub")
.eq("id", social_account_id)
.execute()
)
if not account_response.data:
return jsonify({
'success': False,
'message': 'Social account not found'
}), 404
account = account_response.data[0]
if account.get('id_utilisateur') != user_id:
return jsonify({
'success': False,
'message': 'Unauthorized to use this social account'
}), 403
# Get account details
access_token = account.get('token')
user_sub = account.get('sub')
if not access_token or not user_sub:
return jsonify({
'success': False,
'message': 'Social account not properly configured'
}), 400
# Get optional fields
image_url = data.get('image_content_url')
# Publish to LinkedIn
linkedin_service = LinkedInService()
publish_response = linkedin_service.publish_post(
access_token, user_sub, text_content, image_url
)
# Save to database as published
post_data = {
'id_social': social_account_id,
'Text_content': text_content,
'is_published': True
}
# Add optional fields if provided
if image_url:
post_data['image_content_url'] = image_url
if 'scheduled_at' in data:
post_data['scheduled_at'] = data['scheduled_at']
# Insert post into database
response = (
current_app.supabase
.table("Post_content")
.insert(post_data)
.execute()
)
if response.data:
# Add CORS headers explicitly
response_data = jsonify({
'success': True,
'message': 'Post published and saved successfully',
'post': response.data[0],
'linkedin_response': publish_response
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 201
else:
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'Failed to save post to database'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
except Exception as e:
error_message = str(e)
safe_log_message(f"[Post] Publish post directly error: {error_message}")
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': f'An error occurred while publishing post: {error_message}'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
@posts_bp.route('/<post_id>', methods=['OPTIONS'])
def handle_post_options(post_id):
"""Handle OPTIONS requests for preflight CORS checks for specific post."""
return '', 200
@posts_bp.route('/', methods=['POST'])
@posts_bp.route('', methods=['POST'])
@jwt_required()
def create_post():
"""
Create a new post.
Request Body:
social_account_id (str): Social account ID
text_content (str): Post text content
image_content_url (str, optional): Image URL
scheduled_at (str, optional): Scheduled time in ISO format
is_published (bool, optional): Whether the post is published (defaults to True)
Returns:
JSON: Created post data
"""
try:
user_id = get_jwt_identity()
data = request.get_json()
# Validate required fields
social_account_id = data.get('social_account_id')
text_content = data.get('text_content')
if not social_account_id or not text_content:
return jsonify({
'success': False,
'message': 'social_account_id and text_content are required'
}), 400
# Verify the social account belongs to the user
account_response = (
current_app.supabase
.table("Social_network")
.select("id_utilisateur")
.eq("id", social_account_id)
.execute()
)
if not account_response.data:
return jsonify({
'success': False,
'message': 'Social account not found'
}), 404
if account_response.data[0].get('id_utilisateur') != user_id:
return jsonify({
'success': False,
'message': 'Unauthorized to use this social account'
}), 403
# Prepare post data - always mark as published
post_data = {
'id_social': social_account_id,
'Text_content': text_content,
'is_published': data.get('is_published', True) # Default to True
}
# Add optional fields if provided
if 'image_content_url' in data:
post_data['image_content_url'] = data['image_content_url']
if 'scheduled_at' in data:
post_data['scheduled_at'] = data['scheduled_at']
# Insert post into database
response = (
current_app.supabase
.table("Post_content")
.insert(post_data)
.execute()
)
if response.data:
# Add CORS headers explicitly
response_data = jsonify({
'success': True,
'post': response.data[0]
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 201
else:
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'Failed to create post'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
except Exception as e:
error_message = str(e)
safe_log_message(f"[Post] Create post error: {error_message}")
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': f'An error occurred while creating post: {error_message}'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
@posts_bp.route('/<post_id>', methods=['DELETE'])
@jwt_required()
def delete_post(post_id):
"""
Delete a post.
Path Parameters:
post_id (str): Post ID
Returns:
JSON: Delete post result
"""
try:
user_id = get_jwt_identity()
# Verify the post belongs to the user
response = (
current_app.supabase
.table("Post_content")
.select("Social_network(id_utilisateur)")
.eq("id", post_id)
.execute()
)
if not response.data:
return jsonify({
'success': False,
'message': 'Post not found'
}), 404
post = response.data[0]
if post.get('Social_network', {}).get('id_utilisateur') != user_id:
return jsonify({
'success': False,
'message': 'Unauthorized to delete this post'
}), 403
# Delete post from Supabase
delete_response = (
current_app.supabase
.table("Post_content")
.delete()
.eq("id", post_id)
.execute()
)
if delete_response.data:
return jsonify({
'success': True,
'message': 'Post deleted successfully'
}), 200
else:
return jsonify({
'success': False,
'message': 'Failed to delete post'
}), 500
except Exception as e:
error_message = str(e)
safe_log_message(f"Delete post error: {error_message}")
return jsonify({
'success': False,
'message': 'An error occurred while deleting post'
}), 500