Spaces:
Sleeping
Sleeping
File size: 4,574 Bytes
620b8e8 4bbdecf 620b8e8 4bbdecf 620b8e8 4bbdecf 620b8e8 4bbdecf 620b8e8 4bbdecf 620b8e8 4bbdecf 620b8e8 4bbdecf 620b8e8 4bbdecf 620b8e8 4bbdecf 620b8e8 4bbdecf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
permit = Permit(
pdp="https://cloudpdp.api.permit.io",
token=os.getenv("PERMIT_TOKEN")
)
class PermissionRequest(BaseModel):
email: str
key: str
tenant: str # Added tenant
RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None
async def get_resources_and_actions() -> Dict[str, List[str]]:
global RESOURCES_CACHE
if RESOURCES_CACHE is None:
resources = await permit.api.resources.list()
resource_actions = {}
for resource in resources:
actions = [action.key for action in resource.actions.values()]
resource_actions[resource.key] = actions
RESOURCES_CACHE = resource_actions
return RESOURCES_CACHE
async def check_single_permission(user_email: str, action: str, resource_key: str, tenant: str):
try:
permitted = await permit.check(
user=user_email,
action=action,
resource={"type": resource_key, "tenant": tenant} # Include tenant
)
return {
"title": resource_key,
"link": resource_key,
"action": action,
"permitted": permitted
}
except Exception as e:
print(f"Error checking permission for resource {resource_key} and action {action}: {e}")
return {
"title": resource_key,
"link": resource_key,
"action": action,
"permitted": False
}
@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
try:
user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) # added tenant
# Collect all roles and their children
roles_json = []
for role_assignment in user_roles:
role_key = role_assignment.role
role = await permit.api.roles.get(role_key)
permissions = []
# Check if role.permissions is a list before iterating
if isinstance(role.permissions, list):
for permission in role.permissions:
try:
resource, action = permission.split(":")
permissions.append({
"title": resource.strip(),
"link": resource.strip(),
"action": action.strip()
})
except ValueError as e:
print(f"Skipping invalid permission format: {permission}. Error: {e}")
continue
roles_json.append({
"role": role.name,
"children": permissions
})
# Process the roles JSON to remove suffixes and merge roles with the same base name
merged_roles = {}
for role_obj in roles_json:
# Remove suffixes from role name
base_role = role_obj["role"].rsplit("-", 1)[0].strip() # Split on the last '-' and take the first part
if base_role not in merged_roles:
merged_roles[base_role] = []
# Add children to the merged role
if "children" in role_obj:
merged_roles[base_role].extend(role_obj["children"])
# Convert merged roles back to the desired format
final_roles = []
for role_name, children in merged_roles.items():
# Remove duplicate children
unique_children = []
seen = set()
for child in children:
child_tuple = tuple(child.items())
if child_tuple not in seen:
seen.add(child_tuple)
unique_children.append(child)
final_roles.append({
"title": role_name,
"value": role_name,
"icon": "PencilRuler",
"children": unique_children
})
return final_roles
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error checking permissions: {str(e)}")
|