Draken007's picture
Upload 7228 files
2a0bc63 verified
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
from typing import Any, cast, Dict, Optional, TypedDict
import httpx
from astrapy.core.api import (
APIRequestError,
api_request,
async_api_request,
raw_api_request,
async_raw_api_request,
)
from astrapy.core.utils import (
http_methods,
to_httpx_timeout,
TimeoutInfoWideType,
)
from astrapy.core.defaults import (
DEFAULT_DEV_OPS_AUTH_HEADER,
DEFAULT_DEV_OPS_API_VERSION,
DEFAULT_DEV_OPS_URL,
)
from astrapy.core.core_types import API_RESPONSE, OPS_API_RESPONSE
logger = logging.getLogger(__name__)
class AstraDBOpsConstructorParams(TypedDict):
token: str
dev_ops_url: Optional[str]
dev_ops_api_version: Optional[str]
caller_name: Optional[str]
caller_version: Optional[str]
class AstraDBOps:
# Initialize the shared httpx clients as class attributes
client = httpx.Client()
async_client = httpx.AsyncClient()
def __init__(
self,
token: str,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
self.caller_name = caller_name
self.caller_version = caller_version
# constructor params (for the copy() method):
self.constructor_params: AstraDBOpsConstructorParams = {
"token": token,
"dev_ops_url": dev_ops_url,
"dev_ops_api_version": dev_ops_api_version,
"caller_name": caller_name,
"caller_version": caller_version,
}
#
dev_ops_url = (dev_ops_url or DEFAULT_DEV_OPS_URL).strip("/")
dev_ops_api_version = (
dev_ops_api_version or DEFAULT_DEV_OPS_API_VERSION
).strip("/")
self.token = "Bearer " + token
self.base_url = f"{dev_ops_url}/{dev_ops_api_version}"
def __eq__(self, other: Any) -> bool:
if isinstance(other, AstraDBOps):
# work on the "normalized" quantities (stripped, etc)
return all(
[
self.token == other.token,
self.base_url == other.base_url,
self.caller_name == other.caller_name,
self.caller_version == other.caller_version,
]
)
else:
return False
def copy(
self,
*,
token: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> AstraDBOps:
return AstraDBOps(
token=token or self.constructor_params["token"],
dev_ops_url=dev_ops_url or self.constructor_params["dev_ops_url"],
dev_ops_api_version=dev_ops_api_version
or self.constructor_params["dev_ops_api_version"],
caller_name=caller_name or self.caller_name,
caller_version=caller_version or self.caller_version,
)
def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
self.caller_name = caller_name
self.caller_version = caller_version
def _ops_request(
self,
method: str,
path: str,
options: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> httpx.Response:
_options = {} if options is None else options
raw_response = raw_api_request(
client=self.client,
base_url=self.base_url,
auth_header=DEFAULT_DEV_OPS_AUTH_HEADER,
token=self.token,
method=method,
json_data=json_data,
url_params=_options,
path=path,
caller_name=self.caller_name,
caller_version=self.caller_version,
timeout=to_httpx_timeout(timeout_info),
)
return raw_response
async def _async_ops_request(
self,
method: str,
path: str,
options: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> httpx.Response:
_options = {} if options is None else options
raw_response = await async_raw_api_request(
client=self.async_client,
base_url=self.base_url,
auth_header=DEFAULT_DEV_OPS_AUTH_HEADER,
token=self.token,
method=method,
json_data=json_data,
url_params=_options,
path=path,
caller_name=self.caller_name,
caller_version=self.caller_version,
timeout=to_httpx_timeout(timeout_info),
)
return raw_response
def _json_ops_request(
self,
method: str,
path: str,
options: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
_options = {} if options is None else options
response = api_request(
client=self.client,
base_url=self.base_url,
auth_header="Authorization",
token=self.token,
method=method,
json_data=json_data,
url_params=_options,
path=path,
skip_error_check=False,
caller_name=None,
caller_version=None,
timeout=to_httpx_timeout(timeout_info),
)
return response
async def _async_json_ops_request(
self,
method: str,
path: str,
options: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
_options = {} if options is None else options
response = await async_api_request(
client=self.async_client,
base_url=self.base_url,
auth_header="Authorization",
token=self.token,
method=method,
json_data=json_data,
url_params=_options,
path=path,
skip_error_check=False,
caller_name=None,
caller_version=None,
timeout=to_httpx_timeout(timeout_info),
)
return response
def get_databases(
self,
options: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Retrieve a list of databases.
Args:
options (dict, optional): Additional options for the request.
Returns:
list: a JSON list of dictionaries, one per database.
"""
response = self._json_ops_request(
method=http_methods.GET,
path="/databases",
options=options,
timeout_info=timeout_info,
)
return response
async def async_get_databases(
self,
options: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Retrieve a list of databases - async version of the method.
Args:
options (dict, optional): Additional options for the request.
Returns:
list: a JSON list of dictionaries, one per database.
"""
response = await self._async_json_ops_request(
method=http_methods.GET,
path="/databases",
options=options,
timeout_info=timeout_info,
)
return response
def create_database(
self,
database_definition: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> Dict[str, str]:
"""
Create a new database.
Args:
database_definition (dict, optional): A dictionary defining the properties of the database to be created.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
dict: A dictionary such as: {"id": the ID of the created database}
Raises an error if not successful.
"""
r = self._ops_request(
method=http_methods.POST,
path="/databases",
json_data=database_definition,
timeout_info=timeout_info,
)
if r.status_code == 201:
return {"id": r.headers["Location"]}
elif r.status_code >= 400 and r.status_code < 500:
raise APIRequestError(r, payload=database_definition)
else:
raise ValueError(f"[HTTP {r.status_code}] {r.text}")
async def async_create_database(
self,
database_definition: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> Dict[str, str]:
"""
Create a new database - async version of the method.
Args:
database_definition (dict, optional): A dictionary defining the properties of the database to be created.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
dict: A dictionary such as: {"id": the ID of the created database}
Raises an error if not successful.
"""
r = await self._async_ops_request(
method=http_methods.POST,
path="/databases",
json_data=database_definition,
timeout_info=timeout_info,
)
if r.status_code == 201:
return {"id": r.headers["Location"]}
elif r.status_code >= 400 and r.status_code < 500:
raise APIRequestError(r, payload=database_definition)
else:
raise ValueError(f"[HTTP {r.status_code}] {r.text}")
def terminate_database(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> str:
"""
Terminate an existing database.
Args:
database (str): The identifier of the database to terminate.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
str: The identifier of the terminated database, or None if termination was unsuccessful.
"""
r = self._ops_request(
method=http_methods.POST,
path=f"/databases/{database}/terminate",
timeout_info=timeout_info,
)
if r.status_code == 202:
return database
elif r.status_code >= 400 and r.status_code < 500:
raise APIRequestError(r, payload=None)
else:
raise ValueError(f"[HTTP {r.status_code}] {r.text}")
return None
async def async_terminate_database(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> str:
"""
Terminate an existing database - async version of the method.
Args:
database (str): The identifier of the database to terminate.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
str: The identifier of the terminated database, or None if termination was unsuccessful.
"""
r = await self._async_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/terminate",
timeout_info=timeout_info,
)
if r.status_code == 202:
return database
elif r.status_code >= 400 and r.status_code < 500:
raise APIRequestError(r, payload=None)
else:
raise ValueError(f"[HTTP {r.status_code}] {r.text}")
return None
def get_database(
self,
database: str = "",
options: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> API_RESPONSE:
"""
Retrieve details of a specific database.
Args:
database (str): The identifier of the database to retrieve.
options (dict, optional): Additional options for the request.
Returns:
dict: A JSON response containing the details of the specified database.
"""
return cast(
API_RESPONSE,
self._json_ops_request(
method=http_methods.GET,
path=f"/databases/{database}",
options=options,
timeout_info=timeout_info,
),
)
async def async_get_database(
self,
database: str = "",
options: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> API_RESPONSE:
"""
Retrieve details of a specific database - async version of the method.
Args:
database (str): The identifier of the database to retrieve.
options (dict, optional): Additional options for the request.
Returns:
dict: A JSON response containing the details of the specified database.
"""
return cast(
API_RESPONSE,
await self._async_json_ops_request(
method=http_methods.GET,
path=f"/databases/{database}",
options=options,
timeout_info=timeout_info,
),
)
def create_keyspace(
self,
database: str = "",
keyspace: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> Dict[str, str]:
"""
Create a keyspace in a specified database.
Args:
database (str): The identifier of the database where the keyspace will be created.
keyspace (str): The name of the keyspace to create.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
{"ok": 1} if successful. Raises errors otherwise.
"""
r = self._ops_request(
method=http_methods.POST,
path=f"/databases/{database}/keyspaces/{keyspace}",
timeout_info=timeout_info,
)
if r.status_code == 201:
return {"name": keyspace}
elif r.status_code >= 400 and r.status_code < 500:
raise APIRequestError(r, payload=None)
else:
raise ValueError(f"[HTTP {r.status_code}] {r.text}")
async def async_create_keyspace(
self,
database: str = "",
keyspace: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> Dict[str, str]:
"""
Create a keyspace in a specified database - async version of the method.
Args:
database (str): The identifier of the database where the keyspace will be created.
keyspace (str): The name of the keyspace to create.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
{"ok": 1} if successful. Raises errors otherwise.
"""
r = await self._async_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/keyspaces/{keyspace}",
timeout_info=timeout_info,
)
if r.status_code == 201:
return {"name": keyspace}
elif r.status_code >= 400 and r.status_code < 500:
raise APIRequestError(r, payload=None)
else:
raise ValueError(f"[HTTP {r.status_code}] {r.text}")
def delete_keyspace(
self,
database: str = "",
keyspace: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> str:
"""
Delete a keyspace from a database
Args:
database (str): The identifier of the database to terminate.
keyspace (str): The name of the keyspace to create.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
str: The identifier of the deleted keyspace. Otherwise raises an error.
"""
r = self._ops_request(
method=http_methods.DELETE,
path=f"/databases/{database}/keyspaces/{keyspace}",
timeout_info=timeout_info,
)
if r.status_code == 202:
return keyspace
elif r.status_code >= 400 and r.status_code < 500:
raise APIRequestError(r, payload=None)
else:
raise ValueError(f"[HTTP {r.status_code}] {r.text}")
async def async_delete_keyspace(
self,
database: str = "",
keyspace: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> str:
"""
Delete a keyspace from a database - async version of the method.
Args:
database (str): The identifier of the database to terminate.
keyspace (str): The name of the keyspace to create.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
str: The identifier of the deleted keyspace. Otherwise raises an error.
"""
r = await self._async_ops_request(
method=http_methods.DELETE,
path=f"/databases/{database}/keyspaces/{keyspace}",
timeout_info=timeout_info,
)
if r.status_code == 202:
return keyspace
elif r.status_code >= 400 and r.status_code < 500:
raise APIRequestError(r, payload=None)
else:
raise ValueError(f"[HTTP {r.status_code}] {r.text}")
def park_database(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Park a specific database, making it inactive.
Args:
database (str): The identifier of the database to park.
Returns:
dict: The response from the server after parking the database.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/park",
timeout_info=timeout_info,
)
def unpark_database(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Unpark a specific database, making it active again.
Args:
database (str): The identifier of the database to unpark.
Returns:
dict: The response from the server after unparking the database.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/unpark",
timeout_info=timeout_info,
)
def resize_database(
self,
database: str = "",
options: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Resize a specific database according to provided options.
Args:
database (str): The identifier of the database to resize.
options (dict, optional): The specifications for the resize operation.
Returns:
dict: The response from the server after the resize operation.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/resize",
json_data=options,
timeout_info=timeout_info,
)
def reset_database_password(
self,
database: str = "",
options: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Reset the password for a specific database.
Args:
database (str): The identifier of the database for which to reset the password.
options (dict, optional): Additional options for the password reset.
Returns:
dict: The response from the server after resetting the password.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/resetPassword",
json_data=options,
timeout_info=timeout_info,
)
def get_secure_bundle(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve a secure bundle URL for a specific database.
Args:
database (str): The identifier of the database for which to get the secure bundle.
Returns:
dict: The secure bundle URL and related information.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/secureBundleURL",
timeout_info=timeout_info,
)
def get_datacenters(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Get a list of datacenters associated with a specific database.
Args:
database (str): The identifier of the database for which to list datacenters.
Returns:
dict: A list of datacenters and their details.
"""
return self._json_ops_request(
method=http_methods.GET,
path=f"/databases/{database}/datacenters",
timeout_info=timeout_info,
)
def create_datacenter(
self,
database: str = "",
options: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Create a new datacenter for a specific database.
Args:
database (str): The identifier of the database for which to create the datacenter.
options (dict, optional): Specifications for the new datacenter.
Returns:
dict: The response from the server after creating the datacenter.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/datacenters",
json_data=options,
timeout_info=timeout_info,
)
def terminate_datacenter(
self,
database: str = "",
datacenter: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Terminate a specific datacenter in a database.
Args:
database (str): The identifier of the database containing the datacenter.
datacenter (str): The identifier of the datacenter to terminate.
Returns:
dict: The response from the server after terminating the datacenter.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/datacenters/{datacenter}/terminate",
timeout_info=timeout_info,
)
def get_access_list(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve the access list for a specific database.
Args:
database (str): The identifier of the database for which to get the access list.
Returns:
dict: The current access list for the database.
"""
return self._json_ops_request(
method=http_methods.GET,
path=f"/databases/{database}/access-list",
timeout_info=timeout_info,
)
def replace_access_list(
self,
database: str = "",
access_list: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Replace the entire access list for a specific database.
Args:
database (str): The identifier of the database for which to replace the access list.
access_list (dict): The new access list to be set.
Returns:
dict: The response from the server after replacing the access list.
"""
return self._json_ops_request(
method=http_methods.PUT,
path=f"/databases/{database}/access-list",
json_data=access_list,
timeout_info=timeout_info,
)
def update_access_list(
self,
database: str = "",
access_list: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Update the access list for a specific database.
Args:
database (str): The identifier of the database for which to update the access list.
access_list (dict): The updates to be applied to the access list.
Returns:
dict: The response from the server after updating the access list.
"""
return self._json_ops_request(
method=http_methods.PATCH,
path=f"/databases/{database}/access-list",
json_data=access_list,
timeout_info=timeout_info,
)
def add_access_list_address(
self,
database: str = "",
address: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Add a new address to the access list for a specific database.
Args:
database (str): The identifier of the database for which to add the address.
address (dict): The address details to add to the access list.
Returns:
dict: The response from the server after adding the address.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/databases/{database}/access-list",
json_data=address,
timeout_info=timeout_info,
)
def delete_access_list(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Delete the access list for a specific database.
Args:
database (str): The identifier of the database for which to delete the access list.
Returns:
dict: The response from the server after deleting the access list.
"""
return self._json_ops_request(
method=http_methods.DELETE,
path=f"/databases/{database}/access-list",
timeout_info=timeout_info,
)
def get_private_link(
self, database: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve the private link information for a specified database.
Args:
database (str): The identifier of the database.
Returns:
dict: The private link information for the database.
"""
return self._json_ops_request(
method=http_methods.GET,
path=f"/organizations/clusters/{database}/private-link",
timeout_info=timeout_info,
)
def get_datacenter_private_link(
self,
database: str = "",
datacenter: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Retrieve the private link information for a specific datacenter in a database.
Args:
database (str): The identifier of the database.
datacenter (str): The identifier of the datacenter.
Returns:
dict: The private link information for the specified datacenter.
"""
return self._json_ops_request(
method=http_methods.GET,
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/private-link",
timeout_info=timeout_info,
)
def create_datacenter_private_link(
self,
database: str = "",
datacenter: str = "",
private_link: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Create a private link for a specific datacenter in a database.
Args:
database (str): The identifier of the database.
datacenter (str): The identifier of the datacenter.
private_link (dict): The private link configuration details.
Returns:
dict: The response from the server after creating the private link.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/private-link",
json_data=private_link,
timeout_info=timeout_info,
)
def create_datacenter_endpoint(
self,
database: str = "",
datacenter: str = "",
endpoint: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Create an endpoint for a specific datacenter in a database.
Args:
database (str): The identifier of the database.
datacenter (str): The identifier of the datacenter.
endpoint (dict): The endpoint configuration details.
Returns:
dict: The response from the server after creating the endpoint.
"""
return self._json_ops_request(
method=http_methods.POST,
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoint",
json_data=endpoint,
timeout_info=timeout_info,
)
def update_datacenter_endpoint(
self,
database: str = "",
datacenter: str = "",
endpoint: Dict[str, Any] = {},
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Update an existing endpoint for a specific datacenter in a database.
Args:
database (str): The identifier of the database.
datacenter (str): The identifier of the datacenter.
endpoint (dict): The updated endpoint configuration details.
Returns:
dict: The response from the server after updating the endpoint.
"""
return self._json_ops_request(
method=http_methods.PUT,
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint['id']}",
json_data=endpoint,
timeout_info=timeout_info,
)
def get_datacenter_endpoint(
self,
database: str = "",
datacenter: str = "",
endpoint: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Retrieve information about a specific endpoint in a datacenter of a database.
Args:
database (str): The identifier of the database.
datacenter (str): The identifier of the datacenter.
endpoint (str): The identifier of the endpoint.
Returns:
dict: The endpoint information for the specified datacenter.
"""
return self._json_ops_request(
method=http_methods.GET,
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint}",
timeout_info=timeout_info,
)
def delete_datacenter_endpoint(
self,
database: str = "",
datacenter: str = "",
endpoint: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Delete a specific endpoint in a datacenter of a database.
Args:
database (str): The identifier of the database.
datacenter (str): The identifier of the datacenter.
endpoint (str): The identifier of the endpoint to delete.
Returns:
dict: The response from the server after deleting the endpoint.
"""
return self._json_ops_request(
method=http_methods.DELETE,
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint}",
timeout_info=timeout_info,
)
def get_available_classic_regions(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve a list of available classic regions.
Returns:
dict: A list of available classic regions.
"""
return self._json_ops_request(
method=http_methods.GET, path="/availableRegions", timeout_info=timeout_info
)
def get_available_regions(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve a list of available regions for serverless deployment.
Returns:
dict: A list of available regions for serverless deployment.
"""
return self._json_ops_request(
method=http_methods.GET,
path="/regions/serverless",
timeout_info=timeout_info,
)
def get_roles(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE:
"""
Retrieve a list of roles within the organization.
Returns:
dict: A list of roles within the organization.
"""
return self._json_ops_request(
method=http_methods.GET,
path="/organizations/roles",
timeout_info=timeout_info,
)
def create_role(
self,
role_definition: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Create a new role within the organization.
Args:
role_definition (dict, optional): The definition of the role to be created.
Returns:
dict: The response from the server after creating the role.
"""
return self._json_ops_request(
method=http_methods.POST,
path="/organizations/roles",
json_data=role_definition,
timeout_info=timeout_info,
)
def get_role(
self, role: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve details of a specific role within the organization.
Args:
role (str): The identifier of the role.
Returns:
dict: The details of the specified role.
"""
return self._json_ops_request(
method=http_methods.GET,
path=f"/organizations/roles/{role}",
timeout_info=timeout_info,
)
def update_role(
self,
role: str = "",
role_definition: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Update the definition of an existing role within the organization.
Args:
role (str): The identifier of the role to update.
role_definition (dict, optional): The new definition of the role.
Returns:
dict: The response from the server after updating the role.
"""
return self._json_ops_request(
method=http_methods.PUT,
path=f"/organizations/roles/{role}",
json_data=role_definition,
timeout_info=timeout_info,
)
def delete_role(
self, role: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Delete a specific role from the organization.
Args:
role (str): The identifier of the role to delete.
Returns:
dict: The response from the server after deleting the role.
"""
return self._json_ops_request(
method=http_methods.DELETE,
path=f"/organizations/roles/{role}",
timeout_info=timeout_info,
)
def invite_user(
self,
user_definition: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Invite a new user to the organization.
Args:
user_definition (dict, optional): The definition of the user to be invited.
Returns:
dict: The response from the server after inviting the user.
"""
return self._json_ops_request(
method=http_methods.PUT,
path="/organizations/users",
json_data=user_definition,
timeout_info=timeout_info,
)
def get_users(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE:
"""
Retrieve a list of users within the organization.
Returns:
dict: A list of users within the organization.
"""
return self._json_ops_request(
method=http_methods.GET,
path="/organizations/users",
timeout_info=timeout_info,
)
def get_user(
self, user: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve details of a specific user within the organization.
Args:
user (str): The identifier of the user.
Returns:
dict: The details of the specified user.
"""
return self._json_ops_request(
method=http_methods.GET,
path=f"/organizations/users/{user}",
timeout_info=timeout_info,
)
def remove_user(
self, user: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Remove a user from the organization.
Args:
user (str): The identifier of the user to remove.
Returns:
dict: The response from the server after removing the user.
"""
return self._json_ops_request(
method=http_methods.DELETE,
path=f"/organizations/users/{user}",
timeout_info=timeout_info,
)
def update_user_roles(
self,
user: str = "",
roles: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Update the roles assigned to a specific user within the organization.
Args:
user (str): The identifier of the user.
roles (list, optional): The list of new roles to assign to the user.
Returns:
dict: The response from the server after updating the user's roles.
"""
return self._json_ops_request(
method=http_methods.PUT,
path=f"/organizations/users/{user}/roles",
json_data=roles,
timeout_info=timeout_info,
)
def get_clients(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE:
"""
Retrieve a list of client IDs and secrets associated with the organization.
Returns:
dict: A list of client IDs and their associated secrets.
"""
return self._json_ops_request(
method=http_methods.GET, path="/clientIdSecrets", timeout_info=timeout_info
)
def create_token(
self,
roles: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Create a new token with specific roles.
Args:
roles (dict, optional): The roles to associate with the token:
{"roles": ["<roleId>"]}
Returns:
dict: The response from the server after creating the token.
"""
return self._json_ops_request(
method=http_methods.POST,
path="/clientIdSecrets",
json_data=roles,
timeout_info=timeout_info,
)
def delete_token(
self, token: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Delete a specific token.
Args:
token (str): The identifier of the token to delete.
Returns:
dict: The response from the server after deleting the token.
"""
return self._json_ops_request(
method=http_methods.DELETE,
path=f"/clientIdSecret/{token}",
timeout_info=timeout_info,
)
def get_organization(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve details of the current organization.
Returns:
dict: The details of the organization.
"""
return self._json_ops_request(
method=http_methods.GET, path="/currentOrg", timeout_info=timeout_info
)
def get_access_lists(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve a list of access lists for the organization.
Returns:
dict: A list of access lists.
"""
return self._json_ops_request(
method=http_methods.GET, path="/access-lists", timeout_info=timeout_info
)
def get_access_list_template(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve a template for creating an access list.
Returns:
dict: An access list template.
"""
return self._json_ops_request(
method=http_methods.GET,
path="/access-list/template",
timeout_info=timeout_info,
)
def validate_access_list(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Validate the configuration of the access list.
Returns:
dict: The validation result of the access list configuration.
"""
return self._json_ops_request(
method=http_methods.POST,
path="/access-list/validate",
timeout_info=timeout_info,
)
def get_private_links(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve a list of private link connections for the organization.
Returns:
dict: A list of private link connections.
"""
return self._json_ops_request(
method=http_methods.GET,
path="/organizations/private-link",
timeout_info=timeout_info,
)
def get_streaming_providers(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve a list of streaming service providers.
Returns:
dict: A list of available streaming service providers.
"""
return self._json_ops_request(
method=http_methods.GET,
path="/streaming/providers",
timeout_info=timeout_info,
)
def get_streaming_tenants(
self, timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve a list of streaming tenants.
Returns:
dict: A list of streaming tenants and their details.
"""
return self._json_ops_request(
method=http_methods.GET,
path="/streaming/tenants",
timeout_info=timeout_info,
)
def create_streaming_tenant(
self,
tenant: Optional[Dict[str, Any]] = None,
timeout_info: TimeoutInfoWideType = None,
) -> OPS_API_RESPONSE:
"""
Create a new streaming tenant.
Args:
tenant (dict, optional): The configuration details for the new streaming tenant.
Returns:
dict: The response from the server after creating the streaming tenant.
"""
return self._json_ops_request(
method=http_methods.POST,
path="/streaming/tenants",
json_data=tenant,
timeout_info=timeout_info,
)
def delete_streaming_tenant(
self,
tenant: str = "",
cluster: str = "",
timeout_info: TimeoutInfoWideType = None,
) -> None:
"""
Delete a specific streaming tenant from a cluster.
Args:
tenant (str): The identifier of the tenant to delete.
cluster (str): The identifier of the cluster from which the tenant is to be deleted.
timeout_info: either a float (seconds) or a TimeoutInfo dict (see)
Returns:
dict: The response from the server after deleting the streaming tenant.
"""
r = self._ops_request(
method=http_methods.DELETE,
path=f"/streaming/tenants/{tenant}/clusters/{cluster}",
timeout_info=timeout_info,
)
if r.status_code == 202: # 'Accepted'
return None
else:
raise ValueError(r.text)
def get_streaming_tenant(
self, tenant: str = "", timeout_info: TimeoutInfoWideType = None
) -> OPS_API_RESPONSE:
"""
Retrieve information about the limits and usage of a specific streaming tenant.
Args:
tenant (str): The identifier of the streaming tenant.
Returns:
dict: Details of the specified streaming tenant, including limits and current usage.
"""
return self._json_ops_request(
method=http_methods.GET,
path=f"/streaming/tenants/{tenant}/limits",
timeout_info=timeout_info,
)