Spaces:
Running
Running
# Copyright DataStax, Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import annotations | |
import logging | |
from typing import Any, cast, Dict, Optional, TypedDict | |
import httpx | |
from astrapy.core.api import ( | |
APIRequestError, | |
api_request, | |
async_api_request, | |
raw_api_request, | |
async_raw_api_request, | |
) | |
from astrapy.core.utils import ( | |
http_methods, | |
to_httpx_timeout, | |
TimeoutInfoWideType, | |
) | |
from astrapy.core.defaults import ( | |
DEFAULT_DEV_OPS_AUTH_HEADER, | |
DEFAULT_DEV_OPS_API_VERSION, | |
DEFAULT_DEV_OPS_URL, | |
) | |
from astrapy.core.core_types import API_RESPONSE, OPS_API_RESPONSE | |
logger = logging.getLogger(__name__) | |
class AstraDBOpsConstructorParams(TypedDict): | |
token: str | |
dev_ops_url: Optional[str] | |
dev_ops_api_version: Optional[str] | |
caller_name: Optional[str] | |
caller_version: Optional[str] | |
class AstraDBOps: | |
# Initialize the shared httpx clients as class attributes | |
client = httpx.Client() | |
async_client = httpx.AsyncClient() | |
def __init__( | |
self, | |
token: str, | |
dev_ops_url: Optional[str] = None, | |
dev_ops_api_version: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> None: | |
self.caller_name = caller_name | |
self.caller_version = caller_version | |
# constructor params (for the copy() method): | |
self.constructor_params: AstraDBOpsConstructorParams = { | |
"token": token, | |
"dev_ops_url": dev_ops_url, | |
"dev_ops_api_version": dev_ops_api_version, | |
"caller_name": caller_name, | |
"caller_version": caller_version, | |
} | |
# | |
dev_ops_url = (dev_ops_url or DEFAULT_DEV_OPS_URL).strip("/") | |
dev_ops_api_version = ( | |
dev_ops_api_version or DEFAULT_DEV_OPS_API_VERSION | |
).strip("/") | |
self.token = "Bearer " + token | |
self.base_url = f"{dev_ops_url}/{dev_ops_api_version}" | |
def __eq__(self, other: Any) -> bool: | |
if isinstance(other, AstraDBOps): | |
# work on the "normalized" quantities (stripped, etc) | |
return all( | |
[ | |
self.token == other.token, | |
self.base_url == other.base_url, | |
self.caller_name == other.caller_name, | |
self.caller_version == other.caller_version, | |
] | |
) | |
else: | |
return False | |
def copy( | |
self, | |
*, | |
token: Optional[str] = None, | |
dev_ops_url: Optional[str] = None, | |
dev_ops_api_version: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> AstraDBOps: | |
return AstraDBOps( | |
token=token or self.constructor_params["token"], | |
dev_ops_url=dev_ops_url or self.constructor_params["dev_ops_url"], | |
dev_ops_api_version=dev_ops_api_version | |
or self.constructor_params["dev_ops_api_version"], | |
caller_name=caller_name or self.caller_name, | |
caller_version=caller_version or self.caller_version, | |
) | |
def set_caller( | |
self, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> None: | |
self.caller_name = caller_name | |
self.caller_version = caller_version | |
def _ops_request( | |
self, | |
method: str, | |
path: str, | |
options: Optional[Dict[str, Any]] = None, | |
json_data: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> httpx.Response: | |
_options = {} if options is None else options | |
raw_response = raw_api_request( | |
client=self.client, | |
base_url=self.base_url, | |
auth_header=DEFAULT_DEV_OPS_AUTH_HEADER, | |
token=self.token, | |
method=method, | |
json_data=json_data, | |
url_params=_options, | |
path=path, | |
caller_name=self.caller_name, | |
caller_version=self.caller_version, | |
timeout=to_httpx_timeout(timeout_info), | |
) | |
return raw_response | |
async def _async_ops_request( | |
self, | |
method: str, | |
path: str, | |
options: Optional[Dict[str, Any]] = None, | |
json_data: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> httpx.Response: | |
_options = {} if options is None else options | |
raw_response = await async_raw_api_request( | |
client=self.async_client, | |
base_url=self.base_url, | |
auth_header=DEFAULT_DEV_OPS_AUTH_HEADER, | |
token=self.token, | |
method=method, | |
json_data=json_data, | |
url_params=_options, | |
path=path, | |
caller_name=self.caller_name, | |
caller_version=self.caller_version, | |
timeout=to_httpx_timeout(timeout_info), | |
) | |
return raw_response | |
def _json_ops_request( | |
self, | |
method: str, | |
path: str, | |
options: Optional[Dict[str, Any]] = None, | |
json_data: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
_options = {} if options is None else options | |
response = api_request( | |
client=self.client, | |
base_url=self.base_url, | |
auth_header="Authorization", | |
token=self.token, | |
method=method, | |
json_data=json_data, | |
url_params=_options, | |
path=path, | |
skip_error_check=False, | |
caller_name=None, | |
caller_version=None, | |
timeout=to_httpx_timeout(timeout_info), | |
) | |
return response | |
async def _async_json_ops_request( | |
self, | |
method: str, | |
path: str, | |
options: Optional[Dict[str, Any]] = None, | |
json_data: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
_options = {} if options is None else options | |
response = await async_api_request( | |
client=self.async_client, | |
base_url=self.base_url, | |
auth_header="Authorization", | |
token=self.token, | |
method=method, | |
json_data=json_data, | |
url_params=_options, | |
path=path, | |
skip_error_check=False, | |
caller_name=None, | |
caller_version=None, | |
timeout=to_httpx_timeout(timeout_info), | |
) | |
return response | |
def get_databases( | |
self, | |
options: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of databases. | |
Args: | |
options (dict, optional): Additional options for the request. | |
Returns: | |
list: a JSON list of dictionaries, one per database. | |
""" | |
response = self._json_ops_request( | |
method=http_methods.GET, | |
path="/databases", | |
options=options, | |
timeout_info=timeout_info, | |
) | |
return response | |
async def async_get_databases( | |
self, | |
options: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of databases - async version of the method. | |
Args: | |
options (dict, optional): Additional options for the request. | |
Returns: | |
list: a JSON list of dictionaries, one per database. | |
""" | |
response = await self._async_json_ops_request( | |
method=http_methods.GET, | |
path="/databases", | |
options=options, | |
timeout_info=timeout_info, | |
) | |
return response | |
def create_database( | |
self, | |
database_definition: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> Dict[str, str]: | |
""" | |
Create a new database. | |
Args: | |
database_definition (dict, optional): A dictionary defining the properties of the database to be created. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
dict: A dictionary such as: {"id": the ID of the created database} | |
Raises an error if not successful. | |
""" | |
r = self._ops_request( | |
method=http_methods.POST, | |
path="/databases", | |
json_data=database_definition, | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 201: | |
return {"id": r.headers["Location"]} | |
elif r.status_code >= 400 and r.status_code < 500: | |
raise APIRequestError(r, payload=database_definition) | |
else: | |
raise ValueError(f"[HTTP {r.status_code}] {r.text}") | |
async def async_create_database( | |
self, | |
database_definition: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> Dict[str, str]: | |
""" | |
Create a new database - async version of the method. | |
Args: | |
database_definition (dict, optional): A dictionary defining the properties of the database to be created. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
dict: A dictionary such as: {"id": the ID of the created database} | |
Raises an error if not successful. | |
""" | |
r = await self._async_ops_request( | |
method=http_methods.POST, | |
path="/databases", | |
json_data=database_definition, | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 201: | |
return {"id": r.headers["Location"]} | |
elif r.status_code >= 400 and r.status_code < 500: | |
raise APIRequestError(r, payload=database_definition) | |
else: | |
raise ValueError(f"[HTTP {r.status_code}] {r.text}") | |
def terminate_database( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> str: | |
""" | |
Terminate an existing database. | |
Args: | |
database (str): The identifier of the database to terminate. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
str: The identifier of the terminated database, or None if termination was unsuccessful. | |
""" | |
r = self._ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/terminate", | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 202: | |
return database | |
elif r.status_code >= 400 and r.status_code < 500: | |
raise APIRequestError(r, payload=None) | |
else: | |
raise ValueError(f"[HTTP {r.status_code}] {r.text}") | |
return None | |
async def async_terminate_database( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> str: | |
""" | |
Terminate an existing database - async version of the method. | |
Args: | |
database (str): The identifier of the database to terminate. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
str: The identifier of the terminated database, or None if termination was unsuccessful. | |
""" | |
r = await self._async_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/terminate", | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 202: | |
return database | |
elif r.status_code >= 400 and r.status_code < 500: | |
raise APIRequestError(r, payload=None) | |
else: | |
raise ValueError(f"[HTTP {r.status_code}] {r.text}") | |
return None | |
def get_database( | |
self, | |
database: str = "", | |
options: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> API_RESPONSE: | |
""" | |
Retrieve details of a specific database. | |
Args: | |
database (str): The identifier of the database to retrieve. | |
options (dict, optional): Additional options for the request. | |
Returns: | |
dict: A JSON response containing the details of the specified database. | |
""" | |
return cast( | |
API_RESPONSE, | |
self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/databases/{database}", | |
options=options, | |
timeout_info=timeout_info, | |
), | |
) | |
async def async_get_database( | |
self, | |
database: str = "", | |
options: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> API_RESPONSE: | |
""" | |
Retrieve details of a specific database - async version of the method. | |
Args: | |
database (str): The identifier of the database to retrieve. | |
options (dict, optional): Additional options for the request. | |
Returns: | |
dict: A JSON response containing the details of the specified database. | |
""" | |
return cast( | |
API_RESPONSE, | |
await self._async_json_ops_request( | |
method=http_methods.GET, | |
path=f"/databases/{database}", | |
options=options, | |
timeout_info=timeout_info, | |
), | |
) | |
def create_keyspace( | |
self, | |
database: str = "", | |
keyspace: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> Dict[str, str]: | |
""" | |
Create a keyspace in a specified database. | |
Args: | |
database (str): The identifier of the database where the keyspace will be created. | |
keyspace (str): The name of the keyspace to create. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
{"ok": 1} if successful. Raises errors otherwise. | |
""" | |
r = self._ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/keyspaces/{keyspace}", | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 201: | |
return {"name": keyspace} | |
elif r.status_code >= 400 and r.status_code < 500: | |
raise APIRequestError(r, payload=None) | |
else: | |
raise ValueError(f"[HTTP {r.status_code}] {r.text}") | |
async def async_create_keyspace( | |
self, | |
database: str = "", | |
keyspace: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> Dict[str, str]: | |
""" | |
Create a keyspace in a specified database - async version of the method. | |
Args: | |
database (str): The identifier of the database where the keyspace will be created. | |
keyspace (str): The name of the keyspace to create. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
{"ok": 1} if successful. Raises errors otherwise. | |
""" | |
r = await self._async_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/keyspaces/{keyspace}", | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 201: | |
return {"name": keyspace} | |
elif r.status_code >= 400 and r.status_code < 500: | |
raise APIRequestError(r, payload=None) | |
else: | |
raise ValueError(f"[HTTP {r.status_code}] {r.text}") | |
def delete_keyspace( | |
self, | |
database: str = "", | |
keyspace: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> str: | |
""" | |
Delete a keyspace from a database | |
Args: | |
database (str): The identifier of the database to terminate. | |
keyspace (str): The name of the keyspace to create. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
str: The identifier of the deleted keyspace. Otherwise raises an error. | |
""" | |
r = self._ops_request( | |
method=http_methods.DELETE, | |
path=f"/databases/{database}/keyspaces/{keyspace}", | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 202: | |
return keyspace | |
elif r.status_code >= 400 and r.status_code < 500: | |
raise APIRequestError(r, payload=None) | |
else: | |
raise ValueError(f"[HTTP {r.status_code}] {r.text}") | |
async def async_delete_keyspace( | |
self, | |
database: str = "", | |
keyspace: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> str: | |
""" | |
Delete a keyspace from a database - async version of the method. | |
Args: | |
database (str): The identifier of the database to terminate. | |
keyspace (str): The name of the keyspace to create. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
str: The identifier of the deleted keyspace. Otherwise raises an error. | |
""" | |
r = await self._async_ops_request( | |
method=http_methods.DELETE, | |
path=f"/databases/{database}/keyspaces/{keyspace}", | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 202: | |
return keyspace | |
elif r.status_code >= 400 and r.status_code < 500: | |
raise APIRequestError(r, payload=None) | |
else: | |
raise ValueError(f"[HTTP {r.status_code}] {r.text}") | |
def park_database( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Park a specific database, making it inactive. | |
Args: | |
database (str): The identifier of the database to park. | |
Returns: | |
dict: The response from the server after parking the database. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/park", | |
timeout_info=timeout_info, | |
) | |
def unpark_database( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Unpark a specific database, making it active again. | |
Args: | |
database (str): The identifier of the database to unpark. | |
Returns: | |
dict: The response from the server after unparking the database. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/unpark", | |
timeout_info=timeout_info, | |
) | |
def resize_database( | |
self, | |
database: str = "", | |
options: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Resize a specific database according to provided options. | |
Args: | |
database (str): The identifier of the database to resize. | |
options (dict, optional): The specifications for the resize operation. | |
Returns: | |
dict: The response from the server after the resize operation. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/resize", | |
json_data=options, | |
timeout_info=timeout_info, | |
) | |
def reset_database_password( | |
self, | |
database: str = "", | |
options: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Reset the password for a specific database. | |
Args: | |
database (str): The identifier of the database for which to reset the password. | |
options (dict, optional): Additional options for the password reset. | |
Returns: | |
dict: The response from the server after resetting the password. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/resetPassword", | |
json_data=options, | |
timeout_info=timeout_info, | |
) | |
def get_secure_bundle( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a secure bundle URL for a specific database. | |
Args: | |
database (str): The identifier of the database for which to get the secure bundle. | |
Returns: | |
dict: The secure bundle URL and related information. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/secureBundleURL", | |
timeout_info=timeout_info, | |
) | |
def get_datacenters( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Get a list of datacenters associated with a specific database. | |
Args: | |
database (str): The identifier of the database for which to list datacenters. | |
Returns: | |
dict: A list of datacenters and their details. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/databases/{database}/datacenters", | |
timeout_info=timeout_info, | |
) | |
def create_datacenter( | |
self, | |
database: str = "", | |
options: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Create a new datacenter for a specific database. | |
Args: | |
database (str): The identifier of the database for which to create the datacenter. | |
options (dict, optional): Specifications for the new datacenter. | |
Returns: | |
dict: The response from the server after creating the datacenter. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/datacenters", | |
json_data=options, | |
timeout_info=timeout_info, | |
) | |
def terminate_datacenter( | |
self, | |
database: str = "", | |
datacenter: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Terminate a specific datacenter in a database. | |
Args: | |
database (str): The identifier of the database containing the datacenter. | |
datacenter (str): The identifier of the datacenter to terminate. | |
Returns: | |
dict: The response from the server after terminating the datacenter. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/datacenters/{datacenter}/terminate", | |
timeout_info=timeout_info, | |
) | |
def get_access_list( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve the access list for a specific database. | |
Args: | |
database (str): The identifier of the database for which to get the access list. | |
Returns: | |
dict: The current access list for the database. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/databases/{database}/access-list", | |
timeout_info=timeout_info, | |
) | |
def replace_access_list( | |
self, | |
database: str = "", | |
access_list: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Replace the entire access list for a specific database. | |
Args: | |
database (str): The identifier of the database for which to replace the access list. | |
access_list (dict): The new access list to be set. | |
Returns: | |
dict: The response from the server after replacing the access list. | |
""" | |
return self._json_ops_request( | |
method=http_methods.PUT, | |
path=f"/databases/{database}/access-list", | |
json_data=access_list, | |
timeout_info=timeout_info, | |
) | |
def update_access_list( | |
self, | |
database: str = "", | |
access_list: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Update the access list for a specific database. | |
Args: | |
database (str): The identifier of the database for which to update the access list. | |
access_list (dict): The updates to be applied to the access list. | |
Returns: | |
dict: The response from the server after updating the access list. | |
""" | |
return self._json_ops_request( | |
method=http_methods.PATCH, | |
path=f"/databases/{database}/access-list", | |
json_data=access_list, | |
timeout_info=timeout_info, | |
) | |
def add_access_list_address( | |
self, | |
database: str = "", | |
address: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Add a new address to the access list for a specific database. | |
Args: | |
database (str): The identifier of the database for which to add the address. | |
address (dict): The address details to add to the access list. | |
Returns: | |
dict: The response from the server after adding the address. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/databases/{database}/access-list", | |
json_data=address, | |
timeout_info=timeout_info, | |
) | |
def delete_access_list( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Delete the access list for a specific database. | |
Args: | |
database (str): The identifier of the database for which to delete the access list. | |
Returns: | |
dict: The response from the server after deleting the access list. | |
""" | |
return self._json_ops_request( | |
method=http_methods.DELETE, | |
path=f"/databases/{database}/access-list", | |
timeout_info=timeout_info, | |
) | |
def get_private_link( | |
self, database: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve the private link information for a specified database. | |
Args: | |
database (str): The identifier of the database. | |
Returns: | |
dict: The private link information for the database. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/organizations/clusters/{database}/private-link", | |
timeout_info=timeout_info, | |
) | |
def get_datacenter_private_link( | |
self, | |
database: str = "", | |
datacenter: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve the private link information for a specific datacenter in a database. | |
Args: | |
database (str): The identifier of the database. | |
datacenter (str): The identifier of the datacenter. | |
Returns: | |
dict: The private link information for the specified datacenter. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/private-link", | |
timeout_info=timeout_info, | |
) | |
def create_datacenter_private_link( | |
self, | |
database: str = "", | |
datacenter: str = "", | |
private_link: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Create a private link for a specific datacenter in a database. | |
Args: | |
database (str): The identifier of the database. | |
datacenter (str): The identifier of the datacenter. | |
private_link (dict): The private link configuration details. | |
Returns: | |
dict: The response from the server after creating the private link. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/private-link", | |
json_data=private_link, | |
timeout_info=timeout_info, | |
) | |
def create_datacenter_endpoint( | |
self, | |
database: str = "", | |
datacenter: str = "", | |
endpoint: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Create an endpoint for a specific datacenter in a database. | |
Args: | |
database (str): The identifier of the database. | |
datacenter (str): The identifier of the datacenter. | |
endpoint (dict): The endpoint configuration details. | |
Returns: | |
dict: The response from the server after creating the endpoint. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoint", | |
json_data=endpoint, | |
timeout_info=timeout_info, | |
) | |
def update_datacenter_endpoint( | |
self, | |
database: str = "", | |
datacenter: str = "", | |
endpoint: Dict[str, Any] = {}, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Update an existing endpoint for a specific datacenter in a database. | |
Args: | |
database (str): The identifier of the database. | |
datacenter (str): The identifier of the datacenter. | |
endpoint (dict): The updated endpoint configuration details. | |
Returns: | |
dict: The response from the server after updating the endpoint. | |
""" | |
return self._json_ops_request( | |
method=http_methods.PUT, | |
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint['id']}", | |
json_data=endpoint, | |
timeout_info=timeout_info, | |
) | |
def get_datacenter_endpoint( | |
self, | |
database: str = "", | |
datacenter: str = "", | |
endpoint: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve information about a specific endpoint in a datacenter of a database. | |
Args: | |
database (str): The identifier of the database. | |
datacenter (str): The identifier of the datacenter. | |
endpoint (str): The identifier of the endpoint. | |
Returns: | |
dict: The endpoint information for the specified datacenter. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint}", | |
timeout_info=timeout_info, | |
) | |
def delete_datacenter_endpoint( | |
self, | |
database: str = "", | |
datacenter: str = "", | |
endpoint: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Delete a specific endpoint in a datacenter of a database. | |
Args: | |
database (str): The identifier of the database. | |
datacenter (str): The identifier of the datacenter. | |
endpoint (str): The identifier of the endpoint to delete. | |
Returns: | |
dict: The response from the server after deleting the endpoint. | |
""" | |
return self._json_ops_request( | |
method=http_methods.DELETE, | |
path=f"/organizations/clusters/{database}/datacenters/{datacenter}/endpoints/{endpoint}", | |
timeout_info=timeout_info, | |
) | |
def get_available_classic_regions( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of available classic regions. | |
Returns: | |
dict: A list of available classic regions. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, path="/availableRegions", timeout_info=timeout_info | |
) | |
def get_available_regions( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of available regions for serverless deployment. | |
Returns: | |
dict: A list of available regions for serverless deployment. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path="/regions/serverless", | |
timeout_info=timeout_info, | |
) | |
def get_roles(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of roles within the organization. | |
Returns: | |
dict: A list of roles within the organization. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path="/organizations/roles", | |
timeout_info=timeout_info, | |
) | |
def create_role( | |
self, | |
role_definition: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Create a new role within the organization. | |
Args: | |
role_definition (dict, optional): The definition of the role to be created. | |
Returns: | |
dict: The response from the server after creating the role. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path="/organizations/roles", | |
json_data=role_definition, | |
timeout_info=timeout_info, | |
) | |
def get_role( | |
self, role: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve details of a specific role within the organization. | |
Args: | |
role (str): The identifier of the role. | |
Returns: | |
dict: The details of the specified role. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/organizations/roles/{role}", | |
timeout_info=timeout_info, | |
) | |
def update_role( | |
self, | |
role: str = "", | |
role_definition: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Update the definition of an existing role within the organization. | |
Args: | |
role (str): The identifier of the role to update. | |
role_definition (dict, optional): The new definition of the role. | |
Returns: | |
dict: The response from the server after updating the role. | |
""" | |
return self._json_ops_request( | |
method=http_methods.PUT, | |
path=f"/organizations/roles/{role}", | |
json_data=role_definition, | |
timeout_info=timeout_info, | |
) | |
def delete_role( | |
self, role: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Delete a specific role from the organization. | |
Args: | |
role (str): The identifier of the role to delete. | |
Returns: | |
dict: The response from the server after deleting the role. | |
""" | |
return self._json_ops_request( | |
method=http_methods.DELETE, | |
path=f"/organizations/roles/{role}", | |
timeout_info=timeout_info, | |
) | |
def invite_user( | |
self, | |
user_definition: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Invite a new user to the organization. | |
Args: | |
user_definition (dict, optional): The definition of the user to be invited. | |
Returns: | |
dict: The response from the server after inviting the user. | |
""" | |
return self._json_ops_request( | |
method=http_methods.PUT, | |
path="/organizations/users", | |
json_data=user_definition, | |
timeout_info=timeout_info, | |
) | |
def get_users(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of users within the organization. | |
Returns: | |
dict: A list of users within the organization. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path="/organizations/users", | |
timeout_info=timeout_info, | |
) | |
def get_user( | |
self, user: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve details of a specific user within the organization. | |
Args: | |
user (str): The identifier of the user. | |
Returns: | |
dict: The details of the specified user. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/organizations/users/{user}", | |
timeout_info=timeout_info, | |
) | |
def remove_user( | |
self, user: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Remove a user from the organization. | |
Args: | |
user (str): The identifier of the user to remove. | |
Returns: | |
dict: The response from the server after removing the user. | |
""" | |
return self._json_ops_request( | |
method=http_methods.DELETE, | |
path=f"/organizations/users/{user}", | |
timeout_info=timeout_info, | |
) | |
def update_user_roles( | |
self, | |
user: str = "", | |
roles: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Update the roles assigned to a specific user within the organization. | |
Args: | |
user (str): The identifier of the user. | |
roles (list, optional): The list of new roles to assign to the user. | |
Returns: | |
dict: The response from the server after updating the user's roles. | |
""" | |
return self._json_ops_request( | |
method=http_methods.PUT, | |
path=f"/organizations/users/{user}/roles", | |
json_data=roles, | |
timeout_info=timeout_info, | |
) | |
def get_clients(self, timeout_info: TimeoutInfoWideType = None) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of client IDs and secrets associated with the organization. | |
Returns: | |
dict: A list of client IDs and their associated secrets. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, path="/clientIdSecrets", timeout_info=timeout_info | |
) | |
def create_token( | |
self, | |
roles: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Create a new token with specific roles. | |
Args: | |
roles (dict, optional): The roles to associate with the token: | |
{"roles": ["<roleId>"]} | |
Returns: | |
dict: The response from the server after creating the token. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path="/clientIdSecrets", | |
json_data=roles, | |
timeout_info=timeout_info, | |
) | |
def delete_token( | |
self, token: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Delete a specific token. | |
Args: | |
token (str): The identifier of the token to delete. | |
Returns: | |
dict: The response from the server after deleting the token. | |
""" | |
return self._json_ops_request( | |
method=http_methods.DELETE, | |
path=f"/clientIdSecret/{token}", | |
timeout_info=timeout_info, | |
) | |
def get_organization( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve details of the current organization. | |
Returns: | |
dict: The details of the organization. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, path="/currentOrg", timeout_info=timeout_info | |
) | |
def get_access_lists( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of access lists for the organization. | |
Returns: | |
dict: A list of access lists. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, path="/access-lists", timeout_info=timeout_info | |
) | |
def get_access_list_template( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a template for creating an access list. | |
Returns: | |
dict: An access list template. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path="/access-list/template", | |
timeout_info=timeout_info, | |
) | |
def validate_access_list( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Validate the configuration of the access list. | |
Returns: | |
dict: The validation result of the access list configuration. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path="/access-list/validate", | |
timeout_info=timeout_info, | |
) | |
def get_private_links( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of private link connections for the organization. | |
Returns: | |
dict: A list of private link connections. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path="/organizations/private-link", | |
timeout_info=timeout_info, | |
) | |
def get_streaming_providers( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of streaming service providers. | |
Returns: | |
dict: A list of available streaming service providers. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path="/streaming/providers", | |
timeout_info=timeout_info, | |
) | |
def get_streaming_tenants( | |
self, timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve a list of streaming tenants. | |
Returns: | |
dict: A list of streaming tenants and their details. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path="/streaming/tenants", | |
timeout_info=timeout_info, | |
) | |
def create_streaming_tenant( | |
self, | |
tenant: Optional[Dict[str, Any]] = None, | |
timeout_info: TimeoutInfoWideType = None, | |
) -> OPS_API_RESPONSE: | |
""" | |
Create a new streaming tenant. | |
Args: | |
tenant (dict, optional): The configuration details for the new streaming tenant. | |
Returns: | |
dict: The response from the server after creating the streaming tenant. | |
""" | |
return self._json_ops_request( | |
method=http_methods.POST, | |
path="/streaming/tenants", | |
json_data=tenant, | |
timeout_info=timeout_info, | |
) | |
def delete_streaming_tenant( | |
self, | |
tenant: str = "", | |
cluster: str = "", | |
timeout_info: TimeoutInfoWideType = None, | |
) -> None: | |
""" | |
Delete a specific streaming tenant from a cluster. | |
Args: | |
tenant (str): The identifier of the tenant to delete. | |
cluster (str): The identifier of the cluster from which the tenant is to be deleted. | |
timeout_info: either a float (seconds) or a TimeoutInfo dict (see) | |
Returns: | |
dict: The response from the server after deleting the streaming tenant. | |
""" | |
r = self._ops_request( | |
method=http_methods.DELETE, | |
path=f"/streaming/tenants/{tenant}/clusters/{cluster}", | |
timeout_info=timeout_info, | |
) | |
if r.status_code == 202: # 'Accepted' | |
return None | |
else: | |
raise ValueError(r.text) | |
def get_streaming_tenant( | |
self, tenant: str = "", timeout_info: TimeoutInfoWideType = None | |
) -> OPS_API_RESPONSE: | |
""" | |
Retrieve information about the limits and usage of a specific streaming tenant. | |
Args: | |
tenant (str): The identifier of the streaming tenant. | |
Returns: | |
dict: Details of the specified streaming tenant, including limits and current usage. | |
""" | |
return self._json_ops_request( | |
method=http_methods.GET, | |
path=f"/streaming/tenants/{tenant}/limits", | |
timeout_info=timeout_info, | |
) | |