resume-api / streaksManagement.py
Nattyboi's picture
fixed streaks issue
335b256
raw
history blame
4.3 kB
from pymongo import MongoClient
from datetime import datetime, timedelta
from typing import Dict, List, Union
def get_current_date() -> str:
# Get the current date and return it in YYYY-MM-DD format
return datetime.now().date().isoformat()
def get_another_date(day: int) -> str:
# Get the current date and add specified days to it
tomorrow = datetime.now() + timedelta(days=day)
return tomorrow.date().isoformat()
def check_date_streak(start_date: str, end_date: str) -> bool:
# Convert ISO strings to datetime objects
start_date = datetime.fromisoformat(start_date)
end_date = datetime.fromisoformat(end_date)
# Compare the dates to check if they are consecutive (1 day apart)
if end_date - start_date == timedelta(days=0) or end_date - start_date == timedelta(days=1):
return True
else:
return False
def save_in_streaks_history(db_uri,document:dict={"user_id":"67c9b68678fbf39f4ed94e01","streaks_date":["2025-03-07"]}):
client = MongoClient(db_uri)
document= document.pop("_id",None)
try:
db = client["crayonics"]
collection = db["StreaksHistory"]
# Check for existing user
found_user = collection.find_one({"user_id": document.get('user_id')})
current_date = get_current_date()
if found_user is None:
# New user case
print("added new streak reset record")
document["streaks_records"] = {"date_of_streaks_reset":current_date,"streaks_dates":document["streaks_date"]}
document.pop("streaks_date")
collection.insert_one(document)
return True
else:
print("added another streak reset record")
current_record={"date_of_streaks_reset":current_date,"streaks_dates":document["streaks_date"]}
dates = found_user["streaks_records"] + [current_record]
collection.update_one(
{"user_id": document.get("user_id")},
{"$set": {"streaks_records": dates}}
)
return True
except Exception as e:
print(f"Error in creating a streaks reset record: {str(e)}")
return False
finally:
client.close()
def streaks_manager(db_uri: str, document: Dict) -> Union[bool, str]:
"""
Manage user streaks in MongoDB.
Args:
db_uri: MongoDB connection string
document: Dictionary containing user data with 'user_id' key
Returns:
Union[bool, str]: True if successful, "User Already Exists" if streak is reset
"""
client = MongoClient(db_uri)
try:
db = client["crayonics"]
collection = db["Streaks"]
# Check for existing user
found_user = collection.find_one({"user_id": document.get('user_id')})
current_date = get_current_date()
document['streak_dates'] = [current_date]
if found_user is None:
# New user case
collection.insert_one(document)
return True
else:
is_a_streak = check_date_streak(
start_date=found_user['streak_dates'][-1],
end_date=current_date
)
if is_a_streak:
print("its a streak guys")
# Extend existing streak
if not (current_date== found_user["streak_dates"][-1]):
dates = found_user["streak_dates"] + [current_date]
else:
dates=found_user["streak_dates"]
collection.update_one(
{"user_id": document.get("user_id")},
{"$set": {"streak_dates": dates}}
)
return True
else:
print(found_user,type(found_user))
# save_in_streaks_history(db_uri=db_uri,document=found_user)
# # Reset streak if not consecutive
# collection.find_one_and_replace(
# {"user_id": document.get('user_id')},
# document
# )
return "User Already Exists"
except Exception as e:
print(f"Error: {str(e)}")
raise
finally:
client.close()