Explainers API¶
shap
¶
Author: Wenyu Ouyang Date: 2023-10-19 21:34:29 LastEditTime: 2025-07-12 11:25:01 LastEditors: Wenyu Ouyang Description: SHAP methods for deep learning models FilePath: /torchhydro/torchhydro/explainers/shap.py Copyright (c) 2023-2024 Wenyu Ouyang. All rights reserved.
deep_explain_model_heatmap(dl_model, test_dataset)
¶
Generate feature heatmap for prediction at a start time Parameters
model trained model test_dataset test dataset Returns
None
Source code in torchhydro/explainers/shap.py
def deep_explain_model_heatmap(dl_model, test_dataset) -> None:
"""Generate feature heatmap for prediction at a start time
Parameters
----------
model
trained model
test_dataset
test dataset
Returns
-------
None
"""
dl_model.eval()
history = test_dataset.__getitem__(0)[0]
# background shape (L, N, M)
# L - batch size, N - history length, M - feature size
# for each element in each N x M batch in L,
# attribute to each prediction in forecast len
deep_explainer = shap.DeepExplainer(dl_model, history)
shap_values = shap_results(deep_explainer, history)
figs = plot_shap_value_heatmaps(shap_values)
deep_explain_model_summary_plot(dl_model, test_dataset)
¶
Generate feature summary plot for trained deep learning models
Parameters¶
model trained model test_dataset test dataset
Source code in torchhydro/explainers/shap.py
def deep_explain_model_summary_plot(dl_model, test_dataset) -> None:
"""Generate feature summary plot for trained deep learning models
Parameters
----------
model
trained model
test_dataset
test dataset
"""
dl_model.eval()
history = test_dataset.__getitem__(0)[0].unsqueeze(0)
dl_model = dl_model.to("cpu")
deep_explainer = shap.DeepExplainer(dl_model, history)
shap_values = shap_results(deep_explainer, history)
# summary plot shows overall feature ranking
# by average absolute shap values
fig = plot_summary_shap_values(shap_values, test_dataset.df.columns)
abs_mean_shap_values = shap_values.mean(axis=["preds", "batches"])
multi_shap_values = abs_mean_shap_values.mean(axis="observations")
# summary plot for multi-step outputs
# multi_shap_values = shap_values.apply_along_axis(np.mean, 'batches')
fig = plot_summary_shap_values_over_time_series(
shap_values, test_dataset.df.columns
)
history_numpy = torch.tensor(
history.cpu().numpy(), names=["batches", "observations", "features"]
)
shap_values = shap_results(deep_explainer, history)
figs = plot_shap_values_from_history(shap_values, history_numpy)
jitter(values, jitter_strength=0.005)
¶
Add some jitter to the values.
Source code in torchhydro/explainers/shap.py
def jitter(values, jitter_strength=0.005):
"""Add some jitter to the values."""
return values + jitter_strength * np.random.randn(*values.shape)
uncertainty_analysis
¶
bin_aggregated_data(z_values, r_values, num_bins=10)
¶
Aggregate the z-values and r-values into bins, and calculate the average for each bin.
Parameters¶
!!! z_values "np.ndarray" Aggregated z-values (quantiles) across all basins and time steps !!! r_values "np.ndarray" Aggregated r-values (ECDF) across all basins and time steps !!! num_bins "int" Number of bins to divide the data into (default is 10)
Returns¶
!!! binned_z "np.ndarray" Average z-values for each bin !!! binned_r "np.ndarray" Average r-values for each bin
Source code in torchhydro/explainers/uncertainty_analysis.py
def bin_aggregated_data(z_values, r_values, num_bins=10):
"""
Aggregate the z-values and r-values into bins, and calculate the average for each bin.
Parameters
----------
z_values: np.ndarray
Aggregated z-values (quantiles) across all basins and time steps
r_values: np.ndarray
Aggregated r-values (ECDF) across all basins and time steps
num_bins: int
Number of bins to divide the data into (default is 10)
Returns
-------
binned_z: np.ndarray
Average z-values for each bin
binned_r: np.ndarray
Average r-values for each bin
"""
bins = np.linspace(0, 1, num_bins + 1) # Create bin edges from 0 to 1
bin_indices = np.digitize(
z_values, bins
) # Find out which bin each z-value belongs to
binned_z = np.zeros(num_bins)
binned_r = np.zeros(num_bins)
for i in range(1, num_bins + 1):
# Select the z and r values that fall into the current bin
in_bin = bin_indices == i
if np.sum(in_bin) > 0:
# Calculate the average z and r values for this bin
binned_z[i - 1] = np.mean(z_values[in_bin])
binned_r[i - 1] = np.mean(r_values[in_bin])
else:
# If no values in this bin, assign NaN (can also handle this differently if needed)
binned_z[i - 1] = np.nan
binned_r[i - 1] = np.nan
return binned_z, binned_r
calculate_empirical_cdf(predictions, obs_values)
¶
Calculate the empirical cumulative distribution function (CDF) of model predictions for each time step and compute the quantiles z_i of the observed values.
Parameters¶
!!! predictions "np.ndarray" 2D array of predictions (shape = (MC_dropout_times, time_steps)) !!! obs_values "np.ndarray" Time series of observed values (1D array, shape = (time_steps,))
Returns¶
!!! z_values "np.ndarray" Quantiles of the observed values in the prediction distribution for each time step (1D array)
Source code in torchhydro/explainers/uncertainty_analysis.py
def calculate_empirical_cdf(predictions, obs_values):
"""
Calculate the empirical cumulative distribution function (CDF) of model predictions for each time step
and compute the quantiles z_i of the observed values.
Parameters
----------
predictions: np.ndarray
2D array of predictions (shape = (MC_dropout_times, time_steps))
obs_values: np.ndarray
Time series of observed values (1D array, shape = (time_steps,))
Returns
-------
z_values: np.ndarray
Quantiles of the observed values in the prediction distribution for each time step (1D array)
"""
time_steps = obs_values.shape[0]
mc_dropout_times = predictions.shape[0]
z_values = np.zeros(time_steps)
for t in range(time_steps):
# Sort the predictions for each time step
sorted_predictions = np.sort(predictions[:, t])
# Calculate the quantile of the observed value in the prediction distribution
z_values[t] = np.sum(sorted_predictions <= obs_values[t]) / mc_dropout_times
return z_values
calculate_observed_ecdf(obs_values)
¶
Calculate the empirical cumulative distribution function (ECDF) of observed values.
Parameters¶
!!! obs_values "np.ndarray" Time series of observed values (1D array, shape = (time_steps,))
Returns¶
!!! ecdf "np.ndarray" Proportion of observed values in the cumulative distribution (1D array, same shape as obs_values)
Source code in torchhydro/explainers/uncertainty_analysis.py
def calculate_observed_ecdf(obs_values):
"""
Calculate the empirical cumulative distribution function (ECDF) of observed values.
Parameters
----------
obs_values: np.ndarray
Time series of observed values (1D array, shape = (time_steps,))
Returns
-------
ecdf: np.ndarray
Proportion of observed values in the cumulative distribution (1D array, same shape as obs_values)
"""
time_steps = obs_values.shape[0]
# Sort observed values
sorted_obs = np.sort(obs_values)
# Calculate ECDF: for each observed value, the proportion of values <= that value
ecdf = np.zeros(time_steps)
for i in range(time_steps):
ecdf[i] = np.sum(sorted_obs <= obs_values[i]) / time_steps
return ecdf
plot_probability_plot(z_values, r_values, basin_name='Basin', scatter=True)
¶
Plot the probability plot for a single basin.
Parameters¶
!!! z_values "np.ndarray" Quantiles of the observed values for each time step (1D array) !!! r_values "np.ndarray" Empirical cumulative distribution of observed values (ECDF, 1D array) !!! basin_name "str" Name of the basin (for title of the plot)
Source code in torchhydro/explainers/uncertainty_analysis.py
def plot_probability_plot(z_values, r_values, basin_name="Basin", scatter=True):
"""
Plot the probability plot for a single basin.
Parameters
----------
z_values: np.ndarray
Quantiles of the observed values for each time step (1D array)
r_values: np.ndarray
Empirical cumulative distribution of observed values (ECDF, 1D array)
basin_name: str
Name of the basin (for title of the plot)
"""
# Sort both z_values and r_values by z_values
sorted_indices = np.argsort(z_values)
z_sorted = z_values[sorted_indices]
r_sorted = r_values[sorted_indices]
# Plot the probability plot with markers only (no lines)
if scatter:
plt.scatter(
z_sorted, r_sorted, label=f"Probability Plot ({basin_name})", color="b"
)
else:
plt.plot(
z_sorted, r_sorted, label=f"Probability Plot ({basin_name})", color="b"
)
plt.plot([0, 1], [0, 1], "k--", label="1:1 Line")
# Add labels and grid
plt.xlabel("Predicted CDF Quantiles (z_i)")
plt.ylabel("Observed ECDF (R_i/n)")
plt.title(f"Probability Plot - {basin_name}")
plt.legend()
plt.grid(True)
plt.show()
process_and_aggregate_basins(basins_data, num_bins=0)
¶
Process multiple basins: aggregate z-values and r-values across basins and time steps.
Parameters¶
!!! basins_data "list of dict" List of dictionaries, each containing 'predictions', 'obs_values', and 'name' for a basin. !!! num_bins "int" Number of bins to divide the data into (default is 0, which does not bin the data)
Returns¶
!!! all_z_values "np.ndarray" Aggregated z-values (quantiles) across all basins and time steps !!! all_r_values "np.ndarray" Aggregated r-values (ECDF) across all basins and time steps
Source code in torchhydro/explainers/uncertainty_analysis.py
def process_and_aggregate_basins(basins_data, num_bins=0):
"""
Process multiple basins: aggregate z-values and r-values across basins and time steps.
Parameters
----------
basins_data: list of dict
List of dictionaries, each containing 'predictions', 'obs_values', and 'name' for a basin.
num_bins: int
Number of bins to divide the data into (default is 0, which does not bin the data)
Returns
-------
all_z_values: np.ndarray
Aggregated z-values (quantiles) across all basins and time steps
all_r_values: np.ndarray
Aggregated r-values (ECDF) across all basins and time steps
"""
all_z_values = []
all_r_values = []
for basin_data in basins_data:
predictions = basin_data["predictions"]
obs_values = basin_data["obs_values"]
# Calculate z-values and r-values for the current basin
z_values = calculate_empirical_cdf(predictions, obs_values)
r_values = calculate_observed_ecdf(obs_values)
# Append the results to the global list
all_z_values.extend(z_values)
all_r_values.extend(r_values)
# Convert lists to numpy arrays
all_z_values = np.array(all_z_values)
all_r_values = np.array(all_r_values)
# Optionally bin the data
if num_bins > 0:
all_z_values, all_r_values = bin_aggregated_data(
all_z_values, all_r_values, num_bins
)
return all_z_values, all_r_values
process_basin(predictions, obs_values, basin_name='Basin')
¶
Process a single basin: calculate z-values, ECDF, and plot the probability plot.
Parameters¶
!!! predictions "np.ndarray" 2D array of predictions for the basin (shape = (MC_dropout_times, time_steps)) !!! obs_values "np.ndarray" Time series of observed values for the basin (1D array, shape = (time_steps,)) !!! basin_name "str" Name of the basin (for title of the plot)
Source code in torchhydro/explainers/uncertainty_analysis.py
def process_basin(predictions, obs_values, basin_name="Basin"):
"""
Process a single basin: calculate z-values, ECDF, and plot the probability plot.
Parameters
----------
predictions: np.ndarray
2D array of predictions for the basin (shape = (MC_dropout_times, time_steps))
obs_values: np.ndarray
Time series of observed values for the basin (1D array, shape = (time_steps,))
basin_name: str
Name of the basin (for title of the plot)
"""
# Calculate z-values (quantiles of the observed values in prediction distribution)
z_values = calculate_empirical_cdf(predictions, obs_values)
# Calculate observed ECDF
r_values = calculate_observed_ecdf(obs_values)
# Plot the probability plot for the basin
plot_probability_plot(z_values, r_values, basin_name)
process_multiple_basins(basins_data)
¶
Process multiple basins: for each basin, process and plot its probability plot.
Parameters¶
!!! basins_data "list of dict" List of dictionaries, each containing 'predictions', 'obs_values', and 'name' for a basin.
Source code in torchhydro/explainers/uncertainty_analysis.py
def process_multiple_basins(basins_data):
"""
Process multiple basins: for each basin, process and plot its probability plot.
Parameters
----------
basins_data: list of dict
List of dictionaries, each containing 'predictions', 'obs_values', and 'name' for a basin.
"""
for basin_data in basins_data:
predictions = basin_data["predictions"]
obs_values = basin_data["obs_values"]
basin_name = basin_data["name"]
process_basin(predictions, obs_values, basin_name)
weight_anlysis
¶
get_latest_event_file(event_file_lst)
¶
Get the latest event file in the current directory.
Returns¶
str The latest event file.
Source code in torchhydro/explainers/weight_anlysis.py
def get_latest_event_file(event_file_lst):
"""Get the latest event file in the current directory.
Returns
-------
str
The latest event file.
"""
event_files = [Path(f) for f in event_file_lst]
event_file_names_lst = [event_file.stem.split(".") for event_file in event_files]
ctimes = [
int(event_file_names[event_file_names.index("tfevents") + 1])
for event_file_names in event_file_names_lst
]
return event_files[ctimes.index(max(ctimes))]
plot_layer_hist_for_basin_model_fold(model_name, chosen_layer_values, layers, save_fig_dir, cmap_str='Oranges')
¶
summary
Parameters¶
model_name : type description chosen_layer_values : type description layers : type description bsize : type description cmap_str : str, optional chose from here: https://matplotlib.org/stable/tutorials/colors/colormaps.html#sequential, by default "Oranges"
Source code in torchhydro/explainers/weight_anlysis.py
def plot_layer_hist_for_basin_model_fold(
model_name,
chosen_layer_values,
layers,
save_fig_dir,
cmap_str="Oranges",
):
"""_summary_
Parameters
----------
model_name : _type_
_description_
chosen_layer_values : _type_
_description_
layers : _type_
_description_
bsize : _type_
_description_
cmap_str : str, optional
chose from here: https://matplotlib.org/stable/tutorials/colors/colormaps.html#sequential, by default "Oranges"
"""
if not os.path.exists(save_fig_dir):
os.makedirs(save_fig_dir)
for layer in layers:
two_model_layers = chosen_layer_values[layer]
try:
df_lstm_show = two_model_layers[model_name]
except KeyError:
# if the model does not have this layer, skip
continue
lstm_x_lst = []
lstm_y_lst = []
lstm_dash_lines = []
color_str = ""
lw_lst = []
alpha_lst = []
cmap = cm.get_cmap(cmap_str)
rgb_lst = []
norm_color = colors.Normalize(vmin=0, vmax=df_lstm_show.shape[1])
for i in df_lstm_show:
lstm_x_lst.append(df_lstm_show.index.values)
lstm_y_lst.append(df_lstm_show[i].values)
lstm_dash_lines.append(True)
color_str = color_str + "r"
rgba = cmap(norm_color(i))
rgb_lst.append(rgba)
alpha_lst.append(0.5)
lw_lst.append(0.5)
# the first and last line should be solid, have dark color and wide width
rgb_lst[0] = rgba
lstm_dash_lines[-1] = False
alpha_lst[-1] = 1
alpha_lst[0] = 1
lw_lst[-1] = 1
lw_lst[0] = 1
hplt.plot_ts(
lstm_x_lst,
lstm_y_lst,
dash_lines=lstm_dash_lines,
fig_size=(8, 4),
xlabel="weight_bias_value",
ylabel="hist_num",
# c_lst=color_str,
c_lst=rgb_lst,
linewidth=lw_lst,
alpha=alpha_lst,
leg_lst=None,
)
plt.savefig(
os.path.join(
save_fig_dir,
f"{model_name}_{layer}_hist.png",
),
dpi=600,
bbox_inches="tight",
)
plot_param_hist_model(model_name, the_exp_dir, batchsize, chosen_layer_for_hist, start_epoch=0, end_epoch=100, epoch_interval=10, save_fig_dir=None)
¶
plot paramter histogram for each basin
Parameters¶
!!! model_name "str" name of a DL model the_exp_dir : str saving directory for one experiment batchsize : int batch size chosen_layer_for_hist : type description
Returns¶
type description
Source code in torchhydro/explainers/weight_anlysis.py
def plot_param_hist_model(
model_name,
the_exp_dir,
batchsize,
chosen_layer_for_hist,
start_epoch=0,
end_epoch=100,
epoch_interval=10,
save_fig_dir=None,
):
"""plot paramter histogram for each basin
Parameters
----------
model_name: str
name of a DL model
the_exp_dir : str
saving directory for one experiment
batchsize : int
batch size
chosen_layer_for_hist : _type_
_description_
Returns
-------
_type_
_description_
"""
chosen_layer_values = {layer: {} for layer in chosen_layer_for_hist}
chosen_layer_values_consine = {layer: {} for layer in chosen_layer_for_hist}
_, df_histgram = read_tb_log(the_exp_dir, batchsize)
hist_cols = df_histgram.columns.values
model_layers = read_layer_name_from_tb_hist(hist_cols)
chosen_layers = chosen_layer_in_layers(model_layers, chosen_layer_for_hist)
k = 0
for layer in chosen_layer_for_hist:
if layer not in chosen_layers[k]:
continue
df_epochs_hist = epochs_hist_for_chosen_layer(
epoch_interval, chosen_layers[k], df_histgram
)
chosen_layer_values[layer][model_name] = df_epochs_hist
end_epoch_idx = int((end_epoch / epoch_interval - 1) * epoch_interval)
chosen_layer_values_consine[layer][model_name] = 1 - cosine(
df_epochs_hist[start_epoch], df_epochs_hist[end_epoch_idx]
)
k = k + 1
if save_fig_dir is None:
save_fig_dir = the_exp_dir
plot_layer_hist_for_basin_model_fold(
model_name,
chosen_layer_values,
chosen_layer_for_hist,
save_fig_dir=save_fig_dir,
cmap_str="Reds",
)
return chosen_layer_values, chosen_layer_values_consine
read_tb_log(the_exp_dir, shown_batchsize, where_save=None)
¶
Copy a recent log file to the current directory and read the log file.
Parameters¶
the_exp_dir : str saving directory for one experiment shown_batchsize : int batch size for the experiment where_save : str, optional A directory for saving plots, by default None, which means same directory as the_exp_dir. The reason we use a new dir different from the_exp_dir is that ... (I forgot why... haha)
Returns¶
type description
Raises¶
FileNotFoundError description
Source code in torchhydro/explainers/weight_anlysis.py
def read_tb_log(the_exp_dir, shown_batchsize, where_save=None):
"""Copy a recent log file to the current directory and read the log file.
Parameters
----------
the_exp_dir : str
saving directory for one experiment
shown_batchsize : int
batch size for the experiment
where_save : str, optional
A directory for saving plots, by default None,
which means same directory as the_exp_dir.
The reason we use a new dir different from the_exp_dir is that
... (I forgot why... haha)
Returns
-------
_type_
_description_
Raises
------
FileNotFoundError
_description_
"""
log_dir = os.path.join(
the_exp_dir,
f"opt_Adadelta_lr_1.0_bsize_{str(shown_batchsize)}",
)
if not os.path.exists(log_dir):
raise FileNotFoundError(f"Log dir {log_dir} not found!")
if where_save is None:
where_save = the_exp_dir
plot_save_dir = os.path.join(
where_save,
f"opt_Adadelta_lr_1.0_bsize_{str(shown_batchsize)}",
)
else:
plot_save_dir = os.path.join(
where_save,
f"opt_Adadelta_lr_1.0_bsize_{str(shown_batchsize)}",
)
copy_latest_tblog_file(log_dir, plot_save_dir)
scalar_file = os.path.join(plot_save_dir, "scalars.csv")
if not os.path.exists(scalar_file):
reader = SummaryReader(plot_save_dir)
df_scalar = reader.scalars
df_scalar.to_csv(scalar_file, index=False)
else:
df_scalar = pd.read_csv(scalar_file)
# reader = SummaryReader(result_dir)
histgram_file = os.path.join(plot_save_dir, "histograms.pkl")
if not os.path.exists(histgram_file):
reader = SummaryReader(plot_save_dir, pivot=True)
df_histgram = reader.histograms
# https://www.statology.org/pandas-save-dataframe/
df_histgram.to_pickle(histgram_file)
else:
df_histgram = pd.read_pickle(histgram_file)
return df_scalar, df_histgram