Spaces:
Running
Running
import asyncio | |
from typing import Any, Callable, Dict, Iterable | |
from cassandra.cluster import ResponseFuture | |
async def call_wrapped_async( | |
func: Callable[..., ResponseFuture], *args: Any, **kwargs: Any | |
) -> Any: | |
loop = asyncio.get_event_loop() | |
asyncio_future = loop.create_future() | |
response_future = func(*args, **kwargs) | |
def success_handler(_: Any) -> None: | |
loop.call_soon_threadsafe(asyncio_future.set_result, response_future.result()) | |
def error_handler(exc: BaseException) -> None: | |
loop.call_soon_threadsafe(asyncio_future.set_exception, exc) | |
response_future.add_callbacks(success_handler, error_handler) | |
return await asyncio_future | |
def handle_multicolumn_unpacking( | |
args_dict: Dict[str, Any], | |
key_name: str, | |
unpacked_keys: Iterable[str], | |
) -> Dict[str, Any]: | |
""" | |
Given a dictionary of "args", handle if necessary the replacement of one of its items | |
with corresponding split-tuple version if the type structure requires it. | |
So if unpacked_keys is == [key_name], do nothing. | |
If the key_name is None, remove it from the mapping altogether. | |
Returns the modified dictionary. | |
Example: | |
args_dict = {"k": (1, 20), "x": "..."} | |
key_name = "k" | |
unpacked_keys = ["k_0", "k_1"] | |
results in | |
{"k_0": 1, "k_1": 20, "x": "..."} | |
Example: | |
args_dict = {"k": "k_val", "x": "..."} | |
key_name = "k" | |
unpacked_keys = ["k"] | |
results in (unchanged) | |
{"k": "k_val", "x": "..."} | |
Example: | |
args_dict = {"x": "..."} | |
args_dict = {"k": None, "x": "..."} | |
both result in | |
{"x": "..."} | |
""" | |
_unp_keys = list(unpacked_keys) | |
if args_dict.get(key_name) is not None: | |
if _unp_keys != [key_name]: | |
# passing a longer tuple than the keys is meaningless: | |
assert len(_unp_keys) >= len(args_dict[key_name]) | |
# unpack the tuple | |
split_part = { | |
unp_k: tuple_v for unp_k, tuple_v in zip(_unp_keys, args_dict[key_name]) | |
} | |
else: | |
split_part = {key_name: args_dict[key_name]} | |
else: | |
split_part = {} | |
new_args_dict = { | |
**split_part, | |
**{k: v for k, v in args_dict.items() if k != key_name}, | |
} | |
return new_args_dict | |
def handle_multicolumn_packing( | |
unpacked_row: Dict[str, Any], | |
key_name: str, | |
unpacked_keys: Iterable[str], | |
) -> Dict[str, Any]: | |
_unp_keys = list(unpacked_keys) | |
if _unp_keys != [key_name]: | |
packed_keys = {k: v for k, v in unpacked_row.items() if k in _unp_keys} | |
if packed_keys == {}: | |
return unpacked_row | |
else: | |
pk_tuple = tuple(packed_keys[pk_k] for pk_k in _unp_keys) | |
packed_row_portion = { | |
key_name: pk_tuple, | |
} | |
return { | |
**packed_row_portion, | |
**{k: v for k, v in unpacked_row.items() if k not in _unp_keys}, | |
} | |
else: | |
return unpacked_row | |