import os import shutil import tempfile from typing import Any, Dict, List, Optional, Union from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, Session from cassio.config.bundle_download import download_astra_bundle_url from cassio.config.bundle_management import ( infer_keyspace_from_bundle, init_string_to_bundle_path_and_options, ) ASTRA_CLOUD_AUTH_USERNAME = "token" DOWNLOADED_BUNDLE_FILE_NAME = "secure-connect-bundle_devopsapi.zip" default_session: Optional[Session] = None default_keyspace: Optional[str] = None def init( auto: bool = False, session: Optional[Session] = None, secure_connect_bundle: Optional[str] = None, init_string: Optional[str] = None, token: Optional[str] = None, database_id: Optional[str] = None, keyspace: Optional[str] = None, contact_points: Optional[Union[str, List[str]]] = None, username: Optional[str] = None, password: Optional[str] = None, cluster_kwargs: Optional[Dict[str, Any]] = None, tempfile_basedir: Optional[str] = None, bundle_url_template: Optional[str] = None, cloud_kwargs: Optional[Dict[str, Any]] = None, ) -> None: """ Globally set the default Cassandra connection (/keyspace) for CassIO. This default will be used by all other db-requiring CassIO instantiations, unless passed to the respective classes' __init__. There are various ways to achieve this, depending on the passed parameters. Broadly speaking, this method allows to pass one's own ready Session, or to have it created in the method. For this second case, both Astra DB and a regular Cassandra cluster can be targeted. One can also simply call cassio.init(auto=True) and let it figure out what to do based on standard environment variables. CASSANDRA If one passes `contact_points`, it is assumed that this is Cassandra. In that case, only the following arguments will be used: `contact_points`, `keyspace`, `username`, `password`, `cluster_kwargs` Note that when passing a `session` all other parameters are ignored. ASTRA DB: If `contact_points` is not passed, one of several methods to connect to Astra should be supplied for the connection to Astra. These overlap: see below for their precedence. Note that when passing a `session` all other parameters are ignored. AUTO: If passing auto=True, no other parameter is allowed except for `tempfile_basedir`. The rest, including choosing Astra DB / Cassandra, is autofilled by inspecting the following environment variables: CASSANDRA_CONTACT_POINTS CASSANDRA_USERNAME CASSANDRA_PASSWORD CASSANDRA_KEYSPACE ASTRA_DB_APPLICATION_TOKEN ASTRA_DB_INIT_STRING ASTRA_DB_SECURE_BUNDLE_PATH ASTRA_DB_KEYSPACE ASTRA_DB_DATABASE_ID PARAMETERS: `auto`: (bool = False), whether to auto-guess all connection params. `session` (optional cassandra.cluster.Session), an established connection. `secure_connect_bundle` (optional str), full path to a Secure Bundle. `init_string` (optional str), a stand-alone "db init string" credential (which can optionally contain keyspace and/or token). `token` (optional str), the Astra DB "AstraCS:..." token. `database_id` (optional str), the Astra DB ID. Used only for Astra connections with no `secure_connect_bundle` parameter passed. `keyspace` (optional str), the keyspace to work. `contact_points` (optional List[str]), for Cassandra connection `username` (optional str), username for Cassandra connection `password` (optional str), password for Cassandra connection `cluster_kwargs` (optional dict), additional arguments to `Cluster(...)`. `tempfile_basedir` (optional str), where to create temporary work directories. `bundle_url_template` (optional str), url template for getting the database secure bundle. The "databaseId" variable is resolved with the actual value. Default (for Astra DB): "https://api.astra.datastax.com/v2/databases/{database_id}/secureBundleURL" `cloud_kwargs` (optional dict), additional arguments to `Cluster(cloud={...})` (i.e. additional key-value pairs within the passed `cloud` dict). ASTRA DB: The Astra-related parameters are arranged in a chain of fallbacks. In case redundant information is supplied, these are the precedences: session > secure_connect_bundle > init_string token > (from init_string if any) keyspace > (from init_string if any) > (from bundle if any) If a secure-connect-bundle is needed and not passed, it will be downloaded: this requires `database_id` to be passed, suitable token permissions. Constraints and caveats: `secure_connect_bundle` requires `token`. `session` does not make use of `cluster_kwargs` and will ignore it. The Session is created at `init` time and kept around, available to any subsequent table creation. If calling `init` a second time, a new Session will be made available replacing the previous one. """ global default_session global default_keyspace temp_dir_created: bool = False temp_dir: Optional[str] = None direct_session: Optional[Session] = None bundle_from_is: Optional[str] = None bundle_from_arg: Optional[str] = None bundle_from_download: Optional[str] = None keyspace_from_is: Optional[str] = None keyspace_from_bundle: Optional[str] = None keyspace_from_arg: Optional[str] = None token_from_is: Optional[str] = None token_from_arg: Optional[str] = None # if auto: if any( v is not None for v in ( session, secure_connect_bundle, init_string, token, database_id, keyspace, contact_points, username, password, # cluster_kwargs is allowed # tempfile_basedir is allowed bundle_url_template, cloud_kwargs, ) ): raise ValueError( "When auto=True, no arguments can be passed other than " "tempfile_basedir and cluster_kwargs." ) # setting some arguments from environment variables if "CASSANDRA_CONTACT_POINTS" in os.environ: contact_points = os.environ["CASSANDRA_CONTACT_POINTS"] username = os.environ.get("CASSANDRA_USERNAME") password = os.environ.get("CASSANDRA_PASSWORD") keyspace = os.environ.get("CASSANDRA_KEYSPACE") elif any( avar in os.environ for avar in [ "ASTRA_DB_APPLICATION_TOKEN", "ASTRA_DB_INIT_STRING", ] ): token = os.environ.get("ASTRA_DB_APPLICATION_TOKEN") init_string = os.environ.get("ASTRA_DB_INIT_STRING") secure_connect_bundle = os.environ.get("ASTRA_DB_SECURE_BUNDLE_PATH") keyspace = os.environ.get("ASTRA_DB_KEYSPACE") database_id = os.environ.get("ASTRA_DB_DATABASE_ID") # try: # process init_string base_dir = tempfile_basedir if tempfile_basedir else tempfile.gettempdir() if init_string: temp_dir = tempfile.mkdtemp(dir=base_dir) temp_dir_created = True bundle_from_is, options_from_is = init_string_to_bundle_path_and_options( init_string, target_dir=temp_dir, ) keyspace_from_is = options_from_is.get("keyspace") token_from_is = options_from_is.get("token") # for the session if session: direct_session = session if secure_connect_bundle: if not token: raise ValueError( "`token` is required if `secure_connect_bundle` is passed" ) # params from arguments: bundle_from_arg = secure_connect_bundle token_from_arg = token keyspace_from_arg = keyspace can_be_astra = any( [ secure_connect_bundle is not None, init_string is not None, token is not None, ] ) # resolution of priority among args if direct_session: default_session = direct_session else: # first determine if Cassandra or Astra is_cassandra = all( [ secure_connect_bundle is None, init_string is None, token is None, contact_points is not None, ] ) if is_cassandra: is_astra_db = False else: # determine if Astra DB is_astra_db = can_be_astra # if is_cassandra: # need to take care of: # contact_points, username, password, cluster_kwargs chosen_contact_points: Union[List[str], None] if contact_points: if isinstance(contact_points, str): chosen_contact_points = [ cp.strip() for cp in contact_points.split(",") if cp.strip() ] else: # assume it's a list already chosen_contact_points = contact_points else: # normalize "" to None for later `Cluster(...)` call chosen_contact_points = None # if username is not None and password is not None: chosen_auth_provider = PlainTextAuthProvider( username, password, ) else: if username is not None or password is not None: raise ValueError( "Please provide both usename/password or none." ) else: chosen_auth_provider = None # if chosen_contact_points is None: cluster = Cluster( auth_provider=chosen_auth_provider, **(cluster_kwargs if cluster_kwargs is not None else {}), ) else: cluster = Cluster( contact_points=chosen_contact_points, auth_provider=chosen_auth_provider, **(cluster_kwargs if cluster_kwargs is not None else {}), ) default_session = cluster.connect() elif is_astra_db: # Astra DB chosen_token = _first_valid(token_from_arg, token_from_is) if chosen_token is None: raise ValueError( "A token must be supplied if connection is to be established." ) chosen_bundle_pre_token = _first_valid(bundle_from_arg, bundle_from_is) # Try to get the bundle from the token if not supplied otherwise if chosen_bundle_pre_token is None: if database_id is None: raise ValueError( "A database_id must be supplied if no " "secure_connect_bundle is provided." ) if not temp_dir_created: temp_dir = tempfile.mkdtemp(dir=base_dir) temp_dir_created = True bundle_from_download = os.path.join( temp_dir or "", DOWNLOADED_BUNDLE_FILE_NAME ) download_astra_bundle_url( database_id, chosen_token, bundle_from_download, bundle_url_template, ) # After the devops-api part, re-evaluate chosen_bundle: chosen_bundle = _first_valid( bundle_from_download, chosen_bundle_pre_token ) # if chosen_bundle: keyspace_from_bundle = infer_keyspace_from_bundle(chosen_bundle) cluster = Cluster( cloud={ "secure_connect_bundle": chosen_bundle, **(cloud_kwargs if cloud_kwargs is not None else {}), }, auth_provider=PlainTextAuthProvider( ASTRA_CLOUD_AUTH_USERNAME, chosen_token, ), **(cluster_kwargs if cluster_kwargs is not None else {}), ) default_session = cluster.connect() else: raise ValueError("No secure-connect-bundle was available.") # keyspace to be resolved in any case chosen_keyspace = _first_valid( keyspace_from_arg, keyspace_from_is, keyspace_from_bundle ) default_keyspace = chosen_keyspace finally: if temp_dir_created and temp_dir is not None: shutil.rmtree(temp_dir) def resolve_session(arg_session: Optional[Session] = None) -> Optional[Session]: """Utility to fall back to the global defaults when null args are passed.""" if arg_session is not None: return arg_session else: return default_session def check_resolve_session(arg_session: Optional[Session] = None) -> Session: s = resolve_session(arg_session) if s is None: raise ValueError("DB session not set.") else: return s def resolve_keyspace(arg_keyspace: Optional[str] = None) -> Optional[str]: """Utility to fall back to the global defaults when null args are passed.""" if arg_keyspace is not None: return arg_keyspace else: return default_keyspace def check_resolve_keyspace(arg_keyspace: Optional[str] = None) -> str: s = resolve_keyspace(arg_keyspace) if s is None: raise ValueError("DB keyspace not set.") else: return s def _first_valid(*pargs: Any) -> Union[Any, None]: for entry in pargs: if entry is not None: return entry return None