File size: 4,409 Bytes
243f395
f2a79fa
3dbe011
7d6e00c
 
150cbc9
c6054f0
7d6e00c
f2a79fa
 
7d6e00c
 
 
da5737b
243f395
7d6e00c
 
 
 
 
 
 
28ea6eb
7d6e00c
 
 
 
 
 
c6054f0
 
7d6e00c
 
 
 
 
f2a79fa
 
7d6e00c
f2a79fa
 
 
 
 
 
3dbe011
7d6e00c
f2a79fa
 
7d6e00c
f2a79fa
 
7d6e00c
 
 
 
150cbc9
6f1250c
7d6e00c
 
8c1934b
150cbc9
7d6e00c
150cbc9
f2a79fa
 
7d6e00c
 
 
 
 
 
 
 
 
 
3dbe011
7d6e00c
f2a79fa
6f1250c
 
7d6e00c
6f1250c
c6054f0
7d6e00c
 
da5737b
 
 
 
7d6e00c
da5737b
 
7d6e00c
da5737b
 
6f1250c
7d6e00c
6f1250c
da5737b
7d6e00c
f2a79fa
 
7d6e00c
 
 
 
 
 
 
 
 
 
 
f2a79fa
7d6e00c
28ea6eb
c6054f0
7d6e00c
28ea6eb
f2a79fa
7d6e00c
f2a79fa
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import json
import time
from http import HTTPStatus
from typing import Dict, List

from aws_lambda_powertools.event_handler import content_types
from aws_lambda_powertools.utilities.typing import LambdaContext
from geojson_pydantic import FeatureCollection, Feature, Polygon
from pydantic import BaseModel, ValidationError

from src import app_logger
from src.prediction_api.predictor import base_predict
from src.utilities.constants import CUSTOM_RESPONSE_MESSAGES, MODEL_NAME, ZOOM
from src.utilities.utilities import base64_decode

PolygonFeatureCollectionModel = FeatureCollection[Feature[Polygon, Dict]]


class LatLngTupleLeaflet(BaseModel):
    lat: float
    lng: float


class RequestBody(BaseModel):
    ne: LatLngTupleLeaflet
    sw: LatLngTupleLeaflet
    points: List[LatLngTupleLeaflet]
    model: str = MODEL_NAME
    zoom: float = ZOOM


class ResponseBody(BaseModel):
    geojson: Dict = None
    request_id: str
    duration_run: float
    message: str


def get_response(status: int, start_time: float, request_id: str, response_body: ResponseBody = None) -> str:
    """
    Return a response for frontend clients.

    Args:
        status: status response
        start_time: request start time (float)
        request_id: str
        response_body: dict we embed into our response

    Returns:
        str: json response

    """
    response_body.duration_run = time.time() - start_time
    response_body.message = CUSTOM_RESPONSE_MESSAGES[status]
    response_body.request_id = request_id

    response = {
        "statusCode": status,
        "header": {"Content-Type": content_types.APPLICATION_JSON},
        "body": response_body.model_dump_json(),
        "isBase64Encoded": False
    }
    app_logger.info(f"response type:{type(response)} => {response}.")
    return json.dumps(response)


def get_parsed_bbox_points(request_input: RequestBody) -> Dict:
    return {
        "bbox": [
            request_input.ne.lat, request_input.sw.lat,
            request_input.ne.lng, request_input.sw.lng
        ],
        "points": [[p.lat, p.lng] for p in request_input.points]
    }


def lambda_handler(event: dict, context: LambdaContext):
    app_logger.info(f"start with aws_request_id:{context.aws_request_id}.")
    start_time = time.time()

    if "version" in event:
        app_logger.info(f"event version: {event['version']}.")

    try:
        app_logger.info(f"event:{json.dumps(event)}...")
        app_logger.info(f"context:{context}...")

        try:
            body = event["body"]
        except Exception as e_constants1:
            app_logger.error(f"e_constants1:{e_constants1}.")
            body = event

        app_logger.info(f"body: {type(body)}, {body}...")

        if isinstance(body, str):
            body_decoded_str = base64_decode(body)
            app_logger.info(f"body_decoded_str: {type(body_decoded_str)}, {body_decoded_str}...")
            body = json.loads(body_decoded_str)

        app_logger.info(f"body:{body}...")

        try:
            model_name = body["model"] if "model" in body else MODEL_NAME
            zoom = body["zoom"] if "zoom" in body else ZOOM
            body_request_validated = RequestBody(ne=body["ne"], sw=body["sw"], points=body["points"], model=model_name, zoom=zoom)
            body_request = get_parsed_bbox_points(body_request_validated)
            app_logger.info(f"validation ok - body_request:{body_request}, starting prediction...")
            output_geojson_dict = base_predict(bbox=body_request["bbox"], model_name=body_request_validated["model"], zoom=body_request_validated["zoom"])

            # raise ValidationError in case this is not a valid geojson by GeoJSON specification rfc7946
            PolygonFeatureCollectionModel(**output_geojson_dict)
            body_response = ResponseBody(geojson=output_geojson_dict)
            response = get_response(HTTPStatus.OK.value, start_time, context.aws_request_id, body_response)
        except ValidationError as ve:
            app_logger.error(f"validation error:{ve}.")
            response = get_response(HTTPStatus.UNPROCESSABLE_ENTITY.value, start_time, context.aws_request_id)
    except Exception as e:
        app_logger.error(f"exception:{e}.")
        response = get_response(HTTPStatus.INTERNAL_SERVER_ERROR.value, start_time, context.aws_request_id)

    app_logger.info(f"response_dumped:{response}...")
    return response