Lin / backend /services /auth_service.py
Zelyanoth's picture
fff
25f22bf
raw
history blame
5.07 kB
from flask import current_app, request
from flask_jwt_extended import create_access_token, get_jwt
import bcrypt
from datetime import datetime, timedelta
from models.user import User
from utils.database import authenticate_user, create_user
def register_user(email: str, password: str) -> dict:
"""
Register a new user.
Args:
email (str): User email
password (str): User password
Returns:
dict: Registration result with user data or error message
"""
try:
# Check if user already exists
# In Supabase, we'll try to create the user directly
response = create_user(current_app.supabase, email, password)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at
})
return {
'success': True,
'message': 'User registered successfully. Please check your email for confirmation.',
'user': user.to_dict()
}
else:
return {
'success': False,
'message': 'Failed to register user'
}
except Exception as e:
# Check if it's a duplicate user error
if 'already registered' in str(e).lower():
return {
'success': False,
'message': 'User with this email already exists'
}
else:
return {
'success': False,
'message': f'Registration failed: {str(e)}'
}
def login_user(email: str, password: str, remember_me: bool = False) -> dict:
"""
Authenticate and login a user.
Args:
email (str): User email
password (str): User password
remember_me (bool): Remember me flag for extended session
Returns:
dict: Login result with token and user data or error message
"""
try:
# Authenticate user with Supabase
response = authenticate_user(current_app.supabase, email, password)
if response.user:
# Check if email is confirmed
if not response.user.email_confirmed_at:
return {
'success': False,
'message': 'Please confirm your email before logging in'
}
# Set token expiration based on remember me flag
if remember_me:
# Extended token expiration (7 days)
expires_delta = timedelta(days=7)
token_type = "remember"
else:
# Standard token expiration (1 hour)
expires_delta = timedelta(hours=1)
token_type = "session"
# Create JWT token with proper expiration and claims
access_token = create_access_token(
identity=response.user.id,
additional_claims={
'email': response.user.email,
'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None,
'remember_me': remember_me,
'token_type': token_type
},
expires_delta=expires_delta
)
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return {
'success': True,
'token': access_token,
'user': user.to_dict(),
'rememberMe': remember_me,
'expiresAt': (datetime.now() + expires_delta).isoformat(),
'tokenType': token_type
}
else:
return {
'success': False,
'message': 'Invalid email or password'
}
except Exception as e:
current_app.logger.error(f"Login error: {str(e)}")
return {
'success': False,
'message': f'Login failed: {str(e)}'
}
def get_user_by_id(user_id: str) -> dict:
"""
Get user by ID.
Args:
user_id (str): User ID
Returns:
dict: User data or None if not found
"""
try:
# Get user from Supabase Auth
response = current_app.supabase.auth.get_user(user_id)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return user.to_dict()
else:
return None
except Exception:
return None