accesscontrol / main.py
rajshukla1102's picture
Update main.py
620b8e8 verified
raw
history blame
2.86 kB
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
permit = Permit(
pdp="https://cloudpdp.api.permit.io",
token=os.getenv("PERMIT_TOKEN")
)
class PermissionRequest(BaseModel):
email: str
key: str
RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None
async def get_resources_and_actions() -> Dict[str, List[str]]:
global RESOURCES_CACHE
if RESOURCES_CACHE is None:
resources = await permit.api.resources.list()
resource_actions = {}
for resource in resources:
actions = [action.key for action in resource.actions.values()]
resource_actions[resource.key] = actions
RESOURCES_CACHE = resource_actions
return RESOURCES_CACHE
async def check_single_permission(user_email: str, action: str, resource_key: str):
try:
permitted = await permit.check(
user=user_email,
action=action,
resource={"type": resource_key}
)
return {
"resource": resource_key,
"action": action,
"permitted": permitted
}
except Exception:
return {
"resource": resource_key,
"action": action,
"permitted": False
}
@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
try:
resource_actions = await get_resources_and_actions()
tasks = []
for resource_key, actions in resource_actions.items():
for action in actions:
tasks.append(asyncio.create_task(
check_single_permission(request.email, action, resource_key)
))
results = await asyncio.gather(*tasks)
permissions = []
resource_map = {}
for result in results:
if result["permitted"]:
r_key = result["resource"]
if r_key not in resource_map:
resource_map[r_key] = {
"resource": r_key,
"actions": []
}
permissions.append(resource_map[r_key])
resource_map[r_key]["actions"].append(result["action"])
return permissions
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error checking permissions: {str(e)}"
)
# if __name__ == "__main__":
# uvicorn.run(app, host="0.0.0.0", port=8000)