Spaces:
Sleeping
Sleeping
File size: 5,628 Bytes
620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 7646003 620b8e8 f92543a 620b8e8 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf f92543a 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 4bbdecf 7646003 f92543a 4bbdecf f92543a 4bbdecf 7646003 f92543a 4bbdecf 7646003 620b8e8 7646003 620b8e8 7646003 f92543a 2a61450 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize Permit client
permit = Permit(
pdp="https://cloudpdp.api.permit.io",
token=os.getenv("PERMIT_TOKEN")
)
class PermissionRequest(BaseModel):
email: str
key: str
tenant: str = "default"
class ResourceInfo(BaseModel):
key: str
title: str
RESOURCES_CACHE: Optional[Dict[str, ResourceInfo]] = None
async def get_resource_info() -> Dict[str, ResourceInfo]:
global RESOURCES_CACHE
if RESOURCES_CACHE is None:
resources = await permit.api.resources.list()
RESOURCES_CACHE = {
resource.key: ResourceInfo(
key=resource.key,
title=resource.name or resource.key
) for resource in resources
}
return RESOURCES_CACHE
def modify_title(title: str) -> str:
parts = title.split(' ')
return ' '.join(parts[1:]) if len(parts) > 1 else title
def get_sequence_number(role_title: str) -> int:
"""Extract sequence number from role title for sorting"""
try:
return int(role_title.split()[0])
except (IndexError, ValueError):
return float('inf') # Put roles without numbers at the end
def clean_role_title(role_title: str) -> str:
"""Remove sequence number from role title"""
try:
parts = role_title.split()
# Check if first part is a number
int(parts[0])
# If it is, return everything after the first part
return ' '.join(parts[1:])
except (IndexError, ValueError):
# If there's no number or invalid format, return original title
return role_title
@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
try:
# Get user roles and resource info
user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant)
resource_info = await get_resource_info()
# Collect roles and permissions
roles_json = []
for role_assignment in user_roles:
role_key = role_assignment.role
role = await permit.api.roles.get(role_key)
permissions = []
if isinstance(role.permissions, list):
for permission in role.permissions:
try:
resource, action = permission.split(":")
resource = resource.strip()
action = action.strip()
# Get resource display name
resource_title = (
resource_info[resource].title
if resource in resource_info
else resource
)
# Modify the title to remove the prefix
modified_title = modify_title(resource_title)
permissions.append({
"title": modified_title,
"link": resource,
"action": action
})
except ValueError as e:
print(f"Invalid permission format: {permission}. Error: {e}")
continue
roles_json.append({
"role": role.name,
"children": permissions
})
# Merge roles with same base name
merged_roles = {}
for role_obj in roles_json:
base_role = role_obj["role"].rsplit("-", 1)[0].strip()
if base_role not in merged_roles:
merged_roles[base_role] = []
if "children" in role_obj:
merged_roles[base_role].extend(role_obj["children"])
# Format final response
final_roles = []
for role_name, children in merged_roles.items():
unique_children = []
seen = set()
for child in children:
child_tuple = (child["link"], child["action"])
if child_tuple not in seen:
seen.add(child_tuple)
unique_children.append(child)
# Store original title for sorting
final_roles.append({
"title": role_name,
"value": role_name,
"icon": "PencilRuler",
"children": unique_children,
"original_title": role_name # Store original title for sorting
})
# Sort final_roles based on sequence number
final_roles.sort(key=lambda x: get_sequence_number(x["original_title"]))
# Clean up titles and remove original_title field
for role in final_roles:
role["title"] = clean_role_title(role["title"])
role["value"] = clean_role_title(role["value"])
del role["original_title"]
return final_roles
except Exception as e:
print(f"Error in check_permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error checking permissions: {str(e)}"
)
# if __name__ == "__main__":
# uvicorn.run(app, host="0.0.0.0", port=8000) |