from gamification.objects import UserFeedback, UserLevel, UserPoints,CustomerInfo,Customer from bson import ObjectId from pymongo import MongoClient import os from typing import List from dotenv import load_dotenv from datetime import datetime,timedelta load_dotenv() # Normal Math def caculate_rate_change_func(c0,c1,days_ago): if c0 == 0: if c1 > 0: # If there are customers now, but no customers initially (c0 is 0), # we consider it as infinite growth or 100% growth print("here") return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 99999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999, "GrowthRateType": "positive"} elif c1 == 0: # If both c0 and c1 are zero, there is no change return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 0.0, "GrowthRateType": "neutral"} else: # This case is for when c1 < 0, but it's unlikely in a customer count scenario. return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": -99999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999, "GrowthRateType": "negative"} elif c1 > c0: # Positive growth rate: c1 > c0 e = c1 - c0 b = e / c0 d = b * 100 return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": d, "GrowthRateType": "positive"} elif c1 < c0: # Negative growth rate: c1 < c0 e = c0 - c1 b = e / c0 d = b * 100 return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": d, "GrowthRateType": "negative"} elif c1 == c0: # No change: c1 == c0 return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 0.0, "GrowthRateType": "neutral"} MONGO_URI = os.getenv("MONGO_URI") # levels def create_level_func(document:UserLevel,)->bool: """ Creates a UserLevel document in the MongoDB collection. :param UserLevel: Actually accepts an Instance of UserLevel and deserializes it here :param maxPoints: Maximum Amount of points for this level. :param minPoints: Minimum Amount of points for this level. :param levelNumber: Number for this level. :param careerPath: Career Path Special for this level. """ db_uri = MONGO_URI db_name = "crayonics" collection_name="Level" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] should_proceed= (collection.find_one(document.model_dump())) # Insert the document if should_proceed==None: if document!=None: result = collection.insert_one(document.model_dump()) return True else: client.close() return False else: # The exact document already exists so it can't be created again return False def get_all_levels_func() -> List[UserLevel]: # MongoDB URI and configuration db_uri = MONGO_URI db_name = "crayonics" collection_name="Level" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Fetch all documents from the collection levels_cursor = collection.find() # This returns a cursor to the documents # Convert the cursor to a list of UserLevel objects levels = [UserLevel(**level) for level in levels_cursor] return levels def delete_level_func(level_id,)->bool: db_uri = MONGO_URI db_name = "crayonics" collection_name="Level" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] if isinstance(level_id, str): level_id = ObjectId(level_id) result = collection.delete_one(filter={"_id":level_id}) if result.acknowledged==True: if result.deleted_count ==0: return False else: return True else: return False def edit_level_func(level_id, **kwargs): """ Edit a UserLevel document in the MongoDB collection. :param level_id: The ObjectId or unique identifier of the UserLevel to edit. :param maxPoints: Maximum Amount of points for this level. :param minPoints: Minimum Amount of points for this level. :param levelNumber: Number for this level. :param careerPath: Career Path Special for this level. """ db_uri = MONGO_URI db_name = "crayonics" collection_name="Level" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Ensure that `level_id` is an ObjectId if using ObjectId as the identifier if isinstance(level_id, str): level_id = ObjectId(level_id) # Prepare the update data update_data = {key: value for key, value in kwargs.items() if value is not None} # Perform the update operation in the collection result = collection.update_one( {"_id": level_id}, # Find the document by its unique identifier (ObjectId or other) {"$set": update_data} # Update the document with the new values ) return result.modified_count # points def create_points_func(document:UserPoints)->bool: db_uri = MONGO_URI db_name = "crayonics" collection_name="Points" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Insert the document if document!=None: result = collection.insert_one(document.model_dump()) return True else: client.close() return False def get_all_points_func() -> List[UserFeedback]: # MongoDB URI and configuration db_uri = MONGO_URI db_name = "crayonics" collection_name="Points" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Fetch all documents from the collection point_cursor = collection.find() # This returns a cursor to the documents # Convert the cursor to a list of UserLevel objects points = [UserFeedback(**point) for point in point_cursor] return points # feedback def create_feedback_func(document:UserFeedback)->bool: db_uri = MONGO_URI db_name = "crayonics" collection_name="Feedback" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Insert the document if document!=None: result = collection.insert_one(document.model_dump()) return True else: client.close() return False def get_all_feedback_func() -> List[UserFeedback]: # MongoDB URI and configuration db_uri = MONGO_URI db_name = "crayonics" collection_name="Feedback" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Fetch all documents from the collection feedback_cursor = collection.find() # This returns a cursor to the documents # Convert the cursor to a list of UserLevel objects feedbacks = [UserFeedback(**feedback) for feedback in feedback_cursor] return feedbacks def get_all_customer_info()->List[CustomerInfo]: db_uri=MONGO_URI # db_uri = "mongodb+srv://groupcresearchseminar:BfIAlukm5lEfYO6u@cluster0.isba0.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0" db_name = "crayonics" collection_name = "users" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Fetch all documents from the collection customer_cursor = collection.find() # This returns a cursor to the documents # Convert the cursor to a list of Customer objects, setting a default date_Joined if it's missing customers = [] customer_info = [] for customer in customer_cursor: # If date_Joined is missing, add the default value (current datetime) if 'date_Joined' not in customer: print("adding a new date") customer['date_Joined'] = datetime.now() collection.update_one(filter={"_id":ObjectId(customer['_id'])},update={"$set":customer}) customers.append(Customer(**customer)) # Create Customer model with the filled data dt_now = datetime.now() thirty_days_ago = dt_now - timedelta(days=30) sixty_days_ago = dt_now - timedelta(days=60) ninty_days_ago = dt_now - timedelta(days=90) all_customer_from_30_days_ago = [customer for customer in customers if customer.date_Joined <=thirty_days_ago] all_customer_from_60_days_ago = [customer for customer in customers if customer.date_Joined <=sixty_days_ago] all_customer_from_90_days_ago = [customer for customer in customers if customer.date_Joined <=ninty_days_ago] rate_30_days = caculate_rate_change_func(c0=len(all_customer_from_30_days_ago),c1=len(customers),days_ago=30) rate_60_days = caculate_rate_change_func(c0=len(all_customer_from_60_days_ago),c1=len(customers),days_ago=60) rate_90_days = caculate_rate_change_func(c0=len(all_customer_from_90_days_ago),c1=len(customers),days_ago=90) list_of_rates= [rate_30_days,rate_60_days,rate_90_days] for rate in list_of_rates: customer_info.append(CustomerInfo(**rate)) return customer_info