from flask import current_app, request from flask_jwt_extended import create_access_token, get_jwt import bcrypt from datetime import datetime, timedelta from supabase import Client from backend.models.user import User from backend.utils.database import authenticate_user, create_user def register_user(email: str, password: str) -> dict: """ Register a new user. Args: email (str): User email password (str): User password Returns: dict: Registration result with user data or error message """ try: # Check if user already exists # In Supabase, we'll try to create the user directly response = create_user(current_app.supabase, email, password) if response.user: user = User.from_dict({ 'id': response.user.id, 'email': response.user.email, 'created_at': response.user.created_at, 'email_confirmed_at': response.user.email_confirmed_at }) # Check if email is confirmed if response.user.email_confirmed_at: # Email is confirmed, user can login immediately return { 'success': True, 'message': 'Account created successfully! You can now log in with your email and password.', 'user': user.to_dict(), 'email_confirmed': True } else: # Email confirmation is required return { 'success': True, 'message': 'Check your mail to confirm your account', 'user': user.to_dict(), 'email_confirmed': False, 'requires_confirmation': True } else: return { 'success': False, 'message': 'Failed to register user' } except Exception as e: # Log the full error for debugging current_app.logger.error(f"Registration error for email {email}: {str(e)}") # Check if it's a duplicate user error error_str = str(e).lower() if 'already registered' in error_str or 'already exists' in error_str: return { 'success': False, 'message': 'An account with this email already exists. Please login instead or use a different email.' } elif 'invalid email' in error_str: return { 'success': False, 'message': 'Please enter a valid email address.' } # More specific check for Supabase password policy errors elif 'password' in error_str and ('weak' in error_str or 'policy' in error_str or 'requirement' in error_str): return { 'success': False, 'message': 'Password does not meet requirements. Please use at least 8 characters.' } else: # For other errors, provide a more generic message to the user but log the details return { 'success': False, 'message': 'Registration failed. Please check your information and try again.' } def login_user(email: str, password: str, remember_me: bool = False) -> dict: """ Authenticate and login a user. Args: email (str): User email password (str): User password remember_me (bool): Remember me flag for extended session Returns: dict: Login result with token and user data or error message """ try: # Authenticate user with Supabase response = authenticate_user(current_app.supabase, email, password) if response.user: # Check if email is confirmed if not response.user.email_confirmed_at: return { 'success': False, 'message': 'Check your mail to confirm your account', 'requires_confirmation': True } # Set token expiration based on remember me flag if remember_me: # Extended token expiration (7 days) expires_delta = timedelta(days=7) token_type = "remember" else: # Standard token expiration (1 hour) expires_delta = timedelta(hours=1) token_type = "session" # Create JWT token with proper expiration and claims access_token = create_access_token( identity=response.user.id, additional_claims={ 'email': response.user.email, 'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None, 'remember_me': remember_me, 'token_type': token_type }, expires_delta=expires_delta ) user = User.from_dict({ 'id': response.user.id, 'email': response.user.email, 'created_at': response.user.created_at, 'email_confirmed_at': response.user.email_confirmed_at }) return { 'success': True, 'token': access_token, 'user': user.to_dict(), 'rememberMe': remember_me, 'expiresAt': (datetime.now() + expires_delta).isoformat(), 'tokenType': token_type } else: return { 'success': False, 'message': 'Invalid email or password. Please check your credentials and try again.' } except Exception as e: current_app.logger.error(f"Login error: {str(e)}") # Provide more specific error messages error_str = str(e).lower() if 'invalid credentials' in error_str or 'unauthorized' in error_str: return { 'success': False, 'message': 'Password/email Incorrect' } elif 'email not confirmed' in error_str or 'email not verified' in error_str: return { 'success': False, 'message': 'Check your mail to confirm your account', 'requires_confirmation': True } elif 'user not found' in error_str: return { 'success': False, 'message': 'No account found with this email. Please check your email or register for a new account.' } else: error_str = str(e).lower() if 'invalid credentials' in error_str or 'unauthorized' in error_str: return { 'success': False, 'message': 'Password/email Incorrect' } elif 'email not confirmed' in error_str or 'email not verified' in error_str: return { 'success': False, 'message': 'Check your mail to confirm your account', 'requires_confirmation': True } elif 'user not found' in error_str: return { 'success': False, 'message': 'No account found with this email. Please check your email or register for a new account.' } else: return { 'success': False, 'message': 'Password/email Incorrect' } def get_user_by_id(user_id: str) -> dict: """ Get user by ID. Args: user_id (str): User ID Returns: dict: User data or None if not found """ try: # Get user from Supabase Auth response = current_app.supabase.auth.get_user(user_id) if response.user: user = User.from_dict({ 'id': response.user.id, 'email': response.user.email, 'created_at': response.user.created_at, 'email_confirmed_at': response.user.email_confirmed_at }) return user.to_dict() else: return None except Exception: return None def request_password_reset(supabase: Client, email: str) -> dict: """ Request password reset for a user. Args: supabase (Client): Supabase client instance email (str): User email Returns: dict: Password reset request result """ try: # Request password reset response = supabase.auth.reset_password_for_email(email) return { 'success': True, 'message': 'Password reset instructions sent to your email. Please check your inbox.' } except Exception as e: error_str = str(e).lower() if 'user not found' in error_str: # We don't want to reveal if a user exists or not for security reasons # But we still return a success message to prevent user enumeration return { 'success': True, 'message': 'If an account exists with this email, password reset instructions have been sent.' } else: return { 'success': False, 'message': f'Failed to process password reset request: {str(e)}' } def reset_user_password(supabase: Client, token: str, new_password: str) -> dict: """ Reset user password with token. Args: supabase (Client): Supabase client instance token (str): Password reset token (not directly used in Supabase v2) new_password (str): New password Returns: dict: Password reset result """ try: # In Supabase v2, we update the user's password directly # The token verification is handled by Supabase when the user clicks the link response = supabase.auth.update_user({ 'password': new_password }) if response.user: return { 'success': True, 'message': 'Password reset successfully! You can now log in with your new password.' } else: return { 'success': False, 'message': 'Failed to reset password. Please try again.' } except Exception as e: error_str = str(e).lower() if 'invalid token' in error_str or 'expired' in error_str: return { 'success': False, 'message': 'Invalid or expired reset token. Please request a new password reset.' } elif 'password' in error_str: return { 'success': False, 'message': 'Password does not meet requirements. Please use at least 8 characters.' } else: return { 'success': False, 'message': f'Failed to reset password: {str(e)}' }