File size: 5,293 Bytes
39b18be
66630b0
d10b65b
d08fbc6
39b18be
0a1b314
d10b65b
cc5f321
d10b65b
 
 
 
 
 
 
 
66630b0
39b18be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99f75f9
66630b0
 
d10b65b
99f75f9
39b18be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d10b65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5f8a6a
 
 
 
 
 
 
 
 
d10b65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39b18be
 
 
 
 
 
 
0a1b314
d10b65b
 
 
 
 
 
 
d08fbc6
d10b65b
 
cc5f321
d08fbc6
d10b65b
cc5f321
d08fbc6
d10b65b
 
 
d08fbc6
 
 
 
0a1b314
d10b65b
 
 
39b18be
 
 
d10b65b
 
 
 
39b18be
 
 
d10b65b
cc5f321
d10b65b
 
 
 
 
 
 
 
9d5b4c0
 
39b18be
 
 
 
9d5b4c0
 
 
cc5f321
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from itertools import zip_longest
from typing import Any, Dict, Generator, List, Optional

from .dict_utils import dict_get, dict_set
from .operator import InstanceOperator
from .operators import FieldOperator, StreamOperator
from .stream import Stream
from .utils import recursive_shallow_copy


class Dictify(FieldOperator):
    with_keys: List[str]

    def process_value(self, tup: Any) -> Any:
        return dict(zip(self.with_keys, tup))


class Zip(InstanceOperator):
    fields: List[str]
    to_field: str

    def zip(self, values):
        return list(zip(*values))

    def process(
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
    ) -> Dict[str, Any]:
        values = []
        for field in self.fields:
            values.append(dict_get(instance, field))
        dict_set(instance, self.to_field, self.zip(values))
        return instance


class ZipLongest(Zip):
    fields: List[str]
    fill_value: Any = None

    def zip(self, values):
        return list(zip_longest(*values, fillvalue=self.fill_value))


class DictToTuplesList(FieldOperator):
    def process_value(self, dic: Dict) -> Any:
        return list(dic.items())


def flatten(container):
    def _flat_gen(x):
        for item in x:
            if isinstance(item, (list, tuple)):
                yield from _flat_gen(item)
            else:
                yield item

    return type(container)(_flat_gen(container))


class Flatten(FieldOperator):
    def process_value(self, value: Any) -> Any:
        return flatten(value)


class Wrap(FieldOperator):
    inside: str

    def verify(self):
        super().verify()
        if self.inside not in ["list", "tuple", "set"]:
            raise ValueError(
                f"Wrap.inside support only types: [list, tuple, set], got {self.inside}"
            )

    def process_value(self, value: Any) -> Any:
        if self.inside == "list":
            return [value]
        if self.inside == "tuple":
            return (value,)
        return {
            value,
        }


class Chunk(FieldOperator):
    size: int

    def process_value(self, collection: Any) -> Any:
        return [
            collection[i : i + self.size] for i in range(0, len(collection), self.size)
        ]


class Slice(FieldOperator):
    start: Optional[int] = None
    stop: Optional[int] = None
    step: Optional[int] = None

    def process_value(self, collection: Any) -> Any:
        slicer = slice(self.start, self.stop, self.step)
        return collection[slicer]


class Get(FieldOperator):
    item: Any

    def process_value(self, collection: Any) -> Any:
        return collection[self.item]


class Pop(FieldOperator):
    item: Any = None

    def process_value(self, collection: Any) -> Any:
        return collection.pop(self.item)


class DuplicateByList(StreamOperator):
    field: str
    to_field: Optional[str] = None
    use_deep_copy: bool = False

    def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
        to_field = self.field if self.to_field is None else self.to_field
        for instance in stream:
            elements = dict_get(instance, self.field)
            for element in elements:
                if self.use_deep_copy:
                    instance_copy = recursive_shallow_copy(instance)

                else:
                    instance_copy = instance.copy()
                dict_set(instance_copy, to_field, element)
                yield instance_copy


class Explode(DuplicateByList):
    pass


class DuplicateBySubLists(StreamOperator):
    field: str
    to_field: Optional[str] = None
    use_deep_copy: bool = False
    start: int = 1
    end: int = 0
    step: int = 1

    def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
        to_field = self.field if self.to_field is None else self.to_field
        for instance in stream:
            elements = dict_get(instance, self.field)
            end = len(elements) + 1 + self.end
            for i in range(self.start, end, self.step):
                if self.use_deep_copy:
                    instance_copy = recursive_shallow_copy(instance)
                    instance_copy[to_field] = elements[:i]
                else:
                    instance_copy = {
                        **instance,
                        self.field: elements,
                        to_field: elements[:i],
                    }
                yield instance_copy


class ExplodeSubLists(DuplicateBySubLists):
    pass


class GetLength(FieldOperator):
    def process_value(self, collection: Any) -> Any:
        return len(collection)


class Filter(FieldOperator):
    values: List[Any]

    def process_value(self, collection: Any) -> Any:
        # If collection is a list, tuple, or set
        if isinstance(collection, (list, set, tuple)):
            return type(collection)(
                item for item in collection if item not in self.values
            )

        # If collection is a dictionary, filter by keys
        if isinstance(collection, dict):
            return {k: v for k, v in collection.items() if k not in self.values}

        # If collection is of an unsupported type
        raise TypeError(f"Unsupported collection type: {type(collection)}")