File size: 5,624 Bytes
620b8e8
 
 
 
 
 
 
 
 
 
 
 
 
 
7646003
620b8e8
 
 
 
 
 
 
 
7646003
620b8e8
 
 
 
 
 
 
 
7646003
620b8e8
7646003
 
 
 
 
620b8e8
7646003
620b8e8
 
 
7646003
 
 
 
 
 
620b8e8
 
7646003
 
 
620b8e8
f92543a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
620b8e8
 
 
7646003
 
 
 
 
4bbdecf
 
 
 
7646003
4bbdecf
 
 
 
 
7646003
 
 
 
 
 
 
 
 
 
 
 
 
4bbdecf
f92543a
7646003
 
4bbdecf
 
7646003
4bbdecf
7646003
4bbdecf
 
 
 
 
7646003
 
4bbdecf
7646003
4bbdecf
 
 
 
 
7646003
4bbdecf
 
 
 
 
7646003
4bbdecf
 
 
7646003
f92543a
4bbdecf
 
 
 
f92543a
 
4bbdecf
7646003
f92543a
 
 
 
 
 
 
 
 
4bbdecf
7646003
620b8e8
7646003
620b8e8
 
7646003
 
f92543a
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize Permit client
permit = Permit(
    pdp="https://cloudpdp.api.permit.io",
    token=os.getenv("PERMIT_TOKEN")
)

class PermissionRequest(BaseModel):
    email: str
    key: str
    tenant: str = "default"

class ResourceInfo(BaseModel):
    key: str
    title: str

RESOURCES_CACHE: Optional[Dict[str, ResourceInfo]] = None

async def get_resource_info() -> Dict[str, ResourceInfo]:
    global RESOURCES_CACHE
    if RESOURCES_CACHE is None:
        resources = await permit.api.resources.list()
        RESOURCES_CACHE = {
            resource.key: ResourceInfo(
                key=resource.key,
                title=resource.name or resource.key
            ) for resource in resources
        }
    return RESOURCES_CACHE

def modify_title(title: str) -> str:
    parts = title.split(' ')
    return ' '.join(parts[1:]) if len(parts) > 1 else title

def get_sequence_number(role_title: str) -> int:
    """Extract sequence number from role title for sorting"""
    try:
        return int(role_title.split()[0])
    except (IndexError, ValueError):
        return float('inf')  # Put roles without numbers at the end

def clean_role_title(role_title: str) -> str:
    """Remove sequence number from role title"""
    try:
        parts = role_title.split()
        # Check if first part is a number
        int(parts[0])
        # If it is, return everything after the first part
        return ' '.join(parts[1:])
    except (IndexError, ValueError):
        # If there's no number or invalid format, return original title
        return role_title

@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
    try:
        # Get user roles and resource info
        user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant)
        resource_info = await get_resource_info()

        # Collect roles and permissions
        roles_json = []
        for role_assignment in user_roles:
            role_key = role_assignment.role
            role = await permit.api.roles.get(role_key)

            permissions = []
            if isinstance(role.permissions, list):
                for permission in role.permissions:
                    try:
                        resource, action = permission.split(":")
                        resource = resource.strip()
                        action = action.strip()
                        
                        # Get resource display name
                        resource_title = (
                            resource_info[resource].title 
                            if resource in resource_info 
                            else resource
                        )
                        
                        # Modify the title to remove the prefix
                        modified_title = modify_title(resource_title)

                        permissions.append({
                            "title": modified_title,
                            "link": resource,
                            "action": action
                        })
                    except ValueError as e:
                        print(f"Invalid permission format: {permission}. Error: {e}")
                        continue

            roles_json.append({
                "role": role.name,
                "children": permissions
            })

        # Merge roles with same base name
        merged_roles = {}
        for role_obj in roles_json:
            base_role = role_obj["role"].rsplit("-", 1)[0].strip()
            if base_role not in merged_roles:
                merged_roles[base_role] = []
            if "children" in role_obj:
                merged_roles[base_role].extend(role_obj["children"])

        # Format final response
        final_roles = []
        for role_name, children in merged_roles.items():
            unique_children = []
            seen = set()
            for child in children:
                child_tuple = (child["link"], child["action"])
                if child_tuple not in seen:
                    seen.add(child_tuple)
                    unique_children.append(child)

            # Store original title for sorting
            final_roles.append({
                "title": role_name,
                "value": role_name,
                "icon": "PencilRuler",
                "children": unique_children,
                "original_title": role_name  # Store original title for sorting
            })

        # Sort final_roles based on sequence number
        final_roles.sort(key=lambda x: get_sequence_number(x["original_title"]))

        # Clean up titles and remove original_title field
        for role in final_roles:
            role["title"] = clean_role_title(role["title"])
            role["value"] = clean_role_title(role["value"])
            del role["original_title"]

        return final_roles

    except Exception as e:
        print(f"Error in check_permissions: {str(e)}")
        raise HTTPException(
            status_code=500,
            detail=f"Error checking permissions: {str(e)}"
        )

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)