Draken007's picture
Upload 7228 files
2a0bc63 verified
"""
Facilities to manage the conversion between secure-connect-bundle (plus
additional info) and the "init string".
"""
import base64
import json
import os
import shutil
import tempfile
from typing import Any, Dict, Optional, Tuple
from zipfile import ZipFile
CONFIG_FILE_NAME = "config.json"
USED_CONFIG_KEYS_AND_DEFAULTS = {
"caCertLocation": "./ca.crt",
"keyLocation": "./key",
"certLocation": "./cert",
}
def encode_str(sng: str) -> str:
return base64.b64encode(sng.encode()).decode()
def decode_str(b64: str) -> str:
return base64.b64decode(b64).decode()
def _encode_from_string(s: str) -> str:
return encode_str(s)
def _encode_from_file(file_dir: str, file_title: str) -> str:
file_path = os.path.join(file_dir, file_title)
with open(file_path) as o_file:
return o_file.read()
def _clean_filename(fn: str) -> str:
if fn[:2] == "./":
return fn[2:]
else:
return fn
def infer_keyspace_from_bundle(bundle_path: Optional[str]) -> Optional[str]:
"""
Given a bundle zipfile path, try to peek at its config.json
and extract a default keyspace name on it.
Do not raise errors: rather, silently return None instead.
"""
if bundle_path:
try:
bundle_zip = ZipFile(bundle_path)
open_config = bundle_zip.open("config.json")
ascii_lines = [line.decode() for line in open_config.readlines()]
config = json.loads("".join(ascii_lines))
return str(config["keyspace"]) if "keyspace" in config else None
except Exception:
return None
else:
return None
def bundle_path_to_init_string(
secure_bundle: str,
keyspace: Optional[str] = None,
token: Optional[str] = None,
tempfile_basedir: Optional[str] = None,
) -> str:
"""
Utility function. Make the provided bundle zip into a "init string".
1. Unzip the provided bundle to a temp dir
2. Read and acquire (selected) contents of the zip
3. prepare the big json bundle description
4. make the latter into a big "init string"
5. delete the temporary dir
"""
with ZipFile(secure_bundle) as zipfile:
base_dir = tempfile_basedir if tempfile_basedir else tempfile.gettempdir()
temp_dir = tempfile.mkdtemp(dir=base_dir)
try:
# read config.json and find the filenames to get
zipfile.extract(CONFIG_FILE_NAME, path=temp_dir)
filenames = [CONFIG_FILE_NAME]
config = json.load(open(os.path.join(temp_dir, CONFIG_FILE_NAME)))
filenames += [
_clean_filename(config.get(cfg_k, default))
for cfg_k, default in USED_CONFIG_KEYS_AND_DEFAULTS.items()
]
for filename in filenames:
zipfile.extract(filename, path=temp_dir)
#
bundle_data = {
filename: _encode_from_file(temp_dir, filename)
for filename in filenames
}
#
bundle_description = {
"bundle_data": bundle_data,
"options": {
"version": "1",
"bundle_file_name": os.path.split(secure_bundle)[1],
"keyspace": keyspace,
"token": token,
},
}
#
init_string = encode_str(
json.dumps(
bundle_description,
separators=(",", ":"),
)
)
return init_string
finally:
shutil.rmtree(temp_dir)
def init_string_to_bundle_path_and_options(
init_string: str, target_dir: str
) -> Tuple[str, Dict[str, Any]]:
"""
Make an init string back into a usable secure-connect-bundle.
Return
(bundle_path, options_dict)
where options_dict is any additional info found in the init string
beyond the bundle contents.
"""
bundle_description = json.loads(decode_str(init_string))
options_dict = bundle_description.get("options", {})
version = options_dict.get("version")
if version == "1":
bundle_data = bundle_description["bundle_data"]
bundle_file_name = options_dict.get(
"bundle_file_name", "secure-connect-bundle.zip"
)
bundle_filepath = os.path.join(target_dir, bundle_file_name)
with ZipFile(bundle_filepath, "w") as bundle_zip_file:
for f_name, f_contents in bundle_data.items():
bundle_zip_file.writestr(f_name, data=f_contents)
return bundle_filepath, options_dict
else:
raise ValueError(f"Init string has unsupported or unknown version {version}.")
def create_init_string_utility() -> None:
# Utility command-line: converts the full bundle path (argument)
# to an "init string" and prints it as an export line
import sys
if sys.argv[1:] == [] or len(sys.argv) > 4:
cmd = sys.argv[0]
print(f"Usage: {cmd} bundle_path [keyspace [token]]")
else:
bundle_path = sys.argv[1]
keyspace = None if len(sys.argv) < 3 else sys.argv[2]
token = None if len(sys.argv) < 4 else sys.argv[3]
print(f'- Input bundle path: "{bundle_path}".')
print(f"- Keyspace={'Y' if keyspace else 'N'}.")
print(f"- Token={'Y' if token else 'N'}.")
this_init_string = bundle_path_to_init_string(bundle_path, keyspace, token)
print("=> Export command:")
print(f'export ASTRA_DB_INIT_STRING="{this_init_string}"')
if __name__ == "__main__":
create_init_string_utility()