Skip to content

Dahuofang (DHF) Model

The Dahuofang (DHF) model is a lumped conceptual hydrological model proposed by the Dahuofang Reservoir Administration in 1973. It consists of two main components: an 8-parameter infiltration excess runoff calculation model and an 8-parameter empirical unit hydrograph routing model with variable intensity and variable confluence velocity.

The model employs a dual-layer infiltration curve for loss calculation and uses a parabolic function to describe the upper layer water storage and dual-layer infiltration rate distribution. This is a lumped conceptual model specifically designed for flood forecasting applications.

References

The implementation is primarily based on the Chinese textbook: - "水库防洪预报调度方法及应用" (Reservoir Flood Control Forecasting and Dispatching Methods and Applications), edited by Dalian University of Technology and the Office of State Flood Control and Drought Relief Headquarters, published by China Water & Power Press.

For English references, see: - Li, X., Ye, L., Gu, X. et al. Development of A Distributed Modeling Framework Considering Spatiotemporally Varying Hydrological Processes for Sub-Daily Flood Forecasting in Semi-Humid and Semi-Arid Watersheds. Water Resour Manage 38, 3725–3754 (2024). https://doi.org/10.1007/s11269-024-03837-5

Model Overview

The main entry point for the DHF model is the dhf() function, which provides a complete implementation of the Dahuofang hydrological model. This function integrates both runoff generation and routing components in a single, vectorized interface that can process multiple basins simultaneously.

Key features of the dhf() function: - Vectorized computation: Processes all basins simultaneously using NumPy operations - Complete water balance: Handles both runoff generation and channel routing - State management: Supports warmup periods and state variable tracking
- Flexible configuration: Configurable time intervals, basin parameters, and routing options - High performance: Optimized for large-scale flood forecasting applications

Model Components

1. Runoff Generation (产流计算)

The runoff generation component uses an 8-parameter infiltration excess model that:

  • Dual-layer infiltration curve: Models water loss through a two-layer soil structure
  • Parabolic distribution: Describes upper layer water storage capacity using parabolic functions
  • Surface and subsurface separation: Distinguishes between surface runoff, interflow, and groundwater components
  • Evapotranspiration handling: Accounts for both net precipitation and evaporation deficit conditions
  • Storage dynamics: Updates soil moisture states for surface, subsurface, and deep layers

Key parameters include surface storage capacity (S0), subsurface storage capacity (U0), deep storage capacity (D0), and various coefficients controlling infiltration and flow generation.

2. Flow Routing (汇流计算)

The routing component implements an 8-parameter empirical unit hydrograph model featuring:

  • Variable intensity routing: Adapts routing parameters based on antecedent conditions and current runoff intensity
  • Variable confluence velocity: Adjusts flow velocity according to flow magnitude and basin characteristics
  • Empirical unit hydrograph: Uses trigonometric functions to describe the unit hydrograph shape
  • Surface and subsurface separation: Routes surface flow and subsurface flow with different time constants
  • Channel parameters: Incorporates main channel length, basin area, and routing coefficients

The routing process considers both immediate surface flow response and delayed subsurface contributions, providing realistic flood hydrograph simulation for Chinese watershed conditions.

API Reference

Copyright (c) 2023-2026 Wenyu Ouyang. All rights reserved.

calculate_dhf_evaporation_deficit(precipitation, edt, sa, ua, s0, u0, a)

Calculate evaporation when precipitation is insufficient.

Parameters:

Name Type Description Default
precipitation np.ndarray

Precipitation values

required
edt np.ndarray

Evapotranspiration demand

required
sa np.ndarray

Surface water storage

required
ua np.ndarray

Subsurface water storage

required
s0 np.ndarray

Surface storage capacity

required
u0 np.ndarray

Subsurface storage capacity

required
a np.ndarray

Surface storage exponent

required

Returns:

Type Description
Tuple[np.ndarray, np.ndarray]

Updated surface storage (sa_new) and subsurface storage (ua_new) after evaporation

