File size: 4,574 Bytes
620b8e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4bbdecf
620b8e8
 
 
 
 
 
 
 
 
 
 
 
 
 
4bbdecf
620b8e8
 
 
 
4bbdecf
620b8e8
 
4bbdecf
 
620b8e8
 
 
4bbdecf
 
620b8e8
4bbdecf
 
620b8e8
 
 
 
 
 
 
4bbdecf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
620b8e8
4bbdecf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
620b8e8
4bbdecf
620b8e8
 
 
4bbdecf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

permit = Permit(
    pdp="https://cloudpdp.api.permit.io",
    token=os.getenv("PERMIT_TOKEN")
)

class PermissionRequest(BaseModel):
    email: str
    key: str
    tenant: str  # Added tenant

RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None

async def get_resources_and_actions() -> Dict[str, List[str]]:
    global RESOURCES_CACHE
    if RESOURCES_CACHE is None:
        resources = await permit.api.resources.list()
        resource_actions = {}
        for resource in resources:
            actions = [action.key for action in resource.actions.values()]
            resource_actions[resource.key] = actions
        RESOURCES_CACHE = resource_actions
    return RESOURCES_CACHE

async def check_single_permission(user_email: str, action: str, resource_key: str, tenant: str):
    try:
        permitted = await permit.check(
            user=user_email,
            action=action,
            resource={"type": resource_key, "tenant": tenant}  # Include tenant
        )
        return {
            "title": resource_key,
            "link": resource_key,
            "action": action,
            "permitted": permitted
        }
    except Exception as e:
        print(f"Error checking permission for resource {resource_key} and action {action}: {e}")
        return {
            "title": resource_key,
            "link": resource_key,
            "action": action,
            "permitted": False
        }

@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
    try:
        user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) # added tenant
        
        # Collect all roles and their children
        roles_json = []
        for role_assignment in user_roles:
            role_key = role_assignment.role
            role = await permit.api.roles.get(role_key)
            
            permissions = []
            # Check if role.permissions is a list before iterating
            if isinstance(role.permissions, list):
                for permission in role.permissions:
                    try:
                        resource, action = permission.split(":")
                        permissions.append({
                            "title": resource.strip(),
                            "link": resource.strip(),
                            "action": action.strip()
                        })
                    except ValueError as e:
                        print(f"Skipping invalid permission format: {permission}. Error: {e}")
                        continue
            
            roles_json.append({
                "role": role.name,
                "children": permissions
            })
        
        # Process the roles JSON to remove suffixes and merge roles with the same base name
        merged_roles = {}

        for role_obj in roles_json:
            # Remove suffixes from role name
            base_role = role_obj["role"].rsplit("-", 1)[0].strip()  # Split on the last '-' and take the first part
            if base_role not in merged_roles:
                merged_roles[base_role] = []
            # Add children to the merged role
            if "children" in role_obj:
                merged_roles[base_role].extend(role_obj["children"])

        # Convert merged roles back to the desired format
        final_roles = []
        for role_name, children in merged_roles.items():
            # Remove duplicate children
            unique_children = []
            seen = set()
            for child in children:
                child_tuple = tuple(child.items())
                if child_tuple not in seen:
                    seen.add(child_tuple)
                    unique_children.append(child)
            
            final_roles.append({
                "title": role_name,
                "value": role_name,
                "icon": "PencilRuler",
                "children": unique_children
            })
        
        return final_roles
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Error checking permissions: {str(e)}")