Spaces:
Sleeping
Sleeping
File size: 4,362 Bytes
620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 620b8e8 7646003 620b8e8 7646003 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize Permit client
permit = Permit(
pdp="https://cloudpdp.api.permit.io",
token=os.getenv("PERMIT_TOKEN")
)
class PermissionRequest(BaseModel):
email: str
key: str
tenant: str = "default"
class ResourceInfo(BaseModel):
key: str
title: str
RESOURCES_CACHE: Optional[Dict[str, ResourceInfo]] = None
async def get_resource_info() -> Dict[str, ResourceInfo]:
global RESOURCES_CACHE
if RESOURCES_CACHE is None:
resources = await permit.api.resources.list()
RESOURCES_CACHE = {
resource.key: ResourceInfo(
key=resource.key,
title=resource.name or resource.key
) for resource in resources
}
return RESOURCES_CACHE
def modify_title(title: str) -> str:
parts = title.split(' ')
return ' '.join(parts[1:]) if len(parts) > 1 else title
@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
try:
# Get user roles and resource info
user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant)
resource_info = await get_resource_info()
# Collect roles and permissions
roles_json = []
for role_assignment in user_roles:
role_key = role_assignment.role
role = await permit.api.roles.get(role_key)
permissions = []
if isinstance(role.permissions, list):
for permission in role.permissions:
try:
resource, action = permission.split(":")
resource = resource.strip()
action = action.strip()
# Get resource display name
resource_title = (
resource_info[resource].title
if resource in resource_info
else resource
)
# Modify the title to remove the prefix
modified_title = modify_title(resource_title)
permissions.append({
"title": modified_title, # Use modified title here
"link": resource,
"action": action
})
except ValueError as e:
print(f"Invalid permission format: {permission}. Error: {e}")
continue
roles_json.append({
"role": role.name,
"children": permissions
})
# Merge roles with same base name
merged_roles = {}
for role_obj in roles_json:
base_role = role_obj["role"].rsplit("-", 1)[0].strip()
if base_role not in merged_roles:
merged_roles[base_role] = []
if "children" in role_obj:
merged_roles[base_role].extend(role_obj["children"])
# Format final response
final_roles = []
for role_name, children in merged_roles.items():
unique_children = []
seen = set()
for child in children:
child_tuple = (child["link"], child["action"])
if child_tuple not in seen:
seen.add(child_tuple)
unique_children.append(child)
final_roles.append({
"title": role_name,
"value": role_name,
"icon": "PencilRuler",
"children": unique_children
})
return final_roles
except Exception as e:
print(f"Error in check_permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error checking permissions: {str(e)}"
)
|