|
"""Various utilities (logger, time benchmark, args dump, numerical and stats info)""" |
|
import numpy as np |
|
|
|
from src import app_logger |
|
from src.utilities.type_hints import ts_float64_1, ts_float64_2 |
|
|
|
|
|
def is_base64(sb): |
|
import base64 |
|
|
|
try: |
|
if isinstance(sb, str): |
|
|
|
sb_bytes = bytes(sb, 'ascii') |
|
elif isinstance(sb, bytes): |
|
sb_bytes = sb |
|
else: |
|
raise ValueError("Argument must be string or bytes") |
|
return base64.b64encode(base64.b64decode(sb_bytes, validate=True)) == sb_bytes |
|
except ValueError: |
|
return False |
|
|
|
|
|
def get_system_info(): |
|
import multiprocessing |
|
import torch.multiprocessing as mp |
|
import os |
|
import subprocess |
|
|
|
app_logger.info(f"mp::cpu_count:{mp.cpu_count()}.") |
|
app_logger.info(f"multiprocessing::cpu_count:{multiprocessing.cpu_count()}.") |
|
app_logger.info(f"os::cpu_count:{os.cpu_count()}") |
|
app_logger.info(f"os::sched_getaffinity:{len(os.sched_getaffinity(0))}") |
|
lscpu_output = subprocess.run("/usr/bin/lscpu", capture_output=True) |
|
app_logger.info(f"lscpu:{lscpu_output.stdout.decode('utf-8')}.") |
|
free_mem_output = subprocess.run(["/usr/bin/free", "-m"], capture_output=True) |
|
app_logger.info(f"free_mem_output:{free_mem_output.stdout.decode('utf-8')}.") |
|
|
|
|
|
def base64_decode(s): |
|
import base64 |
|
|
|
if isinstance(s, str) and is_base64(s): |
|
return base64.b64decode(s, validate=True).decode("utf-8") |
|
|
|
return s |
|
|
|
|
|
def get_constants(event: dict, debug=False) -> dict: |
|
""" |
|
Return constants we need to use from event, context and environment variables (both production and test). |
|
|
|
Args: |
|
event: request event |
|
debug: logging debug argument |
|
|
|
Returns: |
|
dict: project constants object |
|
|
|
""" |
|
import json |
|
|
|
try: |
|
body = event["body"] |
|
except Exception as e_constants1: |
|
app_logger.error(f"e_constants1:{e_constants1}.") |
|
body = event |
|
|
|
if isinstance(body, str): |
|
body = json.loads(event["body"]) |
|
|
|
try: |
|
debug = body["debug"] |
|
app_logger.info(f"re-try get debug value:{debug}, log_level:{app_logger.level}.") |
|
except KeyError: |
|
app_logger.error("get_constants:: no debug key, pass...") |
|
app_logger.info(f"constants debug:{debug}, log_level:{app_logger.level}, body:{body}.") |
|
|
|
try: |
|
return { |
|
"bbox": body["bbox"], |
|
"point": body["point"], |
|
"debug": debug |
|
} |
|
except KeyError as e_key_constants2: |
|
app_logger.error(f"e_key_constants2:{e_key_constants2}.") |
|
raise KeyError(f"e_key_constants2:{e_key_constants2}.") |
|
except Exception as e_constants2: |
|
app_logger.error(f"e_constants2:{e_constants2}.") |
|
raise e_constants2 |
|
|
|
|
|
def get_rasters_info(rasters_list:list, names_list:list, title:str="", debug:bool=False) -> str: |
|
""" |
|
Analyze numpy arrays' list to extract a string containing some useful information. For every raster: |
|
|
|
- type of raster |
|
- raster.dtype if that's instance of np.ndarray |
|
- raster shape |
|
- min of raster value, over all axis (flattening the array) |
|
- max of raster value, over all axis (flattening the array) |
|
- mean of raster value, over all axis (flattening the array) |
|
- median of raster value, over all axis (flattening the array) |
|
- standard deviation of raster value, over all axis (flattening the array) |
|
- variance of raster value, over all axis (flattening the array) |
|
|
|
Raises: |
|
ValueError if raster_list and names_list have a different number of elements |
|
|
|
Args: |
|
rasters_list: list of numpy array raster to analyze |
|
names_list: string list of numpy array |
|
title: title of current analytic session |
|
debug: logging debug argument |
|
|
|
Returns: |
|
str: the collected information |
|
|
|
""" |
|
|
|
msg = f"get_rasters_info::title:{title},\n" |
|
if not len(rasters_list) == len(names_list): |
|
msg = "raster_list and names_list should have the same number of elements:\n" |
|
msg += f"len(rasters_list):{len(rasters_list)}, len(names_list):{len(names_list)}." |
|
raise ValueError(msg) |
|
try: |
|
for raster, name in zip(rasters_list, names_list): |
|
try: |
|
if isinstance(raster, np.ndarray): |
|
shape_or_len = raster.shape |
|
elif isinstance(raster, list): |
|
shape_or_len = len(raster) |
|
else: |
|
raise ValueError(f"wrong argument type:{raster}, variable:{raster}.") |
|
zmin, zmax, zmean, zmedian, zstd, zvar = get_stats_raster(raster, debug=debug) |
|
msg += "name:{}:type:{},dtype:{},shape:{},min:{},max:{},mean:{},median:{},std:{},var:{}\n".format( |
|
name, type(raster), raster.dtype if isinstance(raster, np.ndarray) else None, shape_or_len, zmin, |
|
zmax, zmean, zmedian, zstd, zvar |
|
) |
|
except Exception as get_rasters_types_e: |
|
msg = f"get_rasters_types_e::{get_rasters_types_e}, type_raster:{type(raster)}." |
|
app_logger.error(msg) |
|
raise ValueError(msg) |
|
except IndexError as get_rasters_types_ie: |
|
app_logger.error(f"get_rasters_types::len:rasters_list:{len(rasters_list)}, len_names_list:{len(names_list)}.") |
|
raise get_rasters_types_ie |
|
return msg + "\n=============================\n" |
|
|
|
|
|
def get_stats_raster(raster: np.ndarray, get_rms:bool=False, debug:bool=False) -> ts_float64_1 or ts_float64_2: |
|
""" |
|
Analyze a numpy arrays to extract a tuple of useful information: |
|
|
|
- type of raster |
|
- raster.dtype if that's instance of np.ndarray |
|
- raster shape |
|
- min of raster value, over all axis (flattening the array) |
|
- max of raster value, over all axis (flattening the array) |
|
- mean of raster value, over all axis (flattening the array) |
|
- median of raster value, over all axis (flattening the array) |
|
- standard deviation of raster value, over all axis (flattening the array) |
|
- variance of raster value, over all axis (flattening the array) |
|
|
|
Args: |
|
raster: numpy array to analyze |
|
get_rms: bool to get Root Mean Square Error |
|
debug: logging debug argument |
|
|
|
Returns: |
|
tuple: float values (min, max, mean, median, standard deviation, variance of raster) |
|
|
|
""" |
|
std = np.nanstd(raster) |
|
if get_rms: |
|
try: |
|
rms = np.sqrt(np.nanmean(np.square(raster))) |
|
except Exception as rms_e: |
|
rms = None |
|
app_logger.error(f"get_stats_raster::rms_Exception:{rms_e}.") |
|
app_logger.info(f"nanmin:{type(np.nanmin(raster))}.") |
|
return (np.nanmin(raster), np.nanmax(raster), np.nanmean(raster), np.nanmedian(raster), std, |
|
np.nanvar(raster), rms) |
|
return (np.nanmin(raster), np.nanmax(raster), np.nanmean(raster), np.nanmedian(raster), np.nanstd(raster), |
|
np.nanvar(raster)) |
|
|