Skip to content

Explainers API

shap

Author: Wenyu Ouyang Date: 2023-10-19 21:34:29 LastEditTime: 2025-07-12 11:25:01 LastEditors: Wenyu Ouyang Description: SHAP methods for deep learning models FilePath: /torchhydro/torchhydro/explainers/shap.py Copyright (c) 2023-2024 Wenyu Ouyang. All rights reserved.

deep_explain_model_heatmap(dl_model, test_dataset)

Generate feature heatmap for prediction at a start time Parameters


model trained model test_dataset test dataset Returns


None

Source code in torchhydro/explainers/shap.py
def deep_explain_model_heatmap(dl_model, test_dataset) -> None:
    """Generate feature heatmap for prediction at a start time
    Parameters
    ----------
    model
        trained model
    test_dataset
        test dataset
    Returns
    -------
    None
    """
    dl_model.eval()
    history = test_dataset.__getitem__(0)[0]
    # background shape (L, N, M)
    # L - batch size, N - history length, M - feature size
    # for each element in each N x M batch in L,
    # attribute to each prediction in forecast len
    deep_explainer = shap.DeepExplainer(dl_model, history)
    shap_values = shap_results(deep_explainer, history)
    figs = plot_shap_value_heatmaps(shap_values)

deep_explain_model_summary_plot(dl_model, test_dataset)

Generate feature summary plot for trained deep learning models

Parameters

model trained model test_dataset test dataset

Source code in torchhydro/explainers/shap.py
def deep_explain_model_summary_plot(dl_model, test_dataset) -> None:
    """Generate feature summary plot for trained deep learning models

    Parameters
    ----------
    model
        trained model
    test_dataset
        test dataset
    """
    dl_model.eval()
    history = test_dataset.__getitem__(0)[0].unsqueeze(0)
    dl_model = dl_model.to("cpu")
    deep_explainer = shap.DeepExplainer(dl_model, history)
    shap_values = shap_results(deep_explainer, history)
    # summary plot shows overall feature ranking
    # by average absolute shap values
    fig = plot_summary_shap_values(shap_values, test_dataset.df.columns)
    abs_mean_shap_values = shap_values.mean(axis=["preds", "batches"])
    multi_shap_values = abs_mean_shap_values.mean(axis="observations")
    # summary plot for multi-step outputs
    # multi_shap_values = shap_values.apply_along_axis(np.mean, 'batches')
    fig = plot_summary_shap_values_over_time_series(
        shap_values, test_dataset.df.columns
    )
    history_numpy = torch.tensor(
        history.cpu().numpy(), names=["batches", "observations", "features"]
    )

    shap_values = shap_results(deep_explainer, history)
    figs = plot_shap_values_from_history(shap_values, history_numpy)

jitter(values, jitter_strength=0.005)

Add some jitter to the values.

Source code in torchhydro/explainers/shap.py
def jitter(values, jitter_strength=0.005):
    """Add some jitter to the values."""
    return values + jitter_strength * np.random.randn(*values.shape)

uncertainty_analysis

bin_aggregated_data(z_values, r_values, num_bins=10)

Aggregate the z-values and r-values into bins, and calculate the average for each bin.

Parameters

!!! z_values "np.ndarray" Aggregated z-values (quantiles) across all basins and time steps !!! r_values "np.ndarray" Aggregated r-values (ECDF) across all basins and time steps !!! num_bins "int" Number of bins to divide the data into (default is 10)

Returns

!!! binned_z "np.ndarray" Average z-values for each bin !!! binned_r "np.ndarray" Average r-values for each bin

Source code in torchhydro/explainers/uncertainty_analysis.py
def bin_aggregated_data(z_values, r_values, num_bins=10):
    """
    Aggregate the z-values and r-values into bins, and calculate the average for each bin.

    Parameters
    ----------
    z_values: np.ndarray
        Aggregated z-values (quantiles) across all basins and time steps
    r_values: np.ndarray
        Aggregated r-values (ECDF) across all basins and time steps
    num_bins: int
        Number of bins to divide the data into (default is 10)

    Returns
    -------
    binned_z: np.ndarray
        Average z-values for each bin
    binned_r: np.ndarray
        Average r-values for each bin
    """
    bins = np.linspace(0, 1, num_bins + 1)  # Create bin edges from 0 to 1
    bin_indices = np.digitize(
        z_values, bins
    )  # Find out which bin each z-value belongs to

    binned_z = np.zeros(num_bins)
    binned_r = np.zeros(num_bins)

    for i in range(1, num_bins + 1):
        # Select the z and r values that fall into the current bin
        in_bin = bin_indices == i

        if np.sum(in_bin) > 0:
            # Calculate the average z and r values for this bin
            binned_z[i - 1] = np.mean(z_values[in_bin])
            binned_r[i - 1] = np.mean(r_values[in_bin])
        else:
            # If no values in this bin, assign NaN (can also handle this differently if needed)
            binned_z[i - 1] = np.nan
            binned_r[i - 1] = np.nan

    return binned_z, binned_r

