Spaces:
Running
Running
| import os | |
| from pymongo import MongoClient | |
| from pymongo.database import Database | |
| from utils import get_logger, Vote | |
| from datetime import datetime, timedelta, timezone | |
| from typing import List | |
| logger = get_logger() | |
| def create_db_connection() -> Database: | |
| print(os.getenv("MONGO_URI")) | |
| print(os.getenv("MONGO_DB")) | |
| db = MongoClient(os.getenv("MONGO_URI")).get_database(os.getenv("MONGO_DB")) | |
| return db | |
| def add_vote(vote: Vote, db: Database) -> None: | |
| try: | |
| db.get_collection("votes").insert_one(vote.__dict__) | |
| logger.info("Vote added to database") | |
| except Exception as e: | |
| logger.error("Error adding vote to database") | |
| logger.error(e) | |
| def get_votes(db: Database) -> List[Vote]: | |
| now = datetime.now(timezone.utc) | |
| current_hour = now.replace(minute=0, second=0, microsecond=0) | |
| votes = list( | |
| db.get_collection("votes").find({"timestamp": {"$lte": current_hour.isoformat()}}) | |
| ) | |
| return votes | |