Spaces:
Sleeping
Sleeping
import os | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from permit import Permit | |
import uvicorn | |
import asyncio | |
from typing import Dict, List, Optional | |
from fastapi.middleware.cors import CORSMiddleware | |
from dotenv import load_dotenv | |
load_dotenv() | |
app = FastAPI() | |
# CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Initialize Permit client | |
permit = Permit( | |
pdp="https://cloudpdp.api.permit.io", | |
token=os.getenv("PERMIT_TOKEN") | |
) | |
class PermissionRequest(BaseModel): | |
email: str | |
key: str | |
tenant: str = "default" | |
class ResourceInfo(BaseModel): | |
key: str | |
title: str | |
RESOURCES_CACHE: Optional[Dict[str, ResourceInfo]] = None | |
async def get_resource_info() -> Dict[str, ResourceInfo]: | |
global RESOURCES_CACHE | |
if RESOURCES_CACHE is None: | |
resources = await permit.api.resources.list() | |
RESOURCES_CACHE = { | |
resource.key: ResourceInfo( | |
key=resource.key, | |
title=resource.name or resource.key | |
) for resource in resources | |
} | |
return RESOURCES_CACHE | |
def modify_title(title: str) -> str: | |
parts = title.split(' ') | |
return ' '.join(parts[1:]) if len(parts) > 1 else title | |
def get_sequence_number(role_title: str) -> int: | |
"""Extract sequence number from role title for sorting""" | |
try: | |
return int(role_title.split()[0]) | |
except (IndexError, ValueError): | |
return float('inf') # Put roles without numbers at the end | |
def clean_role_title(role_title: str) -> str: | |
"""Remove sequence number from role title""" | |
try: | |
parts = role_title.split() | |
# Check if first part is a number | |
int(parts[0]) | |
# If it is, return everything after the first part | |
return ' '.join(parts[1:]) | |
except (IndexError, ValueError): | |
# If there's no number or invalid format, return original title | |
return role_title | |
async def check_permissions(request: PermissionRequest): | |
try: | |
# Get user roles and resource info | |
user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) | |
resource_info = await get_resource_info() | |
# Collect roles and permissions | |
roles_json = [] | |
for role_assignment in user_roles: | |
role_key = role_assignment.role | |
role = await permit.api.roles.get(role_key) | |
permissions = [] | |
if isinstance(role.permissions, list): | |
for permission in role.permissions: | |
try: | |
resource, action = permission.split(":") | |
resource = resource.strip() | |
action = action.strip() | |
# Get resource display name | |
resource_title = ( | |
resource_info[resource].title | |
if resource in resource_info | |
else resource | |
) | |
# Modify the title to remove the prefix | |
modified_title = modify_title(resource_title) | |
permissions.append({ | |
"title": modified_title, | |
"link": resource, | |
"action": action | |
}) | |
except ValueError as e: | |
print(f"Invalid permission format: {permission}. Error: {e}") | |
continue | |
roles_json.append({ | |
"role": role.name, | |
"children": permissions | |
}) | |
# Merge roles with same base name | |
merged_roles = {} | |
for role_obj in roles_json: | |
base_role = role_obj["role"].rsplit("-", 1)[0].strip() | |
if base_role not in merged_roles: | |
merged_roles[base_role] = [] | |
if "children" in role_obj: | |
merged_roles[base_role].extend(role_obj["children"]) | |
# Format final response | |
final_roles = [] | |
for role_name, children in merged_roles.items(): | |
unique_children = [] | |
seen = set() | |
for child in children: | |
child_tuple = (child["link"], child["action"]) | |
if child_tuple not in seen: | |
seen.add(child_tuple) | |
unique_children.append(child) | |
# Store original title for sorting | |
final_roles.append({ | |
"title": role_name, | |
"value": role_name, | |
"icon": "PencilRuler", | |
"children": unique_children, | |
"original_title": role_name # Store original title for sorting | |
}) | |
# Sort final_roles based on sequence number | |
final_roles.sort(key=lambda x: get_sequence_number(x["original_title"])) | |
# Clean up titles and remove original_title field | |
for role in final_roles: | |
role["title"] = clean_role_title(role["title"]) | |
role["value"] = clean_role_title(role["value"]) | |
del role["original_title"] | |
return final_roles | |
except Exception as e: | |
print(f"Error in check_permissions: {str(e)}") | |
raise HTTPException( | |
status_code=500, | |
detail=f"Error checking permissions: {str(e)}" | |
) | |
# if __name__ == "__main__": | |
# uvicorn.run(app, host="0.0.0.0", port=8000) |