calculate_empirical_cdf(predictions, obs_values)

Calculate the empirical cumulative distribution function (CDF) of model predictions for each time step and compute the quantiles z_i of the observed values.

Parameters

!!! predictions "np.ndarray" 2D array of predictions (shape = (MC_dropout_times, time_steps)) !!! obs_values "np.ndarray" Time series of observed values (1D array, shape = (time_steps,))

Returns

!!! z_values "np.ndarray" Quantiles of the observed values in the prediction distribution for each time step (1D array)

Source code in torchhydro/explainers/uncertainty_analysis.py
def calculate_empirical_cdf(predictions, obs_values):
    """
    Calculate the empirical cumulative distribution function (CDF) of model predictions for each time step
    and compute the quantiles z_i of the observed values.

    Parameters
    ----------
    predictions: np.ndarray
        2D array of predictions (shape = (MC_dropout_times, time_steps))
    obs_values: np.ndarray
        Time series of observed values (1D array, shape = (time_steps,))

    Returns
    -------
    z_values: np.ndarray
        Quantiles of the observed values in the prediction distribution for each time step (1D array)
    """
    time_steps = obs_values.shape[0]
    mc_dropout_times = predictions.shape[0]

    z_values = np.zeros(time_steps)

    for t in range(time_steps):
        # Sort the predictions for each time step
        sorted_predictions = np.sort(predictions[:, t])

        # Calculate the quantile of the observed value in the prediction distribution
        z_values[t] = np.sum(sorted_predictions <= obs_values[t]) / mc_dropout_times

    return z_values

calculate_observed_ecdf(obs_values)

Calculate the empirical cumulative distribution function (ECDF) of observed values.

Parameters

!!! obs_values "np.ndarray" Time series of observed values (1D array, shape = (time_steps,))

Returns

!!! ecdf "np.ndarray" Proportion of observed values in the cumulative distribution (1D array, same shape as obs_values)

Source code in torchhydro/explainers/uncertainty_analysis.py
def calculate_observed_ecdf(obs_values):
    """
    Calculate the empirical cumulative distribution function (ECDF) of observed values.

    Parameters
    ----------
    obs_values: np.ndarray
        Time series of observed values (1D array, shape = (time_steps,))

    Returns
    -------
    ecdf: np.ndarray
        Proportion of observed values in the cumulative distribution (1D array, same shape as obs_values)
    """
    time_steps = obs_values.shape[0]

    # Sort observed values
    sorted_obs = np.sort(obs_values)

    # Calculate ECDF: for each observed value, the proportion of values <= that value
    ecdf = np.zeros(time_steps)
    for i in range(time_steps):
        ecdf[i] = np.sum(sorted_obs <= obs_values[i]) / time_steps

    return ecdf

plot_probability_plot(z_values, r_values, basin_name='Basin', scatter=True)

Plot the probability plot for a single basin.

Parameters

!!! z_values "np.ndarray" Quantiles of the observed values for each time step (1D array) !!! r_values "np.ndarray" Empirical cumulative distribution of observed values (ECDF, 1D array) !!! basin_name "str" Name of the basin (for title of the plot)

