|
from flask import current_app, request |
|
from flask_jwt_extended import create_access_token, get_jwt |
|
import bcrypt |
|
from datetime import datetime, timedelta |
|
from supabase import Client |
|
from backend.models.user import User |
|
from backend.utils.database import authenticate_user, create_user |
|
|
|
def register_user(email: str, password: str) -> dict: |
|
""" |
|
Register a new user. |
|
|
|
Args: |
|
email (str): User email |
|
password (str): User password |
|
|
|
Returns: |
|
dict: Registration result with user data or error message |
|
""" |
|
try: |
|
|
|
|
|
response = create_user(current_app.supabase, email, password) |
|
|
|
if response.user: |
|
user = User.from_dict({ |
|
'id': response.user.id, |
|
'email': response.user.email, |
|
'created_at': response.user.created_at, |
|
'email_confirmed_at': response.user.email_confirmed_at |
|
}) |
|
|
|
|
|
if response.user.email_confirmed_at: |
|
|
|
return { |
|
'success': True, |
|
'message': 'Account created successfully! You can now log in with your email and password.', |
|
'user': user.to_dict(), |
|
'email_confirmed': True |
|
} |
|
else: |
|
|
|
return { |
|
'success': True, |
|
'message': 'Check your mail to confirm your account', |
|
'user': user.to_dict(), |
|
'email_confirmed': False, |
|
'requires_confirmation': True |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': 'Failed to register user' |
|
} |
|
except Exception as e: |
|
|
|
current_app.logger.error(f"Registration error for email {email}: {str(e)}") |
|
|
|
|
|
error_str = str(e).lower() |
|
if 'already registered' in error_str or 'already exists' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'An account with this email already exists. Please login instead or use a different email.' |
|
} |
|
elif 'invalid email' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'Please enter a valid email address.' |
|
} |
|
|
|
elif 'password' in error_str and ('weak' in error_str or 'policy' in error_str or 'requirement' in error_str): |
|
return { |
|
'success': False, |
|
'message': 'Password does not meet requirements. Please use at least 8 characters.' |
|
} |
|
else: |
|
|
|
return { |
|
'success': False, |
|
'message': 'Registration failed. Please check your information and try again.' |
|
} |
|
|
|
def login_user(email: str, password: str, remember_me: bool = False) -> dict: |
|
""" |
|
Authenticate and login a user. |
|
|
|
Args: |
|
email (str): User email |
|
password (str): User password |
|
remember_me (bool): Remember me flag for extended session |
|
|
|
Returns: |
|
dict: Login result with token and user data or error message |
|
""" |
|
try: |
|
|
|
response = authenticate_user(current_app.supabase, email, password) |
|
|
|
if response.user: |
|
|
|
if not response.user.email_confirmed_at: |
|
return { |
|
'success': False, |
|
'message': 'Check your mail to confirm your account', |
|
'requires_confirmation': True |
|
} |
|
|
|
|
|
if remember_me: |
|
|
|
expires_delta = timedelta(days=7) |
|
token_type = "remember" |
|
else: |
|
|
|
expires_delta = timedelta(hours=1) |
|
token_type = "session" |
|
|
|
|
|
access_token = create_access_token( |
|
identity=response.user.id, |
|
additional_claims={ |
|
'email': response.user.email, |
|
'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None, |
|
'remember_me': remember_me, |
|
'token_type': token_type |
|
}, |
|
expires_delta=expires_delta |
|
) |
|
|
|
user = User.from_dict({ |
|
'id': response.user.id, |
|
'email': response.user.email, |
|
'created_at': response.user.created_at, |
|
'email_confirmed_at': response.user.email_confirmed_at |
|
}) |
|
|
|
return { |
|
'success': True, |
|
'token': access_token, |
|
'user': user.to_dict(), |
|
'rememberMe': remember_me, |
|
'expiresAt': (datetime.now() + expires_delta).isoformat(), |
|
'tokenType': token_type |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': 'Invalid email or password. Please check your credentials and try again.' |
|
} |
|
except Exception as e: |
|
current_app.logger.error(f"Login error: {str(e)}") |
|
|
|
|
|
error_str = str(e).lower() |
|
if 'invalid credentials' in error_str or 'unauthorized' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'Password/email Incorrect' |
|
} |
|
elif 'email not confirmed' in error_str or 'email not verified' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'Check your mail to confirm your account', |
|
'requires_confirmation': True |
|
} |
|
elif 'user not found' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'No account found with this email. Please check your email or register for a new account.' |
|
} |
|
else: |
|
error_str = str(e).lower() |
|
if 'invalid credentials' in error_str or 'unauthorized' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'Password/email Incorrect' |
|
} |
|
elif 'email not confirmed' in error_str or 'email not verified' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'Check your mail to confirm your account', |
|
'requires_confirmation': True |
|
} |
|
elif 'user not found' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'No account found with this email. Please check your email or register for a new account.' |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': 'Password/email Incorrect' |
|
} |
|
|
|
def get_user_by_id(user_id: str) -> dict: |
|
""" |
|
Get user by ID. |
|
|
|
Args: |
|
user_id (str): User ID |
|
|
|
Returns: |
|
dict: User data or None if not found |
|
""" |
|
try: |
|
|
|
response = current_app.supabase.auth.get_user(user_id) |
|
|
|
if response.user: |
|
user = User.from_dict({ |
|
'id': response.user.id, |
|
'email': response.user.email, |
|
'created_at': response.user.created_at, |
|
'email_confirmed_at': response.user.email_confirmed_at |
|
}) |
|
return user.to_dict() |
|
else: |
|
return None |
|
except Exception: |
|
return None |
|
|
|
|
|
def request_password_reset(supabase: Client, email: str) -> dict: |
|
""" |
|
Request password reset for a user. |
|
|
|
Args: |
|
supabase (Client): Supabase client instance |
|
email (str): User email |
|
|
|
Returns: |
|
dict: Password reset request result |
|
""" |
|
try: |
|
|
|
response = supabase.auth.reset_password_for_email(email) |
|
|
|
return { |
|
'success': True, |
|
'message': 'Password reset instructions sent to your email. Please check your inbox.' |
|
} |
|
except Exception as e: |
|
error_str = str(e).lower() |
|
if 'user not found' in error_str: |
|
|
|
|
|
return { |
|
'success': True, |
|
'message': 'If an account exists with this email, password reset instructions have been sent.' |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': f'Failed to process password reset request: {str(e)}' |
|
} |
|
|
|
|
|
def reset_user_password(supabase: Client, token: str, new_password: str) -> dict: |
|
""" |
|
Reset user password with token. |
|
|
|
Args: |
|
supabase (Client): Supabase client instance |
|
token (str): Password reset token (not directly used in Supabase v2) |
|
new_password (str): New password |
|
|
|
Returns: |
|
dict: Password reset result |
|
""" |
|
try: |
|
|
|
|
|
response = supabase.auth.update_user({ |
|
'password': new_password |
|
}) |
|
|
|
if response.user: |
|
return { |
|
'success': True, |
|
'message': 'Password reset successfully! You can now log in with your new password.' |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': 'Failed to reset password. Please try again.' |
|
} |
|
except Exception as e: |
|
error_str = str(e).lower() |
|
if 'invalid token' in error_str or 'expired' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'Invalid or expired reset token. Please request a new password reset.' |
|
} |
|
elif 'password' in error_str: |
|
return { |
|
'success': False, |
|
'message': 'Password does not meet requirements. Please use at least 8 characters.' |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': f'Failed to reset password: {str(e)}' |
|
} |