# Copyright DataStax, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations import logging from typing import Any, cast, Dict, Optional, TypedDict import httpx from astrapy.core.api import ( APIRequestError, api_request, async_api_request, raw_api_request, async_raw_api_request, ) from astrapy.core.utils import ( http_methods, to_httpx_timeout, TimeoutInfoWideType, ) from astrapy.core.defaults import ( DEFAULT_DEV_OPS_AUTH_HEADER, DEFAULT_DEV_OPS_API_VERSION, DEFAULT_DEV_OPS_URL, ) from astrapy.core.core_types import API_RESPONSE, OPS_API_RESPONSE logger = logging.getLogger(__name__) class AstraDBOpsConstructorParams(TypedDict): token: str dev_ops_url: Optional[str] dev_ops_api_version: Optional[str] caller_name: Optional[str] caller_version: Optional[str] class AstraDBOps: # Initialize the shared httpx clients as class attributes client = httpx.Client() async_client = httpx.AsyncClient() def __init__( self, token: str, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> None: self.caller_name = caller_name self.caller_version = caller_version # constructor params (for the copy() method): self.constructor_params: AstraDBOpsConstructorParams = { "token": token, "dev_ops_url": dev_ops_url, "dev_ops_api_version": dev_ops_api_version, "caller_name": caller_name, "caller_version": caller_version, } # dev_ops_url = (dev_ops_url or DEFAULT_DEV_OPS_URL).strip("/") dev_ops_api_version = ( dev_ops_api_version or DEFAULT_DEV_OPS_API_VERSION ).strip("/") self.token = "Bearer " + token self.base_url = f"{dev_ops_url}/{dev_ops_api_version}" def __eq__(self, other: Any) -> bool: if isinstance(other, AstraDBOps): # work on the "normalized" quantities (stripped, etc) return all( [ self.token == other.token, self.base_url == other.base_url, self.caller_name == other.caller_name, self.caller_version == other.caller_version, ] ) else: return False def copy( self, *, token: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> AstraDBOps: return AstraDBOps( token=token or self.constructor_params["token"], dev_ops_url=dev_ops_url or self.constructor_params["dev_ops_url"], dev_ops_api_version=dev_ops_api_version or self.constructor_params["dev_ops_api_version"], caller_name=caller_name or self.caller_name, caller_version=caller_version or self.caller_version, ) def set_caller( self, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> None: self.caller_name = caller_name self.caller_version = caller_version def _ops_request( self, method: str, path: str, options: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> httpx.Response: _options = {} if options is None else options raw_response = raw_api_request( client=self.client, base_url=self.base_url, auth_header=DEFAULT_DEV_OPS_AUTH_HEADER, token=self.token, method=method, json_data=json_data, url_params=_options, path=path, caller_name=self.caller_name, caller_version=self.caller_version, timeout=to_httpx_timeout(timeout_info), ) return raw_response async def _async_ops_request( self, method: str, path: str, options: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> httpx.Response: _options = {} if options is None else options raw_response = await async_raw_api_request( client=self.async_client, base_url=self.base_url, auth_header=DEFAULT_DEV_OPS_AUTH_HEADER, token=self.token, method=method, json_data=json_data, url_params=_options, path=path, caller_name=self.caller_name, caller_version=self.caller_version, timeout=to_httpx_timeout(timeout_info), ) return raw_response def _json_ops_request( self, method: str, path: str, options: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: _options = {} if options is None else options response = api_request( client=self.client, base_url=self.base_url, auth_header="Authorization", token=self.token, method=method, json_data=json_data, url_params=_options, path=path, skip_error_check=False, caller_name=None, caller_version=None, timeout=to_httpx_timeout(timeout_info), ) return response async def _async_json_ops_request( self, method: str, path: str, options: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: _options = {} if options is None else options response = await async_api_request( client=self.async_client, base_url=self.base_url, auth_header="Authorization", token=self.token, method=method, json_data=json_data, url_params=_options, path=path, skip_error_check=False, caller_name=None, caller_version=None, timeout=to_httpx_timeout(timeout_info), ) return response def get_databases( self, options: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Retrieve a list of databases. Args: options (dict, optional): Additional options for the request. Returns: list: a JSON list of dictionaries, one per database. """ response = self._json_ops_request( method=http_methods.GET, path="/databases", options=options, timeout_info=timeout_info, ) return response async def async_get_databases( self, options: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Retrieve a list of databases - async version of the method. Args: options (dict, optional): Additional options for the request. Returns: list: a JSON list of dictionaries, one per database. """ response = await self._async_json_ops_request( method=http_methods.GET, path="/databases", options=options, timeout_info=timeout_info, ) return response def create_database( self, database_definition: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> Dict[str, str]: """ Create a new database. Args: database_definition (dict, optional): A dictionary defining the properties of the database to be created. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: dict: A dictionary such as: {"id": the ID of the created database} Raises an error if not successful. """ r = self._ops_request( method=http_methods.POST, path="/databases", json_data=database_definition, timeout_info=timeout_info, ) if r.status_code == 201: return {"id": r.headers["Location"]} elif r.status_code >= 400 and r.status_code < 500: raise APIRequestError(r, payload=database_definition) else: raise ValueError(f"[HTTP {r.status_code}] {r.text}") async def async_create_database( self, database_definition: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> Dict[str, str]: """ Create a new database - async version of the method. Args: database_definition (dict, optional): A dictionary defining the properties of the database to be created. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: dict: A dictionary such as: {"id": the ID of the created database} Raises an error if not successful. """ r = await self._async_ops_request( method=http_methods.POST, path="/databases", json_data=database_definition, timeout_info=timeout_info, ) if r.status_code == 201: return {"id": r.headers["Location"]} elif r.status_code >= 400 and r.status_code < 500: raise APIRequestError(r, payload=database_definition) else: raise ValueError(f"[HTTP {r.status_code}] {r.text}") def terminate_database( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> str: """ Terminate an existing database. Args: database (str): The identifier of the database to terminate. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: str: The identifier of the terminated database, or None if termination was unsuccessful. """ r = self._ops_request( method=http_methods.POST, path=f"/databases/{database}/terminate", timeout_info=timeout_info, ) if r.status_code == 202: return database elif r.status_code >= 400 and r.status_code < 500: raise APIRequestError(r, payload=None) else: raise ValueError(f"[HTTP {r.status_code}] {r.text}") return None async def async_terminate_database( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> str: """ Terminate an existing database - async version of the method. Args: database (str): The identifier of the database to terminate. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: str: The identifier of the terminated database, or None if termination was unsuccessful. """ r = await self._async_ops_request( method=http_methods.POST, path=f"/databases/{database}/terminate", timeout_info=timeout_info, ) if r.status_code == 202: return database elif r.status_code >= 400 and r.status_code < 500: raise APIRequestError(r, payload=None) else: raise ValueError(f"[HTTP {r.status_code}] {r.text}") return None def get_database( self, database: str = "", options: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> API_RESPONSE: """ Retrieve details of a specific database. Args: database (str): The identifier of the database to retrieve. options (dict, optional): Additional options for the request. Returns: dict: A JSON response containing the details of the specified database. """ return cast( API_RESPONSE, self._json_ops_request( method=http_methods.GET, path=f"/databases/{database}", options=options, timeout_info=timeout_info, ), ) async def async_get_database( self, database: str = "", options: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> API_RESPONSE: """ Retrieve details of a specific database - async version of the method. Args: database (str): The identifier of the database to retrieve. options (dict, optional): Additional options for the request. Returns: dict: A JSON response containing the details of the specified database. """ return cast( API_RESPONSE, await self._async_json_ops_request( method=http_methods.GET, path=f"/databases/{database}", options=options, timeout_info=timeout_info, ), ) def create_keyspace( self, database: str = "", keyspace: str = "", timeout_info: TimeoutInfoWideType = None, ) -> Dict[str, str]: """ Create a keyspace in a specified database. Args: database (str): The identifier of the database where the keyspace will be created. keyspace (str): The name of the keyspace to create. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: {"ok": 1} if successful. Raises errors otherwise. """ r = self._ops_request( method=http_methods.POST, path=f"/databases/{database}/keyspaces/{keyspace}", timeout_info=timeout_info, ) if r.status_code == 201: return {"name": keyspace} elif r.status_code >= 400 and r.status_code < 500: raise APIRequestError(r, payload=None) else: raise ValueError(f"[HTTP {r.status_code}] {r.text}") async def async_create_keyspace( self, database: str = "", keyspace: str = "", timeout_info: TimeoutInfoWideType = None, ) -> Dict[str, str]: """ Create a keyspace in a specified database - async version of the method. Args: database (str): The identifier of the database where the keyspace will be created. keyspace (str): The name of the keyspace to create. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: {"ok": 1} if successful. Raises errors otherwise. """ r = await self._async_ops_request( method=http_methods.POST, path=f"/databases/{database}/keyspaces/{keyspace}", timeout_info=timeout_info, ) if r.status_code == 201: return {"name": keyspace} elif r.status_code >= 400 and r.status_code < 500: raise APIRequestError(r, payload=None) else: raise ValueError(f"[HTTP {r.status_code}] {r.text}") def delete_keyspace( self, database: str = "", keyspace: str = "", timeout_info: TimeoutInfoWideType = None, ) -> str: """ Delete a keyspace from a database Args: database (str): The identifier of the database to terminate. keyspace (str): The name of the keyspace to create. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: str: The identifier of the deleted keyspace. Otherwise raises an error. """ r = self._ops_request( method=http_methods.DELETE, path=f"/databases/{database}/keyspaces/{keyspace}", timeout_info=timeout_info, ) if r.status_code == 202: return keyspace elif r.status_code >= 400 and r.status_code < 500: raise APIRequestError(r, payload=None) else: raise ValueError(f"[HTTP {r.status_code}] {r.text}") async def async_delete_keyspace( self, database: str = "", keyspace: str = "", timeout_info: TimeoutInfoWideType = None, ) -> str: """ Delete a keyspace from a database - async version of the method. Args: database (str): The identifier of the database to terminate. keyspace (str): The name of the keyspace to create. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: str: The identifier of the deleted keyspace. Otherwise raises an error. """ r = await self._async_ops_request( method=http_methods.DELETE, path=f"/databases/{database}/keyspaces/{keyspace}", timeout_info=timeout_info, ) if r.status_code == 202: return keyspace elif r.status_code >= 400 and r.status_code < 500: raise APIRequestError(r, payload=None) else: raise ValueError(f"[HTTP {r.status_code}] {r.text}") def park_database( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Park a specific database, making it inactive. Args: database (str): The identifier of the database to park. Returns: dict: The response from the server after parking the database. """ return self._json_ops_request( method=http_methods.POST, path=f"/databases/{database}/park", timeout_info=timeout_info, ) def unpark_database( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Unpark a specific database, making it active again. Args: database (str): The identifier of the database to unpark. Returns: dict: The response from the server after unparking the database. """ return self._json_ops_request( method=http_methods.POST, path=f"/databases/{database}/unpark", timeout_info=timeout_info, ) def resize_database( self, database: str = "", options: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Resize a specific database according to provided options. Args: database (str): The identifier of the database to resize. options (dict, optional): The specifications for the resize operation. Returns: dict: The response from the server after the resize operation. """ return self._json_ops_request( method=http_methods.POST, path=f"/databases/{database}/resize", json_data=options, timeout_info=timeout_info, ) def reset_database_password( self, database: str = "", options: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Reset the password for a specific database. Args: database (str): The identifier of the database for which to reset the password. options (dict, optional): Additional options for the password reset. Returns: dict: The response from the server after resetting the password. """ return self._json_ops_request( method=http_methods.POST, path=f"/databases/{database}/resetPassword", json_data=options, timeout_info=timeout_info, ) def get_secure_bundle( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve a secure bundle URL for a specific database. Args: database (str): The identifier of the database for which to get the secure bundle. Returns: dict: The secure bundle URL and related information. """ return self._json_ops_request( method=http_methods.POST, path=f"/databases/{database}/secureBundleURL", timeout_info=timeout_info, ) def get_datacenters( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Get a list of datacenters associated with a specific database. Args: database (str): The identifier of the database for which to list datacenters. Returns: dict: A list of datacenters and their details. """ return self._json_ops_request( method=http_methods.GET, path=f"/databases/{database}/datacenters", timeout_info=timeout_info, ) def create_datacenter( self, database: str = "", options: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Create a new datacenter for a specific database. Args: database (str): The identifier of the database for which to create the datacenter. options (dict, optional): Specifications for the new datacenter. Returns: dict: The response from the server after creating the datacenter. """ return self._json_ops_request( method=http_methods.POST, path=f"/databases/{database}/datacenters", json_data=options, timeout_info=timeout_info, ) def terminate_datacenter( self, database: str = "", datacenter: str = "", timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Terminate a specific datacenter in a database. Args: database (str): The identifier of the database containing the datacenter. datacenter (str): The identifier of the datacenter to terminate. Returns: dict: The response from the server after terminating the datacenter. """ return self._json_ops_request( method=http_methods.POST, path=f"/databases/{database}/datacenters/{datacenter}/terminate", timeout_info=timeout_info, ) def get_access_list( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve the access list for a specific database. Args: database (str): The identifier of the database for which to get the access list. Returns: dict: The current access list for the database. """ return self._json_ops_request( method=http_methods.GET, path=f"/databases/{database}/access-list", timeout_info=timeout_info, ) def replace_access_list( self, database: str = "", access_list: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Replace the entire access list for a specific database. Args: database (str): The identifier of the database for which to replace the access list. access_list (dict): The new access list to be set. Returns: dict: The response from the server after replacing the access list. """ return self._json_ops_request( method=http_methods.PUT, path=f"/databases/{database}/access-list", json_data=access_list, timeout_info=timeout_info, ) def update_access_list( self, database: str = "", access_list: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Update the access list for a specific database. Args: database (str): The identifier of the database for which to update the access list. access_list (dict): The updates to be applied to the access list. Returns: dict: The response from the server after updating the access list. """ return self._json_ops_request( method=http_methods.PATCH, path=f"/databases/{database}/access-list", json_data=access_list, timeout_info=timeout_info, ) def add_access_list_address( self, database: str = "", address: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Add a new address to the access list for a specific database. Args: database (str): The identifier of the database for which to add the address. address (dict): The address details to add to the access list. Returns: dict: The response from the server after adding the address. """ return self._json_ops_request( method=http_methods.POST, path=f"/databases/{database}/access-list", json_data=address, timeout_info=timeout_info, ) def delete_access_list( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Delete the access list for a specific database. Args: database (str): The identifier of the database for which to delete the access list. Returns: dict: The response from the server after deleting the access list. """ return self._json_ops_request( method=http_methods.DELETE, path=f"/databases/{database}/access-list", timeout_info=timeout_info, ) def get_private_link( self, database: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve the private link information for a specified database. Args: database (str): The identifier of the database. Returns: dict: The private link information for the database. """ return self._json_ops_request( method=http_methods.GET, path=f"/organizations/clusters/{database}/private-link", timeout_info=timeout_info, ) def get_datacenter_private_link( self, database: str = "", datacenter: str = "", timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Retrieve the private link information for a specific datacenter in a database. Args: database (str): The identifier of the database. datacenter (str): The identifier of the datacenter. Returns: dict: The private link information for the specified datacenter. """ return self._json_ops_request( method=http_methods.GET, path=f"/organizations/clusters/{database}/datacenters/{datacenter}/private-link", timeout_info=timeout_info, ) def create_datacenter_private_link( self, database: str = "", datacenter: str = "", private_link: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Create a private link for a specific datacenter in a database. Args: database (str): The identifier of the database. datacenter (str): The identifier of the datacenter. private_link (dict): The private link configuration details. Returns: dict: The response from the server after creating the private link. """ return self._json_ops_request( method=http_methods.POST, path=f"/organizations/clusters/{database}/datacenters/{datacenter}/private-link", json_data=private_link, timeout_info=timeout_info, ) def create_datacenter_endpoint( self, database: str = "", datacenter: str = "", endpoint: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Create an endpoint for a specific datacenter in a database. Args: database (str): The identifier of the database. datacenter (str): The identifier of the datacenter. endpoint (dict): The endpoint configuration details. Returns: dict: The response from the server after creating the endpoint. """ return self._json_ops_request( method=http_methods.POST, path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoint", json_data=endpoint, timeout_info=timeout_info, ) def update_datacenter_endpoint( self, database: str = "", datacenter: str = "", endpoint: Dict[str, Any] = {}, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Update an existing endpoint for a specific datacenter in a database. Args: database (str): The identifier of the database. datacenter (str): The identifier of the datacenter. endpoint (dict): The updated endpoint configuration details. Returns: dict: The response from the server after updating the endpoint. """ return self._json_ops_request( method=http_methods.PUT, path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint['id']}", json_data=endpoint, timeout_info=timeout_info, ) def get_datacenter_endpoint( self, database: str = "", datacenter: str = "", endpoint: str = "", timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Retrieve information about a specific endpoint in a datacenter of a database. Args: database (str): The identifier of the database. datacenter (str): The identifier of the datacenter. endpoint (str): The identifier of the endpoint. Returns: dict: The endpoint information for the specified datacenter. """ return self._json_ops_request( method=http_methods.GET, path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint}", timeout_info=timeout_info, ) def delete_datacenter_endpoint( self, database: str = "", datacenter: str = "", endpoint: str = "", timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Delete a specific endpoint in a datacenter of a database. Args: database (str): The identifier of the database. datacenter (str): The identifier of the datacenter. endpoint (str): The identifier of the endpoint to delete. Returns: dict: The response from the server after deleting the endpoint. """ return self._json_ops_request( method=http_methods.DELETE, path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint}", timeout_info=timeout_info, ) def get_available_classic_regions( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve a list of available classic regions. Returns: dict: A list of available classic regions. """ return self._json_ops_request( method=http_methods.GET, path="/availableRegions", timeout_info=timeout_info ) def get_available_regions( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve a list of available regions for serverless deployment. Returns: dict: A list of available regions for serverless deployment. """ return self._json_ops_request( method=http_methods.GET, path="/regions/serverless", timeout_info=timeout_info, ) def get_roles(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE: """ Retrieve a list of roles within the organization. Returns: dict: A list of roles within the organization. """ return self._json_ops_request( method=http_methods.GET, path="/organizations/roles", timeout_info=timeout_info, ) def create_role( self, role_definition: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Create a new role within the organization. Args: role_definition (dict, optional): The definition of the role to be created. Returns: dict: The response from the server after creating the role. """ return self._json_ops_request( method=http_methods.POST, path="/organizations/roles", json_data=role_definition, timeout_info=timeout_info, ) def get_role( self, role: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve details of a specific role within the organization. Args: role (str): The identifier of the role. Returns: dict: The details of the specified role. """ return self._json_ops_request( method=http_methods.GET, path=f"/organizations/roles/{role}", timeout_info=timeout_info, ) def update_role( self, role: str = "", role_definition: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Update the definition of an existing role within the organization. Args: role (str): The identifier of the role to update. role_definition (dict, optional): The new definition of the role. Returns: dict: The response from the server after updating the role. """ return self._json_ops_request( method=http_methods.PUT, path=f"/organizations/roles/{role}", json_data=role_definition, timeout_info=timeout_info, ) def delete_role( self, role: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Delete a specific role from the organization. Args: role (str): The identifier of the role to delete. Returns: dict: The response from the server after deleting the role. """ return self._json_ops_request( method=http_methods.DELETE, path=f"/organizations/roles/{role}", timeout_info=timeout_info, ) def invite_user( self, user_definition: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Invite a new user to the organization. Args: user_definition (dict, optional): The definition of the user to be invited. Returns: dict: The response from the server after inviting the user. """ return self._json_ops_request( method=http_methods.PUT, path="/organizations/users", json_data=user_definition, timeout_info=timeout_info, ) def get_users(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE: """ Retrieve a list of users within the organization. Returns: dict: A list of users within the organization. """ return self._json_ops_request( method=http_methods.GET, path="/organizations/users", timeout_info=timeout_info, ) def get_user( self, user: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve details of a specific user within the organization. Args: user (str): The identifier of the user. Returns: dict: The details of the specified user. """ return self._json_ops_request( method=http_methods.GET, path=f"/organizations/users/{user}", timeout_info=timeout_info, ) def remove_user( self, user: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Remove a user from the organization. Args: user (str): The identifier of the user to remove. Returns: dict: The response from the server after removing the user. """ return self._json_ops_request( method=http_methods.DELETE, path=f"/organizations/users/{user}", timeout_info=timeout_info, ) def update_user_roles( self, user: str = "", roles: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Update the roles assigned to a specific user within the organization. Args: user (str): The identifier of the user. roles (list, optional): The list of new roles to assign to the user. Returns: dict: The response from the server after updating the user's roles. """ return self._json_ops_request( method=http_methods.PUT, path=f"/organizations/users/{user}/roles", json_data=roles, timeout_info=timeout_info, ) def get_clients(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE: """ Retrieve a list of client IDs and secrets associated with the organization. Returns: dict: A list of client IDs and their associated secrets. """ return self._json_ops_request( method=http_methods.GET, path="/clientIdSecrets", timeout_info=timeout_info ) def create_token( self, roles: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Create a new token with specific roles. Args: roles (dict, optional): The roles to associate with the token: {"roles": [""]} Returns: dict: The response from the server after creating the token. """ return self._json_ops_request( method=http_methods.POST, path="/clientIdSecrets", json_data=roles, timeout_info=timeout_info, ) def delete_token( self, token: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Delete a specific token. Args: token (str): The identifier of the token to delete. Returns: dict: The response from the server after deleting the token. """ return self._json_ops_request( method=http_methods.DELETE, path=f"/clientIdSecret/{token}", timeout_info=timeout_info, ) def get_organization( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve details of the current organization. Returns: dict: The details of the organization. """ return self._json_ops_request( method=http_methods.GET, path="/currentOrg", timeout_info=timeout_info ) def get_access_lists( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve a list of access lists for the organization. Returns: dict: A list of access lists. """ return self._json_ops_request( method=http_methods.GET, path="/access-lists", timeout_info=timeout_info ) def get_access_list_template( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve a template for creating an access list. Returns: dict: An access list template. """ return self._json_ops_request( method=http_methods.GET, path="/access-list/template", timeout_info=timeout_info, ) def validate_access_list( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Validate the configuration of the access list. Returns: dict: The validation result of the access list configuration. """ return self._json_ops_request( method=http_methods.POST, path="/access-list/validate", timeout_info=timeout_info, ) def get_private_links( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve a list of private link connections for the organization. Returns: dict: A list of private link connections. """ return self._json_ops_request( method=http_methods.GET, path="/organizations/private-link", timeout_info=timeout_info, ) def get_streaming_providers( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve a list of streaming service providers. Returns: dict: A list of available streaming service providers. """ return self._json_ops_request( method=http_methods.GET, path="/streaming/providers", timeout_info=timeout_info, ) def get_streaming_tenants( self, timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve a list of streaming tenants. Returns: dict: A list of streaming tenants and their details. """ return self._json_ops_request( method=http_methods.GET, path="/streaming/tenants", timeout_info=timeout_info, ) def create_streaming_tenant( self, tenant: Optional[Dict[str, Any]] = None, timeout_info: TimeoutInfoWideType = None, ) -> OPS_API_RESPONSE: """ Create a new streaming tenant. Args: tenant (dict, optional): The configuration details for the new streaming tenant. Returns: dict: The response from the server after creating the streaming tenant. """ return self._json_ops_request( method=http_methods.POST, path="/streaming/tenants", json_data=tenant, timeout_info=timeout_info, ) def delete_streaming_tenant( self, tenant: str = "", cluster: str = "", timeout_info: TimeoutInfoWideType = None, ) -> None: """ Delete a specific streaming tenant from a cluster. Args: tenant (str): The identifier of the tenant to delete. cluster (str): The identifier of the cluster from which the tenant is to be deleted. timeout_info: either a float (seconds) or a TimeoutInfo dict (see) Returns: dict: The response from the server after deleting the streaming tenant. """ r = self._ops_request( method=http_methods.DELETE, path=f"/streaming/tenants/{tenant}/clusters/{cluster}", timeout_info=timeout_info, ) if r.status_code == 202: # 'Accepted' return None else: raise ValueError(r.text) def get_streaming_tenant( self, tenant: str = "", timeout_info: TimeoutInfoWideType = None ) -> OPS_API_RESPONSE: """ Retrieve information about the limits and usage of a specific streaming tenant. Args: tenant (str): The identifier of the streaming tenant. Returns: dict: Details of the specified streaming tenant, including limits and current usage. """ return self._json_ops_request( method=http_methods.GET, path=f"/streaming/tenants/{tenant}/limits", timeout_info=timeout_info, )