Source code in torchhydro/explainers/uncertainty_analysis.py
def plot_probability_plot(z_values, r_values, basin_name="Basin", scatter=True):
    """
    Plot the probability plot for a single basin.

    Parameters
    ----------
    z_values: np.ndarray
        Quantiles of the observed values for each time step (1D array)
    r_values: np.ndarray
        Empirical cumulative distribution of observed values (ECDF, 1D array)
    basin_name: str
        Name of the basin (for title of the plot)
    """
    # Sort both z_values and r_values by z_values
    sorted_indices = np.argsort(z_values)
    z_sorted = z_values[sorted_indices]
    r_sorted = r_values[sorted_indices]

    # Plot the probability plot with markers only (no lines)
    if scatter:
        plt.scatter(
            z_sorted, r_sorted, label=f"Probability Plot ({basin_name})", color="b"
        )
    else:
        plt.plot(
            z_sorted, r_sorted, label=f"Probability Plot ({basin_name})", color="b"
        )
    plt.plot([0, 1], [0, 1], "k--", label="1:1 Line")

    # Add labels and grid
    plt.xlabel("Predicted CDF Quantiles (z_i)")
    plt.ylabel("Observed ECDF (R_i/n)")
    plt.title(f"Probability Plot - {basin_name}")
    plt.legend()
    plt.grid(True)
    plt.show()

process_and_aggregate_basins(basins_data, num_bins=0)

Process multiple basins: aggregate z-values and r-values across basins and time steps.

Parameters

!!! basins_data "list of dict" List of dictionaries, each containing 'predictions', 'obs_values', and 'name' for a basin. !!! num_bins "int" Number of bins to divide the data into (default is 0, which does not bin the data)

Returns

!!! all_z_values "np.ndarray" Aggregated z-values (quantiles) across all basins and time steps !!! all_r_values "np.ndarray" Aggregated r-values (ECDF) across all basins and time steps

Source code in torchhydro/explainers/uncertainty_analysis.py
def process_and_aggregate_basins(basins_data, num_bins=0):
    """
    Process multiple basins: aggregate z-values and r-values across basins and time steps.

    Parameters
    ----------
    basins_data: list of dict
        List of dictionaries, each containing 'predictions', 'obs_values', and 'name' for a basin.
    num_bins: int
        Number of bins to divide the data into (default is 0, which does not bin the data)

    Returns
    -------
    all_z_values: np.ndarray
        Aggregated z-values (quantiles) across all basins and time steps
    all_r_values: np.ndarray
        Aggregated r-values (ECDF) across all basins and time steps
    """
    all_z_values = []
    all_r_values = []

    for basin_data in basins_data:
        predictions = basin_data["predictions"]
        obs_values = basin_data["obs_values"]

        # Calculate z-values and r-values for the current basin
        z_values = calculate_empirical_cdf(predictions, obs_values)
        r_values = calculate_observed_ecdf(obs_values)

        # Append the results to the global list
        all_z_values.extend(z_values)
        all_r_values.extend(r_values)

    # Convert lists to numpy arrays
    all_z_values = np.array(all_z_values)
    all_r_values = np.array(all_r_values)

    # Optionally bin the data
    if num_bins > 0:
        all_z_values, all_r_values = bin_aggregated_data(
            all_z_values, all_r_values, num_bins
        )
    return all_z_values, all_r_values

process_basin(predictions, obs_values, basin_name='Basin')

Process a single basin: calculate z-values, ECDF, and plot the probability plot.

Parameters

!!! predictions "np.ndarray" 2D array of predictions for the basin (shape = (MC_dropout_times, time_steps)) !!! obs_values "np.ndarray" Time series of observed values for the basin (1D array, shape = (time_steps,)) !!! basin_name "str" Name of the basin (for title of the plot)

Source code in torchhydro/explainers/uncertainty_analysis.py
def process_basin(predictions, obs_values, basin_name="Basin"):
    """
    Process a single basin: calculate z-values, ECDF, and plot the probability plot.

    Parameters
    ----------
    predictions: np.ndarray
        2D array of predictions for the basin (shape = (MC_dropout_times, time_steps))
    obs_values: np.ndarray
        Time series of observed values for the basin (1D array, shape = (time_steps,))
    basin_name: str
        Name of the basin (for title of the plot)
    """
    # Calculate z-values (quantiles of the observed values in prediction distribution)
    z_values = calculate_empirical_cdf(predictions, obs_values)

    # Calculate observed ECDF
    r_values = calculate_observed_ecdf(obs_values)

    # Plot the probability plot for the basin
    plot_probability_plot(z_values, r_values, basin_name)

process_multiple_basins(basins_data)

Process multiple basins: for each basin, process and plot its probability plot.

Parameters

!!! basins_data "list of dict" List of dictionaries, each containing 'predictions', 'obs_values', and 'name' for a basin.

