|
import numpy as np |
|
import pandas as pd |
|
from tensorflow.keras.models import load_model |
|
import pickle |
|
import shap |
|
|
|
class UhiPredictor: |
|
""" |
|
Urban Heat Island Predictor Class that predicts new instances and explains the prediction using SHAP. |
|
|
|
INPUTS |
|
--- |
|
model_path: str - Path to the trained model file. |
|
scaler_path: str - Path to the standard scaler file. |
|
explainer_type: SHAP explainer class (e.g., shap.TreeExplainer, shap.KernelExplainer). |
|
ref_data: pd.DataFrame or np.array - Background dataset for SHAP explainer. |
|
feature_names: list - Feature names for SHAP analysis. |
|
""" |
|
|
|
def __init__(self, model_path, scaler_path, explainer_type, ref_data, feature_names): |
|
""" |
|
Initializes the UHI predictor with a trained model, scaler, and SHAP explainer. |
|
|
|
INPUTS |
|
--- |
|
model_path: str - Path to the model file. |
|
scaler_path: str - Path to the standard scaler file. |
|
explainer_type: SHAP explainer class (e.g., shap.TreeExplainer, shap.KernelExplainer). |
|
ref_data: pd.DataFrame or np.array - Background dataset for SHAP explainer. |
|
feature_names: list - Feature names for SHAP explanation. |
|
""" |
|
|
|
self.model = load_model(model_path) |
|
with open(scaler_path, 'rb') as f: |
|
self.scaler = pickle.load(f) |
|
|
|
|
|
ref_data = np.array(ref_data) if isinstance(ref_data, pd.DataFrame) else ref_data |
|
|
|
|
|
self.explainer_type = explainer_type |
|
self.explainer = self.explainer_type(self.model, ref_data) |
|
self.feature_names = feature_names |
|
|
|
def preprocess(self, df: pd.DataFrame) -> pd.DataFrame: |
|
""" |
|
Preprocess the input DataFrame to create new features for the model. |
|
|
|
INPUT |
|
----- |
|
df: pd.DataFrame |
|
The input DataFrame containing the features. |
|
|
|
OUTPUT |
|
------ |
|
pd.DataFrame |
|
The preprocessed DataFrame with additional features. |
|
""" |
|
Wind_Direction_radians = np.radians(df["Wind_Direction_deg"]) |
|
Wind_X = np.sin(Wind_Direction_radians) |
|
Wind_Y = np.cos(Wind_Direction_radians) |
|
|
|
m100_Elevation_Wind_X = df["100m_Ground_Elevation"] * df["Avg_Wind_Speed"] * Wind_X |
|
m150_Elevation_Wind_Y = df["150m_Ground_Elevation"] * df["Avg_Wind_Speed"] * Wind_Y |
|
m150_Humidity_NDVI = df["Relative_Humidity"] * df["150m_NDVI"] |
|
m150_Traffic_NDBI = df["Traffic_Volume"] * df["150m_NDBI"] |
|
m300_Building_Wind_X = df["300m_Building_Height"] * df["Avg_Wind_Speed"] * Wind_X |
|
m300_Building_Wind_Y = df["300m_Building_Height"] * df["Avg_Wind_Speed"] * Wind_Y |
|
m300_Elevation_Wind_Y = df["300m_Ground_Elevation"] * df["Avg_Wind_Speed"] * Wind_Y |
|
m300_BldgHeight_Count = df["300m_Building_Height"] * df["300m_Building_Count"] |
|
m300_TotalBuildingArea_NDVI = df["300m_Total_Building_Area_m2"] * df["300m_NDVI"] |
|
m300_Traffic_NDVI = df["Traffic_Volume"] * df["300m_NDVI"] |
|
m300_Traffic_NDBI = df["Traffic_Volume"] * df["300m_NDBI"] |
|
m300_Building_Aspect_Ratio = df["300m_Building_Height"] / np.sqrt(df["300m_Total_Building_Area_m2"] + 1e-6) |
|
m300_Sky_View_Factor = 1 - df["300m_Building_Density"] |
|
m300_Canopy_Cover_Ratio = df["300m_NDVI"] / (df["300m_Building_Density"] + 1e-6) |
|
m300_GHG_Proxy = df["300m_Building_Count"] * df["Traffic_Volume"] * df["Solar_Flux"] |
|
|
|
output = { |
|
"50m_1NPCRI": df["150m_NPCRI"], |
|
"100m_Elevation_Wind_X": m100_Elevation_Wind_X, |
|
"150m_Traffic_Volume": df["Traffic_Volume"], |
|
"150m_Elevation_Wind_Y": m150_Elevation_Wind_Y, |
|
"150m_Humidity_NDVI": m150_Humidity_NDVI, |
|
"150m_Traffic_NDBI": m150_Traffic_NDBI, |
|
"300m_SI": df["300m_SI"], |
|
"300m_NPCRI": df["300m_NPCRI"], |
|
"300m_Coastal_Aerosol": df["300m_Coastal_Aerosol"], |
|
"300m_Total_Building_Area_m2": df["300m_Total_Building_Area_m2"], |
|
"300m_Building_Construction_Year": df["300m_Building_Construction_Year"], |
|
"300m_Ground_Elevation": df["300m_Ground_Elevation"], |
|
"300m_Building_Wind_X": m300_Building_Wind_X, |
|
"300m_Building_Wind_Y": m300_Building_Wind_Y, |
|
"300m_Elevation_Wind_Y": m300_Elevation_Wind_Y, |
|
"300m_BldgHeight_Count": m300_BldgHeight_Count, |
|
"300m_TotalBuildingArea_NDVI": m300_TotalBuildingArea_NDVI, |
|
"300m_Traffic_NDVI": m300_Traffic_NDVI, |
|
"300m_Traffic_NDBI": m300_Traffic_NDBI, |
|
"300m_Building_Aspect_Ratio": m300_Building_Aspect_Ratio, |
|
"300m_Sky_View_Factor": m300_Sky_View_Factor, |
|
"300m_Canopy_Cover_Ratio": m300_Canopy_Cover_Ratio, |
|
"300m_GHG_Proxy": m300_GHG_Proxy |
|
} |
|
|
|
output = pd.DataFrame(output, index=[0]) |
|
|
|
return output |
|
|
|
def scale(self, X: pd.DataFrame) -> np.ndarray: |
|
""" |
|
Apply the scaler used to train the model to the new data. |
|
|
|
INPUT |
|
----- |
|
X: pd.DataFrame - The data to be scaled. |
|
|
|
OUTPUT |
|
------ |
|
np.ndarray - The scaled data. |
|
""" |
|
return self.scaler.transform(X) |
|
|
|
def compute_shap_values(self, X): |
|
""" |
|
Computes SHAP values for the record. |
|
""" |
|
|
|
shap_values = self.explainer.shap_values(X, check_additivity=False) if self.explainer_type == shap.DeepExplainer else self.explainer.shap_values(X) |
|
|
|
|
|
if shap_values.ndim == 3 and shap_values.shape[-1] == 1: |
|
shap_values = np.squeeze(shap_values) |
|
|
|
return shap_values |
|
|
|
def predict(self, X: pd.DataFrame, location=(None, None)) -> dict: |
|
""" |
|
Make a prediction on one sample and explain the prediction using SHAP. |
|
|
|
INPUT |
|
----- |
|
X: pd.DataFrame - The data to predict a UHI index for (must be one sample). |
|
location: tuple (longitude, latitude) - Optional location data. |
|
|
|
OUTPUT |
|
------ |
|
dict - A dictionary containing the predicted UHI index and SHAP reasoning. |
|
""" |
|
if X.shape[0] != 1: |
|
raise ValueError(f"Input array must contain only one sample, but {X.shape[0]} samples were found.") |
|
|
|
|
|
X_processed = self.preprocess(X) |
|
X_scaled = self.scale(X_processed).reshape(1, -1) |
|
|
|
|
|
y_pred = self.model.predict(X_scaled) |
|
uhi = y_pred[0][0] if y_pred.ndim == 2 else y_pred[0] |
|
|
|
|
|
shap_values = self.compute_shap_values(X_scaled) |
|
|
|
|
|
if self.explainer_type == shap.DeepExplainer: |
|
expected_value = np.array(self.explainer.expected_value) |
|
else: |
|
expected_value = self.explainer.expected_value |
|
|
|
|
|
if isinstance(expected_value, np.ndarray): |
|
expected_value = expected_value[0] |
|
|
|
|
|
shap_final_prediction = expected_value + sum(shap_values) |
|
|
|
|
|
feature_contributions = [ |
|
{ |
|
"feature": feature, |
|
"shap_value": value, |
|
"impact": "increase" if value > 0 else "decrease" |
|
} |
|
for feature, value in zip(self.feature_names, shap_values) |
|
] |
|
|
|
|
|
prediction_output = { |
|
"longitude": location[0], |
|
"latitude": location[1], |
|
"predicted_uhi_index": uhi, |
|
"base_value": expected_value, |
|
"shap_final_prediction": shap_final_prediction, |
|
"uhi_status": "Urban Heat Island" if shap_final_prediction > 1 else "Cooler Region", |
|
"feature_contributions": feature_contributions, |
|
} |
|
|
|
return prediction_output |