File size: 14,765 Bytes
2a0bc63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
import os
import shutil
import tempfile
from typing import Any, Dict, List, Optional, Union

from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session

from cassio.config.bundle_download import download_astra_bundle_url
from cassio.config.bundle_management import (
    infer_keyspace_from_bundle,
    init_string_to_bundle_path_and_options,
)

ASTRA_CLOUD_AUTH_USERNAME = "token"
DOWNLOADED_BUNDLE_FILE_NAME = "secure-connect-bundle_devopsapi.zip"

default_session: Optional[Session] = None
default_keyspace: Optional[str] = None


def init(
    auto: bool = False,
    session: Optional[Session] = None,
    secure_connect_bundle: Optional[str] = None,
    init_string: Optional[str] = None,
    token: Optional[str] = None,
    database_id: Optional[str] = None,
    keyspace: Optional[str] = None,
    contact_points: Optional[Union[str, List[str]]] = None,
    username: Optional[str] = None,
    password: Optional[str] = None,
    cluster_kwargs: Optional[Dict[str, Any]] = None,
    tempfile_basedir: Optional[str] = None,
    bundle_url_template: Optional[str] = None,
    cloud_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
    """
    Globally set the default Cassandra connection (/keyspace) for CassIO.
    This default will be used by all other db-requiring CassIO instantiations,
    unless passed to the respective classes' __init__.

    There are various ways to achieve this, depending on the passed parameters.
    Broadly speaking, this method allows to pass one's own ready Session,
    or to have it created in the method. For this second case, both Astra DB
    and a regular Cassandra cluster can be targeted.
    One can also simply call cassio.init(auto=True) and let it figure out
    what to do based on standard environment variables.

    CASSANDRA
    If one passes `contact_points`, it is assumed that this is Cassandra.
    In that case, only the following arguments will be used:
    `contact_points`, `keyspace`, `username`, `password`, `cluster_kwargs`
    Note that when passing a `session` all other parameters are ignored.

    ASTRA DB:
    If `contact_points` is not passed, one of several methods to connect to
    Astra should be supplied for the connection to Astra. These overlap:
    see below for their precedence.
    Note that when passing a `session` all other parameters are ignored.

    AUTO:
    If passing auto=True, no other parameter is allowed except for
    `tempfile_basedir`. The rest, including choosing Astra DB / Cassandra,
    is autofilled by inspecting the following environment variables:
        CASSANDRA_CONTACT_POINTS
        CASSANDRA_USERNAME
        CASSANDRA_PASSWORD
        CASSANDRA_KEYSPACE
        ASTRA_DB_APPLICATION_TOKEN
        ASTRA_DB_INIT_STRING
        ASTRA_DB_SECURE_BUNDLE_PATH
        ASTRA_DB_KEYSPACE
        ASTRA_DB_DATABASE_ID

    PARAMETERS:
        `auto`: (bool = False), whether to auto-guess all connection params.
        `session` (optional cassandra.cluster.Session), an established connection.
        `secure_connect_bundle` (optional str), full path to a Secure Bundle.
        `init_string` (optional str), a stand-alone "db init string" credential
            (which can optionally contain keyspace and/or token).
        `token` (optional str), the Astra DB "AstraCS:..." token.
        `database_id` (optional str), the Astra DB ID. Used only for Astra
            connections with no `secure_connect_bundle` parameter passed.
        `keyspace` (optional str), the keyspace to work.
        `contact_points` (optional List[str]), for Cassandra connection
        `username` (optional str), username for Cassandra connection
        `password` (optional str), password for Cassandra connection
        `cluster_kwargs` (optional dict), additional arguments to `Cluster(...)`.
        `tempfile_basedir` (optional str), where to create temporary work directories.
        `bundle_url_template` (optional str), url template for getting the database
            secure bundle. The "databaseId" variable is resolved with the actual value.
            Default (for Astra DB):
                "https://api.astra.datastax.com/v2/databases/{database_id}/secureBundleURL"
        `cloud_kwargs` (optional dict), additional arguments to `Cluster(cloud={...})`
            (i.e. additional key-value pairs within the passed `cloud` dict).

    ASTRA DB:
    The Astra-related parameters are arranged in a chain of fallbacks.
    In case redundant information is supplied, these are the precedences:
        session > secure_connect_bundle > init_string
        token > (from init_string if any)
        keyspace > (from init_string if any) > (from bundle if any)
    If a secure-connect-bundle is needed and not passed, it will be downloaded:
        this requires `database_id` to be passed, suitable token permissions.
    Constraints and caveats:
        `secure_connect_bundle` requires `token`.
        `session` does not make use of `cluster_kwargs` and will ignore it.

    The Session is created at `init` time and kept around, available to any
    subsequent table creation. If calling `init` a second time, a new Session
    will be made available replacing the previous one.
    """
    global default_session
    global default_keyspace
    temp_dir_created: bool = False
    temp_dir: Optional[str] = None
    direct_session: Optional[Session] = None
    bundle_from_is: Optional[str] = None
    bundle_from_arg: Optional[str] = None
    bundle_from_download: Optional[str] = None
    keyspace_from_is: Optional[str] = None
    keyspace_from_bundle: Optional[str] = None
    keyspace_from_arg: Optional[str] = None
    token_from_is: Optional[str] = None
    token_from_arg: Optional[str] = None
    #
    if auto:
        if any(
            v is not None
            for v in (
                session,
                secure_connect_bundle,
                init_string,
                token,
                database_id,
                keyspace,
                contact_points,
                username,
                password,
                # cluster_kwargs is allowed
                # tempfile_basedir is allowed
                bundle_url_template,
                cloud_kwargs,
            )
        ):
            raise ValueError(
                "When auto=True, no arguments can be passed other than "
                "tempfile_basedir and cluster_kwargs."
            )
        # setting some arguments from environment variables
        if "CASSANDRA_CONTACT_POINTS" in os.environ:
            contact_points = os.environ["CASSANDRA_CONTACT_POINTS"]
            username = os.environ.get("CASSANDRA_USERNAME")
            password = os.environ.get("CASSANDRA_PASSWORD")
            keyspace = os.environ.get("CASSANDRA_KEYSPACE")
        elif any(
            avar in os.environ
            for avar in [
                "ASTRA_DB_APPLICATION_TOKEN",
                "ASTRA_DB_INIT_STRING",
            ]
        ):
            token = os.environ.get("ASTRA_DB_APPLICATION_TOKEN")
            init_string = os.environ.get("ASTRA_DB_INIT_STRING")
            secure_connect_bundle = os.environ.get("ASTRA_DB_SECURE_BUNDLE_PATH")
            keyspace = os.environ.get("ASTRA_DB_KEYSPACE")
            database_id = os.environ.get("ASTRA_DB_DATABASE_ID")
    #
    try:
        # process init_string
        base_dir = tempfile_basedir if tempfile_basedir else tempfile.gettempdir()
        if init_string:
            temp_dir = tempfile.mkdtemp(dir=base_dir)
            temp_dir_created = True
            bundle_from_is, options_from_is = init_string_to_bundle_path_and_options(
                init_string,
                target_dir=temp_dir,
            )
            keyspace_from_is = options_from_is.get("keyspace")
            token_from_is = options_from_is.get("token")
        # for the session
        if session:
            direct_session = session
        if secure_connect_bundle:
            if not token:
                raise ValueError(
                    "`token` is required if `secure_connect_bundle` is passed"
                )
        # params from arguments:
        bundle_from_arg = secure_connect_bundle
        token_from_arg = token
        keyspace_from_arg = keyspace
        can_be_astra = any(
            [
                secure_connect_bundle is not None,
                init_string is not None,
                token is not None,
            ]
        )
        # resolution of priority among args
        if direct_session:
            default_session = direct_session
        else:
            # first determine if Cassandra or Astra
            is_cassandra = all(
                [
                    secure_connect_bundle is None,
                    init_string is None,
                    token is None,
                    contact_points is not None,
                ]
            )
            if is_cassandra:
                is_astra_db = False
            else:
                # determine if Astra DB
                is_astra_db = can_be_astra
            #
            if is_cassandra:
                # need to take care of:
                #   contact_points, username, password, cluster_kwargs
                chosen_contact_points: Union[List[str], None]
                if contact_points:
                    if isinstance(contact_points, str):
                        chosen_contact_points = [
                            cp.strip() for cp in contact_points.split(",") if cp.strip()
                        ]
                    else:
                        # assume it's a list already
                        chosen_contact_points = contact_points
                else:
                    # normalize "" to None for later `Cluster(...)` call
                    chosen_contact_points = None
                #
                if username is not None and password is not None:
                    chosen_auth_provider = PlainTextAuthProvider(
                        username,
                        password,
                    )
                else:
                    if username is not None or password is not None:
                        raise ValueError(
                            "Please provide both usename/password or none."
                        )
                    else:
                        chosen_auth_provider = None
                #
                if chosen_contact_points is None:
                    cluster = Cluster(
                        auth_provider=chosen_auth_provider,
                        **(cluster_kwargs if cluster_kwargs is not None else {}),
                    )
                else:
                    cluster = Cluster(
                        contact_points=chosen_contact_points,
                        auth_provider=chosen_auth_provider,
                        **(cluster_kwargs if cluster_kwargs is not None else {}),
                    )
                default_session = cluster.connect()
            elif is_astra_db:
                # Astra DB
                chosen_token = _first_valid(token_from_arg, token_from_is)
                if chosen_token is None:
                    raise ValueError(
                        "A token must be supplied if connection is to be established."
                    )
                chosen_bundle_pre_token = _first_valid(bundle_from_arg, bundle_from_is)
                # Try to get the bundle from the token if not supplied otherwise
                if chosen_bundle_pre_token is None:
                    if database_id is None:
                        raise ValueError(
                            "A database_id must be supplied if no "
                            "secure_connect_bundle is provided."
                        )
                    if not temp_dir_created:
                        temp_dir = tempfile.mkdtemp(dir=base_dir)
                        temp_dir_created = True
                    bundle_from_download = os.path.join(
                        temp_dir or "", DOWNLOADED_BUNDLE_FILE_NAME
                    )
                    download_astra_bundle_url(
                        database_id,
                        chosen_token,
                        bundle_from_download,
                        bundle_url_template,
                    )
                # After the devops-api part, re-evaluate chosen_bundle:
                chosen_bundle = _first_valid(
                    bundle_from_download, chosen_bundle_pre_token
                )
                #
                if chosen_bundle:
                    keyspace_from_bundle = infer_keyspace_from_bundle(chosen_bundle)
                    cluster = Cluster(
                        cloud={
                            "secure_connect_bundle": chosen_bundle,
                            **(cloud_kwargs if cloud_kwargs is not None else {}),
                        },
                        auth_provider=PlainTextAuthProvider(
                            ASTRA_CLOUD_AUTH_USERNAME,
                            chosen_token,
                        ),
                        **(cluster_kwargs if cluster_kwargs is not None else {}),
                    )
                    default_session = cluster.connect()
                else:
                    raise ValueError("No secure-connect-bundle was available.")
        # keyspace to be resolved in any case
        chosen_keyspace = _first_valid(
            keyspace_from_arg, keyspace_from_is, keyspace_from_bundle
        )
        default_keyspace = chosen_keyspace
    finally:
        if temp_dir_created and temp_dir is not None:
            shutil.rmtree(temp_dir)


def resolve_session(arg_session: Optional[Session] = None) -> Optional[Session]:
    """Utility to fall back to the global defaults when null args are passed."""
    if arg_session is not None:
        return arg_session
    else:
        return default_session


def check_resolve_session(arg_session: Optional[Session] = None) -> Session:
    s = resolve_session(arg_session)
    if s is None:
        raise ValueError("DB session not set.")
    else:
        return s


def resolve_keyspace(arg_keyspace: Optional[str] = None) -> Optional[str]:
    """Utility to fall back to the global defaults when null args are passed."""
    if arg_keyspace is not None:
        return arg_keyspace
    else:
        return default_keyspace


def check_resolve_keyspace(arg_keyspace: Optional[str] = None) -> str:
    s = resolve_keyspace(arg_keyspace)
    if s is None:
        raise ValueError("DB keyspace not set.")
    else:
        return s


def _first_valid(*pargs: Any) -> Union[Any, None]:
    for entry in pargs:
        if entry is not None:
            return entry
    return None