File size: 2,547 Bytes
bafcf39
c9e23cb
bafcf39
 
 
 
 
 
 
 
cb349ad
bafcf39
 
c9e23cb
 
bafcf39
 
 
c9e23cb
 
 
 
bafcf39
 
 
 
 
 
 
 
bc22fc4
 
 
 
 
 
 
c9e23cb
bc22fc4
 
 
 
 
bafcf39
 
 
bc22fc4
c9e23cb
 
 
bc22fc4
bf7bb79
bafcf39
eafaaed
bafcf39
eafaaed
bafcf39
 
eafaaed
bafcf39
eafaaed
c9e23cb
eafaaed
 
bafcf39
 
 
 
 
 
 
eafaaed
bc22fc4
 
bafcf39
bc22fc4
 
 
 
 
 
 
 
 
52c1a90
 
 
bafcf39
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# import os
import base64
import hashlib

# import gradio as gr
import hmac

import boto3

from tools.config import AWS_CLIENT_ID, AWS_CLIENT_SECRET, AWS_REGION, AWS_USER_POOL_ID


def calculate_secret_hash(client_id: str, client_secret: str, username: str):
    message = username + client_id
    dig = hmac.new(
        str(client_secret).encode("utf-8"),
        msg=str(message).encode("utf-8"),
        digestmod=hashlib.sha256,
    ).digest()
    secret_hash = base64.b64encode(dig).decode()
    return secret_hash


def authenticate_user(
    username: str,
    password: str,
    user_pool_id: str = AWS_USER_POOL_ID,
    client_id: str = AWS_CLIENT_ID,
    client_secret: str = AWS_CLIENT_SECRET,
):
    """Authenticates a user against an AWS Cognito user pool.

    Args:
        user_pool_id (str): The ID of the Cognito user pool.
        client_id (str): The ID of the Cognito user pool client.
        username (str): The username of the user.
        password (str): The password of the user.
        client_secret (str): The client secret of the app client

    Returns:
        bool: True if the user is authenticated, False otherwise.
    """

    client = boto3.client(
        "cognito-idp", region_name=AWS_REGION
    )  # Cognito Identity Provider client

    # Compute the secret hash
    secret_hash = calculate_secret_hash(client_id, client_secret, username)

    try:

        if client_secret == "":
            response = client.initiate_auth(
                AuthFlow="USER_PASSWORD_AUTH",
                AuthParameters={
                    "USERNAME": username,
                    "PASSWORD": password,
                },
                ClientId=client_id,
            )

        else:
            response = client.initiate_auth(
                AuthFlow="USER_PASSWORD_AUTH",
                AuthParameters={
                    "USERNAME": username,
                    "PASSWORD": password,
                    "SECRET_HASH": secret_hash,
                },
                ClientId=client_id,
            )

        # If successful, you'll receive an AuthenticationResult in the response
        if response.get("AuthenticationResult"):
            return True
        else:
            return False

    except client.exceptions.NotAuthorizedException:
        return False
    except client.exceptions.UserNotFoundException:
        return False
    except Exception as e:
        out_message = f"An error occurred: {e}"
        print(out_message)
        raise Exception(out_message)
        return False