File size: 7,373 Bytes
25f22bf c7d5529 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf |
|
from flask import current_app, request
from flask_jwt_extended import create_access_token, get_jwt
import bcrypt
from datetime import datetime, timedelta
from backend.models.user import User
from backend.utils.database import authenticate_user, create_user
def register_user(email: str, password: str) -> dict:
"""
Register a new user.
Args:
email (str): User email
password (str): User password
Returns:
dict: Registration result with user data or error message
"""
try:
# Check if user already exists
# In Supabase, we'll try to create the user directly
response = create_user(current_app.supabase, email, password)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
# Check if email is confirmed
if response.user.email_confirmed_at:
# Email is confirmed, user can login immediately
return {
'success': True,
'message': 'Account created successfully! You can now log in with your email and password.',
'user': user.to_dict(),
'email_confirmed': True
}
else:
# Email confirmation is required
return {
'success': True,
'message': 'Account created successfully! Please check your email for a confirmation link to activate your account.',
'user': user.to_dict(),
'email_confirmed': False,
'requires_confirmation': True
}
else:
return {
'success': False,
'message': 'Failed to register user'
}
except Exception as e:
# Check if it's a duplicate user error
error_str = str(e).lower()
if 'already registered' in error_str or 'already exists' in error_str:
return {
'success': False,
'message': 'An account with this email already exists. Please login instead or use a different email.'
}
elif 'invalid email' in error_str:
return {
'success': False,
'message': 'Please enter a valid email address.'
}
elif 'password' in error_str:
return {
'success': False,
'message': 'Password does not meet requirements. Please use at least 8 characters.'
}
else:
return {
'success': False,
'message': f'Registration failed: {str(e)}'
}
def login_user(email: str, password: str, remember_me: bool = False) -> dict:
"""
Authenticate and login a user.
Args:
email (str): User email
password (str): User password
remember_me (bool): Remember me flag for extended session
Returns:
dict: Login result with token and user data or error message
"""
try:
# Authenticate user with Supabase
response = authenticate_user(current_app.supabase, email, password)
if response.user:
# Check if email is confirmed
if not response.user.email_confirmed_at:
return {
'success': False,
'message': 'Please confirm your email before logging in. Check your inbox for the confirmation email.',
'requires_confirmation': True
}
# Set token expiration based on remember me flag
if remember_me:
# Extended token expiration (7 days)
expires_delta = timedelta(days=7)
token_type = "remember"
else:
# Standard token expiration (1 hour)
expires_delta = timedelta(hours=1)
token_type = "session"
# Create JWT token with proper expiration and claims
access_token = create_access_token(
identity=response.user.id,
additional_claims={
'email': response.user.email,
'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None,
'remember_me': remember_me,
'token_type': token_type
},
expires_delta=expires_delta
)
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return {
'success': True,
'token': access_token,
'user': user.to_dict(),
'rememberMe': remember_me,
'expiresAt': (datetime.now() + expires_delta).isoformat(),
'tokenType': token_type
}
else:
return {
'success': False,
'message': 'Invalid email or password. Please check your credentials and try again.'
}
except Exception as e:
current_app.logger.error(f"Login error: {str(e)}")
# Provide more specific error messages
error_str = str(e).lower()
if 'invalid credentials' in error_str or 'unauthorized' in error_str:
return {
'success': False,
'message': 'Invalid email or password. Please check your credentials and try again.'
}
elif 'email not confirmed' in error_str or 'email not verified' in error_str:
return {
'success': False,
'message': 'Please confirm your email before logging in. Check your inbox for the confirmation email.',
'requires_confirmation': True
}
elif 'user not found' in error_str:
return {
'success': False,
'message': 'No account found with this email. Please check your email or register for a new account.'
}
else:
return {
'success': False,
'message': f'Login failed: {str(e)}'
}
def get_user_by_id(user_id: str) -> dict:
"""
Get user by ID.
Args:
user_id (str): User ID
Returns:
dict: User data or None if not found
"""
try:
# Get user from Supabase Auth
response = current_app.supabase.auth.get_user(user_id)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return user.to_dict()
else:
return None
except Exception:
return None |