File size: 4,362 Bytes
620b8e8
 
 
 
 
 
 
 
 
 
 
 
 
 
7646003
620b8e8
 
 
 
 
 
 
 
7646003
620b8e8
 
 
 
 
 
 
 
7646003
620b8e8
7646003
 
 
 
 
620b8e8
7646003
620b8e8
 
 
7646003
 
 
 
 
 
620b8e8
 
7646003
 
 
620b8e8
 
 
 
7646003
 
 
 
 
4bbdecf
 
 
 
7646003
4bbdecf
 
 
 
 
7646003
 
 
 
 
 
 
 
 
 
 
 
 
4bbdecf
7646003
 
 
4bbdecf
 
7646003
4bbdecf
7646003
4bbdecf
 
 
 
 
7646003
 
4bbdecf
7646003
4bbdecf
 
 
 
 
7646003
4bbdecf
 
 
 
 
7646003
4bbdecf
 
 
7646003
4bbdecf
 
 
 
 
 
7646003
4bbdecf
7646003
620b8e8
7646003
620b8e8
 
7646003
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize Permit client
permit = Permit(
    pdp="https://cloudpdp.api.permit.io",
    token=os.getenv("PERMIT_TOKEN")
)

class PermissionRequest(BaseModel):
    email: str
    key: str
    tenant: str = "default"

class ResourceInfo(BaseModel):
    key: str
    title: str

RESOURCES_CACHE: Optional[Dict[str, ResourceInfo]] = None

async def get_resource_info() -> Dict[str, ResourceInfo]:
    global RESOURCES_CACHE
    if RESOURCES_CACHE is None:
        resources = await permit.api.resources.list()
        RESOURCES_CACHE = {
            resource.key: ResourceInfo(
                key=resource.key,
                title=resource.name or resource.key
            ) for resource in resources
        }
    return RESOURCES_CACHE

def modify_title(title: str) -> str:
    parts = title.split(' ')
    return ' '.join(parts[1:]) if len(parts) > 1 else title

@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
    try:
        # Get user roles and resource info
        user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant)
        resource_info = await get_resource_info()

        # Collect roles and permissions
        roles_json = []
        for role_assignment in user_roles:
            role_key = role_assignment.role
            role = await permit.api.roles.get(role_key)

            permissions = []
            if isinstance(role.permissions, list):
                for permission in role.permissions:
                    try:
                        resource, action = permission.split(":")
                        resource = resource.strip()
                        action = action.strip()
                        
                        # Get resource display name
                        resource_title = (
                            resource_info[resource].title 
                            if resource in resource_info 
                            else resource
                        )
                        
                        # Modify the title to remove the prefix
                        modified_title = modify_title(resource_title)

                        permissions.append({
                            "title": modified_title,  # Use modified title here
                            "link": resource,
                            "action": action
                        })
                    except ValueError as e:
                        print(f"Invalid permission format: {permission}. Error: {e}")
                        continue

            roles_json.append({
                "role": role.name,
                "children": permissions
            })

        # Merge roles with same base name
        merged_roles = {}
        for role_obj in roles_json:
            base_role = role_obj["role"].rsplit("-", 1)[0].strip()
            if base_role not in merged_roles:
                merged_roles[base_role] = []
            if "children" in role_obj:
                merged_roles[base_role].extend(role_obj["children"])

        # Format final response
        final_roles = []
        for role_name, children in merged_roles.items():
            unique_children = []
            seen = set()
            for child in children:
                child_tuple = (child["link"], child["action"])
                if child_tuple not in seen:
                    seen.add(child_tuple)
                    unique_children.append(child)

            final_roles.append({
                "title": role_name,
                "value": role_name,
                "icon": "PencilRuler",
                "children": unique_children
            })

        return final_roles

    except Exception as e:
        print(f"Error in check_permissions: {str(e)}")
        raise HTTPException(
            status_code=500,
            detail=f"Error checking permissions: {str(e)}"
        )