File size: 7,785 Bytes
0914710 0677d1d 0914710 bb6e1c0 00f8875 0677d1d 0914710 0677d1d 0914710 00f8875 0914710 a5e4002 0914710 0677d1d 0914710 a5e4002 0914710 0677d1d 0914710 0677d1d 0914710 0677d1d 0914710 0677d1d 0914710 c311b69 0677d1d c311b69 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
"""lambda helper functions"""
from typing import Dict
from lisa_on_cuda.utils import session_logger
from lisa_on_cuda.utils.app_helpers import get_cleaned_input
from xyzservices import providers, TileProvider
from samgis_lisa_on_zero import app_logger
from samgis_lisa_on_zero.io_package.coordinates_pixel_conversion import get_latlng_to_pixel_coordinates
from samgis_lisa_on_zero.utilities.constants import COMPLETE_URL_TILES_MAPBOX, COMPLETE_URL_TILES_NEXTZEN
from samgis_lisa_on_zero.utilities.type_hints import (
ApiRequestBody, XYZTerrainProvidersNames, XYZDefaultProvidersNames, StringPromptApiRequestBody)
@session_logger.set_uuid_logging
def get_parsed_bbox_points_with_string_prompt(request_input: StringPromptApiRequestBody) -> Dict:
"""
Parse the raw input request into bbox, prompt string and zoom
Args:
request_input: input dict
Returns:
dict with bounding box, prompt string and zoom
"""
app_logger.info(f"try to parsing input request: {type(request_input)}, {request_input}...")
if isinstance(request_input, str):
app_logger.info(f"string/json input, parsing it to {type(StringPromptApiRequestBody)}...")
request_input = StringPromptApiRequestBody.model_validate_json(request_input)
app_logger.info(f"parsed input, now of type {type(request_input)}...")
bbox = request_input.bbox
app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
ne = bbox.ne
sw = bbox.sw
app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
ne_latlng = [float(ne.lat), float(ne.lng)]
sw_latlng = [float(sw.lat), float(sw.lng)]
new_zoom = int(request_input.zoom)
cleaned_prompt = get_cleaned_input(request_input.string_prompt)
app_logger.debug(f"bbox => {bbox}.")
app_logger.debug(f'request_input-prompt cleaned => {cleaned_prompt}.')
app_logger.info("unpacking elaborated request...")
return {
"bbox": [ne_latlng, sw_latlng],
"prompt": cleaned_prompt,
"zoom": new_zoom,
"source": get_url_tile(request_input.source_type),
"source_name": get_source_name(request_input.source_type)
}
@session_logger.set_uuid_logging
def get_parsed_bbox_points_with_dictlist_prompt(request_input: ApiRequestBody) -> Dict:
"""
Parse the raw input request into bbox, prompt and zoom
Args:
request_input: input dict
Returns:
dict with bounding box, prompt and zoom
"""
app_logger.info(f"try to parsing input request {request_input}...")
bbox = request_input.bbox
app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
ne = bbox.ne
sw = bbox.sw
app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
ne_latlng = [float(ne.lat), float(ne.lng)]
sw_latlng = [float(sw.lat), float(sw.lng)]
new_zoom = int(request_input.zoom)
new_prompt_list = _get_parsed_prompt_list(ne, sw, new_zoom, request_input.prompt)
app_logger.debug(f"bbox => {bbox}.")
app_logger.debug(f'request_input-prompt updated => {new_prompt_list}.')
app_logger.info("unpacking elaborated request...")
return {
"bbox": [ne_latlng, sw_latlng],
"prompt": new_prompt_list,
"zoom": new_zoom,
"source": get_url_tile(request_input.source_type),
"source_name": get_source_name(request_input.source_type)
}
@session_logger.set_uuid_logging
def _get_parsed_prompt_list(bbox_ne, bbox_sw, zoom, prompt_list):
new_prompt_list = []
for prompt in prompt_list:
app_logger.debug(f"current prompt: {type(prompt)}, value:{prompt}.")
new_prompt = {"type": prompt.type.value}
if prompt.type == "point":
new_prompt_data = _get_new_prompt_data_point(bbox_ne, bbox_sw, prompt, zoom)
new_prompt["label"] = prompt.label.value
elif prompt.type == "rectangle":
new_prompt_data = _get_new_prompt_data_rectangle(bbox_ne, bbox_sw, prompt, zoom)
else:
msg = "Valid prompt type: 'point' or 'rectangle', not '{}'. Check ApiRequestBody parsing/validation."
raise TypeError(msg.format(prompt.type))
app_logger.debug(f"new_prompt_data: {type(new_prompt_data)}, value:{new_prompt_data}.")
new_prompt["data"] = new_prompt_data
new_prompt_list.append(new_prompt)
return new_prompt_list
@session_logger.set_uuid_logging
def _get_new_prompt_data_point(bbox_ne, bbox_sw, prompt, zoom):
current_point = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data, zoom, prompt.type)
app_logger.debug(f"current prompt: {type(current_point)}, value:{current_point}, label: {prompt.label}.")
return [current_point['x'], current_point['y']]
@session_logger.set_uuid_logging
def _get_new_prompt_data_rectangle(bbox_ne, bbox_sw, prompt, zoom):
current_point_ne = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data.ne, zoom, prompt.type)
app_logger.debug(
f"rectangle:: current_point_ne prompt: {type(current_point_ne)}, value:{current_point_ne}.")
current_point_sw = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data.sw, zoom, prompt.type)
app_logger.debug(
f"rectangle:: current_point_sw prompt: {type(current_point_sw)}, value:{current_point_sw}.")
# correct order for rectangle prompt
return [
current_point_sw["x"],
current_point_ne["y"],
current_point_ne["x"],
current_point_sw["y"]
]
mapbox_terrain_rgb = TileProvider(
name=XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME,
url=COMPLETE_URL_TILES_MAPBOX,
attribution=""
)
nextzen_terrain_rgb = TileProvider(
name=XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME,
url=COMPLETE_URL_TILES_NEXTZEN,
attribution=""
)
@session_logger.set_uuid_logging
def get_url_tile(source_type: str):
try:
match source_type.lower():
case XYZDefaultProvidersNames.DEFAULT_TILES_NAME_SHORT:
return providers.query_name(XYZDefaultProvidersNames.DEFAULT_TILES_NAME)
case XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME:
return mapbox_terrain_rgb
case XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME:
app_logger.info("nextzen_terrain_rgb:", nextzen_terrain_rgb)
return nextzen_terrain_rgb
case _:
return providers.query_name(source_type)
except ValueError as ve:
from pydantic_core import ValidationError
app_logger.error("ve:", str(ve))
raise ValidationError(ve)
def check_source_type_is_terrain(source: str | TileProvider):
return isinstance(source, TileProvider) and source.name in list(XYZTerrainProvidersNames)
@session_logger.set_uuid_logging
def get_source_name(source: str | TileProvider) -> str | bool:
try:
match source.lower():
case XYZDefaultProvidersNames.DEFAULT_TILES_NAME_SHORT:
source_output = providers.query_name(XYZDefaultProvidersNames.DEFAULT_TILES_NAME)
case _:
source_output = providers.query_name(source)
if isinstance(source_output, str):
return source_output
try:
source_dict = dict(source_output)
app_logger.info(f"source_dict:{type(source_dict)}, {'name' in source_dict}, source_dict:{source_dict}.")
return source_dict["name"]
except KeyError as ke:
app_logger.error(f"ke:{ke}.")
except ValueError as ve:
app_logger.info(f"source name::{source}, ve:{ve}.")
app_logger.info(f"source name::{source}.")
return False
|