accesscontrol / main.py
rajshukla1102's picture
Update main.py
4bbdecf verified
raw
history blame
4.57 kB
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
permit = Permit(
pdp="https://cloudpdp.api.permit.io",
token=os.getenv("PERMIT_TOKEN")
)
class PermissionRequest(BaseModel):
email: str
key: str
tenant: str # Added tenant
RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None
async def get_resources_and_actions() -> Dict[str, List[str]]:
global RESOURCES_CACHE
if RESOURCES_CACHE is None:
resources = await permit.api.resources.list()
resource_actions = {}
for resource in resources:
actions = [action.key for action in resource.actions.values()]
resource_actions[resource.key] = actions
RESOURCES_CACHE = resource_actions
return RESOURCES_CACHE
async def check_single_permission(user_email: str, action: str, resource_key: str, tenant: str):
try:
permitted = await permit.check(
user=user_email,
action=action,
resource={"type": resource_key, "tenant": tenant} # Include tenant
)
return {
"title": resource_key,
"link": resource_key,
"action": action,
"permitted": permitted
}
except Exception as e:
print(f"Error checking permission for resource {resource_key} and action {action}: {e}")
return {
"title": resource_key,
"link": resource_key,
"action": action,
"permitted": False
}
@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
try:
user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) # added tenant
# Collect all roles and their children
roles_json = []
for role_assignment in user_roles:
role_key = role_assignment.role
role = await permit.api.roles.get(role_key)
permissions = []
# Check if role.permissions is a list before iterating
if isinstance(role.permissions, list):
for permission in role.permissions:
try:
resource, action = permission.split(":")
permissions.append({
"title": resource.strip(),
"link": resource.strip(),
"action": action.strip()
})
except ValueError as e:
print(f"Skipping invalid permission format: {permission}. Error: {e}")
continue
roles_json.append({
"role": role.name,
"children": permissions
})
# Process the roles JSON to remove suffixes and merge roles with the same base name
merged_roles = {}
for role_obj in roles_json:
# Remove suffixes from role name
base_role = role_obj["role"].rsplit("-", 1)[0].strip() # Split on the last '-' and take the first part
if base_role not in merged_roles:
merged_roles[base_role] = []
# Add children to the merged role
if "children" in role_obj:
merged_roles[base_role].extend(role_obj["children"])
# Convert merged roles back to the desired format
final_roles = []
for role_name, children in merged_roles.items():
# Remove duplicate children
unique_children = []
seen = set()
for child in children:
child_tuple = tuple(child.items())
if child_tuple not in seen:
seen.add(child_tuple)
unique_children.append(child)
final_roles.append({
"title": role_name,
"value": role_name,
"icon": "PencilRuler",
"children": unique_children
})
return final_roles
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error checking permissions: {str(e)}")