Draken007's picture
Upload 7228 files
2a0bc63 verified
import asyncio
from typing import Any, Callable, Dict, Iterable
from cassandra.cluster import ResponseFuture
async def call_wrapped_async(
func: Callable[..., ResponseFuture], *args: Any, **kwargs: Any
) -> Any:
loop = asyncio.get_event_loop()
asyncio_future = loop.create_future()
response_future = func(*args, **kwargs)
def success_handler(_: Any) -> None:
loop.call_soon_threadsafe(asyncio_future.set_result, response_future.result())
def error_handler(exc: BaseException) -> None:
loop.call_soon_threadsafe(asyncio_future.set_exception, exc)
response_future.add_callbacks(success_handler, error_handler)
return await asyncio_future
def handle_multicolumn_unpacking(
args_dict: Dict[str, Any],
key_name: str,
unpacked_keys: Iterable[str],
) -> Dict[str, Any]:
"""
Given a dictionary of "args", handle if necessary the replacement of one of its items
with corresponding split-tuple version if the type structure requires it.
So if unpacked_keys is == [key_name], do nothing.
If the key_name is None, remove it from the mapping altogether.
Returns the modified dictionary.
Example:
args_dict = {"k": (1, 20), "x": "..."}
key_name = "k"
unpacked_keys = ["k_0", "k_1"]
results in
{"k_0": 1, "k_1": 20, "x": "..."}
Example:
args_dict = {"k": "k_val", "x": "..."}
key_name = "k"
unpacked_keys = ["k"]
results in (unchanged)
{"k": "k_val", "x": "..."}
Example:
args_dict = {"x": "..."}
args_dict = {"k": None, "x": "..."}
both result in
{"x": "..."}
"""
_unp_keys = list(unpacked_keys)
if args_dict.get(key_name) is not None:
if _unp_keys != [key_name]:
# passing a longer tuple than the keys is meaningless:
assert len(_unp_keys) >= len(args_dict[key_name])
# unpack the tuple
split_part = {
unp_k: tuple_v for unp_k, tuple_v in zip(_unp_keys, args_dict[key_name])
}
else:
split_part = {key_name: args_dict[key_name]}
else:
split_part = {}
new_args_dict = {
**split_part,
**{k: v for k, v in args_dict.items() if k != key_name},
}
return new_args_dict
def handle_multicolumn_packing(
unpacked_row: Dict[str, Any],
key_name: str,
unpacked_keys: Iterable[str],
) -> Dict[str, Any]:
_unp_keys = list(unpacked_keys)
if _unp_keys != [key_name]:
packed_keys = {k: v for k, v in unpacked_row.items() if k in _unp_keys}
if packed_keys == {}:
return unpacked_row
else:
pk_tuple = tuple(packed_keys[pk_k] for pk_k in _unp_keys)
packed_row_portion = {
key_name: pk_tuple,
}
return {
**packed_row_portion,
**{k: v for k, v in unpacked_row.items() if k not in _unp_keys},
}
else:
return unpacked_row