Spaces:
Sleeping
Sleeping
from gamification.objects import UserFeedback, UserLevel, UserPoints,CustomerInfo,Customer | |
from bson import ObjectId | |
from pymongo import MongoClient | |
import os | |
from typing import List | |
from dotenv import load_dotenv | |
from datetime import datetime,timedelta | |
load_dotenv() | |
# Normal Math | |
def caculate_rate_change_func(c0,c1,days_ago): | |
if c0 == 0: | |
if c1 > 0: | |
# If there are customers now, but no customers initially (c0 is 0), | |
# we consider it as infinite growth or 100% growth | |
print("here") | |
return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 99999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999, "GrowthRateType": "positive"} | |
elif c1 == 0: | |
# If both c0 and c1 are zero, there is no change | |
return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 0.0, "GrowthRateType": "neutral"} | |
else: | |
# This case is for when c1 < 0, but it's unlikely in a customer count scenario. | |
return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": -99999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999, "GrowthRateType": "negative"} | |
elif c1 > c0: | |
# Positive growth rate: c1 > c0 | |
e = c1 - c0 | |
b = e / c0 | |
d = b * 100 | |
return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": d, "GrowthRateType": "positive"} | |
elif c1 < c0: | |
# Negative growth rate: c1 < c0 | |
e = c0 - c1 | |
b = e / c0 | |
d = b * 100 | |
return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": d, "GrowthRateType": "negative"} | |
elif c1 == c0: | |
# No change: c1 == c0 | |
return {"daysAgo":days_ago,"totalCustomers": c1, "GrowthRate": 0.0, "GrowthRateType": "neutral"} | |
MONGO_URI = os.getenv("MONGO_URI") | |
# levels | |
def create_level_func(document:UserLevel,)->bool: | |
""" | |
Creates a UserLevel document in the MongoDB collection. | |
:param UserLevel: Actually accepts an Instance of UserLevel and deserializes it here | |
:param maxPoints: Maximum Amount of points for this level. | |
:param minPoints: Minimum Amount of points for this level. | |
:param levelNumber: Number for this level. | |
:param careerPath: Career Path Special for this level. | |
""" | |
db_uri = MONGO_URI | |
db_name = "crayonics" | |
collection_name="Level" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
should_proceed= (collection.find_one(document.model_dump())) | |
# Insert the document | |
if should_proceed==None: | |
if document!=None: | |
result = collection.insert_one(document.model_dump()) | |
return True | |
else: | |
client.close() | |
return False | |
else: | |
# The exact document already exists so it can't be created again | |
return False | |
def get_all_levels_func() -> List[UserLevel]: | |
# MongoDB URI and configuration | |
db_uri = MONGO_URI | |
db_name = "crayonics" | |
collection_name="Level" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Fetch all documents from the collection | |
levels_cursor = collection.find() # This returns a cursor to the documents | |
# Convert the cursor to a list of UserLevel objects | |
levels = [UserLevel(**level) for level in levels_cursor] | |
return levels | |
def delete_level_func(level_id,)->bool: | |
db_uri = MONGO_URI | |
db_name = "crayonics" | |
collection_name="Level" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
if isinstance(level_id, str): | |
level_id = ObjectId(level_id) | |
result = collection.delete_one(filter={"_id":level_id}) | |
if result.acknowledged==True: | |
if result.deleted_count ==0: | |
return False | |
else: | |
return True | |
else: | |
return False | |
def edit_level_func(level_id, **kwargs): | |
""" | |
Edit a UserLevel document in the MongoDB collection. | |
:param level_id: The ObjectId or unique identifier of the UserLevel to edit. | |
:param maxPoints: Maximum Amount of points for this level. | |
:param minPoints: Minimum Amount of points for this level. | |
:param levelNumber: Number for this level. | |
:param careerPath: Career Path Special for this level. | |
""" | |
db_uri = MONGO_URI | |
db_name = "crayonics" | |
collection_name="Level" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Ensure that `level_id` is an ObjectId if using ObjectId as the identifier | |
if isinstance(level_id, str): | |
level_id = ObjectId(level_id) | |
# Prepare the update data | |
update_data = {key: value for key, value in kwargs.items() if value is not None} | |
# Perform the update operation in the collection | |
result = collection.update_one( | |
{"_id": level_id}, # Find the document by its unique identifier (ObjectId or other) | |
{"$set": update_data} # Update the document with the new values | |
) | |
return result.modified_count | |
# points | |
def create_points_func(document:UserPoints)->bool: | |
db_uri = MONGO_URI | |
db_name = "crayonics" | |
collection_name="Points" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Insert the document | |
if document!=None: | |
result = collection.insert_one(document.model_dump()) | |
return True | |
else: | |
client.close() | |
return False | |
def get_all_points_func() -> List[UserFeedback]: | |
# MongoDB URI and configuration | |
db_uri = MONGO_URI | |
db_name = "crayonics" | |
collection_name="Points" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Fetch all documents from the collection | |
point_cursor = collection.find() # This returns a cursor to the documents | |
# Convert the cursor to a list of UserLevel objects | |
points = [UserFeedback(**point) for point in point_cursor] | |
return points | |
# feedback | |
def create_feedback_func(document:UserFeedback)->bool: | |
db_uri = MONGO_URI | |
db_name = "crayonics" | |
collection_name="Feedback" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Insert the document | |
if document!=None: | |
result = collection.insert_one(document.model_dump()) | |
return True | |
else: | |
client.close() | |
return False | |
def get_all_feedback_func() -> List[UserFeedback]: | |
# MongoDB URI and configuration | |
db_uri = MONGO_URI | |
db_name = "crayonics" | |
collection_name="Feedback" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Fetch all documents from the collection | |
feedback_cursor = collection.find() # This returns a cursor to the documents | |
# Convert the cursor to a list of UserLevel objects | |
feedbacks = [UserFeedback(**feedback) for feedback in feedback_cursor] | |
return feedbacks | |
def get_all_customer_info()->List[CustomerInfo]: | |
db_uri=MONGO_URI | |
# db_uri = "mongodb+srv://groupcresearchseminar:[email protected]/?retryWrites=true&w=majority&appName=Cluster0" | |
db_name = "crayonics" | |
collection_name = "users" | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Fetch all documents from the collection | |
customer_cursor = collection.find() # This returns a cursor to the documents | |
# Convert the cursor to a list of Customer objects, setting a default date_Joined if it's missing | |
customers = [] | |
customer_info = [] | |
for customer in customer_cursor: | |
# If date_Joined is missing, add the default value (current datetime) | |
if 'date_Joined' not in customer: | |
print("adding a new date") | |
customer['date_Joined'] = datetime.now() | |
collection.update_one(filter={"_id":ObjectId(customer['_id'])},update={"$set":customer}) | |
customers.append(Customer(**customer)) # Create Customer model with the filled data | |
dt_now = datetime.now() | |
thirty_days_ago = dt_now - timedelta(days=30) | |
sixty_days_ago = dt_now - timedelta(days=60) | |
ninty_days_ago = dt_now - timedelta(days=90) | |
all_customer_from_30_days_ago = [customer for customer in customers if customer.date_Joined <=thirty_days_ago] | |
all_customer_from_60_days_ago = [customer for customer in customers if customer.date_Joined <=sixty_days_ago] | |
all_customer_from_90_days_ago = [customer for customer in customers if customer.date_Joined <=ninty_days_ago] | |
rate_30_days = caculate_rate_change_func(c0=len(all_customer_from_30_days_ago),c1=len(customers),days_ago=30) | |
rate_60_days = caculate_rate_change_func(c0=len(all_customer_from_60_days_ago),c1=len(customers),days_ago=60) | |
rate_90_days = caculate_rate_change_func(c0=len(all_customer_from_90_days_ago),c1=len(customers),days_ago=90) | |
list_of_rates= [rate_30_days,rate_60_days,rate_90_days] | |
for rate in list_of_rates: | |
customer_info.append(CustomerInfo(**rate)) | |
return customer_info | |