Spaces:
Runtime error
Runtime error
| import asyncio | |
| from typing import Any, Callable, Dict, Iterable | |
| from cassandra.cluster import ResponseFuture | |
| async def call_wrapped_async( | |
| func: Callable[..., ResponseFuture], *args: Any, **kwargs: Any | |
| ) -> Any: | |
| loop = asyncio.get_event_loop() | |
| asyncio_future = loop.create_future() | |
| response_future = func(*args, **kwargs) | |
| def success_handler(_: Any) -> None: | |
| loop.call_soon_threadsafe(asyncio_future.set_result, response_future.result()) | |
| def error_handler(exc: BaseException) -> None: | |
| loop.call_soon_threadsafe(asyncio_future.set_exception, exc) | |
| response_future.add_callbacks(success_handler, error_handler) | |
| return await asyncio_future | |
| def handle_multicolumn_unpacking( | |
| args_dict: Dict[str, Any], | |
| key_name: str, | |
| unpacked_keys: Iterable[str], | |
| ) -> Dict[str, Any]: | |
| """ | |
| Given a dictionary of "args", handle if necessary the replacement of one of its items | |
| with corresponding split-tuple version if the type structure requires it. | |
| So if unpacked_keys is == [key_name], do nothing. | |
| If the key_name is None, remove it from the mapping altogether. | |
| Returns the modified dictionary. | |
| Example: | |
| args_dict = {"k": (1, 20), "x": "..."} | |
| key_name = "k" | |
| unpacked_keys = ["k_0", "k_1"] | |
| results in | |
| {"k_0": 1, "k_1": 20, "x": "..."} | |
| Example: | |
| args_dict = {"k": "k_val", "x": "..."} | |
| key_name = "k" | |
| unpacked_keys = ["k"] | |
| results in (unchanged) | |
| {"k": "k_val", "x": "..."} | |
| Example: | |
| args_dict = {"x": "..."} | |
| args_dict = {"k": None, "x": "..."} | |
| both result in | |
| {"x": "..."} | |
| """ | |
| _unp_keys = list(unpacked_keys) | |
| if args_dict.get(key_name) is not None: | |
| if _unp_keys != [key_name]: | |
| # passing a longer tuple than the keys is meaningless: | |
| assert len(_unp_keys) >= len(args_dict[key_name]) | |
| # unpack the tuple | |
| split_part = { | |
| unp_k: tuple_v for unp_k, tuple_v in zip(_unp_keys, args_dict[key_name]) | |
| } | |
| else: | |
| split_part = {key_name: args_dict[key_name]} | |
| else: | |
| split_part = {} | |
| new_args_dict = { | |
| **split_part, | |
| **{k: v for k, v in args_dict.items() if k != key_name}, | |
| } | |
| return new_args_dict | |
| def handle_multicolumn_packing( | |
| unpacked_row: Dict[str, Any], | |
| key_name: str, | |
| unpacked_keys: Iterable[str], | |
| ) -> Dict[str, Any]: | |
| _unp_keys = list(unpacked_keys) | |
| if _unp_keys != [key_name]: | |
| packed_keys = {k: v for k, v in unpacked_row.items() if k in _unp_keys} | |
| if packed_keys == {}: | |
| return unpacked_row | |
| else: | |
| pk_tuple = tuple(packed_keys[pk_k] for pk_k in _unp_keys) | |
| packed_row_portion = { | |
| key_name: pk_tuple, | |
| } | |
| return { | |
| **packed_row_portion, | |
| **{k: v for k, v in unpacked_row.items() if k not in _unp_keys}, | |
| } | |
| else: | |
| return unpacked_row | |