Spaces:
Sleeping
Sleeping
| from gamification.objects import PlatformEngagement, UserFeedback, UserLevel, UserPoints,CustomerInfo,Customer,IndividualUserLevel,Points,SimpleIndividualUserLevel | |
| from gamification.levelLogic import create_level_func,get_all_levels_func,edit_level_func,delete_level_func | |
| from gamification.imports import * | |
| from gamification.pointLogic import create_points_func,get_all_simple_points_func,get_all_points_func | |
| # Normal Math | |
| def caculate_rate_change_func(c0,c1,days_ago): | |
| if c0 == 0: | |
| if c1 > 0: | |
| # If there are customers now, but no customers initially (c0 is 0), | |
| # we consider it as infinite growth or 100% growth | |
| print("here") | |
| return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 9999999999999999999999999999999999999999999999999999999, "GrowthRateType": "positive"} | |
| elif c1 == 0: | |
| # If both c0 and c1 are zero, there is no change | |
| return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 0.0, "GrowthRateType": "neutral"} | |
| else: | |
| # This case is for when c1 < 0, but it's unlikely in a customer count scenario. | |
| return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": -999999999999999999999999999999999999999999999999999999, "GrowthRateType": "negative"} | |
| elif c1 > c0: | |
| # Positive growth rate: c1 > c0 | |
| e = c1 - c0 | |
| b = e / c0 | |
| d = b * 100 | |
| return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": d, "GrowthRateType": "positive"} | |
| elif c1 < c0: | |
| # Negative growth rate: c1 < c0 | |
| e = c0 - c1 | |
| b = e / c0 | |
| d = b * 100 | |
| return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": d, "GrowthRateType": "negative"} | |
| elif c1 == c0: | |
| # No change: c1 == c0 | |
| return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 0.0, "GrowthRateType": "neutral"} | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| # Levels | |
| # points | |
| # feedback | |
| def create_feedback_func(document:UserFeedback)->bool: | |
| db_uri = MONGO_URI | |
| db_name = "crayonics" | |
| collection_name="Feedback" | |
| client = MongoClient(db_uri) | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| # Insert the document | |
| if document!=None: | |
| feedbackPoints= Points(userId=document.userId,platformEngagement=PlatformEngagement(providing_feedback=15)) | |
| create_points_func(document=feedbackPoints) | |
| result = collection.insert_one(document.model_dump()) | |
| return True | |
| else: | |
| client.close() | |
| return False | |
| def get_all_feedback_func() -> List[UserFeedback]: | |
| # MongoDB URI and configuration | |
| db_uri = MONGO_URI | |
| db_name = "crayonics" | |
| collection_name="Feedback" | |
| client = MongoClient(db_uri) | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| # Fetch all documents from the collection | |
| feedback_cursor = collection.find() # This returns a cursor to the documents | |
| # Convert the cursor to a list of UserLevel objects | |
| feedbacks = [UserFeedback(**feedback) for feedback in feedback_cursor] | |
| return feedbacks | |
| def get_all_customer_info()->List[CustomerInfo]: | |
| db_uri=MONGO_URI | |
| db_name = "crayonics" | |
| collection_name = "users" | |
| client = MongoClient(db_uri) | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| # Fetch all documents from the collection | |
| customer_cursor = collection.find() # This returns a cursor to the documents | |
| # Convert the cursor to a list of Customer objects, setting a default date_Joined if it's missing | |
| customers = [] | |
| customer_info = [] | |
| for customer in customer_cursor: | |
| # If date_Joined is missing, add the default value (current datetime) | |
| if 'date_Joined' not in customer: | |
| print("adding a new date") | |
| customer['date_Joined'] = datetime.now() | |
| collection.update_one(filter={"_id":ObjectId(customer['_id'])},update={"$set":customer}) | |
| customers.append(Customer(**customer)) # Create Customer model with the filled data | |
| dt_now = datetime.now() | |
| thirty_days_ago = dt_now - timedelta(days=30) | |
| sixty_days_ago = dt_now - timedelta(days=60) | |
| ninty_days_ago = dt_now - timedelta(days=90) | |
| all_customer_from_30_days_ago = [customer for customer in customers if customer.date_Joined <=thirty_days_ago] | |
| all_customer_from_60_days_ago = [customer for customer in customers if customer.date_Joined <=sixty_days_ago] | |
| all_customer_from_90_days_ago = [customer for customer in customers if customer.date_Joined <=ninty_days_ago] | |
| rate_30_days = caculate_rate_change_func(c0=len(all_customer_from_30_days_ago),c1=len(customers),days_ago=30) | |
| rate_60_days = caculate_rate_change_func(c0=len(all_customer_from_60_days_ago),c1=len(customers),days_ago=60) | |
| rate_90_days = caculate_rate_change_func(c0=len(all_customer_from_90_days_ago),c1=len(customers),days_ago=90) | |
| list_of_rates= [rate_30_days,rate_60_days,rate_90_days] | |
| for rate in list_of_rates: | |
| customer_info.append(CustomerInfo(**rate)) | |
| return customer_info | |
| # Leaderboard Logic | |
| def get_top_30(): | |
| db_uri=MONGO_URI | |
| db_name = "crayonics" | |
| collection_name = "LeaderBoard" | |
| client = MongoClient(db_uri) | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| sorted_documents = collection.find().sort([("totalpoints", -1), ("lastName", 1)]).limit(30) | |
| rankers = [ | |
| {**r, 'rank': i + 1} # Add 'rank' to the document with i+1 (1-based index) | |
| for i, r in enumerate(sorted_documents) | |
| ] | |
| return rankers | |