Spaces:
Sleeping
Sleeping
File size: 13,163 Bytes
2cdce84 ab52166 d064f2c 1ffd5d5 56d64f4 1ffd5d5 ab52166 1ffd5d5 ab52166 1ffd5d5 ab52166 1ffd5d5 ab52166 1ffd5d5 ab52166 270809d 1ffd5d5 ab52166 2cdce84 46c0e18 2cdce84 46c0e18 2cdce84 46c0e18 2cdce84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
import os
import folium
import confuse
import numpy as np
from math import isnan
import geopandas as gpd
from shapely.geometry import Point
from PIL import Image
from tqdm import tqdm
import geopy
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut, GeocoderUnavailable
import random
import string
import os
from datetime import datetime, timedelta
TOKEN_FILE = "tokens.txt"
EXPIRED_FILE = "tokens_expired.txt"
NEW_TOKEN_INSTRUCTIONS="""<div style="padding: 20px; border-radius: 5px;">
<h3 style="color: #4CAF50;">Get Your SNET API Token</h3>
<p style="color: #666; font-size: 16px;">
To get an API token for the SNET platform:
<ol>
<li>Purchase tokens on the SNET platform using AGIX</li>
<li> <del> Input the email address you used to sign up for the dashboard </del>. Any email will work for testing purposes</li>
<li>An API token will be generated for you</li>
</ol>
The generated token will be valid for unlimited calls to the forecasting API for 15 minutes from your first call, for any of your fields.
</p>
<p style="color: #666; font-size: 16px;">
You can find us on the SNET platform at:
<a href="https://beta.singularitynet.io/servicedetails/org/EnigmaAi/service/farmingID" style="color: #007bff; text-decoration: none;">SNET Platform</a>
</p>
</div>
"""
import pandas as pd
import os
def manage_user_tokens(current_user, api_token, valid_until):
"""
Manages the storage and retrieval of user API tokens.
Args:
current_user (str): The username of the currently logged-in user.
api_token (str): The API token to be stored.
valid_until (str): The expiration date of the API token (in 'YYYY-MM-DD HH:MM:SS' format).
Returns:
None
"""
filename = f'{current_user}_tokens.csv'
# Check if the file exists
if not os.path.exists(filename):
# Create a new DataFrame and save it to the file
df = pd.DataFrame({'token': [api_token], 'valid_until': [valid_until]})
df.to_csv(filename, index=False)
else:
# Load the existing DataFrame
df = pd.read_csv(filename)
# Check if the token already exists in the DataFrame
if api_token not in df['token'].values:
# Append the new token and valid_until to the DataFrame
new_row = {'token': api_token, 'valid_until': valid_until}
new_row_df = pd.DataFrame(new_row, index=[0])
df = pd.concat([df, new_row_df], ignore_index=True)
df.to_csv(filename, index=False)
return df
def generate_random_unique_tokens(num_tokens=10, token_file=TOKEN_FILE):
'''Generates a list of random unique tokens and saves them to a file.'''
if not os.path.exists(token_file):
with open(token_file, 'w') as f:
tokens = set()
while len(tokens) < num_tokens:
token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32))
tokens.add(token)
for token in tokens:
f.write(token + '\n')
else:
with open(token_file, 'r') as f:
tokens = set(f.read().splitlines())
with open(token_file, 'a') as f:
while len(tokens) < num_tokens:
token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32))
if token not in tokens:
tokens.add(token)
f.write(token + '\n')
return tokens
def confirm_api_token(token, token_file=TOKEN_FILE, expired_file=EXPIRED_FILE):
'''Checks if the given token is valid and not expired.'''
print(f'Checking token: {token} >>>>>>>>>>>>>>>>>>>')
msg = 'Token is valid'
with open(token_file, 'r') as f:
tokens = set(f.read().splitlines()) # Load All possible tokens
if token in tokens: # Check if the token is one of the possible tokens
now = datetime.now()
if token in load_expired_tokens(expired_file): # Check if the token have been used before
if now < load_token_expiration(token, expired_file): # Check if the token has been expired
return {'valid': True, 'message': msg}
else: # If the token has been expired, return invalid
msg = 'Token has expired'
return {'valid': False, 'message': msg}
else:
msg = 'Token is valid'
expiry_date = now + timedelta(minutes=15)
save_expired_token(token, expiry_date, expired_file) # If the token is valid and have not been used before, set the expiration date to 15 minutes
return {'valid': True, 'message': msg}
msg = 'Token is invalid'
return {'valid': False, 'message': msg}
def load_expired_tokens(expired_file=EXPIRED_FILE):
'''Loads expired tokens from the file.'''
expired_tokens = {}
if os.path.exists(expired_file):
with open(expired_file, 'r') as f:
for line in f:
print(line)
print(line.strip().split(','))
print('------------')
token, expiry_date = line.strip().split(',')
expired_tokens[token] = datetime.fromisoformat(expiry_date)
return expired_tokens
def load_token_expiration(token, expired_file=EXPIRED_FILE):
'''Loads the expiration date for a given token.'''
expired_tokens = load_expired_tokens(expired_file)
return expired_tokens.get(token)
def save_expired_token(token, expiry_date, expired_file=EXPIRED_FILE):
'''Saves expired tokens to the file.'''
if not os.path.exists(expired_file):
with open(expired_file, 'w') as f:
f.write(f"{token},{expiry_date.isoformat()}\n")
else:
with open(expired_file, 'a') as f:
f.write(f"{token},{expiry_date.isoformat()}\n")
def get_region_from_coordinates(latitude, longitude, max_retries=3):
geolocator = Nominatim(user_agent="my_agent")
for attempt in range(max_retries):
try:
location = geolocator.reverse(f"{latitude}, {longitude}")
if location and location.raw.get('address'):
address = location.raw['address']
# Try to get the most relevant administrative level
for level in ['state', 'county', 'region', 'province', 'district']:
if level in address:
return address[level]
# If no specific region is found, return the country
if 'country' in address:
return address['country']
return "Region not found"
except (GeocoderTimedOut, GeocoderUnavailable):
if attempt == max_retries - 1:
return "Geocoding service unavailable"
return "Failed to retrieve region information"
# Initialzie custom basemaps for folium
basemaps = {
'Google Maps': folium.TileLayer(
tiles = 'https://mt1.google.com/vt/lyrs=m&x={x}&y={y}&z={z}',
attr = 'Google',
name = 'Google Maps',
overlay = True,
control = True
),
'Google Satellite': folium.TileLayer(
tiles = 'https://mt1.google.com/vt/lyrs=s&x={x}&y={y}&z={z}',
attr = 'Google',
name = 'Google Satellite',
overlay = True,
control = True
),
'Google Terrain': folium.TileLayer(
tiles = 'https://mt1.google.com/vt/lyrs=p&x={x}&y={y}&z={z}',
attr = 'Google',
name = 'Google Terrain',
overlay = True,
control = True
),
'Google Satellite Hybrid': folium.TileLayer(
tiles = 'https://mt1.google.com/vt/lyrs=y&x={x}&y={y}&z={z}',
attr = 'Google',
name = 'Google Satellite',
overlay = True,
control = True
),
'Esri Satellite': folium.TileLayer(
tiles = 'https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}',
attr = 'Esri',
name = 'Esri Satellite',
overlay = True,
control = True
),
'openstreetmap': folium.TileLayer('openstreetmap'),
'cartodbdark_matter': folium.TileLayer('cartodbdark_matter')
}
# Dictionary of JavaScript files (More Readable)
scripts_dir = './scripts/'
scripts_files = [f for f in os.listdir(scripts_dir) if f.endswith('.js')]
Scripts = {}
for f in scripts_files:
key = f.split('.')[0].upper()
with open(scripts_dir + f) as f:
Scripts[key] = f.read()
def calculate_bbox(df, field):
'''
Calculate the bounding box of a specfic field ID in a given data frame
'''
bbox = df.loc[df['name'] == field].bounds
r = bbox.iloc[0]
return [r.minx, r.miny, r.maxx, r.maxy]
def tiff_to_geodataframe(im, metric, date, crs):
'''
Convert a tiff image to a geodataframe
'''
x_cords = im.coords['x'].values
y_cords = im.coords['y'].values
vals = im.values
dims = vals.shape
points = []
v_s = []
for lat in range(dims[1]):
y = y_cords[lat]
for lon in range(dims[2]):
x = x_cords[lon]
v = vals[:,lat,lon]
if isnan(v[0]):
continue
points.append(Point(x,y))
v_s.append(v.item())
d = {f'{metric}_{date}': v_s, 'geometry': points}
df = gpd.GeoDataFrame(d, crs = crs)
return df
def get_bearer_token_headers(bearer_token):
'''
Get the bearer token headers to be used in the request to the SentinelHub API
'''
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer '+ bearer_token,
}
return headers
def get_downloaded_location_img_path(clientName, metric, date, field, extension='tiff'):
'''
Get the path of the downloaded image in TIFF based on the:
'''
date_dir = f'./data/{clientName}/raw/{metric}/{date}/field_{field}/'
print(f'True Color Date Dir: {date_dir}')
os.makedirs(date_dir, exist_ok=True)
intermediate_dirs = os.listdir(date_dir)
print(f'Intermediate Dirs: {intermediate_dirs}')
if len(intermediate_dirs) == 0:
return None
imagePath = f'{date_dir}{os.listdir(date_dir)[0]}/response.{extension}'
print(f'Image Path: {imagePath}')
if not os.path.exists(imagePath):
return None
print(f'Image Path: {imagePath}')
return imagePath
def get_masked_location_img_path(clientName, metric, date, field):
'''
Get the path of the downloaded image after applying the mask in TIFF based on the:
'''
date_dir = f'./data/{clientName}/processed/{metric}/{date}/field_{field}/'
imagePath = date_dir + 'masked.tiff'
return imagePath
def get_curated_location_img_path(clientName, metric, date, field):
'''
Get the path of the downloaded image after applying the mask and converting it to geojson formay based on the:
'''
date_dir = f'./data/{clientName}/curated/{metric}/{date}/field_{field}/'
imagePath = date_dir + 'masked.geojson'
if os.path.exists(imagePath):
return imagePath
else:
return None
def parse_app_config(path=r'config-fgm-dev.yaml'):
config = confuse.Configuration('CropHealth', __name__)
config.set_file(path)
return config
def fix_image(img):
def normalize(band):
band_min, band_max = (band.min(), band.max())
return ((band-band_min)/((band_max - band_min)))
def brighten(band):
alpha=3
beta=0
return np.clip(alpha*band+beta, 0,255)
def gammacorr(band):
gamma=0.9
return np.power(band, 1/gamma)
red = img[:, :, 0]
green = img[:, :, 1]
blue = img[:, :, 2]
red_b=brighten(red)
blue_b=brighten(blue)
green_b=brighten(green)
red_bg=gammacorr(red_b)
blue_bg=gammacorr(blue_b)
green_bg=gammacorr(green_b)
red_bgn = normalize(red_bg)
green_bgn = normalize(green_bg)
blue_bgn = normalize(blue_bg)
rgb_composite_bgn= np.dstack((red_b, green_b, blue_b))
return rgb_composite_bgn
def creat_gif(dataset, gif_name, duration=50):
'''
Create a gif from a list of images
'''
imgs = [Image.fromarray((255*img).astype(np.uint8)) for img in dataset]
# duration is the number of milliseconds between frames; this is 40 frames per second
imgs[0].save(gif_name, save_all=True, append_images=imgs[1:], duration=duration, loop=1)
def add_lat_lon_to_gdf_from_geometry(gdf):
gdf['Lat'] = gdf['geometry'].apply(lambda p: p.x)
gdf['Lon'] = gdf['geometry'].apply(lambda p: p.y)
return gdf
def gdf_column_to_one_band_array(gdf, column_name):
gdf = gdf.sort_values(by=['Lat', 'Lon'])
gdf = gdf.reset_index(drop=True)
unique_lats_count = gdf['Lat'].nunique()
unique_lons_count = gdf['Lon'].nunique()
rows_arr = [[] for i in range(unique_lats_count)]
column_values = gdf[column_name].values
for i in tqdm(range(len(column_values))):
row_index = i // unique_lons_count
rows_arr[row_index].append(column_values[i])
max_row_length = max([len(row) for row in rows_arr])
for row in rows_arr:
while len(row) < max_row_length:
row.append(0)
rows_arr = np.array(rows_arr)
return rows_arr |