Spaces:
Building
Building
File size: 6,663 Bytes
4187c6f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
"""
Contains the filters used to filter out images from the Mapillary API.
"""
import inspect
import yaml
from datetime import datetime
from functools import partial
import numpy as np
import pandas as pd
import shapely
import shapely.geometry
from shapely.prepared import prep
from shapely import contains_xy
from .. import logger
def in_shape_filter(df: pd.DataFrame, geojson_shape):
polygon = shapely.geometry.shape(geojson_shape["features"][0]["geometry"])
mask = contains_xy(polygon, x=df["geometry.long"], y=df["geometry.lat"])
return mask
def value_range_filter(df: pd.DataFrame, key, from_v=None, to_v=None):
c = df[key]
if from_v is not None and to_v is not None:
if from_v == to_v:
return c == from_v
else:
return np.logical_and(c >= from_v, c <= to_v)
elif from_v is not None:
return c >= from_v
elif to_v is not None:
return c <= to_v
else:
raise Exception("from_v and to_v cannot both be None")
def value_in_list_filter(df: pd.DataFrame, key, lst, exclude=False):
mask = df[key].isin(lst)
if exclude:
mask = ~mask
return mask
def value_missing_filter(df: pd.DataFrame, keys):
return np.all(df[keys].notna(), axis=1)
def date_filter(df: pd.DataFrame, from_year=None, to_year=None):
"""
Args:
before_year: integer representing the year
after_year: integer representing the year
"""
if from_year is not None:
from_year = int(datetime(from_year, 1, 1).timestamp())*1e3
if to_year is not None:
to_year = int(datetime(to_year, 1, 1).timestamp())*1e3
return value_range_filter(df, "captured_at", from_year, to_year)
def quality_score_filter(df: pd.DataFrame, from_score=None, to_score=None):
return value_range_filter(df, "quality_score", from_v=from_score, to_v=to_score)
def angle_dist(a1, a2):
a = a1-a2
return np.abs((a + 180) % 360 - 180)
def angle_discrip_filter(df: pd.DataFrame, thresh, less_than=True):
"""
Args:
thresh: Threshold in degrees
"""
a1 = df["computed_compass_angle"]
a2 = df["compass_angle"]
diff = angle_dist(a1, a2)
if less_than:
return diff < thresh
else:
return diff > thresh
def haversine_np(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
All args must be of equal length.
"""
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2
c = 2 * np.arcsin(np.sqrt(a))
km = 6378.137 * c
return km*1e3
def loc_discrip_filter(df: pd.DataFrame, thresh, less_than=True):
"""
Args:
thresh: Threshold in meters
"""
lat1 = df["computed_geometry.lat"]
lon1 = df["computed_geometry.long"]
lat2 = df["geometry.lat"]
lon2 = df["geometry.long"]
diff = haversine_np(lon1, lat1, lon2, lat2)
if less_than:
return diff < thresh
else:
return diff > thresh
def sequence_sparsity_filter(df: pd.DataFrame, dist_thresh):
"""
TODO
This filter filters out images that are too close to each other within a sequence
"""
pass
class Filter():
def __init__(self, filter_func, name=None, **kwargs):
self.filter_func = filter_func
self.name = name
self.kwargs = kwargs
def __call__(self, df: pd.DataFrame):
return self.filter_func(df, **self.kwargs)
def __str__(self) -> str:
if self.name is None:
tag = self.filter_func.__name__
else:
tag = f"{self.filter_func.__name__}:{self.name}"
return tag
def __repr__(self):
kwargs_fmt = ", ".join([f"{k}={v}" for k,v in self.kwargs.items()])
return f"{self.__str__()} | kwargs({kwargs_fmt})"
class FilterPipeline():
def __init__(self, filters: list, sequential=True, name=None, verbose=True):
"""
Args:
sequential: Whether to apply filters sequentially or compute the masks
for all of them then apply once at the end.
verbose: Whether to log the effect of each filter or not
"""
self.filters = filters
self.sequential = sequential
self.name = name
self.verbose = verbose
def __call__(self, df: pd.DataFrame):
N = df.shape[0]
if not self.sequential:
running_mask = np.full(df.shape[0], True, dtype=bool)
for f in self.filters:
mask = f(df)
if self.verbose:
s = np.sum(mask)
logger.info(f"{f} keeps {s}/{mask.shape[0]} ({s/mask.shape[0]*100:.2f}%) of the images")
if self.sequential:
df = df[mask]
if df.shape[0] == 0:
logger.warn("No images left during filtering.. Stopping pipeline")
return df
else:
running_mask = np.logical_and(running_mask, mask)
if not self.sequential:
df = df[running_mask]
logger.info(f"Filter Pipeline {self.name} kept {df.shape[0]}/{N} ({df.shape[0]/N*100:.2f}%) of the images")
return df
def __str__(self):
return f"Pipeline {self.name}: " + "\n".join([str(x) for x in self.filters])
def __repr__(self):
return f"Pipeline {self.name}: " + "\n".join([repr(x) for x in self.filters])
@staticmethod
def load_from_yaml(file_path):
def is_primitive(x):
return isinstance(x, (float, int, bool, str))
with open(file_path, 'r') as stream:
pipeline_dict = yaml.safe_load(stream)["filter_pipeline"]
sig = inspect.signature(FilterPipeline.__init__)
init_args = dict()
for param in sig.parameters.values():
if param.name in pipeline_dict and is_primitive(pipeline_dict[param.name]):
init_args[param.name] = pipeline_dict[param.name]
filter_dicts = pipeline_dict["filters"]
filters = list()
for filter_dict in filter_dicts:
filter_func_name, kwargs = list(filter_dict.items())[0]
filter_func = globals()[filter_func_name]
filters.append(Filter(filter_func=filter_func, **kwargs))
pipeline = FilterPipeline(filters, **init_args)
return pipeline
if __name__ == "__main__":
FilterPipeline.load_from_yaml("mia/fpv/filter_pipelines/mia.yaml") |