import os from fastapi import FastAPI, HTTPException from pydantic import BaseModel from permit import Permit import uvicorn import asyncio from typing import Dict, List, Optional from fastapi.middleware.cors import CORSMiddleware from dotenv import load_dotenv load_dotenv() app = FastAPI() # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize Permit client permit = Permit( pdp="https://cloudpdp.api.permit.io", token=os.getenv("PERMIT_TOKEN") ) class PermissionRequest(BaseModel): email: str key: str tenant: str = "default" class ResourceInfo(BaseModel): key: str title: str RESOURCES_CACHE: Optional[Dict[str, ResourceInfo]] = None async def get_resource_info() -> Dict[str, ResourceInfo]: global RESOURCES_CACHE if RESOURCES_CACHE is None: resources = await permit.api.resources.list() RESOURCES_CACHE = { resource.key: ResourceInfo( key=resource.key, title=resource.name or resource.key ) for resource in resources } return RESOURCES_CACHE def modify_title(title: str) -> str: parts = title.split(' ') return ' '.join(parts[1:]) if len(parts) > 1 else title @app.post("/check-permission") async def check_permissions(request: PermissionRequest): try: # Get user roles and resource info user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) resource_info = await get_resource_info() # Collect roles and permissions roles_json = [] for role_assignment in user_roles: role_key = role_assignment.role role = await permit.api.roles.get(role_key) permissions = [] if isinstance(role.permissions, list): for permission in role.permissions: try: resource, action = permission.split(":") resource = resource.strip() action = action.strip() # Get resource display name resource_title = ( resource_info[resource].title if resource in resource_info else resource ) # Modify the title to remove the prefix modified_title = modify_title(resource_title) permissions.append({ "title": modified_title, # Use modified title here "link": resource, "action": action }) except ValueError as e: print(f"Invalid permission format: {permission}. Error: {e}") continue roles_json.append({ "role": role.name, "children": permissions }) # Merge roles with same base name merged_roles = {} for role_obj in roles_json: base_role = role_obj["role"].rsplit("-", 1)[0].strip() if base_role not in merged_roles: merged_roles[base_role] = [] if "children" in role_obj: merged_roles[base_role].extend(role_obj["children"]) # Format final response final_roles = [] for role_name, children in merged_roles.items(): unique_children = [] seen = set() for child in children: child_tuple = (child["link"], child["action"]) if child_tuple not in seen: seen.add(child_tuple) unique_children.append(child) final_roles.append({ "title": role_name, "value": role_name, "icon": "PencilRuler", "children": unique_children }) return final_roles except Exception as e: print(f"Error in check_permissions: {str(e)}") raise HTTPException( status_code=500, detail=f"Error checking permissions: {str(e)}" )