File size: 5,293 Bytes
39b18be 66630b0 d10b65b d08fbc6 39b18be 0a1b314 d10b65b cc5f321 d10b65b 66630b0 39b18be 99f75f9 66630b0 d10b65b 99f75f9 39b18be d10b65b c5f8a6a d10b65b 39b18be 0a1b314 d10b65b d08fbc6 d10b65b cc5f321 d08fbc6 d10b65b cc5f321 d08fbc6 d10b65b d08fbc6 0a1b314 d10b65b 39b18be d10b65b 39b18be d10b65b cc5f321 d10b65b 9d5b4c0 39b18be 9d5b4c0 cc5f321 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
from itertools import zip_longest
from typing import Any, Dict, Generator, List, Optional
from .dict_utils import dict_get, dict_set
from .operator import InstanceOperator
from .operators import FieldOperator, StreamOperator
from .stream import Stream
from .utils import recursive_shallow_copy
class Dictify(FieldOperator):
with_keys: List[str]
def process_value(self, tup: Any) -> Any:
return dict(zip(self.with_keys, tup))
class Zip(InstanceOperator):
fields: List[str]
to_field: str
def zip(self, values):
return list(zip(*values))
def process(
self, instance: Dict[str, Any], stream_name: Optional[str] = None
) -> Dict[str, Any]:
values = []
for field in self.fields:
values.append(dict_get(instance, field))
dict_set(instance, self.to_field, self.zip(values))
return instance
class ZipLongest(Zip):
fields: List[str]
fill_value: Any = None
def zip(self, values):
return list(zip_longest(*values, fillvalue=self.fill_value))
class DictToTuplesList(FieldOperator):
def process_value(self, dic: Dict) -> Any:
return list(dic.items())
def flatten(container):
def _flat_gen(x):
for item in x:
if isinstance(item, (list, tuple)):
yield from _flat_gen(item)
else:
yield item
return type(container)(_flat_gen(container))
class Flatten(FieldOperator):
def process_value(self, value: Any) -> Any:
return flatten(value)
class Wrap(FieldOperator):
inside: str
def verify(self):
super().verify()
if self.inside not in ["list", "tuple", "set"]:
raise ValueError(
f"Wrap.inside support only types: [list, tuple, set], got {self.inside}"
)
def process_value(self, value: Any) -> Any:
if self.inside == "list":
return [value]
if self.inside == "tuple":
return (value,)
return {
value,
}
class Chunk(FieldOperator):
size: int
def process_value(self, collection: Any) -> Any:
return [
collection[i : i + self.size] for i in range(0, len(collection), self.size)
]
class Slice(FieldOperator):
start: Optional[int] = None
stop: Optional[int] = None
step: Optional[int] = None
def process_value(self, collection: Any) -> Any:
slicer = slice(self.start, self.stop, self.step)
return collection[slicer]
class Get(FieldOperator):
item: Any
def process_value(self, collection: Any) -> Any:
return collection[self.item]
class Pop(FieldOperator):
item: Any = None
def process_value(self, collection: Any) -> Any:
return collection.pop(self.item)
class DuplicateByList(StreamOperator):
field: str
to_field: Optional[str] = None
use_deep_copy: bool = False
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
to_field = self.field if self.to_field is None else self.to_field
for instance in stream:
elements = dict_get(instance, self.field)
for element in elements:
if self.use_deep_copy:
instance_copy = recursive_shallow_copy(instance)
else:
instance_copy = instance.copy()
dict_set(instance_copy, to_field, element)
yield instance_copy
class Explode(DuplicateByList):
pass
class DuplicateBySubLists(StreamOperator):
field: str
to_field: Optional[str] = None
use_deep_copy: bool = False
start: int = 1
end: int = 0
step: int = 1
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
to_field = self.field if self.to_field is None else self.to_field
for instance in stream:
elements = dict_get(instance, self.field)
end = len(elements) + 1 + self.end
for i in range(self.start, end, self.step):
if self.use_deep_copy:
instance_copy = recursive_shallow_copy(instance)
instance_copy[to_field] = elements[:i]
else:
instance_copy = {
**instance,
self.field: elements,
to_field: elements[:i],
}
yield instance_copy
class ExplodeSubLists(DuplicateBySubLists):
pass
class GetLength(FieldOperator):
def process_value(self, collection: Any) -> Any:
return len(collection)
class Filter(FieldOperator):
values: List[Any]
def process_value(self, collection: Any) -> Any:
# If collection is a list, tuple, or set
if isinstance(collection, (list, set, tuple)):
return type(collection)(
item for item in collection if item not in self.values
)
# If collection is a dictionary, filter by keys
if isinstance(collection, dict):
return {k: v for k, v in collection.items() if k not in self.values}
# If collection is of an unsupported type
raise TypeError(f"Unsupported collection type: {type(collection)}")
|