samgis / src /utilities /utilities.py
aletrn's picture
[feat] wip workflow from request to response
6d1f220
raw
history blame
7.13 kB
"""Various utilities (logger, time benchmark, args dump, numerical and stats info)"""
import numpy as np
from src import app_logger
from src.utilities.type_hints import ts_float64_1, ts_float64_2
def is_base64(sb):
import base64
try:
if isinstance(sb, str):
# If there's any unicode here, an exception will be thrown and the function will return false
sb_bytes = bytes(sb, 'ascii')
elif isinstance(sb, bytes):
sb_bytes = sb
else:
raise ValueError("Argument must be string or bytes")
return base64.b64encode(base64.b64decode(sb_bytes, validate=True)) == sb_bytes
except ValueError:
return False
def get_system_info():
import multiprocessing
import torch.multiprocessing as mp
import os
import subprocess
app_logger.info(f"mp::cpu_count:{mp.cpu_count()}.")
app_logger.info(f"multiprocessing::cpu_count:{multiprocessing.cpu_count()}.")
app_logger.info(f"os::cpu_count:{os.cpu_count()}")
app_logger.info(f"os::sched_getaffinity:{len(os.sched_getaffinity(0))}")
lscpu_output = subprocess.run("/usr/bin/lscpu", capture_output=True)
app_logger.info(f"lscpu:{lscpu_output.stdout.decode('utf-8')}.")
free_mem_output = subprocess.run(["/usr/bin/free", "-m"], capture_output=True)
app_logger.info(f"free_mem_output:{free_mem_output.stdout.decode('utf-8')}.")
def base64_decode(s):
import base64
if isinstance(s, str) and is_base64(s):
return base64.b64decode(s, validate=True).decode("utf-8")
return s
def get_constants(event: dict, debug=False) -> dict:
"""
Return constants we need to use from event, context and environment variables (both production and test).
Args:
event: request event
debug: logging debug argument
Returns:
dict: project constants object
"""
import json
try:
body = event["body"]
except Exception as e_constants1:
app_logger.error(f"e_constants1:{e_constants1}.")
body = event
if isinstance(body, str):
body = json.loads(event["body"])
try:
debug = body["debug"]
app_logger.info(f"re-try get debug value:{debug}, log_level:{app_logger.level}.")
except KeyError:
app_logger.error("get_constants:: no debug key, pass...")
app_logger.info(f"constants debug:{debug}, log_level:{app_logger.level}, body:{body}.")
try:
return {
"bbox": body["bbox"],
"point": body["point"],
"debug": debug
}
except KeyError as e_key_constants2:
app_logger.error(f"e_key_constants2:{e_key_constants2}.")
raise KeyError(f"e_key_constants2:{e_key_constants2}.")
except Exception as e_constants2:
app_logger.error(f"e_constants2:{e_constants2}.")
raise e_constants2
def get_rasters_info(rasters_list:list, names_list:list, title:str="", debug:bool=False) -> str:
"""
Analyze numpy arrays' list to extract a string containing some useful information. For every raster:
- type of raster
- raster.dtype if that's instance of np.ndarray
- raster shape
- min of raster value, over all axis (flattening the array)
- max of raster value, over all axis (flattening the array)
- mean of raster value, over all axis (flattening the array)
- median of raster value, over all axis (flattening the array)
- standard deviation of raster value, over all axis (flattening the array)
- variance of raster value, over all axis (flattening the array)
Raises:
ValueError if raster_list and names_list have a different number of elements
Args:
rasters_list: list of numpy array raster to analyze
names_list: string list of numpy array
title: title of current analytic session
debug: logging debug argument
Returns:
str: the collected information
"""
msg = f"get_rasters_info::title:{title},\n"
if not len(rasters_list) == len(names_list):
msg = "raster_list and names_list should have the same number of elements:\n"
msg += f"len(rasters_list):{len(rasters_list)}, len(names_list):{len(names_list)}."
raise ValueError(msg)
try:
for raster, name in zip(rasters_list, names_list):
try:
if isinstance(raster, np.ndarray):
shape_or_len = raster.shape
elif isinstance(raster, list):
shape_or_len = len(raster)
else:
raise ValueError(f"wrong argument type:{raster}, variable:{raster}.")
zmin, zmax, zmean, zmedian, zstd, zvar = get_stats_raster(raster, debug=debug)
msg += "name:{}:type:{},dtype:{},shape:{},min:{},max:{},mean:{},median:{},std:{},var:{}\n".format(
name, type(raster), raster.dtype if isinstance(raster, np.ndarray) else None, shape_or_len, zmin,
zmax, zmean, zmedian, zstd, zvar
)
except Exception as get_rasters_types_e:
msg = f"get_rasters_types_e::{get_rasters_types_e}, type_raster:{type(raster)}."
app_logger.error(msg)
raise ValueError(msg)
except IndexError as get_rasters_types_ie:
app_logger.error(f"get_rasters_types::len:rasters_list:{len(rasters_list)}, len_names_list:{len(names_list)}.")
raise get_rasters_types_ie
return msg + "\n=============================\n"
def get_stats_raster(raster: np.ndarray, get_rms:bool=False, debug:bool=False) -> ts_float64_1 or ts_float64_2:
"""
Analyze a numpy arrays to extract a tuple of useful information:
- type of raster
- raster.dtype if that's instance of np.ndarray
- raster shape
- min of raster value, over all axis (flattening the array)
- max of raster value, over all axis (flattening the array)
- mean of raster value, over all axis (flattening the array)
- median of raster value, over all axis (flattening the array)
- standard deviation of raster value, over all axis (flattening the array)
- variance of raster value, over all axis (flattening the array)
Args:
raster: numpy array to analyze
get_rms: bool to get Root Mean Square Error
debug: logging debug argument
Returns:
tuple: float values (min, max, mean, median, standard deviation, variance of raster)
"""
std = np.nanstd(raster)
if get_rms:
try:
rms = np.sqrt(np.nanmean(np.square(raster)))
except Exception as rms_e:
rms = None
app_logger.error(f"get_stats_raster::rms_Exception:{rms_e}.")
app_logger.info(f"nanmin:{type(np.nanmin(raster))}.")
return (np.nanmin(raster), np.nanmax(raster), np.nanmean(raster), np.nanmedian(raster), std,
np.nanvar(raster), rms)
return (np.nanmin(raster), np.nanmax(raster), np.nanmean(raster), np.nanmedian(raster), np.nanstd(raster),
np.nanvar(raster))