TestLLM / litellm /proxy /management_endpoints /team_callback_endpoints.py
Raju2024's picture
Upload 1072 files
e3278e4 verified
raw
history blame
15.6 kB
"""
Endpoints to control callbacks per team
Use this when each team should control its own callbacks
"""
import json
import traceback
from typing import Optional
from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
from litellm._logging import verbose_proxy_logger
from litellm.proxy._types import (
AddTeamCallback,
ProxyErrorTypes,
ProxyException,
TeamCallbackMetadata,
UserAPIKeyAuth,
)
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.proxy.management_helpers.utils import management_endpoint_wrapper
router = APIRouter()
@router.post(
"/team/{team_id:path}/callback",
tags=["team management"],
dependencies=[Depends(user_api_key_auth)],
)
@management_endpoint_wrapper
async def add_team_callbacks(
data: AddTeamCallback,
http_request: Request,
team_id: str,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
litellm_changed_by: Optional[str] = Header(
None,
description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
),
):
"""
Add a success/failure callback to a team
Use this if if you want different teams to have different success/failure callbacks
Parameters:
- callback_name (Literal["langfuse", "langsmith", "gcs"], required): The name of the callback to add
- callback_type (Literal["success", "failure", "success_and_failure"], required): The type of callback to add. One of:
- "success": Callback for successful LLM calls
- "failure": Callback for failed LLM calls
- "success_and_failure": Callback for both successful and failed LLM calls
- callback_vars (StandardCallbackDynamicParams, required): A dictionary of variables to pass to the callback
- langfuse_public_key: The public key for the Langfuse callback
- langfuse_secret_key: The secret key for the Langfuse callback
- langfuse_secret: The secret for the Langfuse callback
- langfuse_host: The host for the Langfuse callback
- gcs_bucket_name: The name of the GCS bucket
- gcs_path_service_account: The path to the GCS service account
- langsmith_api_key: The API key for the Langsmith callback
- langsmith_project: The project for the Langsmith callback
- langsmith_base_url: The base URL for the Langsmith callback
Example curl:
```
curl -X POST 'http:/localhost:4000/team/dbe2f686-a686-4896-864a-4c3924458709/callback' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-d '{
"callback_name": "langfuse",
"callback_type": "success",
"callback_vars": {"langfuse_public_key": "pk-lf-xxxx1", "langfuse_secret_key": "sk-xxxxx"}
}'
```
This means for the team where team_id = dbe2f686-a686-4896-864a-4c3924458709, all LLM calls will be logged to langfuse using the public key pk-lf-xxxx1 and the secret key sk-xxxxx
"""
try:
from litellm.proxy.proxy_server import prisma_client
if prisma_client is None:
raise HTTPException(status_code=500, detail={"error": "No db connected"})
# Check if team_id exists already
_existing_team = await prisma_client.get_data(
team_id=team_id, table_name="team", query_type="find_unique"
)
if _existing_team is None:
raise HTTPException(
status_code=400,
detail={
"error": f"Team id = {team_id} does not exist. Please use a different team id."
},
)
# store team callback settings in metadata
team_metadata = _existing_team.metadata
team_callback_settings = team_metadata.get("callback_settings", {})
# expect callback settings to be
team_callback_settings_obj = TeamCallbackMetadata(**team_callback_settings)
if data.callback_type == "success":
if team_callback_settings_obj.success_callback is None:
team_callback_settings_obj.success_callback = []
if data.callback_name in team_callback_settings_obj.success_callback:
raise ProxyException(
message=f"callback_name = {data.callback_name} already exists in failure_callback, for team_id = {team_id}. \n Existing failure_callback = {team_callback_settings_obj.success_callback}",
code=status.HTTP_400_BAD_REQUEST,
type=ProxyErrorTypes.bad_request_error,
param="callback_name",
)
team_callback_settings_obj.success_callback.append(data.callback_name)
elif data.callback_type == "failure":
if team_callback_settings_obj.failure_callback is None:
team_callback_settings_obj.failure_callback = []
if data.callback_name in team_callback_settings_obj.failure_callback:
raise ProxyException(
message=f"callback_name = {data.callback_name} already exists in failure_callback, for team_id = {team_id}. \n Existing failure_callback = {team_callback_settings_obj.failure_callback}",
code=status.HTTP_400_BAD_REQUEST,
type=ProxyErrorTypes.bad_request_error,
param="callback_name",
)
team_callback_settings_obj.failure_callback.append(data.callback_name)
elif data.callback_type == "success_and_failure":
if team_callback_settings_obj.success_callback is None:
team_callback_settings_obj.success_callback = []
if team_callback_settings_obj.failure_callback is None:
team_callback_settings_obj.failure_callback = []
if data.callback_name in team_callback_settings_obj.success_callback:
raise ProxyException(
message=f"callback_name = {data.callback_name} already exists in success_callback, for team_id = {team_id}. \n Existing success_callback = {team_callback_settings_obj.success_callback}",
code=status.HTTP_400_BAD_REQUEST,
type=ProxyErrorTypes.bad_request_error,
param="callback_name",
)
if data.callback_name in team_callback_settings_obj.failure_callback:
raise ProxyException(
message=f"callback_name = {data.callback_name} already exists in failure_callback, for team_id = {team_id}. \n Existing failure_callback = {team_callback_settings_obj.failure_callback}",
code=status.HTTP_400_BAD_REQUEST,
type=ProxyErrorTypes.bad_request_error,
param="callback_name",
)
team_callback_settings_obj.success_callback.append(data.callback_name)
team_callback_settings_obj.failure_callback.append(data.callback_name)
for var, value in data.callback_vars.items():
if team_callback_settings_obj.callback_vars is None:
team_callback_settings_obj.callback_vars = {}
team_callback_settings_obj.callback_vars[var] = value
team_callback_settings_obj_dict = team_callback_settings_obj.model_dump()
team_metadata["callback_settings"] = team_callback_settings_obj_dict
team_metadata_json = json.dumps(team_metadata) # update team_metadata
new_team_row = await prisma_client.db.litellm_teamtable.update(
where={"team_id": team_id}, data={"metadata": team_metadata_json} # type: ignore
)
return {
"status": "success",
"data": new_team_row,
}
except Exception as e:
verbose_proxy_logger.error(
"litellm.proxy.proxy_server.add_team_callbacks(): Exception occured - {}".format(
str(e)
)
)
verbose_proxy_logger.debug(traceback.format_exc())
if isinstance(e, HTTPException):
raise ProxyException(
message=getattr(e, "detail", f"Internal Server Error({str(e)})"),
type=ProxyErrorTypes.internal_server_error.value,
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR),
)
elif isinstance(e, ProxyException):
raise e
raise ProxyException(
message="Internal Server Error, " + str(e),
type=ProxyErrorTypes.internal_server_error.value,
param=getattr(e, "param", "None"),
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@router.post(
"/team/{team_id}/disable_logging",
tags=["team management"],
dependencies=[Depends(user_api_key_auth)],
)
@management_endpoint_wrapper
async def disable_team_logging(
http_request: Request,
team_id: str,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Disable all logging callbacks for a team
Parameters:
- team_id (str, required): The unique identifier for the team
Example curl:
```
curl -X POST 'http://localhost:4000/team/dbe2f686-a686-4896-864a-4c3924458709/disable_logging' \
-H 'Authorization: Bearer sk-1234'
```
"""
try:
from litellm.proxy.proxy_server import prisma_client
if prisma_client is None:
raise HTTPException(status_code=500, detail={"error": "No db connected"})
# Check if team exists
_existing_team = await prisma_client.get_data(
team_id=team_id, table_name="team", query_type="find_unique"
)
if _existing_team is None:
raise HTTPException(
status_code=404,
detail={"error": f"Team id = {team_id} does not exist."},
)
# Update team metadata to disable logging
team_metadata = _existing_team.metadata
team_callback_settings = team_metadata.get("callback_settings", {})
team_callback_settings_obj = TeamCallbackMetadata(**team_callback_settings)
# Reset callbacks
team_callback_settings_obj.success_callback = []
team_callback_settings_obj.failure_callback = []
# Update metadata
team_metadata["callback_settings"] = team_callback_settings_obj.model_dump()
team_metadata_json = json.dumps(team_metadata)
# Update team in database
updated_team = await prisma_client.db.litellm_teamtable.update(
where={"team_id": team_id}, data={"metadata": team_metadata_json} # type: ignore
)
if updated_team is None:
raise HTTPException(
status_code=404,
detail={
"error": f"Team id = {team_id} does not exist. Error updating team logging"
},
)
return {
"status": "success",
"message": f"Logging disabled for team {team_id}",
"data": {
"team_id": updated_team.team_id,
"success_callbacks": [],
"failure_callbacks": [],
},
}
except Exception as e:
verbose_proxy_logger.error(
f"litellm.proxy.proxy_server.disable_team_logging(): Exception occurred - {str(e)}"
)
verbose_proxy_logger.debug(traceback.format_exc())
if isinstance(e, HTTPException):
raise ProxyException(
message=getattr(e, "detail", f"Internal Server Error({str(e)})"),
type=ProxyErrorTypes.internal_server_error.value,
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR),
)
elif isinstance(e, ProxyException):
raise e
raise ProxyException(
message="Internal Server Error, " + str(e),
type=ProxyErrorTypes.internal_server_error.value,
param=getattr(e, "param", "None"),
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@router.get(
"/team/{team_id:path}/callback",
tags=["team management"],
dependencies=[Depends(user_api_key_auth)],
)
@management_endpoint_wrapper
async def get_team_callbacks(
http_request: Request,
team_id: str,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Get the success/failure callbacks and variables for a team
Parameters:
- team_id (str, required): The unique identifier for the team
Example curl:
```
curl -X GET 'http://localhost:4000/team/dbe2f686-a686-4896-864a-4c3924458709/callback' \
-H 'Authorization: Bearer sk-1234'
```
This will return the callback settings for the team with id dbe2f686-a686-4896-864a-4c3924458709
Returns {
"status": "success",
"data": {
"team_id": team_id,
"success_callbacks": team_callback_settings_obj.success_callback,
"failure_callbacks": team_callback_settings_obj.failure_callback,
"callback_vars": team_callback_settings_obj.callback_vars,
},
}
"""
try:
from litellm.proxy.proxy_server import prisma_client
if prisma_client is None:
raise HTTPException(status_code=500, detail={"error": "No db connected"})
# Check if team_id exists
_existing_team = await prisma_client.get_data(
team_id=team_id, table_name="team", query_type="find_unique"
)
if _existing_team is None:
raise HTTPException(
status_code=404,
detail={"error": f"Team id = {team_id} does not exist."},
)
# Retrieve team callback settings from metadata
team_metadata = _existing_team.metadata
team_callback_settings = team_metadata.get("callback_settings", {})
# Convert to TeamCallbackMetadata object for consistent structure
team_callback_settings_obj = TeamCallbackMetadata(**team_callback_settings)
return {
"status": "success",
"data": {
"team_id": team_id,
"success_callbacks": team_callback_settings_obj.success_callback,
"failure_callbacks": team_callback_settings_obj.failure_callback,
"callback_vars": team_callback_settings_obj.callback_vars,
},
}
except Exception as e:
verbose_proxy_logger.error(
"litellm.proxy.proxy_server.get_team_callbacks(): Exception occurred - {}".format(
str(e)
)
)
verbose_proxy_logger.debug(traceback.format_exc())
if isinstance(e, HTTPException):
raise ProxyException(
message=getattr(e, "detail", f"Internal Server Error({str(e)})"),
type=ProxyErrorTypes.internal_server_error.value,
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR),
)
elif isinstance(e, ProxyException):
raise e
raise ProxyException(
message="Internal Server Error, " + str(e),
type=ProxyErrorTypes.internal_server_error.value,
param=getattr(e, "param", "None"),
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)