from pymongo import MongoClient from datetime import datetime, timedelta from typing import Dict, Union def get_current_date() -> str: # Get the current date and return it in YYYY-MM-DD format return datetime.now().date().isoformat() def get_another_date(day: int) -> str: # Get the current date and add specified days to it tomorrow = datetime.now() + timedelta(days=day) return tomorrow.date().isoformat() def check_date_streak(start_date: str, end_date: str) -> bool: # Convert ISO strings to datetime objects start_date = datetime.fromisoformat(start_date) end_date = datetime.fromisoformat(end_date) # Compare the dates to check if they are consecutive (1 day apart) return end_date - start_date == timedelta(days=1) def streaks_manager(db_uri: str, document: Dict) -> Union[bool, str]: """ Manage user streaks in MongoDB. Args: db_uri: MongoDB connection string document: Dictionary containing user data with 'user_id' key Returns: Union[bool, str]: True if successful, "User Already Exists" if streak is reset """ client = MongoClient(db_uri) try: db = client["crayonics"] collection = db["Streaks"] # Check for existing user found_user = collection.find_one({"user_id": document.get('user_id')}) current_date = get_current_date() document['streak_dates'] = [current_date] if found_user is None: # New user case collection.insert_one(document) return True else: is_a_streak = check_date_streak( start_date=found_user['streak_dates'][-1], end_date=current_date ) if is_a_streak: print("its a streak guys") # Extend existing streak dates = found_user["streak_dates"] + [current_date] collection.update_one( {"user_id": document.get("user_id")}, {"$set": {"streak_dates": dates}} ) return True else: # Reset streak if not consecutive collection.find_one_and_replace( {"user_id": document.get('user_id')}, document ) return "User Already Exists" except Exception as e: print(f"Error: {str(e)}") raise finally: client.close()