File size: 7,373 Bytes
25f22bf c7d5529 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd 25f22bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
from flask import current_app, request
from flask_jwt_extended import create_access_token, get_jwt
import bcrypt
from datetime import datetime, timedelta
from backend.models.user import User
from backend.utils.database import authenticate_user, create_user
def register_user(email: str, password: str) -> dict:
"""
Register a new user.
Args:
email (str): User email
password (str): User password
Returns:
dict: Registration result with user data or error message
"""
try:
# Check if user already exists
# In Supabase, we'll try to create the user directly
response = create_user(current_app.supabase, email, password)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
# Check if email is confirmed
if response.user.email_confirmed_at:
# Email is confirmed, user can login immediately
return {
'success': True,
'message': 'Account created successfully! You can now log in with your email and password.',
'user': user.to_dict(),
'email_confirmed': True
}
else:
# Email confirmation is required
return {
'success': True,
'message': 'Account created successfully! Please check your email for a confirmation link to activate your account.',
'user': user.to_dict(),
'email_confirmed': False,
'requires_confirmation': True
}
else:
return {
'success': False,
'message': 'Failed to register user'
}
except Exception as e:
# Check if it's a duplicate user error
error_str = str(e).lower()
if 'already registered' in error_str or 'already exists' in error_str:
return {
'success': False,
'message': 'An account with this email already exists. Please login instead or use a different email.'
}
elif 'invalid email' in error_str:
return {
'success': False,
'message': 'Please enter a valid email address.'
}
elif 'password' in error_str:
return {
'success': False,
'message': 'Password does not meet requirements. Please use at least 8 characters.'
}
else:
return {
'success': False,
'message': f'Registration failed: {str(e)}'
}
def login_user(email: str, password: str, remember_me: bool = False) -> dict:
"""
Authenticate and login a user.
Args:
email (str): User email
password (str): User password
remember_me (bool): Remember me flag for extended session
Returns:
dict: Login result with token and user data or error message
"""
try:
# Authenticate user with Supabase
response = authenticate_user(current_app.supabase, email, password)
if response.user:
# Check if email is confirmed
if not response.user.email_confirmed_at:
return {
'success': False,
'message': 'Please confirm your email before logging in. Check your inbox for the confirmation email.',
'requires_confirmation': True
}
# Set token expiration based on remember me flag
if remember_me:
# Extended token expiration (7 days)
expires_delta = timedelta(days=7)
token_type = "remember"
else:
# Standard token expiration (1 hour)
expires_delta = timedelta(hours=1)
token_type = "session"
# Create JWT token with proper expiration and claims
access_token = create_access_token(
identity=response.user.id,
additional_claims={
'email': response.user.email,
'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None,
'remember_me': remember_me,
'token_type': token_type
},
expires_delta=expires_delta
)
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return {
'success': True,
'token': access_token,
'user': user.to_dict(),
'rememberMe': remember_me,
'expiresAt': (datetime.now() + expires_delta).isoformat(),
'tokenType': token_type
}
else:
return {
'success': False,
'message': 'Invalid email or password. Please check your credentials and try again.'
}
except Exception as e:
current_app.logger.error(f"Login error: {str(e)}")
# Provide more specific error messages
error_str = str(e).lower()
if 'invalid credentials' in error_str or 'unauthorized' in error_str:
return {
'success': False,
'message': 'Invalid email or password. Please check your credentials and try again.'
}
elif 'email not confirmed' in error_str or 'email not verified' in error_str:
return {
'success': False,
'message': 'Please confirm your email before logging in. Check your inbox for the confirmation email.',
'requires_confirmation': True
}
elif 'user not found' in error_str:
return {
'success': False,
'message': 'No account found with this email. Please check your email or register for a new account.'
}
else:
return {
'success': False,
'message': f'Login failed: {str(e)}'
}
def get_user_by_id(user_id: str) -> dict:
"""
Get user by ID.
Args:
user_id (str): User ID
Returns:
dict: User data or None if not found
"""
try:
# Get user from Supabase Auth
response = current_app.supabase.auth.get_user(user_id)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return user.to_dict()
else:
return None
except Exception:
return None |