Lin / backend /services /auth_service.py
Zelyanoth's picture
k
0cedd2e
from flask import current_app, request
from flask_jwt_extended import create_access_token, get_jwt
import bcrypt
from datetime import datetime, timedelta
from supabase import Client
from backend.models.user import User
from backend.utils.database import authenticate_user, create_user
def register_user(email: str, password: str) -> dict:
"""
Register a new user.
Args:
email (str): User email
password (str): User password
Returns:
dict: Registration result with user data or error message
"""
try:
# Check if user already exists
# In Supabase, we'll try to create the user directly
response = create_user(current_app.supabase, email, password)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
# Check if email is confirmed
if response.user.email_confirmed_at:
# Email is confirmed, user can login immediately
return {
'success': True,
'message': 'Account created successfully! You can now log in with your email and password.',
'user': user.to_dict(),
'email_confirmed': True
}
else:
# Email confirmation is required
return {
'success': True,
'message': 'Check your mail to confirm your account',
'user': user.to_dict(),
'email_confirmed': False,
'requires_confirmation': True
}
else:
return {
'success': False,
'message': 'Failed to register user'
}
except Exception as e:
# Log the full error for debugging
current_app.logger.error(f"Registration error for email {email}: {str(e)}")
# Check if it's a duplicate user error
error_str = str(e).lower()
if 'already registered' in error_str or 'already exists' in error_str:
return {
'success': False,
'message': 'An account with this email already exists. Please login instead or use a different email.'
}
elif 'invalid email' in error_str:
return {
'success': False,
'message': 'Please enter a valid email address.'
}
# More specific check for Supabase password policy errors
elif 'password' in error_str and ('weak' in error_str or 'policy' in error_str or 'requirement' in error_str):
return {
'success': False,
'message': 'Password does not meet requirements. Please use at least 8 characters.'
}
else:
# For other errors, provide a more generic message to the user but log the details
return {
'success': False,
'message': 'Registration failed. Please check your information and try again.'
}
def login_user(email: str, password: str, remember_me: bool = False) -> dict:
"""
Authenticate and login a user.
Args:
email (str): User email
password (str): User password
remember_me (bool): Remember me flag for extended session
Returns:
dict: Login result with token and user data or error message
"""
try:
# Authenticate user with Supabase
response = authenticate_user(current_app.supabase, email, password)
if response.user:
# Check if email is confirmed
if not response.user.email_confirmed_at:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
# Set token expiration based on remember me flag
if remember_me:
# Extended token expiration (7 days)
expires_delta = timedelta(days=7)
token_type = "remember"
else:
# Standard token expiration (1 hour)
expires_delta = timedelta(hours=1)
token_type = "session"
# Create JWT token with proper expiration and claims
access_token = create_access_token(
identity=response.user.id,
additional_claims={
'email': response.user.email,
'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None,
'remember_me': remember_me,
'token_type': token_type
},
expires_delta=expires_delta
)
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return {
'success': True,
'token': access_token,
'user': user.to_dict(),
'rememberMe': remember_me,
'expiresAt': (datetime.now() + expires_delta).isoformat(),
'tokenType': token_type
}
else:
return {
'success': False,
'message': 'Invalid email or password. Please check your credentials and try again.'
}
except Exception as e:
current_app.logger.error(f"Login error: {str(e)}")
# Provide more specific error messages
error_str = str(e).lower()
if 'invalid credentials' in error_str or 'unauthorized' in error_str:
return {
'success': False,
'message': 'Password/email Incorrect'
}
elif 'email not confirmed' in error_str or 'email not verified' in error_str:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
elif 'user not found' in error_str:
return {
'success': False,
'message': 'No account found with this email. Please check your email or register for a new account.'
}
else:
error_str = str(e).lower()
if 'invalid credentials' in error_str or 'unauthorized' in error_str:
return {
'success': False,
'message': 'Password/email Incorrect'
}
elif 'email not confirmed' in error_str or 'email not verified' in error_str:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
elif 'user not found' in error_str:
return {
'success': False,
'message': 'No account found with this email. Please check your email or register for a new account.'
}
else:
return {
'success': False,
'message': 'Password/email Incorrect'
}
def get_user_by_id(user_id: str) -> dict:
"""
Get user by ID.
Args:
user_id (str): User ID
Returns:
dict: User data or None if not found
"""
try:
# Get user from Supabase Auth
response = current_app.supabase.auth.get_user(user_id)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return user.to_dict()
else:
return None
except Exception:
return None
def request_password_reset(supabase: Client, email: str) -> dict:
"""
Request password reset for a user.
Args:
supabase (Client): Supabase client instance
email (str): User email
Returns:
dict: Password reset request result
"""
try:
# Request password reset
response = supabase.auth.reset_password_for_email(email)
return {
'success': True,
'message': 'Password reset instructions sent to your email. Please check your inbox.'
}
except Exception as e:
error_str = str(e).lower()
if 'user not found' in error_str:
# We don't want to reveal if a user exists or not for security reasons
# But we still return a success message to prevent user enumeration
return {
'success': True,
'message': 'If an account exists with this email, password reset instructions have been sent.'
}
else:
return {
'success': False,
'message': f'Failed to process password reset request: {str(e)}'
}
def reset_user_password(supabase: Client, token: str, new_password: str) -> dict:
"""
Reset user password with token.
Args:
supabase (Client): Supabase client instance
token (str): Password reset token (not directly used in Supabase v2)
new_password (str): New password
Returns:
dict: Password reset result
"""
try:
# In Supabase v2, we update the user's password directly
# The token verification is handled by Supabase when the user clicks the link
response = supabase.auth.update_user({
'password': new_password
})
if response.user:
return {
'success': True,
'message': 'Password reset successfully! You can now log in with your new password.'
}
else:
return {
'success': False,
'message': 'Failed to reset password. Please try again.'
}
except Exception as e:
error_str = str(e).lower()
if 'invalid token' in error_str or 'expired' in error_str:
return {
'success': False,
'message': 'Invalid or expired reset token. Please request a new password reset.'
}
elif 'password' in error_str:
return {
'success': False,
'message': 'Password does not meet requirements. Please use at least 8 characters.'
}
else:
return {
'success': False,
'message': f'Failed to reset password: {str(e)}'
}