Source code in hydromodel/models/dhf.py
@jit(nopython=True)
def calculate_dhf_evaporation_deficit(
    precipitation: np.ndarray,
    edt: np.ndarray,
    sa: np.ndarray,
    ua: np.ndarray,
    s0: np.ndarray,
    u0: np.ndarray,
    a: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """Calculate evaporation when precipitation is insufficient.

    Args:
        precipitation (np.ndarray): Precipitation values
        edt (np.ndarray): Evapotranspiration demand
        sa (np.ndarray): Surface water storage
        ua (np.ndarray): Subsurface water storage
        s0 (np.ndarray): Surface storage capacity
        u0 (np.ndarray): Subsurface storage capacity
        a (np.ndarray): Surface storage exponent

    Returns:
        Tuple[np.ndarray, np.ndarray]: Updated surface storage (sa_new) and subsurface storage (ua_new) after evaporation
    """
    ec = edt - precipitation
    eb = ec  # accumulated deficit

    # Calculate surface evaporation
    temp1 = (1 - (eb - ec) / (a * s0)) ** a
    temp2 = (1 - eb / (a * s0)) ** a

    eu = np.where(
        (eb / (a * s0) <= 0.999999) & ((eb - ec) / (a * s0) <= 0.999999),
        s0 * (temp1 - temp2),
        np.where(
            (eb / (a * s0) >= 1.00001) & ((eb - ec) / (a * s0) <= 0.999999),
            s0 * temp1,
            0.00001,
        ),
    )

    # Update storages after evaporation
    el = np.where(sa - eu < 0.0, (ec - sa) * ua / u0, (ec - eu) * ua / u0)
    sa_new = np.where(sa - eu < 0.0, 0.0, sa - eu)
    ua_new = ua - el
    ua_new = np.maximum(ua_new, 0.0)

    return sa_new, ua_new

calculate_dhf_evapotranspiration(precipitation, potential_evapotranspiration, kc)

Calculate evapotranspiration and net precipitation for DHF model.

TODO: We writes some atomic functions for DHF model, but it is not used yet.

Parameters:

Name Type Description Default
precipitation np.ndarray

Precipitation values

required
potential_evapotranspiration np.ndarray

Potential evapotranspiration values

required
kc np.ndarray

Evaporation coefficient

required

Returns:

Type Description
Tuple[np.ndarray, np.ndarray]

Net precipitation (pe) and evapotranspiration (edt)

Source code in hydromodel/models/dhf.py
@jit(nopython=True)
def calculate_dhf_evapotranspiration(
    precipitation: np.ndarray,
    potential_evapotranspiration: np.ndarray,
    kc: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """Calculate evapotranspiration and net precipitation for DHF model.

    TODO: We writes some atomic functions for DHF model, but it is not used yet.

    Args:
        precipitation (np.ndarray): Precipitation values
        potential_evapotranspiration (np.ndarray): Potential evapotranspiration values
        kc (np.ndarray): Evaporation coefficient

    Returns:
        Tuple[np.ndarray, np.ndarray]: Net precipitation (pe) and evapotranspiration (edt)
    """
    edt = kc * potential_evapotranspiration
    pe = precipitation - edt  # net precipitation
    return pe, edt

calculate_dhf_routing(runoff_sim, rl, tm, k3, k3l, aa, aal, tt, ts, dd, cc, ddl, ccl, time_steps, pai)

Calculate routing for DHF model.

Parameters:

Name Type Description Default
runoff_sim np.ndarray

Simulated runoff

required
rl np.ndarray

Subsurface flow

required
tm np.ndarray

Time parameter

required
k3 np.ndarray

Surface routing coefficient

required
k3l np.ndarray

Subsurface routing coefficient

required
aa np.ndarray

Surface routing parameter aa

required
aal np.ndarray

Subsurface routing parameter aal

required
tt np.ndarray

Time index for subsurface flow

required
ts np.ndarray

Time index for surface flow

required
dd np.ndarray

Surface routing parameter dd

required
cc np.ndarray

Surface routing parameter cc

required
ddl np.ndarray

Subsurface routing parameter ddl

required
ccl np.ndarray

Subsurface routing parameter ccl

required
time_steps int

Number of time steps

required
pai float

Pi constant

required

Returns:

Type Description
Tuple[np.ndarray, np.ndarray]

Surface flow (qs) and subsurface flow (ql)

Source code in hydromodel/models/dhf.py
@jit(nopython=True)
def calculate_dhf_routing(
    runoff_sim: np.ndarray,
    rl: np.ndarray,
    tm: np.ndarray,
    k3: np.ndarray,
    k3l: np.ndarray,
    aa: np.ndarray,
    aal: np.ndarray,
    tt: np.ndarray,
    ts: np.ndarray,
    dd: np.ndarray,
    cc: np.ndarray,
    ddl: np.ndarray,
    ccl: np.ndarray,
    time_steps: int,
    pai: float,
) -> Tuple[np.ndarray, np.ndarray]:
    """Calculate routing for DHF model.

    Args:
        runoff_sim (np.ndarray): Simulated runoff
        rl (np.ndarray): Subsurface flow
        tm (np.ndarray): Time parameter
        k3 (np.ndarray): Surface routing coefficient
        k3l (np.ndarray): Subsurface routing coefficient
        aa (np.ndarray): Surface routing parameter aa
        aal (np.ndarray): Subsurface routing parameter aal
        tt (np.ndarray): Time index for subsurface flow
        ts (np.ndarray): Time index for surface flow
        dd (np.ndarray): Surface routing parameter dd
        cc (np.ndarray): Surface routing parameter cc
        ddl (np.ndarray): Subsurface routing parameter ddl
        ccl (np.ndarray): Subsurface routing parameter ccl
        time_steps (int): Number of time steps
        pai (float): Pi constant

    Returns:
        Tuple[np.ndarray, np.ndarray]: Surface flow (qs) and subsurface flow (ql)
    """
    qs = np.zeros_like(runoff_sim)
    ql = np.zeros_like(runoff_sim)

    for i in range(time_steps):
        tl = tt[i] + ts[i] - 1
        tl = max(tl, 0)

        for j in range(int(tl) + 1):
            if i + j >= time_steps:
                break

            # Surface routing
            if tm[i] > 0:
                temp0 = pai * j / tm[i]
                temp1 = temp0 ** dd[i]
                temp2 = np.exp(-aa[i] * temp1)
                temp3 = (np.sin(temp0)) ** cc[i]
                qs_contrib = (
                    (runoff_sim[i] - rl[i]) * k3[i] / tm[i] * temp2 * temp3
                )
            else:
                qs_contrib = 0.0

            # Subsurface routing
            if tt[i] > 0 and j >= ts[i]:
                temp00 = pai * (j - ts[i]) / tt[i]
                temp10 = temp00 ** ddl[i]
                temp20 = np.exp(-aal[i] * temp10)
                temp30 = (np.sin(temp00)) ** ccl[i]
                ql_contrib = rl[i] * k3l[i] / tt[i] * temp20 * temp30
            else:
                ql_contrib = 0.0

            # Add contributions based on timing conditions
            if j <= tm[i]:
                if j <= ts[i]:
                    qs[i + j] += qs_contrib
                else:
                    qs[i + j] += qs_contrib
                    ql[i + j] += ql_contrib
            else:
                ql[i + j] += ql_contrib

    return qs, ql

calculate_dhf_routing_params(ya, runoff_sim, l, b0, k0, n, coe, dd, cc, ddl, ccl, time_interval, pai)

Calculate routing parameters for DHF model.

Parameters:

Name Type Description Default
ya np.ndarray

Precedent rain parameter

required
runoff_sim np.ndarray

Simulated runoff

required
l np.ndarray

Main channel length

required
b0 np.ndarray

Routing parameter b0

required
k0 np.ndarray

Routing parameter k0

required
n np.ndarray

Routing parameter n

required
coe np.ndarray

Routing coefficient

required
dd np.ndarray

Surface routing parameter dd

required
cc np.ndarray

Surface routing parameter cc

required
ddl np.ndarray

Subsurface routing parameter ddl

required
ccl np.ndarray

Subsurface routing parameter ccl

required
time_interval float

Time interval in hours

required
pai float

Pi constant

required

Returns:

Type Description
Tuple[np.ndarray, ...]

Routing parameters including tm, k3, k3l, aa, aal, tt, ts

Source code in hydromodel/models/dhf.py
@jit(nopython=True)
def calculate_dhf_routing_params(
    ya: np.ndarray,
    runoff_sim: np.ndarray,
    l: np.ndarray,
    b0: np.ndarray,
    k0: np.ndarray,
    n: np.ndarray,
    coe: np.ndarray,
    dd: np.ndarray,
    cc: np.ndarray,
    ddl: np.ndarray,
    ccl: np.ndarray,
    time_interval: float,
    pai: float,
):
    """Calculate routing parameters for DHF model.

    Args:
        ya (np.ndarray): Precedent rain parameter
        runoff_sim (np.ndarray): Simulated runoff
        l (np.ndarray): Main channel length
        b0 (np.ndarray): Routing parameter b0
        k0 (np.ndarray): Routing parameter k0
        n (np.ndarray): Routing parameter n
        coe (np.ndarray): Routing coefficient
        dd (np.ndarray): Surface routing parameter dd
        cc (np.ndarray): Surface routing parameter cc
        ddl (np.ndarray): Subsurface routing parameter ddl
        ccl (np.ndarray): Subsurface routing parameter ccl
        time_interval (float): Time interval in hours
        pai (float): Pi constant

    Returns:
        Tuple[np.ndarray, ...]: Routing parameters including tm, k3, k3l, aa, aal, tt, ts
    """
    # Ensure ya >= 0.5 for stability
    ya = np.maximum(ya, 0.5)

    # Calculate timing parameters
    temp_tm = (ya + runoff_sim) ** (-k0)
    lb = l / b0
    tm = lb * temp_tm

    tt = (n * tm).astype(np.int32)
    ts = (coe * tm).astype(np.int32)

    # Surface flow routing coefficient
    w0 = 1.0 / time_interval

    # Calculate surface routing coefficient K3
    k3 = np.zeros_like(tm)
    aa = np.zeros_like(tm)

    mask = tm > 0
    if np.any(mask):
        temp_aa = (pai * coe[mask]) ** (dd[mask] - 1)
        aa[mask] = cc[mask] / (dd[mask] * temp_aa * np.tan(pai * coe[mask]))

        for j in range(int(np.max(tm[mask])) + 1):
            j_mask = mask & (j < tm)
            if np.any(j_mask):
                temp = (pai * j / tm[j_mask]) ** dd[j_mask]
                temp1 = (np.sin(pai * j / tm[j_mask])) ** cc[j_mask]
                k3[j_mask] += np.exp(-aa[j_mask] * temp) * temp1

        k3[mask] = tm[mask] * w0 / k3[mask]

    # Calculate subsurface routing coefficient K3L
    k3l = np.zeros_like(tm)
    aal = np.zeros_like(tm)

    tt_mask = tt > 0
    if np.any(tt_mask):
        temp_aal = (pai * coe[tt_mask] / n[tt_mask]) ** (ddl[tt_mask] - 1)
        aal[tt_mask] = ccl[tt_mask] / (
            ddl[tt_mask] * temp_aal * np.tan(pai * coe[tt_mask] / n[tt_mask])
        )

        for j in range(int(np.max(tt[tt_mask])) + 1):
            j_mask = tt_mask & (j < tt)
            if np.any(j_mask):
                temp = (pai * j / tt[j_mask]) ** ddl[j_mask]
                temp1 = (np.sin(pai * j / tt[j_mask])) ** ccl[j_mask]
                k3l[j_mask] += np.exp(-aal[j_mask] * temp) * temp1

        k3l[tt_mask] = tt[tt_mask] * w0 / k3l[tt_mask]

    return tm, k3, k3l, aa, aal, tt, ts

calculate_dhf_storage_update(sa, ua, pc, rr, y, s0, u0, a)

Update water storage states.

Parameters:

Name Type Description Default
sa np.ndarray

Surface water storage

required
ua np.ndarray

Subsurface water storage

required
pc np.ndarray

Net infiltration

required
rr np.ndarray

Surface runoff

required
y np.ndarray

Total flow

required
s0 np.ndarray

Surface storage capacity

required
u0 np.ndarray

Subsurface storage capacity

required
a np.ndarray

Surface storage exponent

required

Returns:

Type Description
Tuple[np.ndarray, np.ndarray]

Updated surface storage (sa_new) and subsurface storage (ua_new)

Source code in hydromodel/models/dhf.py
@jit(nopython=True)
def calculate_dhf_storage_update(
    sa: np.ndarray,
    ua: np.ndarray,
    pc: np.ndarray,
    rr: np.ndarray,
    y: np.ndarray,
    s0: np.ndarray,
    u0: np.ndarray,
    a: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """Update water storage states.

    Args:
        sa (np.ndarray): Surface water storage
        ua (np.ndarray): Subsurface water storage
        pc (np.ndarray): Net infiltration
        rr (np.ndarray): Surface runoff
        y (np.ndarray): Total flow
        s0 (np.ndarray): Surface storage capacity
        u0 (np.ndarray): Subsurface storage capacity
        a (np.ndarray): Surface storage exponent

    Returns:
        Tuple[np.ndarray, np.ndarray]: Updated surface storage (sa_new) and subsurface storage (ua_new)
    """
    # Calculate surface water storage parameters
    temp = np.where(sa > 0, (1 - sa / s0) ** (1 / a), 0.0)
    sm = a * s0 * (1 - temp)

    # Update surface storage
    sa_new = np.where(
        pc > 0.0,
        np.where(
            sm + pc < a * s0,
            s0 * (1 - (1 - (sm + pc) / (a * s0)) ** a),
            sa + pc - rr,
        ),
        sa,
    )
    sa_new = np.clip(sa_new, 0.0, s0)

    # Update subsurface storage
    ua_new = ua + rr - y
    ua_new = np.clip(ua_new, 0.0, u0)

    return sa_new, ua_new

calculate_dhf_subsurface_flow(rr, ua, u0, d0, b, k2, kw, time_interval)

Calculate subsurface flow components.

Parameters:

Name Type Description Default
rr np.ndarray

Surface runoff

required
ua np.ndarray

Subsurface water storage

required
u0 np.ndarray

Subsurface storage capacity

required
d0 np.ndarray

Deep storage capacity

required
b np.ndarray

Subsurface storage exponent

required
k2 np.ndarray

Percolation coefficient

required
kw np.ndarray

Subsurface flow coefficient

required
time_interval float

Time interval in hours

required

Returns:

Type Description
Tuple[np.ndarray, np.ndarray, np.ndarray]

Total flow (y), interflow (yu), and groundwater runoff (yl)

Source code in hydromodel/models/dhf.py
@jit(nopython=True)
def calculate_dhf_subsurface_flow(
    rr: np.ndarray,
    ua: np.ndarray,
    u0: np.ndarray,
    d0: np.ndarray,
    b: np.ndarray,
    k2: np.ndarray,
    kw: np.ndarray,
    time_interval: float,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Calculate subsurface flow components.

    Args:
        rr (np.ndarray): Surface runoff
        ua (np.ndarray): Subsurface water storage
        u0 (np.ndarray): Subsurface storage capacity
        d0 (np.ndarray): Deep storage capacity
        b (np.ndarray): Subsurface storage exponent
        k2 (np.ndarray): Percolation coefficient
        kw (np.ndarray): Subsurface flow coefficient
        time_interval (float): Time interval in hours

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: Total flow (y), interflow (yu), and groundwater runoff (yl)
    """
    # Calculate subsurface flow parameters
    temp = np.where(ua > 0, (1 - ua / u0) ** (1 / b), 0.0)
    un = b * u0 * (1 - temp)
    temp = np.where(ua > 0, (1 - ua / u0) ** (u0 / (b * d0)), 0.0)
    dn = b * d0 * (1 - temp)

    z1 = 1 - np.exp(-k2 * time_interval * u0 / d0)
    z2 = 1 - np.exp(-k2 * time_interval)

    # Calculate total flow
    y = np.where(
        rr + z2 * un < z2 * b * u0,
        rr
        + z2 * (ua - u0)
        + z2 * u0 * (1 - (z2 * un + rr) / (z2 * b * u0)) ** b,
        rr + z2 * (ua - u0),
    )

    # Calculate interflow
    temp = np.where(ua > 0, (1 - ua / u0) ** (u0 / d0), 0.0)
    yu = np.where(
        z1 * dn + rr < z1 * b * d0,
        rr
        - z1 * d0 * temp
        + z1 * d0 * (1 - (z1 * dn + rr) / (z1 * b * d0)) ** b,
        rr - z1 * d0 * temp,
    )

    # Calculate groundwater runoff
    yl = (y - yu) * kw

    return y, yu, yl

calculate_dhf_surface_runoff(pe, sa, s0, a, g)

Calculate surface runoff and impervious area runoff.

Parameters:

Name Type Description Default
pe np.ndarray

Net precipitation

required
sa np.ndarray

Surface water storage

required
s0 np.ndarray

Surface storage capacity

required
a np.ndarray

Surface storage exponent

required
g np.ndarray

Impervious area ratio

required

Returns:

Type Description
Tuple[np.ndarray, np.ndarray, np.ndarray]

Impervious area runoff (y0), net infiltration (pc), and surface runoff (rr)

Source code in hydromodel/models/dhf.py
@jit(nopython=True)
def calculate_dhf_surface_runoff(
    pe: np.ndarray,
    sa: np.ndarray,
    s0: np.ndarray,
    a: np.ndarray,
    g: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """Calculate surface runoff and impervious area runoff.

    Args:
        pe (np.ndarray): Net precipitation
        sa (np.ndarray): Surface water storage
        s0 (np.ndarray): Surface storage capacity
        a (np.ndarray): Surface storage exponent
        g (np.ndarray): Impervious area ratio

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: Impervious area runoff (y0), net infiltration (pc), and surface runoff (rr)
    """
    y0 = g * pe  # impervious area runoff
    pc = pe - y0  # net infiltration

    # Calculate surface water storage contribution (vectorized)
    temp = np.where(sa > 0, (1 - sa / s0) ** (1 / a), 0.0)
    sm = a * s0 * (1 - temp)

    # Calculate runoff from surface storage
    rr = np.where(
        pc > 0.0,
        np.where(
            sm + pc < a * s0,
            pc + sa - s0 + s0 * (1 - (sm + pc) / (a * s0)) ** a,
            pc - (s0 - sa),
        ),
        0.0,
    )

    return y0, pc, rr

dhf(p_and_e, parameters, warmup_length=365, return_state=False, return_warmup_states=False, normalized_params='auto', **kwargs)

Vectorized DHF (Dahuofang) hydrological model - fully parallelized version.

This function implements the DHF model with full NumPy vectorization, processing all basins simultaneously using [seq, basin, feature] tensor operations.

Parameters:

Name Type Description Default
p_and_e np.ndarray

Precipitation and potential evapotranspiration, 3-dim: [time, basin, feature=2] where feature=0 is precipitation, feature=1 is potential evapotranspiration

required
parameters np.ndarray

Model parameters, 2-dim: [basin, parameter] Parameters: [S0, U0, D0, KC, KW, K2, KA, G, A, B, B0, K0, N, DD, CC, COE, DDL, CCL]

required
warmup_length int

The length of warmup period (default: 365)

365
return_state bool

If True, return internal state variables (default: False)

False
return_warmup_states bool

If True, return initial states after warmup period (default: False) Returns a dict with keys: "sa0", "ua0", "ya0" containing initial states

False
normalized_params Union[bool, str]

Parameter format specification: - "auto": automatically detect parameter format (default) - True: parameters are normalized (0-1 range), convert to original scale - False: parameters are already in original scale, use as-is

'auto'
**kwargs

Additional keyword arguments, including: - time_interval_hours (default: 3.0) - main_river_length (default: None) means length of the main channel (km), for example, dahuofang's is 155.763 - basin_area (default: None) means basin area (km^2), for example, dahuofang's is 5482.0 - initial_states (default: None) dict to override specific initial state values after warmup, e.g., {"sa0": 10, "ya0": 15} will set sa0=10 and ya0=15 for all basins after warmup

{}

Returns:

Type Description
Union[np.ndarray, Tuple]

Depends on return_state and return_warmup_states parameters:

  • return_state=False, return_warmup_states=False: QSim array [time, basin, 1]

  • return_state=False, return_warmup_states=True: (QSim, warmup_states_dict) where warmup_states_dict contains

  • return_state=True, return_warmup_states=False: (QSim, runoffSim, y0, yu, yl, y, sa, ua, ya)

  • return_state=True, return_warmup_states=True: (QSim, runoffSim, y0, yu, yl, y, sa, ua, ya, warmup_states_dict)

Source code in hydromodel/models/dhf.py
def dhf(
    p_and_e: np.ndarray,
    parameters: np.ndarray,
    warmup_length: int = 365,
    return_state: bool = False,
    return_warmup_states: bool = False,
    normalized_params: Union[bool, str] = "auto",
    **kwargs,
) -> Union[
    np.ndarray,  # return_state=False, return_warmup_states=False
    Tuple[
        np.ndarray, Dict[str, np.ndarray]
    ],  # return_state=False, return_warmup_states=True
    Tuple[
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
    ],  # return_state=True, return_warmup_states=False
    Tuple[
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        np.ndarray,
        Dict[str, np.ndarray],
    ],  # return_state=True, return_warmup_states=True
]:
    """Vectorized DHF (Dahuofang) hydrological model - fully parallelized version.

    This function implements the DHF model with full NumPy vectorization,
    processing all basins simultaneously using [seq, basin, feature] tensor operations.

    Args:
        p_and_e (np.ndarray): Precipitation and potential evapotranspiration, 3-dim: [time, basin, feature=2]
            where feature=0 is precipitation, feature=1 is potential evapotranspiration
        parameters (np.ndarray): Model parameters, 2-dim: [basin, parameter]
            Parameters: [S0, U0, D0, KC, KW, K2, KA, G, A, B, B0, K0, N, DD, CC, COE, DDL, CCL]
        warmup_length (int, optional): The length of warmup period (default: 365)
        return_state (bool, optional): If True, return internal state variables (default: False)
        return_warmup_states (bool, optional): If True, return initial states after warmup period (default: False)
            Returns a dict with keys: "sa0", "ua0", "ya0" containing initial states
        normalized_params (Union[bool, str], optional): Parameter format specification:
            - "auto": automatically detect parameter format (default)
            - True: parameters are normalized (0-1 range), convert to original scale
            - False: parameters are already in original scale, use as-is
        **kwargs: Additional keyword arguments, including:
            - time_interval_hours (default: 3.0)
            - main_river_length (default: None) means length of the main channel (km), for example, dahuofang's is 155.763
            - basin_area (default: None) means basin area (km^2), for example, dahuofang's is 5482.0
            - initial_states (default: None) dict to override specific initial state values after warmup,
              e.g., {"sa0": 10, "ya0": 15} will set sa0=10 and ya0=15 for all basins after warmup

    Returns:
        Union[np.ndarray, Tuple]: Depends on return_state and return_warmup_states parameters:

        - return_state=False, return_warmup_states=False:
            QSim array [time, basin, 1]

        - return_state=False, return_warmup_states=True:
            (QSim, warmup_states_dict) where warmup_states_dict contains
            {"sa0": [basin], "ua0": [basin], "ya0": [basin]}

        - return_state=True, return_warmup_states=False:
            (QSim, runoffSim, y0, yu, yl, y, sa, ua, ya)

        - return_state=True, return_warmup_states=True:
            (QSim, runoffSim, y0, yu, yl, y, sa, ua, ya, warmup_states_dict)
    """

    # Get data dimensions
    time_steps, num_basins, _ = p_and_e.shape
    time_interval = kwargs.get("time_interval_hours", 3.0)
    pai = np.pi
    l = kwargs.get("main_river_length", None)  # km
    f = kwargs.get("basin_area", None)  # km^2

    if l is None or f is None:
        raise ValueError(
            "main_river_length (l) and basin_area (f) must be provided for DHF model"
        )

    # Process parameters using unified parameter handling
    processed_parameters = parameters.copy()
    if normalized_params != False:
        model_param_dict = MODEL_PARAM_DICT.get("dhf")
        if model_param_dict is not None:
            param_ranges = model_param_dict["param_range"]
            processed_parameters = process_parameters(
                parameters, param_ranges, normalized=normalized_params
            )

    # Extract parameters - all are [basin] arrays
    s0 = processed_parameters[:, 0]  # Surface storage capacity
    u0 = processed_parameters[:, 1]  # Subsurface storage capacity
    d0 = processed_parameters[:, 2]  # Deep storage capacity
    kc = processed_parameters[:, 3]  # Evaporation coefficient
    kw = processed_parameters[:, 4]  # Subsurface flow coefficient
    k2 = processed_parameters[:, 5]  # Percolation coefficient
    ka = processed_parameters[:, 6]  # Total runoff adjustment coefficient
    g = processed_parameters[:, 7]  # Impervious area ratio
    a = processed_parameters[:, 8]  # Surface storage exponent
    b = processed_parameters[:, 9]  # Subsurface storage exponent
    b0 = processed_parameters[:, 10]  # Routing parameter
    k0 = processed_parameters[:, 11]  # Routing parameter
    n = processed_parameters[:, 12]  # Routing parameter
    dd = processed_parameters[:, 13]  # Surface routing parameter
    cc = processed_parameters[:, 14]  # Surface routing parameter
    coe = processed_parameters[:, 15]  # Routing parameter
    ddl = processed_parameters[:, 16]  # Subsurface routing parameter
    ccl = processed_parameters[:, 17]  # Subsurface routing parameter

    # Handle warmup period
    if warmup_length > 0:
        p_and_e_warmup = p_and_e[0:warmup_length, :, :]
        # Remove initial_states from kwargs for warmup period to avoid applying override during warmup
        warmup_kwargs = {
            k: v for k, v in kwargs.items() if k != "initial_states"
        }
        *_, sa, ua, ya = dhf(
            p_and_e_warmup,
            parameters,
            warmup_length=0,
            return_state=True,
            normalized_params=False,  # Already processed
            **warmup_kwargs,
        )
        sa0 = sa[-1, :, 0].copy()
        ua0 = ua[-1, :, 0].copy()
        ya0 = ya[-1, :, 0].copy()
    else:
        # Default initial states
        sa0 = np.zeros(s0.shape)
        ua0 = np.zeros(u0.shape)
        # just use d0's shape, ya0 is not d0, it is Pa, while d0 is the deep storage capacity
        ya0 = np.full(d0.shape, 0.5)

    # Apply initial state overrides if provided (only after warmup in main call)
    initial_states = kwargs.get("initial_states", None)
    if initial_states is not None:
        # Only apply initial_states when we just finished a warmup period
        if "sa0" in initial_states:
            sa0.fill(initial_states["sa0"])
        if "ua0" in initial_states:
            ua0.fill(initial_states["ua0"])
        if "ya0" in initial_states:
            ya0.fill(initial_states["ya0"])

    # Save warmup states before applying overrides (for return_warmup_states)
    # NOTE: this part must be set after the initial_states override, otherwise override initial states will be ignored
    warmup_states = None
    if return_warmup_states:
        warmup_states = {
            "sa0": sa0.copy(),  # [basin] array
            "ua0": ua0.copy(),  # [basin] array
            "ya0": ya0.copy(),  # [basin] array
        }

    inputs = p_and_e[warmup_length:, :, :]
    # Get actual time steps after warmup
    actual_time_steps = inputs.shape[0]

    # Initialize state and output arrays - [time, basin]
    sa = np.zeros((actual_time_steps, num_basins))
    ua = np.zeros((actual_time_steps, num_basins))
    ya = np.zeros((actual_time_steps, num_basins))
    # to store the accumulated deficit
    ebs = np.zeros((actual_time_steps, num_basins))

    # Initialize output arrays
    runoff_sim = np.zeros((actual_time_steps, num_basins))
    q_sim = np.zeros((actual_time_steps, num_basins))
    y0_out = np.zeros((actual_time_steps, num_basins))
    yu_out = np.zeros((actual_time_steps, num_basins))
    yl_out = np.zeros((actual_time_steps, num_basins))
    y_out = np.zeros((actual_time_steps, num_basins))

    # Main time loop - DHF generation (runoff production)
    for i in range(actual_time_steps):
        # Current precipitation and PET for all basins
        prcp = inputs[i, :, 0]
        pet = inputs[i, :, 1]
        if i == 0:
            eb = np.zeros(kc.shape)
        else:
            sa0 = sa[i - 1, :]
            ua0 = ua[i - 1, :]
            # NOTE: Because Chu version init eb as 0 at every time step, we keep the same now
            # if we want to make it same as the book, we need to value ebs after a time step's calculation
            eb = ebs[i - 1, :]

        # Limit current states
        sa0 = np.minimum(sa0, s0)
        ua0 = np.minimum(ua0, u0)

        # Calculate evapotranspiration and net precipitation (vectorized)
        edt = kc * pet
        pe = prcp - edt
        # Surface runoff calculation (vectorized)
        y0 = g * pe  # impervious area runoff
        pc = pe - y0  # net infiltration
        # Process based on whether we have net precipitation or evaporation
        # Actually, we should use pe > 0.0, but we use pc > 0.0 to make it same as Chu's version
        # as g<1, hence pe>0 means pc>0, so it is fine.
        net_precip_mask = pc > 0.0

        # For basins with net precipitation (pe > 0) - vectorized operations
        if np.any(net_precip_mask):
            # Apply mask to get relevant basin data
            sa_pos = sa0[net_precip_mask]
            ua_pos = ua0[net_precip_mask]
            s0_pos = s0[net_precip_mask]
            u0_pos = u0[net_precip_mask]
            d0_pos = d0[net_precip_mask]
            a_pos = a[net_precip_mask]
            b_pos = b[net_precip_mask]
            k2_pos = k2[net_precip_mask]
            kw_pos = kw[net_precip_mask]

            # Surface water storage calculation
            temp = (1 - sa_pos / s0_pos) ** (1 / a_pos)
            sm = a_pos * s0_pos * (1 - temp)

            # Calculate surface runoff
            rr = np.where(
                pc > 0.0,
                np.where(
                    sm + pc < a_pos * s0_pos,
                    pc
                    + sa_pos
                    - s0_pos
                    + s0_pos * (1 - (sm + pc) / (a_pos * s0_pos)) ** a_pos,
                    pc - (s0_pos - sa_pos),
                ),
                0.0,
            )

            # Subsurface flow calculation (vectorized)
            temp = (1 - ua_pos / u0_pos) ** (1 / b_pos)
            un = b_pos * u0_pos * (1 - temp)
            temp = (1 - ua_pos / u0_pos) ** (u0_pos / (b_pos * d0_pos))
            dn = b_pos * d0_pos * (1 - temp)

            z1 = 1 - np.exp(-k2_pos * time_interval * u0_pos / d0_pos)
            z2 = 1 - np.exp(-k2_pos * time_interval)

            # Calculate total flow
            y = np.where(
                rr + z2 * un < z2 * b_pos * u0_pos,
                rr
                + z2 * (ua_pos - u0_pos)
                + z2
                * u0_pos
                * (1 - (z2 * un + rr) / (z2 * b_pos * u0_pos)) ** b_pos,
                rr + z2 * (ua_pos - u0_pos),
            )

            # Calculate interflow
            temp = (1 - ua_pos / u0_pos) ** (u0_pos / d0_pos)
            yu = np.where(
                z1 * dn + rr < z1 * b_pos * d0_pos,
                rr
                - z1 * d0_pos * temp
                + z1
                * d0_pos
                * (1 - (z1 * dn + rr) / (z1 * b_pos * d0_pos)) ** b_pos,
                rr - z1 * d0_pos * temp,
            )

            # Calculate groundwater runoff
            yl = (y - yu) * kw_pos

            # Update storage states (vectorized)
            sa_new = np.where(
                pc > 0.0,
                np.where(
                    sm + pc < a_pos * s0_pos,
                    s0_pos * (1 - (1 - (sm + pc) / (a_pos * s0_pos)) ** a_pos),
                    sa_pos + pc - rr,
                ),
                sa_pos,
            )
            sa_new = np.clip(sa_new, 0.0, s0_pos)

            ua_new = ua_pos + rr - y
            ua_new = np.clip(ua_new, 0.0, u0_pos)
            # eb will be set to 0 when pc > 0
            eb = np.where(pc > 0.0, 0.0, eb)

            # Store results for basins with net precipitation
            y0_out[i, net_precip_mask] = y0[net_precip_mask]
            yu_out[i, net_precip_mask] = yu
            yl_out[i, net_precip_mask] = yl
            y_out[i, net_precip_mask] = y
            sa[i, net_precip_mask] = sa_new
            ua[i, net_precip_mask] = ua_new

        # For basins with evaporation deficit (pe <= 0) - vectorized operations
        evap_mask = ~net_precip_mask
        if np.any(evap_mask):
            prcp_neg = prcp[evap_mask]
            edt_neg = edt[evap_mask]
            sa_neg = sa0[evap_mask]
            ua_neg = ua0[evap_mask]
            s0_neg = s0[evap_mask]
            u0_neg = u0[evap_mask]
            a_neg = a[evap_mask]

            ec = edt_neg - prcp_neg
            eb = eb + ec  # accumulated deficit

            # Calculate surface evaporation (vectorized)
            temp1 = (1 - (eb - ec) / (a_neg * s0_neg)) ** a_neg
            temp2 = (1 - eb / (a_neg * s0_neg)) ** a_neg

            eu = np.where(
                (eb / (a_neg * s0_neg) <= 0.999999)
                & ((eb - ec) / (a_neg * s0_neg) <= 0.999999),
                s0_neg * (temp1 - temp2),
                np.where(
                    (eb / (a_neg * s0_neg) >= 1.00001)
                    & ((eb - ec) / (a_neg * s0_neg) <= 0.999999),
                    s0_neg * temp1,
                    0.00001,
                ),
            )

            # Update storages after evaporation
            el = np.where(
                sa_neg - eu < 0.0,
                (ec - sa_neg) * ua_neg / u0_neg,
                (ec - eu) * ua_neg / u0_neg,
            )
            sa_new = np.where(sa_neg - eu < 0.0, 0.0, sa_neg - eu)
            ua_new = ua_neg - el
            ua_new = np.maximum(ua_new, 0.0)

            # Set runoff components to zero for evaporation basins
            y0_out[i, evap_mask] = 0.0
            yu_out[i, evap_mask] = 0.0
            yl_out[i, evap_mask] = 0.0
            y_out[i, evap_mask] = 0.0
            sa[i, evap_mask] = sa_new
            ua[i, evap_mask] = ua_new

        # Ensure states are within bounds
        sa[i, :] = np.clip(sa[i, :], 0.0, s0)
        ua[i, :] = np.clip(ua[i, :], 0.0, u0)

    # get total runoff
    runoff_sim = np.maximum(y_out + y0_out, 0.0)

    # DHF routing calculation - vectorized version without basin loop
    qs = np.zeros((actual_time_steps, num_basins))
    ql = np.zeros((actual_time_steps, num_basins))

    w0 = f / (3.6 * time_interval)  # tmp value used for testing

    # Main time loop for routing (before convolution, getting ya)
    for i in range(actual_time_steps):
        # ya is precedent rain -- Pa
        if i > 0:
            ya0 = ya[i - 1, :]
        ya[i, :] = np.maximum((ya0 + runoff_sim[i, :]) * ka, 0.0)
        # Get current state for all basins (vectorized)
        # here we keep the same as Chu's version
        # You can see that when getting temp_tm, we use ya_val + runoff_sim[i, :]
        ya_val = np.maximum(ya0, 0.5)  # Ensure stability for all basins
        # yl is rL in Chu's version; it's subsurface reservoir's infiltration
        rl = yl_out[i, :]
        # keep same as Chu's version
        rl = np.maximum(rl, 0.0)

        # Calculate routing parameters for all basins (vectorized)
        temp_tm = (ya_val + runoff_sim[i, :]) ** (-k0)
        lb = l / b0
        tm = lb * temp_tm

        # Time indices for all basins (vectorized)
        tt = (n * tm).astype(int)
        ts = (coe * tm).astype(int)

        # Surface routing coefficient calculation (vectorized)
        k3 = np.zeros(num_basins)
        aa_val = np.zeros(num_basins)

        # Calculate routing coefficients for all basins
        temp_aa = (pai * coe) ** (dd - 1)
        aa_val = cc / (dd * temp_aa * np.tan(pai * coe))

        # Calculate k3 for all basins
        max_tm = int(np.ceil(np.max(tm)))
        # we use max_tm for all basins
        for j in range(max_tm):
            # Then, we only process basins for its periods, where j < tm
            j_mask = j < tm
            if np.any(j_mask):
                temp = (pai * j / tm[j_mask]) ** dd[j_mask]
                temp1 = (np.sin(pai * j / tm[j_mask])) ** cc[j_mask]
                k3[j_mask] += np.exp(-aa_val[j_mask] * temp) * temp1

        # Final k3 calculation with division check
        nonzero_k3 = k3 != 0
        if np.any(nonzero_k3):
            k3[nonzero_k3] = tm[nonzero_k3] * w0 / k3[nonzero_k3]

        # Subsurface routing coefficient calculation (vectorized)
        k3l = np.zeros(num_basins)
        aal_val = np.zeros(num_basins)

        # Calculate subsurface routing coefficients for all basins
        temp_aal = (pai * coe / n) ** (ddl - 1)
        aal_val = ccl / (ddl * temp_aal * np.tan(pai * coe / n))

        # Calculate k3l for all basins
        max_tt = int(np.ceil(np.max(tt)))
        for j in range(max_tt):
            # Only process basins where j < tt
            j_mask = j < tt
            if np.any(j_mask):
                temp = (pai * j / tt[j_mask]) ** ddl[j_mask]
                temp1 = (np.sin(pai * j / tt[j_mask])) ** ccl[j_mask]
                k3l[j_mask] += np.exp(-aal_val[j_mask] * temp) * temp1

        # Final k3l calculation with division check
        nonzero_k3l = k3l != 0
        if np.any(nonzero_k3l):
            k3l[nonzero_k3l] = tt[nonzero_k3l] * w0 / k3l[nonzero_k3l]

        # Calculate tl for all basins (vectorized)
        tl = np.maximum(tt + ts - 1, 0)

        # Process routing time steps (still need this loop for convolution)
        max_tl = int(np.ceil(np.max(tl)))
        for j in range(max_tl):
            # Check bounds to prevent overflow
            valid_idx = i + j < actual_time_steps
            if not valid_idx:
                break

            # Surface routing calculation (vectorized)
            q_surface = np.zeros(num_basins)
            # Subsurface routing calculation (vectorized)
            q_subsurface = np.zeros(num_basins)
            _j_mask = j <= tl
            if np.any(_j_mask):
                temp0 = pai * j / tm[_j_mask]
                temp1 = temp0 ** dd[_j_mask]
                temp2 = np.exp(-aa_val[_j_mask] * temp1)
                temp3 = (np.sin(temp0)) ** cc[_j_mask]
                q_surface[_j_mask] = (
                    (runoff_sim[i, _j_mask] - rl[_j_mask])
                    * k3[_j_mask]
                    / tm[_j_mask]
                    * temp2
                    * temp3
                )
                # Handle NaN values
                q_surface[np.isnan(q_surface)] = 0.0

                temp00 = pai * (j - ts[_j_mask]) / tt[_j_mask]
                temp10 = temp00 ** ddl[_j_mask]
                temp20 = np.exp(-aal_val[_j_mask] * temp10)
                temp30 = (np.sin(temp00)) ** ccl[_j_mask]
                q_subsurface[_j_mask] = (
                    rl[_j_mask] * k3l[_j_mask] / tt[_j_mask] * temp20 * temp30
                )

            # Add contributions based on timing conditions (vectorized)
            # Case 1: j <= tm and j <= ts
            mask1 = (j <= tm) & (j <= ts)
            qs[i + j, mask1] += q_surface[mask1]

            # Case 2: j <= tm and j > ts
            mask2 = (j <= tm) & (j > ts)
            qs[i + j, mask2] += q_surface[mask2]
            ql[i + j, mask2] += q_subsurface[mask2]

            # Case 3: j > tm
            mask3 = j > tm
            ql[i + j, mask3] += q_subsurface[mask3]

    # Total discharge
    q_sim = qs + ql
    q_sim = np.maximum(q_sim, 0.0)

    # seq, batch, feature
    q_sim = np.expand_dims(q_sim, axis=2)
    runoff_sim = np.expand_dims(runoff_sim, axis=2)
    y0_out = np.expand_dims(y0_out, axis=2)
    yu_out = np.expand_dims(yu_out, axis=2)
    yl_out = np.expand_dims(yl_out, axis=2)
    y_out = np.expand_dims(y_out, axis=2)
    sa = np.expand_dims(sa, axis=2)
    ua = np.expand_dims(ua, axis=2)
    ya = np.expand_dims(ya, axis=2)

    if return_state:
        result = (
            q_sim,
            runoff_sim,
            y0_out,
            yu_out,
            yl_out,
            y_out,
            sa,
            ua,
            ya,
        )
        # If warmup states are requested, add them as the last element
        if return_warmup_states and warmup_states is not None:
            return result + (warmup_states,)
        else:
            return result
    else:
        # For non-state return, only return warmup states if specifically requested
        if return_warmup_states and warmup_states is not None:
            return q_sim, warmup_states
        else:
            return q_sim