File size: 3,026 Bytes
2a0bc63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import asyncio
from typing import Any, Callable, Dict, Iterable

from cassandra.cluster import ResponseFuture


async def call_wrapped_async(
    func: Callable[..., ResponseFuture], *args: Any, **kwargs: Any
) -> Any:
    loop = asyncio.get_event_loop()
    asyncio_future = loop.create_future()
    response_future = func(*args, **kwargs)

    def success_handler(_: Any) -> None:
        loop.call_soon_threadsafe(asyncio_future.set_result, response_future.result())

    def error_handler(exc: BaseException) -> None:
        loop.call_soon_threadsafe(asyncio_future.set_exception, exc)

    response_future.add_callbacks(success_handler, error_handler)
    return await asyncio_future


def handle_multicolumn_unpacking(
    args_dict: Dict[str, Any],
    key_name: str,
    unpacked_keys: Iterable[str],
) -> Dict[str, Any]:
    """
    Given a dictionary of "args", handle if necessary the replacement of one of its items
    with corresponding split-tuple version if the type structure requires it.
    So if unpacked_keys is == [key_name], do nothing.
    If the key_name is None, remove it from the mapping altogether.
    Returns the modified dictionary.

    Example:
        args_dict = {"k": (1, 20), "x": "..."}
        key_name = "k"
        unpacked_keys = ["k_0", "k_1"]
    results in
        {"k_0": 1, "k_1": 20, "x": "..."}

    Example:
        args_dict = {"k": "k_val", "x": "..."}
        key_name = "k"
        unpacked_keys = ["k"]
    results in (unchanged)
        {"k": "k_val", "x": "..."}

    Example:
        args_dict = {"x": "..."}
        args_dict = {"k": None, "x": "..."}
    both result in
        {"x": "..."}
    """
    _unp_keys = list(unpacked_keys)
    if args_dict.get(key_name) is not None:
        if _unp_keys != [key_name]:
            # passing a longer tuple than the keys is meaningless:
            assert len(_unp_keys) >= len(args_dict[key_name])
            # unpack the tuple
            split_part = {
                unp_k: tuple_v for unp_k, tuple_v in zip(_unp_keys, args_dict[key_name])
            }
        else:
            split_part = {key_name: args_dict[key_name]}
    else:
        split_part = {}

    new_args_dict = {
        **split_part,
        **{k: v for k, v in args_dict.items() if k != key_name},
    }
    return new_args_dict


def handle_multicolumn_packing(
    unpacked_row: Dict[str, Any],
    key_name: str,
    unpacked_keys: Iterable[str],
) -> Dict[str, Any]:
    _unp_keys = list(unpacked_keys)
    if _unp_keys != [key_name]:
        packed_keys = {k: v for k, v in unpacked_row.items() if k in _unp_keys}
        if packed_keys == {}:
            return unpacked_row
        else:
            pk_tuple = tuple(packed_keys[pk_k] for pk_k in _unp_keys)
            packed_row_portion = {
                key_name: pk_tuple,
            }
            return {
                **packed_row_portion,
                **{k: v for k, v in unpacked_row.items() if k not in _unp_keys},
            }
    else:
        return unpacked_row