|
from flask import current_app, request |
|
from flask_jwt_extended import create_access_token, get_jwt |
|
import bcrypt |
|
from datetime import datetime, timedelta |
|
from models.user import User |
|
from utils.database import authenticate_user, create_user |
|
|
|
def register_user(email: str, password: str) -> dict: |
|
""" |
|
Register a new user. |
|
|
|
Args: |
|
email (str): User email |
|
password (str): User password |
|
|
|
Returns: |
|
dict: Registration result with user data or error message |
|
""" |
|
try: |
|
|
|
|
|
response = create_user(current_app.supabase, email, password) |
|
|
|
if response.user: |
|
user = User.from_dict({ |
|
'id': response.user.id, |
|
'email': response.user.email, |
|
'created_at': response.user.created_at |
|
}) |
|
|
|
return { |
|
'success': True, |
|
'message': 'User registered successfully. Please check your email for confirmation.', |
|
'user': user.to_dict() |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': 'Failed to register user' |
|
} |
|
except Exception as e: |
|
|
|
if 'already registered' in str(e).lower(): |
|
return { |
|
'success': False, |
|
'message': 'User with this email already exists' |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': f'Registration failed: {str(e)}' |
|
} |
|
|
|
def login_user(email: str, password: str, remember_me: bool = False) -> dict: |
|
""" |
|
Authenticate and login a user. |
|
|
|
Args: |
|
email (str): User email |
|
password (str): User password |
|
remember_me (bool): Remember me flag for extended session |
|
|
|
Returns: |
|
dict: Login result with token and user data or error message |
|
""" |
|
try: |
|
|
|
response = authenticate_user(current_app.supabase, email, password) |
|
|
|
if response.user: |
|
|
|
if not response.user.email_confirmed_at: |
|
return { |
|
'success': False, |
|
'message': 'Please confirm your email before logging in' |
|
} |
|
|
|
|
|
if remember_me: |
|
|
|
expires_delta = timedelta(days=7) |
|
token_type = "remember" |
|
else: |
|
|
|
expires_delta = timedelta(hours=1) |
|
token_type = "session" |
|
|
|
|
|
access_token = create_access_token( |
|
identity=response.user.id, |
|
additional_claims={ |
|
'email': response.user.email, |
|
'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None, |
|
'remember_me': remember_me, |
|
'token_type': token_type |
|
}, |
|
expires_delta=expires_delta |
|
) |
|
|
|
user = User.from_dict({ |
|
'id': response.user.id, |
|
'email': response.user.email, |
|
'created_at': response.user.created_at, |
|
'email_confirmed_at': response.user.email_confirmed_at |
|
}) |
|
|
|
return { |
|
'success': True, |
|
'token': access_token, |
|
'user': user.to_dict(), |
|
'rememberMe': remember_me, |
|
'expiresAt': (datetime.now() + expires_delta).isoformat(), |
|
'tokenType': token_type |
|
} |
|
else: |
|
return { |
|
'success': False, |
|
'message': 'Invalid email or password' |
|
} |
|
except Exception as e: |
|
current_app.logger.error(f"Login error: {str(e)}") |
|
return { |
|
'success': False, |
|
'message': f'Login failed: {str(e)}' |
|
} |
|
|
|
def get_user_by_id(user_id: str) -> dict: |
|
""" |
|
Get user by ID. |
|
|
|
Args: |
|
user_id (str): User ID |
|
|
|
Returns: |
|
dict: User data or None if not found |
|
""" |
|
try: |
|
|
|
response = current_app.supabase.auth.get_user(user_id) |
|
|
|
if response.user: |
|
user = User.from_dict({ |
|
'id': response.user.id, |
|
'email': response.user.email, |
|
'created_at': response.user.created_at, |
|
'email_confirmed_at': response.user.email_confirmed_at |
|
}) |
|
return user.to_dict() |
|
else: |
|
return None |
|
except Exception: |
|
return None |