rajshukla1102 commited on
Commit
620b8e8
·
verified ·
1 Parent(s): 403448e

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +97 -0
main.py CHANGED
@@ -0,0 +1,97 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from fastapi import FastAPI, HTTPException
3
+ from pydantic import BaseModel
4
+ from permit import Permit
5
+ import uvicorn
6
+ import asyncio
7
+ from typing import Dict, List, Optional
8
+ from fastapi.middleware.cors import CORSMiddleware
9
+ from dotenv import load_dotenv
10
+
11
+ load_dotenv()
12
+
13
+ app = FastAPI()
14
+
15
+ app.add_middleware(
16
+ CORSMiddleware,
17
+ allow_origins=["*"],
18
+ allow_credentials=True,
19
+ allow_methods=["*"],
20
+ allow_headers=["*"],
21
+ )
22
+
23
+ permit = Permit(
24
+ pdp="https://cloudpdp.api.permit.io",
25
+ token=os.getenv("PERMIT_TOKEN")
26
+ )
27
+
28
+ class PermissionRequest(BaseModel):
29
+ email: str
30
+ key: str
31
+
32
+ RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None
33
+
34
+ async def get_resources_and_actions() -> Dict[str, List[str]]:
35
+ global RESOURCES_CACHE
36
+ if RESOURCES_CACHE is None:
37
+ resources = await permit.api.resources.list()
38
+ resource_actions = {}
39
+ for resource in resources:
40
+ actions = [action.key for action in resource.actions.values()]
41
+ resource_actions[resource.key] = actions
42
+ RESOURCES_CACHE = resource_actions
43
+ return RESOURCES_CACHE
44
+
45
+ async def check_single_permission(user_email: str, action: str, resource_key: str):
46
+ try:
47
+ permitted = await permit.check(
48
+ user=user_email,
49
+ action=action,
50
+ resource={"type": resource_key}
51
+ )
52
+ return {
53
+ "resource": resource_key,
54
+ "action": action,
55
+ "permitted": permitted
56
+ }
57
+ except Exception:
58
+ return {
59
+ "resource": resource_key,
60
+ "action": action,
61
+ "permitted": False
62
+ }
63
+
64
+ @app.post("/check-permission")
65
+ async def check_permissions(request: PermissionRequest):
66
+ try:
67
+ resource_actions = await get_resources_and_actions()
68
+ tasks = []
69
+ for resource_key, actions in resource_actions.items():
70
+ for action in actions:
71
+ tasks.append(asyncio.create_task(
72
+ check_single_permission(request.email, action, resource_key)
73
+ ))
74
+ results = await asyncio.gather(*tasks)
75
+
76
+ permissions = []
77
+ resource_map = {}
78
+ for result in results:
79
+ if result["permitted"]:
80
+ r_key = result["resource"]
81
+ if r_key not in resource_map:
82
+ resource_map[r_key] = {
83
+ "resource": r_key,
84
+ "actions": []
85
+ }
86
+ permissions.append(resource_map[r_key])
87
+ resource_map[r_key]["actions"].append(result["action"])
88
+
89
+ return permissions
90
+ except Exception as e:
91
+ raise HTTPException(
92
+ status_code=500,
93
+ detail=f"Error checking permissions: {str(e)}"
94
+ )
95
+
96
+ # if __name__ == "__main__":
97
+ # uvicorn.run(app, host="0.0.0.0", port=8000)