""" Facilities to manage the conversion between secure-connect-bundle (plus additional info) and the "init string". """ import base64 import json import os import shutil import tempfile from typing import Any, Dict, Optional, Tuple from zipfile import ZipFile CONFIG_FILE_NAME = "config.json" USED_CONFIG_KEYS_AND_DEFAULTS = { "caCertLocation": "./ca.crt", "keyLocation": "./key", "certLocation": "./cert", } def encode_str(sng: str) -> str: return base64.b64encode(sng.encode()).decode() def decode_str(b64: str) -> str: return base64.b64decode(b64).decode() def _encode_from_string(s: str) -> str: return encode_str(s) def _encode_from_file(file_dir: str, file_title: str) -> str: file_path = os.path.join(file_dir, file_title) with open(file_path) as o_file: return o_file.read() def _clean_filename(fn: str) -> str: if fn[:2] == "./": return fn[2:] else: return fn def infer_keyspace_from_bundle(bundle_path: Optional[str]) -> Optional[str]: """ Given a bundle zipfile path, try to peek at its config.json and extract a default keyspace name on it. Do not raise errors: rather, silently return None instead. """ if bundle_path: try: bundle_zip = ZipFile(bundle_path) open_config = bundle_zip.open("config.json") ascii_lines = [line.decode() for line in open_config.readlines()] config = json.loads("".join(ascii_lines)) return str(config["keyspace"]) if "keyspace" in config else None except Exception: return None else: return None def bundle_path_to_init_string( secure_bundle: str, keyspace: Optional[str] = None, token: Optional[str] = None, tempfile_basedir: Optional[str] = None, ) -> str: """ Utility function. Make the provided bundle zip into a "init string". 1. Unzip the provided bundle to a temp dir 2. Read and acquire (selected) contents of the zip 3. prepare the big json bundle description 4. make the latter into a big "init string" 5. delete the temporary dir """ with ZipFile(secure_bundle) as zipfile: base_dir = tempfile_basedir if tempfile_basedir else tempfile.gettempdir() temp_dir = tempfile.mkdtemp(dir=base_dir) try: # read config.json and find the filenames to get zipfile.extract(CONFIG_FILE_NAME, path=temp_dir) filenames = [CONFIG_FILE_NAME] config = json.load(open(os.path.join(temp_dir, CONFIG_FILE_NAME))) filenames += [ _clean_filename(config.get(cfg_k, default)) for cfg_k, default in USED_CONFIG_KEYS_AND_DEFAULTS.items() ] for filename in filenames: zipfile.extract(filename, path=temp_dir) # bundle_data = { filename: _encode_from_file(temp_dir, filename) for filename in filenames } # bundle_description = { "bundle_data": bundle_data, "options": { "version": "1", "bundle_file_name": os.path.split(secure_bundle)[1], "keyspace": keyspace, "token": token, }, } # init_string = encode_str( json.dumps( bundle_description, separators=(",", ":"), ) ) return init_string finally: shutil.rmtree(temp_dir) def init_string_to_bundle_path_and_options( init_string: str, target_dir: str ) -> Tuple[str, Dict[str, Any]]: """ Make an init string back into a usable secure-connect-bundle. Return (bundle_path, options_dict) where options_dict is any additional info found in the init string beyond the bundle contents. """ bundle_description = json.loads(decode_str(init_string)) options_dict = bundle_description.get("options", {}) version = options_dict.get("version") if version == "1": bundle_data = bundle_description["bundle_data"] bundle_file_name = options_dict.get( "bundle_file_name", "secure-connect-bundle.zip" ) bundle_filepath = os.path.join(target_dir, bundle_file_name) with ZipFile(bundle_filepath, "w") as bundle_zip_file: for f_name, f_contents in bundle_data.items(): bundle_zip_file.writestr(f_name, data=f_contents) return bundle_filepath, options_dict else: raise ValueError(f"Init string has unsupported or unknown version {version}.") def create_init_string_utility() -> None: # Utility command-line: converts the full bundle path (argument) # to an "init string" and prints it as an export line import sys if sys.argv[1:] == [] or len(sys.argv) > 4: cmd = sys.argv[0] print(f"Usage: {cmd} bundle_path [keyspace [token]]") else: bundle_path = sys.argv[1] keyspace = None if len(sys.argv) < 3 else sys.argv[2] token = None if len(sys.argv) < 4 else sys.argv[3] print(f'- Input bundle path: "{bundle_path}".') print(f"- Keyspace={'Y' if keyspace else 'N'}.") print(f"- Token={'Y' if token else 'N'}.") this_init_string = bundle_path_to_init_string(bundle_path, keyspace, token) print("=> Export command:") print(f'export ASTRA_DB_INIT_STRING="{this_init_string}"') if __name__ == "__main__": create_init_string_utility()