Source code in torchhydro/explainers/uncertainty_analysis.py
def process_multiple_basins(basins_data):
    """
    Process multiple basins: for each basin, process and plot its probability plot.

    Parameters
    ----------
    basins_data: list of dict
        List of dictionaries, each containing 'predictions', 'obs_values', and 'name' for a basin.
    """
    for basin_data in basins_data:
        predictions = basin_data["predictions"]
        obs_values = basin_data["obs_values"]
        basin_name = basin_data["name"]
        process_basin(predictions, obs_values, basin_name)

weight_anlysis

get_latest_event_file(event_file_lst)

Get the latest event file in the current directory.

Returns

str The latest event file.

Source code in torchhydro/explainers/weight_anlysis.py
def get_latest_event_file(event_file_lst):
    """Get the latest event file in the current directory.

    Returns
    -------
    str
        The latest event file.
    """
    event_files = [Path(f) for f in event_file_lst]
    event_file_names_lst = [event_file.stem.split(".") for event_file in event_files]
    ctimes = [
        int(event_file_names[event_file_names.index("tfevents") + 1])
        for event_file_names in event_file_names_lst
    ]
    return event_files[ctimes.index(max(ctimes))]

plot_layer_hist_for_basin_model_fold(model_name, chosen_layer_values, layers, save_fig_dir, cmap_str='Oranges')

summary

Parameters

model_name : type description chosen_layer_values : type description layers : type description bsize : type description cmap_str : str, optional chose from here: https://matplotlib.org/stable/tutorials/colors/colormaps.html#sequential, by default "Oranges"

Source code in torchhydro/explainers/weight_anlysis.py
def plot_layer_hist_for_basin_model_fold(
    model_name,
    chosen_layer_values,
    layers,
    save_fig_dir,
    cmap_str="Oranges",
):
    """_summary_

    Parameters
    ----------
    model_name : _type_
        _description_
    chosen_layer_values : _type_
        _description_
    layers : _type_
        _description_
    bsize : _type_
        _description_
    cmap_str : str, optional
        chose from here: https://matplotlib.org/stable/tutorials/colors/colormaps.html#sequential, by default "Oranges"
    """
    if not os.path.exists(save_fig_dir):
        os.makedirs(save_fig_dir)
    for layer in layers:
        two_model_layers = chosen_layer_values[layer]
        try:
            df_lstm_show = two_model_layers[model_name]
        except KeyError:
            # if the model does not have this layer, skip
            continue
        lstm_x_lst = []
        lstm_y_lst = []
        lstm_dash_lines = []
        color_str = ""
        lw_lst = []
        alpha_lst = []
        cmap = cm.get_cmap(cmap_str)
        rgb_lst = []
        norm_color = colors.Normalize(vmin=0, vmax=df_lstm_show.shape[1])
        for i in df_lstm_show:
            lstm_x_lst.append(df_lstm_show.index.values)
            lstm_y_lst.append(df_lstm_show[i].values)
            lstm_dash_lines.append(True)
            color_str = color_str + "r"
            rgba = cmap(norm_color(i))
            rgb_lst.append(rgba)
            alpha_lst.append(0.5)
            lw_lst.append(0.5)
        # the first and last line should be solid, have dark color and wide width
        rgb_lst[0] = rgba
        lstm_dash_lines[-1] = False
        alpha_lst[-1] = 1
        alpha_lst[0] = 1
        lw_lst[-1] = 1
        lw_lst[0] = 1
        hplt.plot_ts(
            lstm_x_lst,
            lstm_y_lst,
            dash_lines=lstm_dash_lines,
            fig_size=(8, 4),
            xlabel="weight_bias_value",
            ylabel="hist_num",
            # c_lst=color_str,
            c_lst=rgb_lst,
            linewidth=lw_lst,
            alpha=alpha_lst,
            leg_lst=None,
        )
        plt.savefig(
            os.path.join(
                save_fig_dir,
                f"{model_name}_{layer}_hist.png",
            ),
            dpi=600,
            bbox_inches="tight",
        )

plot_param_hist_model(model_name, the_exp_dir, batchsize, chosen_layer_for_hist, start_epoch=0, end_epoch=100, epoch_interval=10, save_fig_dir=None)

plot paramter histogram for each basin

Parameters

!!! model_name "str" name of a DL model the_exp_dir : str saving directory for one experiment batchsize : int batch size chosen_layer_for_hist : type description

Returns

type description

