import os from fastapi import FastAPI, HTTPException from pydantic import BaseModel from permit import Permit import uvicorn import asyncio from typing import Dict, List, Optional from fastapi.middleware.cors import CORSMiddleware from dotenv import load_dotenv load_dotenv() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) permit = Permit( pdp="https://cloudpdp.api.permit.io", token=os.getenv("PERMIT_TOKEN") ) class PermissionRequest(BaseModel): email: str key: str tenant: str # Added tenant RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None async def get_resources_and_actions() -> Dict[str, List[str]]: global RESOURCES_CACHE if RESOURCES_CACHE is None: resources = await permit.api.resources.list() resource_actions = {} for resource in resources: actions = [action.key for action in resource.actions.values()] resource_actions[resource.key] = actions RESOURCES_CACHE = resource_actions return RESOURCES_CACHE async def check_single_permission(user_email: str, action: str, resource_key: str, tenant: str): try: permitted = await permit.check( user=user_email, action=action, resource={"type": resource_key, "tenant": tenant} # Include tenant ) return { "title": resource_key, "link": resource_key, "action": action, "permitted": permitted } except Exception as e: print(f"Error checking permission for resource {resource_key} and action {action}: {e}") return { "title": resource_key, "link": resource_key, "action": action, "permitted": False } @app.post("/check-permission") async def check_permissions(request: PermissionRequest): try: user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) # added tenant # Collect all roles and their children roles_json = [] for role_assignment in user_roles: role_key = role_assignment.role role = await permit.api.roles.get(role_key) permissions = [] # Check if role.permissions is a list before iterating if isinstance(role.permissions, list): for permission in role.permissions: try: resource, action = permission.split(":") permissions.append({ "title": resource.strip(), "link": resource.strip(), "action": action.strip() }) except ValueError as e: print(f"Skipping invalid permission format: {permission}. Error: {e}") continue roles_json.append({ "role": role.name, "children": permissions }) # Process the roles JSON to remove suffixes and merge roles with the same base name merged_roles = {} for role_obj in roles_json: # Remove suffixes from role name base_role = role_obj["role"].rsplit("-", 1)[0].strip() # Split on the last '-' and take the first part if base_role not in merged_roles: merged_roles[base_role] = [] # Add children to the merged role if "children" in role_obj: merged_roles[base_role].extend(role_obj["children"]) # Convert merged roles back to the desired format final_roles = [] for role_name, children in merged_roles.items(): # Remove duplicate children unique_children = [] seen = set() for child in children: child_tuple = tuple(child.items()) if child_tuple not in seen: seen.add(child_tuple) unique_children.append(child) final_roles.append({ "title": role_name, "value": role_name, "icon": "PencilRuler", "children": unique_children }) return final_roles except Exception as e: raise HTTPException( status_code=500, detail=f"Error checking permissions: {str(e)}")