File size: 7,129 Bytes
59ebac7 6d1f220 3d82071 6d1f220 59ebac7 8ef9cc0 da5737b 59ebac7 da5737b 8ef9cc0 da5737b 59ebac7 3d82071 da5737b 59ebac7 243f395 59ebac7 243f395 59ebac7 6d1f220 59ebac7 6d1f220 59ebac7 6d1f220 59ebac7 243f395 59ebac7 6d1f220 59ebac7 6d1f220 59ebac7 6d1f220 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
"""Various utilities (logger, time benchmark, args dump, numerical and stats info)"""
import numpy as np
from src import app_logger
from src.utilities.type_hints import ts_float64_1, ts_float64_2
def is_base64(sb):
import base64
try:
if isinstance(sb, str):
# If there's any unicode here, an exception will be thrown and the function will return false
sb_bytes = bytes(sb, 'ascii')
elif isinstance(sb, bytes):
sb_bytes = sb
else:
raise ValueError("Argument must be string or bytes")
return base64.b64encode(base64.b64decode(sb_bytes, validate=True)) == sb_bytes
except ValueError:
return False
def get_system_info():
import multiprocessing
import torch.multiprocessing as mp
import os
import subprocess
app_logger.info(f"mp::cpu_count:{mp.cpu_count()}.")
app_logger.info(f"multiprocessing::cpu_count:{multiprocessing.cpu_count()}.")
app_logger.info(f"os::cpu_count:{os.cpu_count()}")
app_logger.info(f"os::sched_getaffinity:{len(os.sched_getaffinity(0))}")
lscpu_output = subprocess.run("/usr/bin/lscpu", capture_output=True)
app_logger.info(f"lscpu:{lscpu_output.stdout.decode('utf-8')}.")
free_mem_output = subprocess.run(["/usr/bin/free", "-m"], capture_output=True)
app_logger.info(f"free_mem_output:{free_mem_output.stdout.decode('utf-8')}.")
def base64_decode(s):
import base64
if isinstance(s, str) and is_base64(s):
return base64.b64decode(s, validate=True).decode("utf-8")
return s
def get_constants(event: dict, debug=False) -> dict:
"""
Return constants we need to use from event, context and environment variables (both production and test).
Args:
event: request event
debug: logging debug argument
Returns:
dict: project constants object
"""
import json
try:
body = event["body"]
except Exception as e_constants1:
app_logger.error(f"e_constants1:{e_constants1}.")
body = event
if isinstance(body, str):
body = json.loads(event["body"])
try:
debug = body["debug"]
app_logger.info(f"re-try get debug value:{debug}, log_level:{app_logger.level}.")
except KeyError:
app_logger.error("get_constants:: no debug key, pass...")
app_logger.info(f"constants debug:{debug}, log_level:{app_logger.level}, body:{body}.")
try:
return {
"bbox": body["bbox"],
"point": body["point"],
"debug": debug
}
except KeyError as e_key_constants2:
app_logger.error(f"e_key_constants2:{e_key_constants2}.")
raise KeyError(f"e_key_constants2:{e_key_constants2}.")
except Exception as e_constants2:
app_logger.error(f"e_constants2:{e_constants2}.")
raise e_constants2
def get_rasters_info(rasters_list:list, names_list:list, title:str="", debug:bool=False) -> str:
"""
Analyze numpy arrays' list to extract a string containing some useful information. For every raster:
- type of raster
- raster.dtype if that's instance of np.ndarray
- raster shape
- min of raster value, over all axis (flattening the array)
- max of raster value, over all axis (flattening the array)
- mean of raster value, over all axis (flattening the array)
- median of raster value, over all axis (flattening the array)
- standard deviation of raster value, over all axis (flattening the array)
- variance of raster value, over all axis (flattening the array)
Raises:
ValueError if raster_list and names_list have a different number of elements
Args:
rasters_list: list of numpy array raster to analyze
names_list: string list of numpy array
title: title of current analytic session
debug: logging debug argument
Returns:
str: the collected information
"""
msg = f"get_rasters_info::title:{title},\n"
if not len(rasters_list) == len(names_list):
msg = "raster_list and names_list should have the same number of elements:\n"
msg += f"len(rasters_list):{len(rasters_list)}, len(names_list):{len(names_list)}."
raise ValueError(msg)
try:
for raster, name in zip(rasters_list, names_list):
try:
if isinstance(raster, np.ndarray):
shape_or_len = raster.shape
elif isinstance(raster, list):
shape_or_len = len(raster)
else:
raise ValueError(f"wrong argument type:{raster}, variable:{raster}.")
zmin, zmax, zmean, zmedian, zstd, zvar = get_stats_raster(raster, debug=debug)
msg += "name:{}:type:{},dtype:{},shape:{},min:{},max:{},mean:{},median:{},std:{},var:{}\n".format(
name, type(raster), raster.dtype if isinstance(raster, np.ndarray) else None, shape_or_len, zmin,
zmax, zmean, zmedian, zstd, zvar
)
except Exception as get_rasters_types_e:
msg = f"get_rasters_types_e::{get_rasters_types_e}, type_raster:{type(raster)}."
app_logger.error(msg)
raise ValueError(msg)
except IndexError as get_rasters_types_ie:
app_logger.error(f"get_rasters_types::len:rasters_list:{len(rasters_list)}, len_names_list:{len(names_list)}.")
raise get_rasters_types_ie
return msg + "\n=============================\n"
def get_stats_raster(raster: np.ndarray, get_rms:bool=False, debug:bool=False) -> ts_float64_1 or ts_float64_2:
"""
Analyze a numpy arrays to extract a tuple of useful information:
- type of raster
- raster.dtype if that's instance of np.ndarray
- raster shape
- min of raster value, over all axis (flattening the array)
- max of raster value, over all axis (flattening the array)
- mean of raster value, over all axis (flattening the array)
- median of raster value, over all axis (flattening the array)
- standard deviation of raster value, over all axis (flattening the array)
- variance of raster value, over all axis (flattening the array)
Args:
raster: numpy array to analyze
get_rms: bool to get Root Mean Square Error
debug: logging debug argument
Returns:
tuple: float values (min, max, mean, median, standard deviation, variance of raster)
"""
std = np.nanstd(raster)
if get_rms:
try:
rms = np.sqrt(np.nanmean(np.square(raster)))
except Exception as rms_e:
rms = None
app_logger.error(f"get_stats_raster::rms_Exception:{rms_e}.")
app_logger.info(f"nanmin:{type(np.nanmin(raster))}.")
return (np.nanmin(raster), np.nanmax(raster), np.nanmean(raster), np.nanmedian(raster), std,
np.nanvar(raster), rms)
return (np.nanmin(raster), np.nanmax(raster), np.nanmean(raster), np.nanmedian(raster), np.nanstd(raster),
np.nanvar(raster))
|