Source code in torchhydro/explainers/weight_anlysis.py
def plot_param_hist_model(
    model_name,
    the_exp_dir,
    batchsize,
    chosen_layer_for_hist,
    start_epoch=0,
    end_epoch=100,
    epoch_interval=10,
    save_fig_dir=None,
):
    """plot paramter histogram for each basin

    Parameters
    ----------
    model_name: str
        name of a DL model
    the_exp_dir : str
        saving directory for one experiment
    batchsize : int
        batch size
    chosen_layer_for_hist : _type_
        _description_

    Returns
    -------
    _type_
        _description_
    """
    chosen_layer_values = {layer: {} for layer in chosen_layer_for_hist}
    chosen_layer_values_consine = {layer: {} for layer in chosen_layer_for_hist}
    _, df_histgram = read_tb_log(the_exp_dir, batchsize)
    hist_cols = df_histgram.columns.values
    model_layers = read_layer_name_from_tb_hist(hist_cols)
    chosen_layers = chosen_layer_in_layers(model_layers, chosen_layer_for_hist)
    k = 0
    for layer in chosen_layer_for_hist:
        if layer not in chosen_layers[k]:
            continue
        df_epochs_hist = epochs_hist_for_chosen_layer(
            epoch_interval, chosen_layers[k], df_histgram
        )
        chosen_layer_values[layer][model_name] = df_epochs_hist
        end_epoch_idx = int((end_epoch / epoch_interval - 1) * epoch_interval)
        chosen_layer_values_consine[layer][model_name] = 1 - cosine(
            df_epochs_hist[start_epoch], df_epochs_hist[end_epoch_idx]
        )
        k = k + 1
    if save_fig_dir is None:
        save_fig_dir = the_exp_dir
    plot_layer_hist_for_basin_model_fold(
        model_name,
        chosen_layer_values,
        chosen_layer_for_hist,
        save_fig_dir=save_fig_dir,
        cmap_str="Reds",
    )
    return chosen_layer_values, chosen_layer_values_consine

read_tb_log(the_exp_dir, shown_batchsize, where_save=None)

Copy a recent log file to the current directory and read the log file.

Parameters

the_exp_dir : str saving directory for one experiment shown_batchsize : int batch size for the experiment where_save : str, optional A directory for saving plots, by default None, which means same directory as the_exp_dir. The reason we use a new dir different from the_exp_dir is that ... (I forgot why... haha)

Returns

type description

Raises

FileNotFoundError description

Source code in torchhydro/explainers/weight_anlysis.py
def read_tb_log(the_exp_dir, shown_batchsize, where_save=None):
    """Copy a recent log file to the current directory and read the log file.

    Parameters
    ----------
    the_exp_dir : str
        saving directory for one experiment
    shown_batchsize : int
        batch size for the experiment
    where_save : str, optional
        A directory for saving plots, by default None,
        which means same directory as the_exp_dir.
        The reason we use a new dir different from the_exp_dir is that
        ... (I forgot why... haha)

    Returns
    -------
    _type_
        _description_

    Raises
    ------
    FileNotFoundError
        _description_
    """
    log_dir = os.path.join(
        the_exp_dir,
        f"opt_Adadelta_lr_1.0_bsize_{str(shown_batchsize)}",
    )
    if not os.path.exists(log_dir):
        raise FileNotFoundError(f"Log dir {log_dir} not found!")
    if where_save is None:
        where_save = the_exp_dir
        plot_save_dir = os.path.join(
            where_save,
            f"opt_Adadelta_lr_1.0_bsize_{str(shown_batchsize)}",
        )
    else:
        plot_save_dir = os.path.join(
            where_save,
            f"opt_Adadelta_lr_1.0_bsize_{str(shown_batchsize)}",
        )
        copy_latest_tblog_file(log_dir, plot_save_dir)
    scalar_file = os.path.join(plot_save_dir, "scalars.csv")
    if not os.path.exists(scalar_file):
        reader = SummaryReader(plot_save_dir)
        df_scalar = reader.scalars
        df_scalar.to_csv(scalar_file, index=False)
    else:
        df_scalar = pd.read_csv(scalar_file)

    # reader = SummaryReader(result_dir)
    histgram_file = os.path.join(plot_save_dir, "histograms.pkl")
    if not os.path.exists(histgram_file):
        reader = SummaryReader(plot_save_dir, pivot=True)
        df_histgram = reader.histograms
        # https://www.statology.org/pandas-save-dataframe/
        df_histgram.to_pickle(histgram_file)
    else:
        df_histgram = pd.read_pickle(histgram_file)
    return df_scalar, df_histgram