accesscontrol / main.py
rajshukla1102's picture
Update main.py
7646003 verified
raw
history blame
4.36 kB
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize Permit client
permit = Permit(
pdp="https://cloudpdp.api.permit.io",
token=os.getenv("PERMIT_TOKEN")
)
class PermissionRequest(BaseModel):
email: str
key: str
tenant: str = "default"
class ResourceInfo(BaseModel):
key: str
title: str
RESOURCES_CACHE: Optional[Dict[str, ResourceInfo]] = None
async def get_resource_info() -> Dict[str, ResourceInfo]:
global RESOURCES_CACHE
if RESOURCES_CACHE is None:
resources = await permit.api.resources.list()
RESOURCES_CACHE = {
resource.key: ResourceInfo(
key=resource.key,
title=resource.name or resource.key
) for resource in resources
}
return RESOURCES_CACHE
def modify_title(title: str) -> str:
parts = title.split(' ')
return ' '.join(parts[1:]) if len(parts) > 1 else title
@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
try:
# Get user roles and resource info
user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant)
resource_info = await get_resource_info()
# Collect roles and permissions
roles_json = []
for role_assignment in user_roles:
role_key = role_assignment.role
role = await permit.api.roles.get(role_key)
permissions = []
if isinstance(role.permissions, list):
for permission in role.permissions:
try:
resource, action = permission.split(":")
resource = resource.strip()
action = action.strip()
# Get resource display name
resource_title = (
resource_info[resource].title
if resource in resource_info
else resource
)
# Modify the title to remove the prefix
modified_title = modify_title(resource_title)
permissions.append({
"title": modified_title, # Use modified title here
"link": resource,
"action": action
})
except ValueError as e:
print(f"Invalid permission format: {permission}. Error: {e}")
continue
roles_json.append({
"role": role.name,
"children": permissions
})
# Merge roles with same base name
merged_roles = {}
for role_obj in roles_json:
base_role = role_obj["role"].rsplit("-", 1)[0].strip()
if base_role not in merged_roles:
merged_roles[base_role] = []
if "children" in role_obj:
merged_roles[base_role].extend(role_obj["children"])
# Format final response
final_roles = []
for role_name, children in merged_roles.items():
unique_children = []
seen = set()
for child in children:
child_tuple = (child["link"], child["action"])
if child_tuple not in seen:
seen.add(child_tuple)
unique_children.append(child)
final_roles.append({
"title": role_name,
"value": role_name,
"icon": "PencilRuler",
"children": unique_children
})
return final_roles
except Exception as e:
print(f"Error in check_permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error checking permissions: {str(e)}"
)