samgis / src /app.py
aletrn's picture
[feat] re-add samgeo with segment-anything-fast
7d6e00c
raw
history blame
4.41 kB
import json
import time
from http import HTTPStatus
from typing import Dict, List
from aws_lambda_powertools.event_handler import content_types
from aws_lambda_powertools.utilities.typing import LambdaContext
from geojson_pydantic import FeatureCollection, Feature, Polygon
from pydantic import BaseModel, ValidationError
from src import app_logger
from src.prediction_api.predictor import base_predict
from src.utilities.constants import CUSTOM_RESPONSE_MESSAGES, MODEL_NAME, ZOOM
from src.utilities.utilities import base64_decode
PolygonFeatureCollectionModel = FeatureCollection[Feature[Polygon, Dict]]
class LatLngTupleLeaflet(BaseModel):
lat: float
lng: float
class RequestBody(BaseModel):
ne: LatLngTupleLeaflet
sw: LatLngTupleLeaflet
points: List[LatLngTupleLeaflet]
model: str = MODEL_NAME
zoom: float = ZOOM
class ResponseBody(BaseModel):
geojson: Dict = None
request_id: str
duration_run: float
message: str
def get_response(status: int, start_time: float, request_id: str, response_body: ResponseBody = None) -> str:
"""
Return a response for frontend clients.
Args:
status: status response
start_time: request start time (float)
request_id: str
response_body: dict we embed into our response
Returns:
str: json response
"""
response_body.duration_run = time.time() - start_time
response_body.message = CUSTOM_RESPONSE_MESSAGES[status]
response_body.request_id = request_id
response = {
"statusCode": status,
"header": {"Content-Type": content_types.APPLICATION_JSON},
"body": response_body.model_dump_json(),
"isBase64Encoded": False
}
app_logger.info(f"response type:{type(response)} => {response}.")
return json.dumps(response)
def get_parsed_bbox_points(request_input: RequestBody) -> Dict:
return {
"bbox": [
request_input.ne.lat, request_input.sw.lat,
request_input.ne.lng, request_input.sw.lng
],
"points": [[p.lat, p.lng] for p in request_input.points]
}
def lambda_handler(event: dict, context: LambdaContext):
app_logger.info(f"start with aws_request_id:{context.aws_request_id}.")
start_time = time.time()
if "version" in event:
app_logger.info(f"event version: {event['version']}.")
try:
app_logger.info(f"event:{json.dumps(event)}...")
app_logger.info(f"context:{context}...")
try:
body = event["body"]
except Exception as e_constants1:
app_logger.error(f"e_constants1:{e_constants1}.")
body = event
app_logger.info(f"body: {type(body)}, {body}...")
if isinstance(body, str):
body_decoded_str = base64_decode(body)
app_logger.info(f"body_decoded_str: {type(body_decoded_str)}, {body_decoded_str}...")
body = json.loads(body_decoded_str)
app_logger.info(f"body:{body}...")
try:
model_name = body["model"] if "model" in body else MODEL_NAME
zoom = body["zoom"] if "zoom" in body else ZOOM
body_request_validated = RequestBody(ne=body["ne"], sw=body["sw"], points=body["points"], model=model_name, zoom=zoom)
body_request = get_parsed_bbox_points(body_request_validated)
app_logger.info(f"validation ok - body_request:{body_request}, starting prediction...")
output_geojson_dict = base_predict(bbox=body_request["bbox"], model_name=body_request_validated["model"], zoom=body_request_validated["zoom"])
# raise ValidationError in case this is not a valid geojson by GeoJSON specification rfc7946
PolygonFeatureCollectionModel(**output_geojson_dict)
body_response = ResponseBody(geojson=output_geojson_dict)
response = get_response(HTTPStatus.OK.value, start_time, context.aws_request_id, body_response)
except ValidationError as ve:
app_logger.error(f"validation error:{ve}.")
response = get_response(HTTPStatus.UNPROCESSABLE_ENTITY.value, start_time, context.aws_request_id)
except Exception as e:
app_logger.error(f"exception:{e}.")
response = get_response(HTTPStatus.INTERNAL_SERVER_ERROR.value, start_time, context.aws_request_id)
app_logger.info(f"response_dumped:{response}...")
return response