File size: 7,785 Bytes
0914710
 
 
0677d1d
0914710
bb6e1c0
 
 
00f8875
0677d1d
 
 
0914710
 
0677d1d
0914710
 
 
 
 
 
 
 
 
 
 
00f8875
 
 
 
 
0914710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a5e4002
 
0914710
 
 
0677d1d
0914710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a5e4002
 
0914710
 
 
0677d1d
0914710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0677d1d
0914710
 
 
 
 
 
0677d1d
0914710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0677d1d
0914710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c311b69
 
0677d1d
c311b69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""lambda helper functions"""
from typing import Dict

from lisa_on_cuda.utils import session_logger
from lisa_on_cuda.utils.app_helpers import get_cleaned_input
from xyzservices import providers, TileProvider

from samgis_lisa_on_zero import app_logger
from samgis_lisa_on_zero.io_package.coordinates_pixel_conversion import get_latlng_to_pixel_coordinates
from samgis_lisa_on_zero.utilities.constants import COMPLETE_URL_TILES_MAPBOX, COMPLETE_URL_TILES_NEXTZEN
from samgis_lisa_on_zero.utilities.type_hints import (
    ApiRequestBody, XYZTerrainProvidersNames, XYZDefaultProvidersNames, StringPromptApiRequestBody)


@session_logger.set_uuid_logging
def get_parsed_bbox_points_with_string_prompt(request_input: StringPromptApiRequestBody) -> Dict:
    """
        Parse the raw input request into bbox, prompt string and zoom

        Args:
            request_input: input dict

        Returns:
            dict with bounding box, prompt string and zoom
        """

    app_logger.info(f"try to parsing input request: {type(request_input)}, {request_input}...")
    if isinstance(request_input, str):
        app_logger.info(f"string/json input, parsing it to {type(StringPromptApiRequestBody)}...")
        request_input = StringPromptApiRequestBody.model_validate_json(request_input)
        app_logger.info(f"parsed input, now of type {type(request_input)}...")

    bbox = request_input.bbox
    app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
    ne = bbox.ne
    sw = bbox.sw
    app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
    app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
    ne_latlng = [float(ne.lat), float(ne.lng)]
    sw_latlng = [float(sw.lat), float(sw.lng)]
    new_zoom = int(request_input.zoom)
    cleaned_prompt = get_cleaned_input(request_input.string_prompt)

    app_logger.debug(f"bbox => {bbox}.")
    app_logger.debug(f'request_input-prompt cleaned => {cleaned_prompt}.')

    app_logger.info("unpacking elaborated request...")
    return {
        "bbox": [ne_latlng, sw_latlng],
        "prompt": cleaned_prompt,
        "zoom": new_zoom,
        "source": get_url_tile(request_input.source_type),
        "source_name": get_source_name(request_input.source_type)
    }


@session_logger.set_uuid_logging
def get_parsed_bbox_points_with_dictlist_prompt(request_input: ApiRequestBody) -> Dict:
    """
    Parse the raw input request into bbox, prompt and zoom

    Args:
        request_input: input dict

    Returns:
        dict with bounding box, prompt and zoom
    """

    app_logger.info(f"try to parsing input request {request_input}...")

    bbox = request_input.bbox
    app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
    ne = bbox.ne
    sw = bbox.sw
    app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
    app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
    ne_latlng = [float(ne.lat), float(ne.lng)]
    sw_latlng = [float(sw.lat), float(sw.lng)]
    new_zoom = int(request_input.zoom)
    new_prompt_list = _get_parsed_prompt_list(ne, sw, new_zoom, request_input.prompt)

    app_logger.debug(f"bbox => {bbox}.")
    app_logger.debug(f'request_input-prompt updated => {new_prompt_list}.')

    app_logger.info("unpacking elaborated request...")
    return {
        "bbox": [ne_latlng, sw_latlng],
        "prompt": new_prompt_list,
        "zoom": new_zoom,
        "source": get_url_tile(request_input.source_type),
        "source_name": get_source_name(request_input.source_type)
    }


@session_logger.set_uuid_logging
def _get_parsed_prompt_list(bbox_ne, bbox_sw, zoom, prompt_list):
    new_prompt_list = []
    for prompt in prompt_list:
        app_logger.debug(f"current prompt: {type(prompt)}, value:{prompt}.")
        new_prompt = {"type": prompt.type.value}
        if prompt.type == "point":
            new_prompt_data = _get_new_prompt_data_point(bbox_ne, bbox_sw, prompt, zoom)
            new_prompt["label"] = prompt.label.value
        elif prompt.type == "rectangle":
            new_prompt_data = _get_new_prompt_data_rectangle(bbox_ne, bbox_sw, prompt, zoom)
        else:
            msg = "Valid prompt type: 'point' or 'rectangle', not '{}'. Check ApiRequestBody parsing/validation."
            raise TypeError(msg.format(prompt.type))
        app_logger.debug(f"new_prompt_data: {type(new_prompt_data)}, value:{new_prompt_data}.")
        new_prompt["data"] = new_prompt_data
        new_prompt_list.append(new_prompt)
    return new_prompt_list


@session_logger.set_uuid_logging
def _get_new_prompt_data_point(bbox_ne, bbox_sw, prompt, zoom):
    current_point = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data, zoom, prompt.type)
    app_logger.debug(f"current prompt: {type(current_point)}, value:{current_point}, label: {prompt.label}.")
    return [current_point['x'], current_point['y']]


@session_logger.set_uuid_logging
def _get_new_prompt_data_rectangle(bbox_ne, bbox_sw, prompt, zoom):
    current_point_ne = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data.ne, zoom, prompt.type)
    app_logger.debug(
        f"rectangle:: current_point_ne prompt: {type(current_point_ne)}, value:{current_point_ne}.")
    current_point_sw = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data.sw, zoom, prompt.type)
    app_logger.debug(
        f"rectangle:: current_point_sw prompt: {type(current_point_sw)}, value:{current_point_sw}.")
    # correct order for rectangle prompt
    return [
        current_point_sw["x"],
        current_point_ne["y"],
        current_point_ne["x"],
        current_point_sw["y"]
    ]


mapbox_terrain_rgb = TileProvider(
    name=XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME,
    url=COMPLETE_URL_TILES_MAPBOX,
    attribution=""
)
nextzen_terrain_rgb = TileProvider(
    name=XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME,
    url=COMPLETE_URL_TILES_NEXTZEN,
    attribution=""
)


@session_logger.set_uuid_logging
def get_url_tile(source_type: str):
    try:
        match source_type.lower():
            case XYZDefaultProvidersNames.DEFAULT_TILES_NAME_SHORT:
                return providers.query_name(XYZDefaultProvidersNames.DEFAULT_TILES_NAME)
            case XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME:
                return mapbox_terrain_rgb
            case XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME:
                app_logger.info("nextzen_terrain_rgb:", nextzen_terrain_rgb)
                return nextzen_terrain_rgb
            case _:
                return providers.query_name(source_type)
    except ValueError as ve:
        from pydantic_core import ValidationError

        app_logger.error("ve:", str(ve))
        raise ValidationError(ve)


def check_source_type_is_terrain(source: str | TileProvider):
    return isinstance(source, TileProvider) and source.name in list(XYZTerrainProvidersNames)


@session_logger.set_uuid_logging
def get_source_name(source: str | TileProvider) -> str | bool:
    try:
        match source.lower():
            case XYZDefaultProvidersNames.DEFAULT_TILES_NAME_SHORT:
                source_output = providers.query_name(XYZDefaultProvidersNames.DEFAULT_TILES_NAME)
            case _:
                source_output = providers.query_name(source)
        if isinstance(source_output, str):
            return source_output
        try:
            source_dict = dict(source_output)
            app_logger.info(f"source_dict:{type(source_dict)}, {'name' in source_dict}, source_dict:{source_dict}.")
            return source_dict["name"]
        except KeyError as ke:
            app_logger.error(f"ke:{ke}.")
    except ValueError as ve:
        app_logger.info(f"source name::{source}, ve:{ve}.")
    app_logger.info(f"source name::{source}.")

    return False