Spaces:
Running
Running
File size: 14,765 Bytes
2a0bc63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
import os
import shutil
import tempfile
from typing import Any, Dict, List, Optional, Union
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session
from cassio.config.bundle_download import download_astra_bundle_url
from cassio.config.bundle_management import (
infer_keyspace_from_bundle,
init_string_to_bundle_path_and_options,
)
ASTRA_CLOUD_AUTH_USERNAME = "token"
DOWNLOADED_BUNDLE_FILE_NAME = "secure-connect-bundle_devopsapi.zip"
default_session: Optional[Session] = None
default_keyspace: Optional[str] = None
def init(
auto: bool = False,
session: Optional[Session] = None,
secure_connect_bundle: Optional[str] = None,
init_string: Optional[str] = None,
token: Optional[str] = None,
database_id: Optional[str] = None,
keyspace: Optional[str] = None,
contact_points: Optional[Union[str, List[str]]] = None,
username: Optional[str] = None,
password: Optional[str] = None,
cluster_kwargs: Optional[Dict[str, Any]] = None,
tempfile_basedir: Optional[str] = None,
bundle_url_template: Optional[str] = None,
cloud_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
"""
Globally set the default Cassandra connection (/keyspace) for CassIO.
This default will be used by all other db-requiring CassIO instantiations,
unless passed to the respective classes' __init__.
There are various ways to achieve this, depending on the passed parameters.
Broadly speaking, this method allows to pass one's own ready Session,
or to have it created in the method. For this second case, both Astra DB
and a regular Cassandra cluster can be targeted.
One can also simply call cassio.init(auto=True) and let it figure out
what to do based on standard environment variables.
CASSANDRA
If one passes `contact_points`, it is assumed that this is Cassandra.
In that case, only the following arguments will be used:
`contact_points`, `keyspace`, `username`, `password`, `cluster_kwargs`
Note that when passing a `session` all other parameters are ignored.
ASTRA DB:
If `contact_points` is not passed, one of several methods to connect to
Astra should be supplied for the connection to Astra. These overlap:
see below for their precedence.
Note that when passing a `session` all other parameters are ignored.
AUTO:
If passing auto=True, no other parameter is allowed except for
`tempfile_basedir`. The rest, including choosing Astra DB / Cassandra,
is autofilled by inspecting the following environment variables:
CASSANDRA_CONTACT_POINTS
CASSANDRA_USERNAME
CASSANDRA_PASSWORD
CASSANDRA_KEYSPACE
ASTRA_DB_APPLICATION_TOKEN
ASTRA_DB_INIT_STRING
ASTRA_DB_SECURE_BUNDLE_PATH
ASTRA_DB_KEYSPACE
ASTRA_DB_DATABASE_ID
PARAMETERS:
`auto`: (bool = False), whether to auto-guess all connection params.
`session` (optional cassandra.cluster.Session), an established connection.
`secure_connect_bundle` (optional str), full path to a Secure Bundle.
`init_string` (optional str), a stand-alone "db init string" credential
(which can optionally contain keyspace and/or token).
`token` (optional str), the Astra DB "AstraCS:..." token.
`database_id` (optional str), the Astra DB ID. Used only for Astra
connections with no `secure_connect_bundle` parameter passed.
`keyspace` (optional str), the keyspace to work.
`contact_points` (optional List[str]), for Cassandra connection
`username` (optional str), username for Cassandra connection
`password` (optional str), password for Cassandra connection
`cluster_kwargs` (optional dict), additional arguments to `Cluster(...)`.
`tempfile_basedir` (optional str), where to create temporary work directories.
`bundle_url_template` (optional str), url template for getting the database
secure bundle. The "databaseId" variable is resolved with the actual value.
Default (for Astra DB):
"https://api.astra.datastax.com/v2/databases/{database_id}/secureBundleURL"
`cloud_kwargs` (optional dict), additional arguments to `Cluster(cloud={...})`
(i.e. additional key-value pairs within the passed `cloud` dict).
ASTRA DB:
The Astra-related parameters are arranged in a chain of fallbacks.
In case redundant information is supplied, these are the precedences:
session > secure_connect_bundle > init_string
token > (from init_string if any)
keyspace > (from init_string if any) > (from bundle if any)
If a secure-connect-bundle is needed and not passed, it will be downloaded:
this requires `database_id` to be passed, suitable token permissions.
Constraints and caveats:
`secure_connect_bundle` requires `token`.
`session` does not make use of `cluster_kwargs` and will ignore it.
The Session is created at `init` time and kept around, available to any
subsequent table creation. If calling `init` a second time, a new Session
will be made available replacing the previous one.
"""
global default_session
global default_keyspace
temp_dir_created: bool = False
temp_dir: Optional[str] = None
direct_session: Optional[Session] = None
bundle_from_is: Optional[str] = None
bundle_from_arg: Optional[str] = None
bundle_from_download: Optional[str] = None
keyspace_from_is: Optional[str] = None
keyspace_from_bundle: Optional[str] = None
keyspace_from_arg: Optional[str] = None
token_from_is: Optional[str] = None
token_from_arg: Optional[str] = None
#
if auto:
if any(
v is not None
for v in (
session,
secure_connect_bundle,
init_string,
token,
database_id,
keyspace,
contact_points,
username,
password,
# cluster_kwargs is allowed
# tempfile_basedir is allowed
bundle_url_template,
cloud_kwargs,
)
):
raise ValueError(
"When auto=True, no arguments can be passed other than "
"tempfile_basedir and cluster_kwargs."
)
# setting some arguments from environment variables
if "CASSANDRA_CONTACT_POINTS" in os.environ:
contact_points = os.environ["CASSANDRA_CONTACT_POINTS"]
username = os.environ.get("CASSANDRA_USERNAME")
password = os.environ.get("CASSANDRA_PASSWORD")
keyspace = os.environ.get("CASSANDRA_KEYSPACE")
elif any(
avar in os.environ
for avar in [
"ASTRA_DB_APPLICATION_TOKEN",
"ASTRA_DB_INIT_STRING",
]
):
token = os.environ.get("ASTRA_DB_APPLICATION_TOKEN")
init_string = os.environ.get("ASTRA_DB_INIT_STRING")
secure_connect_bundle = os.environ.get("ASTRA_DB_SECURE_BUNDLE_PATH")
keyspace = os.environ.get("ASTRA_DB_KEYSPACE")
database_id = os.environ.get("ASTRA_DB_DATABASE_ID")
#
try:
# process init_string
base_dir = tempfile_basedir if tempfile_basedir else tempfile.gettempdir()
if init_string:
temp_dir = tempfile.mkdtemp(dir=base_dir)
temp_dir_created = True
bundle_from_is, options_from_is = init_string_to_bundle_path_and_options(
init_string,
target_dir=temp_dir,
)
keyspace_from_is = options_from_is.get("keyspace")
token_from_is = options_from_is.get("token")
# for the session
if session:
direct_session = session
if secure_connect_bundle:
if not token:
raise ValueError(
"`token` is required if `secure_connect_bundle` is passed"
)
# params from arguments:
bundle_from_arg = secure_connect_bundle
token_from_arg = token
keyspace_from_arg = keyspace
can_be_astra = any(
[
secure_connect_bundle is not None,
init_string is not None,
token is not None,
]
)
# resolution of priority among args
if direct_session:
default_session = direct_session
else:
# first determine if Cassandra or Astra
is_cassandra = all(
[
secure_connect_bundle is None,
init_string is None,
token is None,
contact_points is not None,
]
)
if is_cassandra:
is_astra_db = False
else:
# determine if Astra DB
is_astra_db = can_be_astra
#
if is_cassandra:
# need to take care of:
# contact_points, username, password, cluster_kwargs
chosen_contact_points: Union[List[str], None]
if contact_points:
if isinstance(contact_points, str):
chosen_contact_points = [
cp.strip() for cp in contact_points.split(",") if cp.strip()
]
else:
# assume it's a list already
chosen_contact_points = contact_points
else:
# normalize "" to None for later `Cluster(...)` call
chosen_contact_points = None
#
if username is not None and password is not None:
chosen_auth_provider = PlainTextAuthProvider(
username,
password,
)
else:
if username is not None or password is not None:
raise ValueError(
"Please provide both usename/password or none."
)
else:
chosen_auth_provider = None
#
if chosen_contact_points is None:
cluster = Cluster(
auth_provider=chosen_auth_provider,
**(cluster_kwargs if cluster_kwargs is not None else {}),
)
else:
cluster = Cluster(
contact_points=chosen_contact_points,
auth_provider=chosen_auth_provider,
**(cluster_kwargs if cluster_kwargs is not None else {}),
)
default_session = cluster.connect()
elif is_astra_db:
# Astra DB
chosen_token = _first_valid(token_from_arg, token_from_is)
if chosen_token is None:
raise ValueError(
"A token must be supplied if connection is to be established."
)
chosen_bundle_pre_token = _first_valid(bundle_from_arg, bundle_from_is)
# Try to get the bundle from the token if not supplied otherwise
if chosen_bundle_pre_token is None:
if database_id is None:
raise ValueError(
"A database_id must be supplied if no "
"secure_connect_bundle is provided."
)
if not temp_dir_created:
temp_dir = tempfile.mkdtemp(dir=base_dir)
temp_dir_created = True
bundle_from_download = os.path.join(
temp_dir or "", DOWNLOADED_BUNDLE_FILE_NAME
)
download_astra_bundle_url(
database_id,
chosen_token,
bundle_from_download,
bundle_url_template,
)
# After the devops-api part, re-evaluate chosen_bundle:
chosen_bundle = _first_valid(
bundle_from_download, chosen_bundle_pre_token
)
#
if chosen_bundle:
keyspace_from_bundle = infer_keyspace_from_bundle(chosen_bundle)
cluster = Cluster(
cloud={
"secure_connect_bundle": chosen_bundle,
**(cloud_kwargs if cloud_kwargs is not None else {}),
},
auth_provider=PlainTextAuthProvider(
ASTRA_CLOUD_AUTH_USERNAME,
chosen_token,
),
**(cluster_kwargs if cluster_kwargs is not None else {}),
)
default_session = cluster.connect()
else:
raise ValueError("No secure-connect-bundle was available.")
# keyspace to be resolved in any case
chosen_keyspace = _first_valid(
keyspace_from_arg, keyspace_from_is, keyspace_from_bundle
)
default_keyspace = chosen_keyspace
finally:
if temp_dir_created and temp_dir is not None:
shutil.rmtree(temp_dir)
def resolve_session(arg_session: Optional[Session] = None) -> Optional[Session]:
"""Utility to fall back to the global defaults when null args are passed."""
if arg_session is not None:
return arg_session
else:
return default_session
def check_resolve_session(arg_session: Optional[Session] = None) -> Session:
s = resolve_session(arg_session)
if s is None:
raise ValueError("DB session not set.")
else:
return s
def resolve_keyspace(arg_keyspace: Optional[str] = None) -> Optional[str]:
"""Utility to fall back to the global defaults when null args are passed."""
if arg_keyspace is not None:
return arg_keyspace
else:
return default_keyspace
def check_resolve_keyspace(arg_keyspace: Optional[str] = None) -> str:
s = resolve_keyspace(arg_keyspace)
if s is None:
raise ValueError("DB keyspace not set.")
else:
return s
def _first_valid(*pargs: Any) -> Union[Any, None]:
for entry in pargs:
if entry is not None:
return entry
return None
|