Spaces:
Sleeping
Sleeping
import os | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from permit import Permit | |
import uvicorn | |
import asyncio | |
from typing import Dict, List, Optional | |
from fastapi.middleware.cors import CORSMiddleware | |
from dotenv import load_dotenv | |
load_dotenv() | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
permit = Permit( | |
pdp="https://cloudpdp.api.permit.io", | |
token=os.getenv("PERMIT_TOKEN") | |
) | |
class PermissionRequest(BaseModel): | |
email: str | |
key: str | |
tenant: str # Added tenant | |
RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None | |
async def get_resources_and_actions() -> Dict[str, List[str]]: | |
global RESOURCES_CACHE | |
if RESOURCES_CACHE is None: | |
resources = await permit.api.resources.list() | |
resource_actions = {} | |
for resource in resources: | |
actions = [action.key for action in resource.actions.values()] | |
resource_actions[resource.key] = actions | |
RESOURCES_CACHE = resource_actions | |
return RESOURCES_CACHE | |
async def check_single_permission(user_email: str, action: str, resource_key: str, tenant: str): | |
try: | |
permitted = await permit.check( | |
user=user_email, | |
action=action, | |
resource={"type": resource_key, "tenant": tenant} # Include tenant | |
) | |
return { | |
"title": resource_key, | |
"link": resource_key, | |
"action": action, | |
"permitted": permitted | |
} | |
except Exception as e: | |
print(f"Error checking permission for resource {resource_key} and action {action}: {e}") | |
return { | |
"title": resource_key, | |
"link": resource_key, | |
"action": action, | |
"permitted": False | |
} | |
async def check_permissions(request: PermissionRequest): | |
try: | |
user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) # added tenant | |
# Collect all roles and their children | |
roles_json = [] | |
for role_assignment in user_roles: | |
role_key = role_assignment.role | |
role = await permit.api.roles.get(role_key) | |
permissions = [] | |
# Check if role.permissions is a list before iterating | |
if isinstance(role.permissions, list): | |
for permission in role.permissions: | |
try: | |
resource, action = permission.split(":") | |
permissions.append({ | |
"title": resource.strip(), | |
"link": resource.strip(), | |
"action": action.strip() | |
}) | |
except ValueError as e: | |
print(f"Skipping invalid permission format: {permission}. Error: {e}") | |
continue | |
roles_json.append({ | |
"role": role.name, | |
"children": permissions | |
}) | |
# Process the roles JSON to remove suffixes and merge roles with the same base name | |
merged_roles = {} | |
for role_obj in roles_json: | |
# Remove suffixes from role name | |
base_role = role_obj["role"].rsplit("-", 1)[0].strip() # Split on the last '-' and take the first part | |
if base_role not in merged_roles: | |
merged_roles[base_role] = [] | |
# Add children to the merged role | |
if "children" in role_obj: | |
merged_roles[base_role].extend(role_obj["children"]) | |
# Convert merged roles back to the desired format | |
final_roles = [] | |
for role_name, children in merged_roles.items(): | |
# Remove duplicate children | |
unique_children = [] | |
seen = set() | |
for child in children: | |
child_tuple = tuple(child.items()) | |
if child_tuple not in seen: | |
seen.add(child_tuple) | |
unique_children.append(child) | |
final_roles.append({ | |
"title": role_name, | |
"value": role_name, | |
"icon": "PencilRuler", | |
"children": unique_children | |
}) | |
return final_roles | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Error checking permissions: {str(e)}") | |