File size: 6,434 Bytes
e3278e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Auth Checks for Organizations
"""

from typing import Dict, List, Optional, Tuple

from fastapi import status

from litellm.proxy._types import *


def organization_role_based_access_check(
    request_body: dict,
    user_object: Optional[LiteLLM_UserTable],
    route: str,
):
    """
    Role based access control checks only run if a user is part of an Organization

    Organization Checks:
    ONLY RUN IF user_object.organization_memberships is not None

    1. Only Proxy Admins can access /organization/new
    2. IF route is a LiteLLMRoutes.org_admin_only_routes, then check if user is an Org Admin for that organization

    """

    if user_object is None:
        return

    passed_organization_id: Optional[str] = request_body.get("organization_id", None)

    if route == "/organization/new":
        if user_object.user_role != LitellmUserRoles.PROXY_ADMIN.value:
            raise ProxyException(
                message=f"Only proxy admins can create new organizations. You are {user_object.user_role}",
                type=ProxyErrorTypes.auth_error.value,
                param="user_role",
                code=status.HTTP_401_UNAUTHORIZED,
            )

    if user_object.user_role == LitellmUserRoles.PROXY_ADMIN.value:
        return

    # Checks if route is an Org Admin Only Route
    if route in LiteLLMRoutes.org_admin_only_routes.value:
        _user_organizations, _user_organization_role_mapping = (
            get_user_organization_info(user_object)
        )

        if user_object.organization_memberships is None:
            raise ProxyException(
                message=f"Tried to access route={route} but you are not a member of any organization. Please contact the proxy admin to request access.",
                type=ProxyErrorTypes.auth_error.value,
                param="organization_id",
                code=status.HTTP_401_UNAUTHORIZED,
            )

        if passed_organization_id is None:
            raise ProxyException(
                message="Passed organization_id is None, please pass an organization_id in your request",
                type=ProxyErrorTypes.auth_error.value,
                param="organization_id",
                code=status.HTTP_401_UNAUTHORIZED,
            )

        user_role: Optional[LitellmUserRoles] = _user_organization_role_mapping.get(
            passed_organization_id
        )
        if user_role is None:
            raise ProxyException(
                message=f"You do not have a role within the selected organization. Passed organization_id: {passed_organization_id}. Please contact the organization admin to request access.",
                type=ProxyErrorTypes.auth_error.value,
                param="organization_id",
                code=status.HTTP_401_UNAUTHORIZED,
            )

        if user_role != LitellmUserRoles.ORG_ADMIN.value:
            raise ProxyException(
                message=f"You do not have the required role to perform {route} in Organization {passed_organization_id}. Your role is {user_role} in Organization {passed_organization_id}",
                type=ProxyErrorTypes.auth_error.value,
                param="user_role",
                code=status.HTTP_401_UNAUTHORIZED,
            )
    elif route == "/team/new":
        # if user is part of multiple teams, then they need to specify the organization_id
        _user_organizations, _user_organization_role_mapping = (
            get_user_organization_info(user_object)
        )
        if (
            user_object.organization_memberships is not None
            and len(user_object.organization_memberships) > 0
        ):
            if passed_organization_id is None:
                raise ProxyException(
                    message=f"Passed organization_id is None, please specify the organization_id in your request. You are part of multiple organizations: {_user_organizations}",
                    type=ProxyErrorTypes.auth_error.value,
                    param="organization_id",
                    code=status.HTTP_401_UNAUTHORIZED,
                )

            _user_role_in_passed_org = _user_organization_role_mapping.get(
                passed_organization_id
            )
            if _user_role_in_passed_org != LitellmUserRoles.ORG_ADMIN.value:
                raise ProxyException(
                    message=f"You do not have the required role to call {route}. Your role is {_user_role_in_passed_org} in Organization {passed_organization_id}",
                    type=ProxyErrorTypes.auth_error.value,
                    param="user_role",
                    code=status.HTTP_401_UNAUTHORIZED,
                )


def get_user_organization_info(
    user_object: LiteLLM_UserTable,
) -> Tuple[List[str], Dict[str, Optional[LitellmUserRoles]]]:
    """
    Helper function to extract user organization information.

    Args:
        user_object (LiteLLM_UserTable): The user object containing organization memberships.

    Returns:
        Tuple[List[str], Dict[str, Optional[LitellmUserRoles]]]: A tuple containing:
            - List of organization IDs the user is a member of
            - Dictionary mapping organization IDs to user roles
    """
    _user_organizations: List[str] = []
    _user_organization_role_mapping: Dict[str, Optional[LitellmUserRoles]] = {}

    if user_object.organization_memberships is not None:
        for _membership in user_object.organization_memberships:
            if _membership.organization_id is not None:
                _user_organizations.append(_membership.organization_id)
                _user_organization_role_mapping[_membership.organization_id] = _membership.user_role  # type: ignore

    return _user_organizations, _user_organization_role_mapping


def _user_is_org_admin(
    request_data: dict,
    user_object: Optional[LiteLLM_UserTable] = None,
) -> bool:
    """
    Helper function to check if user is an org admin for the passed organization_id
    """
    if request_data.get("organization_id", None) is None:
        return False

    if user_object is None:
        return False

    if user_object.organization_memberships is None:
        return False

    for _membership in user_object.organization_memberships:
        if _membership.organization_id == request_data.get("organization_id", None):
            if _membership.user_role == LitellmUserRoles.ORG_ADMIN.value:
                return True

    return False