File size: 4,464 Bytes
69ab315
 
 
 
 
 
 
 
65eb9f5
b8a8faa
69ab315
 
 
 
 
b8a8faa
69ab315
 
 
 
 
 
65eb9f5
 
 
69ab315
b8a8faa
69ab315
 
 
65eb9f5
69ab315
65eb9f5
 
b8a8faa
69ab315
 
 
 
 
 
 
65eb9f5
69ab315
 
 
65eb9f5
 
69ab315
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65eb9f5
 
 
 
 
 
 
 
 
69ab315
 
 
 
65eb9f5
 
69ab315
65eb9f5
 
 
 
 
 
 
 
 
 
 
69ab315
 
 
 
 
 
 
 
 
 
 
 
65eb9f5
69ab315
 
 
 
 
65eb9f5
 
69ab315
 
 
 
 
 
 
 
 
 
 
65eb9f5
 
69ab315
 
65eb9f5
69ab315
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from datetime import timedelta, datetime
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from starlette import status
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
import uuid
import os 
router = APIRouter(
    prefix='/v1/auth',
    tags=['auth']
)

SECRET_KEY = os.environ.get("PASSWORD")
ALGORITHM = 'HS256'

bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_bearer = OAuth2PasswordBearer(tokenUrl='/v1/auth/token')

hardcoded_users = [
    {   "id": str(uuid.uuid4()), 
        "first_name": "Ibraaheem",
        "last_name": "Akbar",
        "username": "test",
        "password_hash": bcrypt_context.hash(os.environ.get("USER_HASH")),
        "role": "user"
    },

    {   "id": str(uuid.uuid4()), 
        "username": "admin",
        "first_name": "John",
        "last_name": "Doe",
        "password_hash": bcrypt_context.hash(os.environ.get("ADMIN_HASH")),
        "role": "admin"
    },
  
  
]



class CreateUserRequest(BaseModel):
    username: str
    password: str
    first_name: str
    last_name: str

class Token(BaseModel):
    access_token: str
    token_type: str

def authenticate_user(username: str, password: str, role: str):
    for user in hardcoded_users:
        if user["username"] == username:
            stored_password_hash = user.get("password_hash")
            stored_role = user.get("role")
            if (
                stored_password_hash
                and stored_role
                and bcrypt_context.verify(password, stored_password_hash)
            ):
                # Include 'first_name' and 'last_name' in the user dictionary
                user_data = {
                    "username": username,
                    "id": user["id"],
                    "role": stored_role,
                    "first_name": user.get("first_name", ""),
                    "last_name": user.get("last_name", ""),
                }
                return user_data
    return None




@router.post("/create_user", status_code=status.HTTP_201_CREATED)
async def create_user(create_user_request: CreateUserRequest):
    user_id = str(uuid.uuid4())
    user_data = {
        "id": user_id,
        "first_name": create_user_request.first_name,
        "last_name": create_user_request.last_name,
        "username": create_user_request.username,
        "password_hash": bcrypt_context.hash(create_user_request.password),
        "role": "user"
    }
    hardcoded_users.append(user_data)
    return {"message": "User created successfully"}

@router.post("/token", response_model=Token)
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
    user = authenticate_user(form_data.username, form_data.password, role="user")
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate user.",
        )

    token = create_access_token(user["username"], user["id"], user["role"], user["first_name"], user["last_name"], timedelta(minutes=10080))

    return Token(access_token=token, token_type="bearer")



def create_access_token(username: str, user_id: int, role: str, first_name: str, last_name: str, expires_delta: timedelta):
    encode = {'sub': username, 'id': user_id, 'role': role, 'first_name': first_name, 'last_name': last_name}
    expires = datetime.utcnow() + expires_delta
    encode.update({'exp': expires})
    return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(token: Annotated[str, Depends(oauth2_bearer)]):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
        username: str = payload.get('sub')
        user_id: int = payload.get('id')
        role: str = payload.get('role')  # Add this line to get the role
        first_name: str = payload.get('first_name')
        last_name: str = payload.get('last_name')
        if username is None or user_id is None:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Could not validate user.')
        return {'username': username, 'id': user_id, 'role': role, 'first_name': first_name, 'last_name': last_name}
    except JWTError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user.")