rajshukla1102 commited on
Commit
4bbdecf
·
verified ·
1 Parent(s): f46b3cb

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +68 -31
main.py CHANGED
@@ -28,6 +28,7 @@ permit = Permit(
28
  class PermissionRequest(BaseModel):
29
  email: str
30
  key: str
 
31
 
32
  RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None
33
 
@@ -42,21 +43,24 @@ async def get_resources_and_actions() -> Dict[str, List[str]]:
42
  RESOURCES_CACHE = resource_actions
43
  return RESOURCES_CACHE
44
 
45
- async def check_single_permission(user_email: str, action: str, resource_key: str):
46
  try:
47
  permitted = await permit.check(
48
  user=user_email,
49
  action=action,
50
- resource={"type": resource_key}
51
  )
52
  return {
53
- "resource": resource_key,
 
54
  "action": action,
55
  "permitted": permitted
56
  }
57
- except Exception:
 
58
  return {
59
- "resource": resource_key,
 
60
  "action": action,
61
  "permitted": False
62
  }
@@ -64,34 +68,67 @@ async def check_single_permission(user_email: str, action: str, resource_key: st
64
  @app.post("/check-permission")
65
  async def check_permissions(request: PermissionRequest):
66
  try:
67
- resource_actions = await get_resources_and_actions()
68
- tasks = []
69
- for resource_key, actions in resource_actions.items():
70
- for action in actions:
71
- tasks.append(asyncio.create_task(
72
- check_single_permission(request.email, action, resource_key)
73
- ))
74
- results = await asyncio.gather(*tasks)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
 
76
- permissions = []
77
- resource_map = {}
78
- for result in results:
79
- if result["permitted"]:
80
- r_key = result["resource"]
81
- if r_key not in resource_map:
82
- resource_map[r_key] = {
83
- "resource": r_key,
84
- "actions": []
85
- }
86
- permissions.append(resource_map[r_key])
87
- resource_map[r_key]["actions"].append(result["action"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
89
- return permissions
90
  except Exception as e:
91
  raise HTTPException(
92
  status_code=500,
93
- detail=f"Error checking permissions: {str(e)}"
94
- )
95
-
96
- # if __name__ == "__main__":
97
- # uvicorn.run(app, host="0.0.0.0", port=8000)
 
28
  class PermissionRequest(BaseModel):
29
  email: str
30
  key: str
31
+ tenant: str # Added tenant
32
 
33
  RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None
34
 
 
43
  RESOURCES_CACHE = resource_actions
44
  return RESOURCES_CACHE
45
 
46
+ async def check_single_permission(user_email: str, action: str, resource_key: str, tenant: str):
47
  try:
48
  permitted = await permit.check(
49
  user=user_email,
50
  action=action,
51
+ resource={"type": resource_key, "tenant": tenant} # Include tenant
52
  )
53
  return {
54
+ "title": resource_key,
55
+ "link": resource_key,
56
  "action": action,
57
  "permitted": permitted
58
  }
59
+ except Exception as e:
60
+ print(f"Error checking permission for resource {resource_key} and action {action}: {e}")
61
  return {
62
+ "title": resource_key,
63
+ "link": resource_key,
64
  "action": action,
65
  "permitted": False
66
  }
 
68
  @app.post("/check-permission")
69
  async def check_permissions(request: PermissionRequest):
70
  try:
71
+ user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) # added tenant
72
+
73
+ # Collect all roles and their children
74
+ roles_json = []
75
+ for role_assignment in user_roles:
76
+ role_key = role_assignment.role
77
+ role = await permit.api.roles.get(role_key)
78
+
79
+ permissions = []
80
+ # Check if role.permissions is a list before iterating
81
+ if isinstance(role.permissions, list):
82
+ for permission in role.permissions:
83
+ try:
84
+ resource, action = permission.split(":")
85
+ permissions.append({
86
+ "title": resource.strip(),
87
+ "link": resource.strip(),
88
+ "action": action.strip()
89
+ })
90
+ except ValueError as e:
91
+ print(f"Skipping invalid permission format: {permission}. Error: {e}")
92
+ continue
93
+
94
+ roles_json.append({
95
+ "role": role.name,
96
+ "children": permissions
97
+ })
98
 
99
+ # Process the roles JSON to remove suffixes and merge roles with the same base name
100
+ merged_roles = {}
101
+
102
+ for role_obj in roles_json:
103
+ # Remove suffixes from role name
104
+ base_role = role_obj["role"].rsplit("-", 1)[0].strip() # Split on the last '-' and take the first part
105
+ if base_role not in merged_roles:
106
+ merged_roles[base_role] = []
107
+ # Add children to the merged role
108
+ if "children" in role_obj:
109
+ merged_roles[base_role].extend(role_obj["children"])
110
+
111
+ # Convert merged roles back to the desired format
112
+ final_roles = []
113
+ for role_name, children in merged_roles.items():
114
+ # Remove duplicate children
115
+ unique_children = []
116
+ seen = set()
117
+ for child in children:
118
+ child_tuple = tuple(child.items())
119
+ if child_tuple not in seen:
120
+ seen.add(child_tuple)
121
+ unique_children.append(child)
122
+
123
+ final_roles.append({
124
+ "title": role_name,
125
+ "value": role_name,
126
+ "icon": "PencilRuler",
127
+ "children": unique_children
128
+ })
129
 
130
+ return final_roles
131
  except Exception as e:
132
  raise HTTPException(
133
  status_code=500,
134
+ detail=f"Error checking permissions: {str(e)}")