from gamification.objects import PlatformEngagement, UserFeedback, UserLevel, UserPoints,CustomerInfo,Customer,IndividualUserLevel,Points,SimpleIndividualUserLevel from gamification.levelLogic import create_level_func,get_all_levels_func,edit_level_func,delete_level_func from gamification.imports import * from gamification.pointLogic import create_points_func,get_all_simple_points_func,get_all_points_func from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=5) # Normal Math def caculate_rate_change_func(c0,c1,days_ago): if c0 == 0: if c1 > 0: # If there are customers now, but no customers initially (c0 is 0), # we consider it as infinite growth or 100% growth print("here") return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 9999999999999999999999999999999999999999999999999999999, "GrowthRateType": "positive"} elif c1 == 0: # If both c0 and c1 are zero, there is no change return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 0.0, "GrowthRateType": "neutral"} else: # This case is for when c1 < 0, but it's unlikely in a customer count scenario. return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": -999999999999999999999999999999999999999999999999999999, "GrowthRateType": "negative"} elif c1 > c0: # Positive growth rate: c1 > c0 e = c1 - c0 b = e / c0 d = b * 100 return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": d, "GrowthRateType": "positive"} elif c1 < c0: # Negative growth rate: c1 < c0 e = c0 - c1 b = e / c0 d = b * 100 return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": d, "GrowthRateType": "negative"} elif c1 == c0: # No change: c1 == c0 return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 0.0, "GrowthRateType": "neutral"} MONGO_URI = os.getenv("MONGO_URI") # Levels # points # feedback def create_feedback_func(document:UserFeedback)->bool: db_uri = MONGO_URI db_name = "crayonics" collection_name="Feedback" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Insert the document if document!=None: feedbackPoints= Points(userId=document.userId,platformEngagement=PlatformEngagement(providing_feedback=15)) executor.submit(create_points_func, document=feedbackPoints) result = collection.insert_one(document.model_dump()) return True else: client.close() return False def get_all_feedback_func() -> List[UserFeedback]: # MongoDB URI and configuration db_uri = MONGO_URI db_name = "crayonics" collection_name="Feedback" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Fetch all documents from the collection feedback_cursor = collection.find() # This returns a cursor to the documents # Convert the cursor to a list of UserLevel objects feedbacks = [UserFeedback(**feedback) for feedback in feedback_cursor] return feedbacks def get_all_customer_info()->List[CustomerInfo]: db_uri=MONGO_URI db_name = "crayonics" collection_name = "users" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Fetch all documents from the collection customer_cursor = collection.find() # This returns a cursor to the documents # Convert the cursor to a list of Customer objects, setting a default date_Joined if it's missing customers = [] customer_info = [] for customer in customer_cursor: # If date_Joined is missing, add the default value (current datetime) if 'date_Joined' not in customer: print("adding a new date") customer['date_Joined'] = datetime.now() collection.update_one(filter={"_id":ObjectId(customer['_id'])},update={"$set":customer}) customers.append(Customer(**customer)) # Create Customer model with the filled data dt_now = datetime.now() thirty_days_ago = dt_now - timedelta(days=30) sixty_days_ago = dt_now - timedelta(days=60) ninty_days_ago = dt_now - timedelta(days=90) all_customer_from_30_days_ago = [customer for customer in customers if customer.date_Joined <=thirty_days_ago] all_customer_from_60_days_ago = [customer for customer in customers if customer.date_Joined <=sixty_days_ago] all_customer_from_90_days_ago = [customer for customer in customers if customer.date_Joined <=ninty_days_ago] rate_30_days = caculate_rate_change_func(c0=len(all_customer_from_30_days_ago),c1=len(customers),days_ago=30) rate_60_days = caculate_rate_change_func(c0=len(all_customer_from_60_days_ago),c1=len(customers),days_ago=60) rate_90_days = caculate_rate_change_func(c0=len(all_customer_from_90_days_ago),c1=len(customers),days_ago=90) list_of_rates= [rate_30_days,rate_60_days,rate_90_days] for rate in list_of_rates: customer_info.append(CustomerInfo(**rate)) return customer_info # Leaderboard Logic def get_top_30(): db_uri=MONGO_URI db_name = "crayonics" collection_name = "LeaderBoard" client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] sorted_documents = collection.find().sort([("totalpoints", -1), ("lastName", 1)]).limit(30) rankers = [ {**r, 'rank': i + 1} # Add 'rank' to the document with i+1 (1-based index) for i, r in enumerate(sorted_documents) ] return rankers