File size: 3,148 Bytes
d10b65b
 
0a1b314
d10b65b
9d5b4c0
d10b65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5f8a6a
 
 
 
 
 
 
 
 
d10b65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0a1b314
d10b65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0a1b314
d10b65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9d5b4c0
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from typing import Any, Generator, List, Optional

from .operators import FieldOperator, StreamOperator
from .stream import Stream
from .utils import deepcopy


class Dictify(FieldOperator):
    with_keys: List[str]

    def process_value(self, tup: Any) -> Any:
        return dict(zip(self.with_keys, tup))


class Wrap(FieldOperator):
    inside: str

    def verify(self):
        super().verify()
        if self.inside not in ["list", "tuple", "set"]:
            raise ValueError(
                f"Wrap.inside support only types: [list, tuple, set], got {self.inside}"
            )

    def process_value(self, value: Any) -> Any:
        if self.inside == "list":
            return [value]
        if self.inside == "tuple":
            return (value,)
        return {
            value,
        }


class Chunk(FieldOperator):
    size: int

    def process_value(self, collection: Any) -> Any:
        return [
            collection[i : i + self.size] for i in range(0, len(collection), self.size)
        ]


class Slice(FieldOperator):
    start: Optional[int] = None
    stop: Optional[int] = None
    step: Optional[int] = None

    def process_value(self, collection: Any) -> Any:
        slicer = slice(self.start, self.stop, self.step)
        return collection[slicer]


class Get(FieldOperator):
    item: Any

    def process_value(self, collection: Any) -> Any:
        return collection[self.item]


class DuplicateByList(StreamOperator):
    field: str
    to_field: Optional[str] = None
    use_deep_copy: bool = False

    def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
        to_field = self.field if self.to_field is None else self.to_field
        for instance in stream:
            elements = instance[self.field]
            for element in elements:
                if self.use_deep_copy:
                    instance_copy = deepcopy(instance)
                    instance_copy[to_field] = element
                else:
                    instance_copy = {
                        **instance,
                        self.field: elements,
                        to_field: element,
                    }
                yield instance_copy


class DuplicateBySubLists(StreamOperator):
    field: str
    to_field: Optional[str] = None
    use_deep_copy: bool = False

    def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
        to_field = self.field if self.to_field is None else self.to_field
        for instance in stream:
            elements = instance[self.field]
            for i in range(1, len(elements) + 1):
                if self.use_deep_copy:
                    instance_copy = deepcopy(instance)
                    instance_copy[to_field] = elements[:i]
                else:
                    instance_copy = {
                        **instance,
                        self.field: elements,
                        to_field: elements[:i],
                    }
                yield instance_copy


class GetLength(FieldOperator):
    def process_value(self, collection: Any) -> Any:
        return len(collection)