File size: 2,858 Bytes
620b8e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from permit import Permit
import uvicorn
import asyncio
from typing import Dict, List, Optional
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

permit = Permit(
    pdp="https://cloudpdp.api.permit.io",
    token=os.getenv("PERMIT_TOKEN")
)

class PermissionRequest(BaseModel):
    email: str
    key: str

RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None

async def get_resources_and_actions() -> Dict[str, List[str]]:
    global RESOURCES_CACHE
    if RESOURCES_CACHE is None:
        resources = await permit.api.resources.list()
        resource_actions = {}
        for resource in resources:
            actions = [action.key for action in resource.actions.values()]
            resource_actions[resource.key] = actions
        RESOURCES_CACHE = resource_actions
    return RESOURCES_CACHE

async def check_single_permission(user_email: str, action: str, resource_key: str):
    try:
        permitted = await permit.check(
            user=user_email,
            action=action,
            resource={"type": resource_key}
        )
        return {
            "resource": resource_key,
            "action": action,
            "permitted": permitted
        }
    except Exception:
        return {
            "resource": resource_key,
            "action": action,
            "permitted": False
        }

@app.post("/check-permission")
async def check_permissions(request: PermissionRequest):
    try:
        resource_actions = await get_resources_and_actions()
        tasks = []
        for resource_key, actions in resource_actions.items():
            for action in actions:
                tasks.append(asyncio.create_task(
                    check_single_permission(request.email, action, resource_key)
                ))
        results = await asyncio.gather(*tasks)
        
        permissions = []
        resource_map = {}
        for result in results:
            if result["permitted"]:
                r_key = result["resource"]
                if r_key not in resource_map:
                    resource_map[r_key] = {
                        "resource": r_key,
                        "actions": []
                    }
                    permissions.append(resource_map[r_key])
                resource_map[r_key]["actions"].append(result["action"])
        
        return permissions
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Error checking permissions: {str(e)}"
        )

# if __name__ == "__main__":
#     uvicorn.run(app, host="0.0.0.0", port=8000)