resume-api / controller /tokenManagement.py
Nattyboi's picture
updated codebase to make it faster
bbd997e
import datetime
from bson import ObjectId
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=5)
from controller.streaksManagement import streaks_manager
from pymongo import MongoClient
from cryptography.fernet import Fernet
from dotenv import load_dotenv
import redis
import os
load_dotenv()
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
FERNET_SECRET_KEY = os.getenv('FERNET_SECRET_KEY')
REDIS_USERNAME=os.getenv('REDIS_USERNAME')
fernet = Fernet(FERNET_SECRET_KEY)
r = redis.StrictRedis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
username=REDIS_USERNAME,
db=0,
decode_responses=True
)
# Function to store an access token in Redis with an expiration time
def store_access_token(user_id: str, access_token: str, expiration_time_seconds: int = 4400):
"""Store the access token in Redis after encrypting it."""
# Store the encrypted token in Redis with expiration time
r.setex(f"{user_id}.{access_token}", expiration_time_seconds,value=access_token)
# Function to check if an access token is valid
def check_cache_for_accesstokens(user_id: str, access_token: str):
"""Check if the access token is valid."""
token_value = r.get(f"{user_id}.{access_token}")
if token_value:
return True
else:
return False
def isexpired(previous_date):
# Get the current date and time
current_date =datetime.datetime.now()
# Convert the previous date (which is a string) to a datetime object
# Compare the two dates
if current_date > previous_date:
return True
else:
return False
def create_accessToken(db_uri: str, user_id: str, refresh_token: str) -> str:
from pymongo import MongoClient
current_time = datetime.datetime.now()
expire_at = current_time + datetime.timedelta(minutes=130)
"""
Inserts a new document into the specified MongoDB collection.
Parameters:
db_uri (str): MongoDB connection URI.
db_name (str): Name of the database.
collection_name (str): Name of the collection.
document (dict): The document to insert.
Returns:
str: The ID of the inserted document.
"""
client = MongoClient(db_uri)
db = client["crayonics"]
collection = db["AccessToken"]
collection.find_one_and_delete({"refresh_token":refresh_token})
result = collection.insert_one({"user_id":user_id,"refresh_token":refresh_token,"current_time":current_time,"expire_at":expire_at})
store_access_token(user_id=user_id,access_token=str(result.inserted_id))
client.close()
return str(result.inserted_id)
def create_refreshToken(db_uri: str, user_id: str) -> str:
from pymongo import MongoClient
current_time = datetime.datetime.now()
expire_at = current_time + datetime.timedelta(days=30)
"""
Inserts a new document into the specified MongoDB collection.
Parameters:
db_uri (str): MongoDB connection URI.
user_id (str): id of user .
Returns:
str: The ID of the inserted document.
"""
client = MongoClient(db_uri)
db = client["crayonics"]
collection = db["RefreshToken"]
result = collection.insert_one({"user_id":user_id,"current_time":current_time,"expire_at":expire_at,"previous_access_token":"None"})
streaks_doc={}
streaks_doc['user_id'] = str(user_id)
# executor.submit(streaks_manager,db_uri=db_uri,document=streaks_doc)
streaks_manager(db_uri=db_uri,document=streaks_doc)
client.close()
return str(result.inserted_id)
# Close the connection
def update_refreshTokenWithPreviouslyUsedAccessToken(db_uri: str, refresh_token: str,access_token:str) -> bool:
from pymongo import MongoClient
"""
"""
# Connect to MongoDB
client = MongoClient(db_uri)
db = client["crayonics"]
collection = db["RefreshToken"]
# Insert the document
try:
collection.update_one(
{"_id":ObjectId(oid=refresh_token) }, # Filter (find the document by user_id)
{"$set": {"previous_access_token": access_token}} # Add or update the field
)
client.close()
return True
except:
return False
def verify_access_token(db_uri: str, user_id: str, access_token: str) -> bool:
is_valid = check_cache_for_accesstokens(user_id=user_id,access_token=access_token)
if is_valid==True:
streaks_doc={}
streaks_doc['user_id'] = str(user_id)
executor.submit(streaks_manager,db_uri=db_uri,document=streaks_doc)
return True
# Connect to MongoDB
else:
return False
def verify_refresh_access_token(db_uri: str, user_id: str, access_token: str,refresh_token:str) -> bool:
current_time = datetime.datetime.now()
"""
"""
# Connect to MongoDB
client = MongoClient(db_uri)
db = client["crayonics"]
collection = db["RefreshToken"]
docs = collection.find({"_id":ObjectId(refresh_token),"user_id":user_id,"previous_access_token":access_token})
for doc in docs:
if doc==None:
return False
else:
if str(doc['previous_access_token']) == access_token:
streaks_doc={}
streaks_doc['user_id'] = str(user_id)
# executor.submit(streaks_manager,db_uri=db_uri,document=streaks_doc)
streaks_manager(db_uri=db_uri,document=streaks_doc)
return True
else:
streaks_doc={}
streaks_doc['user_id'] = str(user_id)
# executor.submit(streaks_manager,db_uri=db_uri,document=streaks_doc)
streaks_manager(db_uri=db_uri,document=streaks_doc)
pass
return False
def logout_func(db_uri: str, refresh_token: str) -> str:
from pymongo import MongoClient
current_time = datetime.datetime.now()
expire_at = current_time + datetime.timedelta(days=30)
"""
Inserts a new document into the specified MongoDB collection.
Parameters:
db_uri (str): MongoDB connection URI.
user_id (str): id of user .
Returns:
str: The ID of the inserted document.
"""
# Connect to MongoDB
client = MongoClient(db_uri)
db = client["crayonics"]
collection = db["RefreshToken"]
# Insert the document
result = collection.find_one_and_delete(filter={"_id":ObjectId(refresh_token)})
print(result)
if result==None:
return result
return True
# Close the connection