File size: 11,257 Bytes
25f22bf 67b1bef c7d5529 25f22bf b48b9dd 25f22bf b48b9dd e484e19 b48b9dd 25f22bf 0cedd2e 25f22bf b48b9dd 25f22bf b48b9dd 0cedd2e b48b9dd 25f22bf 0cedd2e 25f22bf 0cedd2e 25f22bf e484e19 b48b9dd 25f22bf b48b9dd 25f22bf b48b9dd e484e19 b48b9dd e484e19 b48b9dd 67b1bef 25f22bf 67b1bef |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
from flask import current_app, request
from flask_jwt_extended import create_access_token, get_jwt
import bcrypt
from datetime import datetime, timedelta
from supabase import Client
from backend.models.user import User
from backend.utils.database import authenticate_user, create_user
def register_user(email: str, password: str) -> dict:
"""
Register a new user.
Args:
email (str): User email
password (str): User password
Returns:
dict: Registration result with user data or error message
"""
try:
# Check if user already exists
# In Supabase, we'll try to create the user directly
response = create_user(current_app.supabase, email, password)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
# Check if email is confirmed
if response.user.email_confirmed_at:
# Email is confirmed, user can login immediately
return {
'success': True,
'message': 'Account created successfully! You can now log in with your email and password.',
'user': user.to_dict(),
'email_confirmed': True
}
else:
# Email confirmation is required
return {
'success': True,
'message': 'Check your mail to confirm your account',
'user': user.to_dict(),
'email_confirmed': False,
'requires_confirmation': True
}
else:
return {
'success': False,
'message': 'Failed to register user'
}
except Exception as e:
# Log the full error for debugging
current_app.logger.error(f"Registration error for email {email}: {str(e)}")
# Check if it's a duplicate user error
error_str = str(e).lower()
if 'already registered' in error_str or 'already exists' in error_str:
return {
'success': False,
'message': 'An account with this email already exists. Please login instead or use a different email.'
}
elif 'invalid email' in error_str:
return {
'success': False,
'message': 'Please enter a valid email address.'
}
# More specific check for Supabase password policy errors
elif 'password' in error_str and ('weak' in error_str or 'policy' in error_str or 'requirement' in error_str):
return {
'success': False,
'message': 'Password does not meet requirements. Please use at least 8 characters.'
}
else:
# For other errors, provide a more generic message to the user but log the details
return {
'success': False,
'message': 'Registration failed. Please check your information and try again.'
}
def login_user(email: str, password: str, remember_me: bool = False) -> dict:
"""
Authenticate and login a user.
Args:
email (str): User email
password (str): User password
remember_me (bool): Remember me flag for extended session
Returns:
dict: Login result with token and user data or error message
"""
try:
# Authenticate user with Supabase
response = authenticate_user(current_app.supabase, email, password)
if response.user:
# Check if email is confirmed
if not response.user.email_confirmed_at:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
# Set token expiration based on remember me flag
if remember_me:
# Extended token expiration (7 days)
expires_delta = timedelta(days=7)
token_type = "remember"
else:
# Standard token expiration (1 hour)
expires_delta = timedelta(hours=1)
token_type = "session"
# Create JWT token with proper expiration and claims
access_token = create_access_token(
identity=response.user.id,
additional_claims={
'email': response.user.email,
'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None,
'remember_me': remember_me,
'token_type': token_type
},
expires_delta=expires_delta
)
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return {
'success': True,
'token': access_token,
'user': user.to_dict(),
'rememberMe': remember_me,
'expiresAt': (datetime.now() + expires_delta).isoformat(),
'tokenType': token_type
}
else:
return {
'success': False,
'message': 'Invalid email or password. Please check your credentials and try again.'
}
except Exception as e:
current_app.logger.error(f"Login error: {str(e)}")
# Provide more specific error messages
error_str = str(e).lower()
if 'invalid credentials' in error_str or 'unauthorized' in error_str:
return {
'success': False,
'message': 'Password/email Incorrect'
}
elif 'email not confirmed' in error_str or 'email not verified' in error_str:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
elif 'user not found' in error_str:
return {
'success': False,
'message': 'No account found with this email. Please check your email or register for a new account.'
}
else:
error_str = str(e).lower()
if 'invalid credentials' in error_str or 'unauthorized' in error_str:
return {
'success': False,
'message': 'Password/email Incorrect'
}
elif 'email not confirmed' in error_str or 'email not verified' in error_str:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
elif 'user not found' in error_str:
return {
'success': False,
'message': 'No account found with this email. Please check your email or register for a new account.'
}
else:
return {
'success': False,
'message': 'Password/email Incorrect'
}
def get_user_by_id(user_id: str) -> dict:
"""
Get user by ID.
Args:
user_id (str): User ID
Returns:
dict: User data or None if not found
"""
try:
# Get user from Supabase Auth
response = current_app.supabase.auth.get_user(user_id)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return user.to_dict()
else:
return None
except Exception:
return None
def request_password_reset(supabase: Client, email: str) -> dict:
"""
Request password reset for a user.
Args:
supabase (Client): Supabase client instance
email (str): User email
Returns:
dict: Password reset request result
"""
try:
# Request password reset
response = supabase.auth.reset_password_for_email(email)
return {
'success': True,
'message': 'Password reset instructions sent to your email. Please check your inbox.'
}
except Exception as e:
error_str = str(e).lower()
if 'user not found' in error_str:
# We don't want to reveal if a user exists or not for security reasons
# But we still return a success message to prevent user enumeration
return {
'success': True,
'message': 'If an account exists with this email, password reset instructions have been sent.'
}
else:
return {
'success': False,
'message': f'Failed to process password reset request: {str(e)}'
}
def reset_user_password(supabase: Client, token: str, new_password: str) -> dict:
"""
Reset user password with token.
Args:
supabase (Client): Supabase client instance
token (str): Password reset token (not directly used in Supabase v2)
new_password (str): New password
Returns:
dict: Password reset result
"""
try:
# In Supabase v2, we update the user's password directly
# The token verification is handled by Supabase when the user clicks the link
response = supabase.auth.update_user({
'password': new_password
})
if response.user:
return {
'success': True,
'message': 'Password reset successfully! You can now log in with your new password.'
}
else:
return {
'success': False,
'message': 'Failed to reset password. Please try again.'
}
except Exception as e:
error_str = str(e).lower()
if 'invalid token' in error_str or 'expired' in error_str:
return {
'success': False,
'message': 'Invalid or expired reset token. Please request a new password reset.'
}
elif 'password' in error_str:
return {
'success': False,
'message': 'Password does not meet requirements. Please use at least 8 characters.'
}
else:
return {
'success': False,
'message': f'Failed to reset password: {str(e)}'
} |