import os import folium import confuse import numpy as np from math import isnan import geopandas as gpd from shapely.geometry import Point from PIL import Image from tqdm import tqdm import geopy from geopy.geocoders import Nominatim from geopy.exc import GeocoderTimedOut, GeocoderUnavailable import random import string import os from datetime import datetime, timedelta TOKEN_FILE = "tokens.txt" EXPIRED_FILE = "tokens_expired.txt" def generate_random_unique_tokens(num_tokens=10, token_file=TOKEN_FILE): '''Generates a list of random unique tokens and saves them to a file.''' if not os.path.exists(token_file): with open(token_file, 'w') as f: tokens = set() while len(tokens) < num_tokens: token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) tokens.add(token) for token in tokens: f.write(token + '\n') else: with open(token_file, 'r') as f: tokens = set(f.read().splitlines()) with open(token_file, 'a') as f: while len(tokens) < num_tokens: token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) if token not in tokens: tokens.add(token) f.write(token + '\n') return tokens def confirm_api_token(token, token_file=TOKEN_FILE, expired_file=EXPIRED_FILE): '''Checks if the given token is valid and not expired.''' with open(token_file, 'r') as f: tokens = set(f.read().splitlines()) if token in tokens: now = datetime.now() if token in load_expired_tokens(expired_file): if now < load_token_expiration(token, expired_file): return True else: expiry_date = now + timedelta(hours=1) save_expired_token(token, expiry_date, expired_file) return True return False def load_expired_tokens(expired_file=EXPIRED_FILE): '''Loads expired tokens from the file.''' expired_tokens = {} if os.path.exists(expired_file): with open(expired_file, 'r') as f: for line in f: token, expiry_date = line.strip().split(',') expired_tokens[token] = datetime.fromisoformat(expiry_date) return expired_tokens def load_token_expiration(token, expired_file=EXPIRED_FILE): '''Loads the expiration date for a given token.''' expired_tokens = load_expired_tokens(expired_file) return expired_tokens.get(token) def save_expired_token(token, expiry_date, expired_file=EXPIRED_FILE): '''Saves expired tokens to the file.''' if not os.path.exists(expired_file): with open(expired_file, 'w') as f: f.write(f"{token},{expiry_date.isoformat()}\n") else: with open(expired_file, 'a') as f: f.write(f"{token},{expiry_date.isoformat()}\n") def get_region_from_coordinates(latitude, longitude, max_retries=3): geolocator = Nominatim(user_agent="my_agent") for attempt in range(max_retries): try: location = geolocator.reverse(f"{latitude}, {longitude}") if location and location.raw.get('address'): address = location.raw['address'] # Try to get the most relevant administrative level for level in ['state', 'county', 'region', 'province', 'district']: if level in address: return address[level] # If no specific region is found, return the country if 'country' in address: return address['country'] return "Region not found" except (GeocoderTimedOut, GeocoderUnavailable): if attempt == max_retries - 1: return "Geocoding service unavailable" return "Failed to retrieve region information" # Initialzie custom basemaps for folium basemaps = { 'Google Maps': folium.TileLayer( tiles = 'https://mt1.google.com/vt/lyrs=m&x={x}&y={y}&z={z}', attr = 'Google', name = 'Google Maps', overlay = True, control = True ), 'Google Satellite': folium.TileLayer( tiles = 'https://mt1.google.com/vt/lyrs=s&x={x}&y={y}&z={z}', attr = 'Google', name = 'Google Satellite', overlay = True, control = True ), 'Google Terrain': folium.TileLayer( tiles = 'https://mt1.google.com/vt/lyrs=p&x={x}&y={y}&z={z}', attr = 'Google', name = 'Google Terrain', overlay = True, control = True ), 'Google Satellite Hybrid': folium.TileLayer( tiles = 'https://mt1.google.com/vt/lyrs=y&x={x}&y={y}&z={z}', attr = 'Google', name = 'Google Satellite', overlay = True, control = True ), 'Esri Satellite': folium.TileLayer( tiles = 'https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}', attr = 'Esri', name = 'Esri Satellite', overlay = True, control = True ), 'openstreetmap': folium.TileLayer('openstreetmap'), 'cartodbdark_matter': folium.TileLayer('cartodbdark_matter') } # Dictionary of JavaScript files (More Readable) scripts_dir = './scripts/' scripts_files = [f for f in os.listdir(scripts_dir) if f.endswith('.js')] Scripts = {} for f in scripts_files: key = f.split('.')[0].upper() with open(scripts_dir + f) as f: Scripts[key] = f.read() def calculate_bbox(df, field): ''' Calculate the bounding box of a specfic field ID in a given data frame ''' bbox = df.loc[df['name'] == field].bounds r = bbox.iloc[0] return [r.minx, r.miny, r.maxx, r.maxy] def tiff_to_geodataframe(im, metric, date, crs): ''' Convert a tiff image to a geodataframe ''' x_cords = im.coords['x'].values y_cords = im.coords['y'].values vals = im.values dims = vals.shape points = [] v_s = [] for lat in range(dims[1]): y = y_cords[lat] for lon in range(dims[2]): x = x_cords[lon] v = vals[:,lat,lon] if isnan(v[0]): continue points.append(Point(x,y)) v_s.append(v.item()) d = {f'{metric}_{date}': v_s, 'geometry': points} df = gpd.GeoDataFrame(d, crs = crs) return df def get_bearer_token_headers(bearer_token): ''' Get the bearer token headers to be used in the request to the SentinelHub API ''' headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer '+ bearer_token, } return headers def get_downloaded_location_img_path(clientName, metric, date, field, extension='tiff'): ''' Get the path of the downloaded image in TIFF based on the: ''' date_dir = f'./data/{clientName}/raw/{metric}/{date}/field_{field}/' print(f'True Color Date Dir: {date_dir}') os.makedirs(date_dir, exist_ok=True) intermediate_dirs = os.listdir(date_dir) print(f'Intermediate Dirs: {intermediate_dirs}') if len(intermediate_dirs) == 0: return None imagePath = f'{date_dir}{os.listdir(date_dir)[0]}/response.{extension}' print(f'Image Path: {imagePath}') if not os.path.exists(imagePath): return None print(f'Image Path: {imagePath}') return imagePath def get_masked_location_img_path(clientName, metric, date, field): ''' Get the path of the downloaded image after applying the mask in TIFF based on the: ''' date_dir = f'./data/{clientName}/processed/{metric}/{date}/field_{field}/' imagePath = date_dir + 'masked.tiff' return imagePath def get_curated_location_img_path(clientName, metric, date, field): ''' Get the path of the downloaded image after applying the mask and converting it to geojson formay based on the: ''' date_dir = f'./data/{clientName}/curated/{metric}/{date}/field_{field}/' imagePath = date_dir + 'masked.geojson' if os.path.exists(imagePath): return imagePath else: return None def parse_app_config(path=r'config-fgm-dev.yaml'): config = confuse.Configuration('CropHealth', __name__) config.set_file(path) return config def fix_image(img): def normalize(band): band_min, band_max = (band.min(), band.max()) return ((band-band_min)/((band_max - band_min))) def brighten(band): alpha=3 beta=0 return np.clip(alpha*band+beta, 0,255) def gammacorr(band): gamma=0.9 return np.power(band, 1/gamma) red = img[:, :, 0] green = img[:, :, 1] blue = img[:, :, 2] red_b=brighten(red) blue_b=brighten(blue) green_b=brighten(green) red_bg=gammacorr(red_b) blue_bg=gammacorr(blue_b) green_bg=gammacorr(green_b) red_bgn = normalize(red_bg) green_bgn = normalize(green_bg) blue_bgn = normalize(blue_bg) rgb_composite_bgn= np.dstack((red_b, green_b, blue_b)) return rgb_composite_bgn def creat_gif(dataset, gif_name, duration=50): ''' Create a gif from a list of images ''' imgs = [Image.fromarray((255*img).astype(np.uint8)) for img in dataset] # duration is the number of milliseconds between frames; this is 40 frames per second imgs[0].save(gif_name, save_all=True, append_images=imgs[1:], duration=duration, loop=1) def add_lat_lon_to_gdf_from_geometry(gdf): gdf['Lat'] = gdf['geometry'].apply(lambda p: p.x) gdf['Lon'] = gdf['geometry'].apply(lambda p: p.y) return gdf def gdf_column_to_one_band_array(gdf, column_name): gdf = gdf.sort_values(by=['Lat', 'Lon']) gdf = gdf.reset_index(drop=True) unique_lats_count = gdf['Lat'].nunique() unique_lons_count = gdf['Lon'].nunique() rows_arr = [[] for i in range(unique_lats_count)] column_values = gdf[column_name].values for i in tqdm(range(len(column_values))): row_index = i // unique_lons_count rows_arr[row_index].append(column_values[i]) max_row_length = max([len(row) for row in rows_arr]) for row in rows_arr: while len(row) < max_row_length: row.append(0) rows_arr = np.array(rows_arr) return rows_arr