Spaces:
Sleeping
Sleeping
| from Ars.core import db | |
| from bson import ObjectId | |
| from fastapi import HTTPException | |
| from typing import Optional, List | |
| from motor.motor_asyncio import AsyncIOMotorDatabase | |
| from Ars.objects import UserResilienceScoreCreate, UserResilienceScoreOut,BreakDownByDomainCreate,BreakDownByDomainOut,FlaggedRiskAreasCreate,FlaggedRiskAreasOut,BoostSuggestionsCreate,BoostSuggestionsOut,BoostSuggestionsUpdate,UserResilienceScoreUpdate,FlaggedRiskAreasUpdate,BreakDownByDomainUpdate | |
| async def create_user_resilience( data: UserResilienceScoreCreate) -> UserResilienceScoreOut: | |
| """ | |
| Create a new UserResilienceScore in the database. | |
| Args: | |
| db: The MongoDB database instance. | |
| data: A Pydantic object containing the fields to create. | |
| Returns: | |
| The newly created object. | |
| """ | |
| result = await db.user_resilience.insert_one(data.model_dump()) | |
| created = await db.user_resilience.find_one({"_id": result.inserted_id}) | |
| out = UserResilienceScoreOut(**created) | |
| return out | |
| async def get_user_resilience( object_id: str) -> Optional[UserResilienceScoreOut]: | |
| """ | |
| Retrieve a UserResilienceScore by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to retrieve. | |
| Returns: | |
| An bject, or raises 404 if not found. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| result = await db.user_resilience.find_one({"_id": ObjectId(object_id)}) | |
| if result is None: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| out =UserResilienceScoreOut(**result) | |
| return out | |
| async def update_user_resilience( object_id: str, data: UserResilienceScoreUpdate) -> UserResilienceScoreOut: | |
| """ | |
| Update a UserResilienceScore by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to update. | |
| data: A Pydantic object with the updated fields. | |
| Returns: | |
| The updated object dictionary. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| update_data = {k: v for k, v in data.dict().items() if v is not None} | |
| result = await db.user_resilience.update_one( | |
| {"_id": ObjectId(object_id)}, | |
| {"$set": update_data} | |
| ) | |
| if result.matched_count == 0: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| updateresult =await db.user_resilience.find_one({"_id": ObjectId(object_id)}) | |
| out = UserResilienceScoreOut(**updateresult) | |
| return out | |
| async def delete_user_resilience( object_id: str) -> dict: | |
| """ | |
| Delete a UserResilienceScore by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to delete. | |
| Returns: | |
| A confirmation message or raises 404 if object is not found. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| result = await db.user_resilience.delete_one({"_id": ObjectId(object_id)}) | |
| if result.deleted_count == 0: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| return {"message": "Object deleted successfully"} | |
| async def create_breakdown_by_domain( data: BreakDownByDomainCreate) -> BreakDownByDomainOut: | |
| """ | |
| Create a new BreakDownByDomain in the database. | |
| Args: | |
| db: The MongoDB database instance. | |
| data: A Pydantic object containing the fields to create. | |
| Returns: | |
| A dictionary representing the newly created object. | |
| """ | |
| result = await db.breakdown_by_domain.insert_one(data.dict()) | |
| created = await db.breakdown_by_domain.find_one({"_id": result.inserted_id}) | |
| out = BreakDownByDomainOut(**created) | |
| return out | |
| async def get_breakdown_by_domain( object_id: str) -> Optional[ BreakDownByDomainOut]: | |
| """ | |
| Retrieve a BreakDownByDomain by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to retrieve. | |
| Returns: | |
| A dictionary of the found object, or raises 404 if not found. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| result = await db.breakdown_by_domain.find_one({"_id": ObjectId(object_id)}) | |
| if result is None: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| out = BreakDownByDomainOut(**result) | |
| return out | |
| async def update_breakdown_by_domain( object_id: str, data: BreakDownByDomainUpdate) -> BreakDownByDomainOut: | |
| """ | |
| Update a BreakDownByDomain by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to update. | |
| data: A Pydantic object with the updated fields. | |
| Returns: | |
| The updated object dictionary. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| update_data = {k: v for k, v in data.dict().items() if v is not None} | |
| result = await db.breakdown_by_domain.update_one( | |
| {"_id": ObjectId(object_id)}, | |
| {"$set": update_data} | |
| ) | |
| if result.matched_count == 0: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| updateresult =await db. breakdown_by_domain.find_one({"_id": ObjectId(object_id)}) | |
| out = BreakDownByDomainOut(**updateresult) | |
| return out | |
| async def delete_breakdown_by_domain( object_id: str) -> dict: | |
| """ | |
| Delete a BreakDownByDomain by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to delete. | |
| Returns: | |
| A confirmation message or raises 404 if object is not found. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| result = await db.breakdown_by_domain.delete_one({"_id": ObjectId(object_id)}) | |
| if result.deleted_count == 0: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| return {"message": "Object deleted successfully"} | |
| async def create_flagged_risk_areas( data: FlaggedRiskAreasCreate) -> FlaggedRiskAreasOut: | |
| """ | |
| Create a new BreakDownByDomain in the database. | |
| Args: | |
| db: The MongoDB database instance. | |
| data: A Pydantic object containing the fields to create. | |
| Returns: | |
| A dictionary representing the newly created object. | |
| """ | |
| result = await db.flagged_risk_areas.insert_one(data.dict()) | |
| created = await db.flagged_risk_areas.find_one({"_id": result.inserted_id}) | |
| out = FlaggedRiskAreasOut(**created) | |
| return out | |
| async def get_flagged_risk_areas( object_id: str) -> Optional[FlaggedRiskAreasOut]: | |
| """ | |
| Retrieve a FlaggedRiskAreas by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to retrieve. | |
| Returns: | |
| A dictionary of the found object, or raises 404 if not found. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| result = await db.flagged_risk_areas.find_one({"_id": ObjectId(object_id)}) | |
| if result is None: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| out =FlaggedRiskAreasOut(**result) | |
| return out | |
| async def update_flagged_risk_areas( object_id: str, data: FlaggedRiskAreasUpdate) -> FlaggedRiskAreasOut: | |
| """ | |
| Update a FlaggedRiskAreas by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to update. | |
| data: A Pydantic object with the updated fields. | |
| Returns: | |
| The updated object dictionary. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| update_data = {k: v for k, v in data.dict().items() if v is not None} | |
| result = await db.flagged_risk_areas.update_one( | |
| {"_id": ObjectId(object_id)}, | |
| {"$set": update_data} | |
| ) | |
| if result.matched_count == 0: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| updateresult =await db.flagged_risk_areas.find_one({"_id": ObjectId(object_id)}) | |
| out = FlaggedRiskAreasOut(**updateresult) | |
| return out | |
| async def delete_flagged_risk_areas( object_id: str) -> dict: | |
| """ | |
| Delete a FlaggedRiskAreas by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to delete. | |
| Returns: | |
| A confirmation message or raises 404 if object is not found. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| result = await db.flagged_risk_areas.delete_one({"_id": ObjectId(object_id)}) | |
| if result.deleted_count == 0: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| return {"message": "Object deleted successfully"} | |
| async def create_boost_suggestions( data: BoostSuggestionsCreate) -> BoostSuggestionsOut: | |
| """ | |
| Create a new BoostSuggestions in the database. | |
| Args: | |
| db: The MongoDB database instance. | |
| data: A Pydantic object containing the fields to create. | |
| Returns: | |
| A dictionary representing the newly created object. | |
| """ | |
| result = await db.boost_suggestions.insert_one(data.dict()) | |
| created = await db.boost_suggestions.find_one({"_id": result.inserted_id}) | |
| out = BoostSuggestionsOut(**created) | |
| return out | |
| async def get_boost_suggestions( object_id: str) -> Optional[BoostSuggestionsOut]: | |
| """ | |
| Retrieve a BoostSuggestions by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to retrieve. | |
| Returns: | |
| A dictionary of the found object, or raises 404 if not found. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| result = await db.boost_suggestions.find_one({"_id": ObjectId(object_id)}) | |
| if result is None: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| out =BoostSuggestionsOut(**result) | |
| return out | |
| async def update_boost_suggestions( object_id: str, data: BoostSuggestionsUpdate) -> BoostSuggestionsOut: | |
| """ | |
| Update a BoostSuggestions by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to update. | |
| data: A Pydantic object with the updated fields. | |
| Returns: | |
| The updated object dictionary. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| update_data = {k: v for k, v in data.dict().items() if v is not None} | |
| result = await db.boost_suggestions.update_one( | |
| {"_id": ObjectId(object_id)}, | |
| {"$set": update_data} | |
| ) | |
| if result.matched_count == 0: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| updateresult =await db.boost_suggestions.find_one({"_id": ObjectId(object_id)}) | |
| out = BoostSuggestionsOut(**updateresult) | |
| return out | |
| async def delete_boost_suggestions( object_id: str) -> dict: | |
| """ | |
| Delete a BoostSuggestions by its ID. | |
| Args: | |
| db: The MongoDB database instance. | |
| object_id: The ID of the object to delete. | |
| Returns: | |
| A confirmation message or raises 404 if object is not found. | |
| """ | |
| if not ObjectId.is_valid(object_id): | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| result = await db.boost_suggestions.delete_one({"_id": ObjectId(object_id)}) | |
| if result.deleted_count == 0: | |
| raise HTTPException(status_code=404, detail="Object not found") | |
| return {"message": "Object deleted successfully"} | |