rajshukla1102 commited on
Commit
7646003
·
verified ·
1 Parent(s): dc7f9a0

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +53 -51
main.py CHANGED
@@ -12,6 +12,7 @@ load_dotenv()
12
 
13
  app = FastAPI()
14
 
 
15
  app.add_middleware(
16
  CORSMiddleware,
17
  allow_origins=["*"],
@@ -20,6 +21,7 @@ app.add_middleware(
20
  allow_headers=["*"],
21
  )
22
 
 
23
  permit = Permit(
24
  pdp="https://cloudpdp.api.permit.io",
25
  token=os.getenv("PERMIT_TOKEN")
@@ -28,107 +30,107 @@ permit = Permit(
28
  class PermissionRequest(BaseModel):
29
  email: str
30
  key: str
31
- tenant: str # Added tenant
32
 
33
- RESOURCES_CACHE: Optional[Dict[str, List[str]]] = None
 
 
 
 
34
 
35
- async def get_resources_and_actions() -> Dict[str, List[str]]:
36
  global RESOURCES_CACHE
37
  if RESOURCES_CACHE is None:
38
  resources = await permit.api.resources.list()
39
- resource_actions = {}
40
- for resource in resources:
41
- actions = [action.key for action in resource.actions.values()]
42
- resource_actions[resource.key] = actions
43
- RESOURCES_CACHE = resource_actions
 
44
  return RESOURCES_CACHE
45
 
46
- async def check_single_permission(user_email: str, action: str, resource_key: str, tenant: str):
47
- try:
48
- permitted = await permit.check(
49
- user=user_email,
50
- action=action,
51
- resource={"type": resource_key, "tenant": tenant} # Include tenant
52
- )
53
- return {
54
- "title": resource_key,
55
- "link": resource_key,
56
- "action": action,
57
- "permitted": permitted
58
- }
59
- except Exception as e:
60
- print(f"Error checking permission for resource {resource_key} and action {action}: {e}")
61
- return {
62
- "title": resource_key,
63
- "link": resource_key,
64
- "action": action,
65
- "permitted": False
66
- }
67
 
68
  @app.post("/check-permission")
69
  async def check_permissions(request: PermissionRequest):
70
  try:
71
- user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant) # added tenant
72
-
73
- # Collect all roles and their children
 
 
74
  roles_json = []
75
  for role_assignment in user_roles:
76
  role_key = role_assignment.role
77
  role = await permit.api.roles.get(role_key)
78
-
79
  permissions = []
80
- # Check if role.permissions is a list before iterating
81
  if isinstance(role.permissions, list):
82
  for permission in role.permissions:
83
  try:
84
  resource, action = permission.split(":")
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  permissions.append({
86
- "title": resource.strip(),
87
- "link": resource.strip(),
88
- "action": action.strip()
89
  })
90
  except ValueError as e:
91
- print(f"Skipping invalid permission format: {permission}. Error: {e}")
92
  continue
93
-
94
  roles_json.append({
95
  "role": role.name,
96
  "children": permissions
97
  })
98
-
99
- # Process the roles JSON to remove suffixes and merge roles with the same base name
100
- merged_roles = {}
101
 
 
 
102
  for role_obj in roles_json:
103
- # Remove suffixes from role name
104
- base_role = role_obj["role"].rsplit("-", 1)[0].strip() # Split on the last '-' and take the first part
105
  if base_role not in merged_roles:
106
  merged_roles[base_role] = []
107
- # Add children to the merged role
108
  if "children" in role_obj:
109
  merged_roles[base_role].extend(role_obj["children"])
110
 
111
- # Convert merged roles back to the desired format
112
  final_roles = []
113
  for role_name, children in merged_roles.items():
114
- # Remove duplicate children
115
  unique_children = []
116
  seen = set()
117
  for child in children:
118
- child_tuple = tuple(child.items())
119
  if child_tuple not in seen:
120
  seen.add(child_tuple)
121
  unique_children.append(child)
122
-
123
  final_roles.append({
124
  "title": role_name,
125
  "value": role_name,
126
  "icon": "PencilRuler",
127
  "children": unique_children
128
  })
129
-
130
  return final_roles
 
131
  except Exception as e:
 
132
  raise HTTPException(
133
  status_code=500,
134
- detail=f"Error checking permissions: {str(e)}")
 
 
12
 
13
  app = FastAPI()
14
 
15
+ # CORS middleware
16
  app.add_middleware(
17
  CORSMiddleware,
18
  allow_origins=["*"],
 
21
  allow_headers=["*"],
22
  )
23
 
24
+ # Initialize Permit client
25
  permit = Permit(
26
  pdp="https://cloudpdp.api.permit.io",
27
  token=os.getenv("PERMIT_TOKEN")
 
30
  class PermissionRequest(BaseModel):
31
  email: str
32
  key: str
33
+ tenant: str = "default"
34
 
35
+ class ResourceInfo(BaseModel):
36
+ key: str
37
+ title: str
38
+
39
+ RESOURCES_CACHE: Optional[Dict[str, ResourceInfo]] = None
40
 
41
+ async def get_resource_info() -> Dict[str, ResourceInfo]:
42
  global RESOURCES_CACHE
43
  if RESOURCES_CACHE is None:
44
  resources = await permit.api.resources.list()
45
+ RESOURCES_CACHE = {
46
+ resource.key: ResourceInfo(
47
+ key=resource.key,
48
+ title=resource.name or resource.key
49
+ ) for resource in resources
50
+ }
51
  return RESOURCES_CACHE
52
 
53
+ def modify_title(title: str) -> str:
54
+ parts = title.split(' ')
55
+ return ' '.join(parts[1:]) if len(parts) > 1 else title
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
  @app.post("/check-permission")
58
  async def check_permissions(request: PermissionRequest):
59
  try:
60
+ # Get user roles and resource info
61
+ user_roles = await permit.api.users.get_assigned_roles(request.email, tenant=request.tenant)
62
+ resource_info = await get_resource_info()
63
+
64
+ # Collect roles and permissions
65
  roles_json = []
66
  for role_assignment in user_roles:
67
  role_key = role_assignment.role
68
  role = await permit.api.roles.get(role_key)
69
+
70
  permissions = []
 
71
  if isinstance(role.permissions, list):
72
  for permission in role.permissions:
73
  try:
74
  resource, action = permission.split(":")
75
+ resource = resource.strip()
76
+ action = action.strip()
77
+
78
+ # Get resource display name
79
+ resource_title = (
80
+ resource_info[resource].title
81
+ if resource in resource_info
82
+ else resource
83
+ )
84
+
85
+ # Modify the title to remove the prefix
86
+ modified_title = modify_title(resource_title)
87
+
88
  permissions.append({
89
+ "title": modified_title, # Use modified title here
90
+ "link": resource,
91
+ "action": action
92
  })
93
  except ValueError as e:
94
+ print(f"Invalid permission format: {permission}. Error: {e}")
95
  continue
96
+
97
  roles_json.append({
98
  "role": role.name,
99
  "children": permissions
100
  })
 
 
 
101
 
102
+ # Merge roles with same base name
103
+ merged_roles = {}
104
  for role_obj in roles_json:
105
+ base_role = role_obj["role"].rsplit("-", 1)[0].strip()
 
106
  if base_role not in merged_roles:
107
  merged_roles[base_role] = []
 
108
  if "children" in role_obj:
109
  merged_roles[base_role].extend(role_obj["children"])
110
 
111
+ # Format final response
112
  final_roles = []
113
  for role_name, children in merged_roles.items():
 
114
  unique_children = []
115
  seen = set()
116
  for child in children:
117
+ child_tuple = (child["link"], child["action"])
118
  if child_tuple not in seen:
119
  seen.add(child_tuple)
120
  unique_children.append(child)
121
+
122
  final_roles.append({
123
  "title": role_name,
124
  "value": role_name,
125
  "icon": "PencilRuler",
126
  "children": unique_children
127
  })
128
+
129
  return final_roles
130
+
131
  except Exception as e:
132
+ print(f"Error in check_permissions: {str(e)}")
133
  raise HTTPException(
134
  status_code=500,
135
+ detail=f"Error checking permissions: {str(e)}"
136
+ )