Skip to content

hydro_event

The hydro_event module provides utilities for flood event extraction and analysis from hydrological time series data.

Core Functions

extract_flood_events

1
2
3
4
5
6
def extract_flood_events(
    df: pd.DataFrame,
    warmup_length: int = 0,
    flood_event_col: str = "flood_event",
    time_col: str = "time"
) -> List[Dict]

Extracts flood events from a DataFrame based on a binary flood event indicator.

Args: - df: DataFrame with flood_event and time columns - warmup_length: Number of time steps to include as warmup period - flood_event_col: Name of flood event indicator column - time_col: Name of time column

Returns: - List of flood event dictionaries containing event data and metadata

Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import hydroutils as hu
import pandas as pd

# Sample data with flood events
df = pd.DataFrame({
    'time': pd.date_range('2020-01-01', periods=10),
    'flood_event': [0, 0, 1, 1, 1, 0, 0, 1, 1, 0],
    'flow': [100, 120, 200, 300, 250, 150, 130, 180, 220, 140]
})

# Extract flood events
events = hu.extract_flood_events(df, warmup_length=1)
print(f"Found {len(events)} flood events")

time_to_ten_digits

1
def time_to_ten_digits(time_obj) -> str

Converts time objects to ten-digit format (YYYYMMDDHH).

Example:

1
2
3
4
5
from datetime import datetime
import hydroutils as hu

dt = datetime(2020, 1, 1, 12, 0)
time_str = hu.time_to_ten_digits(dt)  # Returns '2020010112'

extract_peaks

1
2
3
4
5
def extract_peaks(
    data: np.ndarray,
    threshold: float = None,
    min_distance: int = 1
) -> Tuple[np.ndarray, np.ndarray]

Extracts peak values and their indices from time series data.

calculate_event_statistics

1
def calculate_event_statistics(events: List[Dict]) -> pd.DataFrame

Calculates statistical summary for extracted flood events.

API Reference

Author: Wenyu Ouyang Date: 2025-01-17 LastEditTime: 2025-08-17 09:29:42 LastEditors: Wenyu Ouyang Description: Flood event extraction utilities for hydrological data processing FilePath: \hydromodeld:\Code\hydroutils\hydroutils\hydro_event.py Copyright (c) 2023-2026 Wenyu Ouyang. All rights reserved.

extract_event_data_by_columns(df, event_indices, data_columns)

Extract event data for specified columns using event indices.

This function extracts data from specified columns for a flood event using the index information from get_event_indices or extract_flood_events.

Parameters:

Name Type Description Default
df DataFrame

Original DataFrame containing all data.

required
event_indices Dict

Event index information dictionary containing: - warmup_start_idx (int): Start index including warmup period - end_idx (int): End index of event

required
data_columns List[str]

List of column names to extract.

required

Returns:

Name Type Description
Dict Dict

Dictionary mapping column names to numpy arrays containing the extracted data. If a column is not found, it will contain an array of NaN values.

Example

df = pd.DataFrame({ ... 'time': pd.date_range('2020-01-01', periods=5), ... 'flow': [100, 200, 300, 250, 150] ... }) indices = {'warmup_start_idx': 1, 'end_idx': 4} data = extract_event_data_by_columns(df, indices, ['flow']) data['flow'] array([200., 300., 250.])

Source code in hydroutils/hydro_event.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def extract_event_data_by_columns(
    df: pd.DataFrame, event_indices: Dict, data_columns: List[str]
) -> Dict:
    """Extract event data for specified columns using event indices.

    This function extracts data from specified columns for a flood event using
    the index information from get_event_indices or extract_flood_events.

    Args:
        df (pd.DataFrame): Original DataFrame containing all data.
        event_indices (Dict): Event index information dictionary containing:
            - warmup_start_idx (int): Start index including warmup period
            - end_idx (int): End index of event
        data_columns (List[str]): List of column names to extract.

    Returns:
        Dict: Dictionary mapping column names to numpy arrays containing the
            extracted data. If a column is not found, it will contain an array
            of NaN values.

    Example:
        >>> df = pd.DataFrame({
        ...     'time': pd.date_range('2020-01-01', periods=5),
        ...     'flow': [100, 200, 300, 250, 150]
        ... })
        >>> indices = {'warmup_start_idx': 1, 'end_idx': 4}
        >>> data = extract_event_data_by_columns(df, indices, ['flow'])
        >>> data['flow']
        array([200., 300., 250.])
    """
    start_idx = event_indices["warmup_start_idx"]
    end_idx = event_indices["end_idx"]

    event_data = {}
    for col in data_columns:
        if col in df.columns:
            event_data[col] = df.iloc[start_idx:end_idx][col].values
        else:
            # 如果列不存在,用NaN数组填充
            event_data[col] = np.full(end_idx - start_idx, np.nan)

    return event_data

extract_flood_events(df, warmup_length=0, flood_event_col='flood_event', time_col='time')

Extract flood events from a DataFrame based on a flood event indicator column.

This function extracts flood events based on a binary indicator column (flood_event). The design philosophy is to be agnostic about other columns, letting the caller decide how to handle the data columns. The function only requires the flood_event column to mark events and a time column for event naming.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing site data. Must have flood_event and time columns.

required
warmup_length int

Number of time steps to include as warmup period before each event. Defaults to 0.

0
flood_event_col str

Name of the flood event indicator column. Defaults to "flood_event".

'flood_event'
time_col str

Name of the time column. Defaults to "time".

'time'

Returns:

Type Description
List[Dict]

List[Dict]: List of flood events. Each dictionary contains: - event_name (str): Event name based on start/end times - start_idx (int): Start index of actual event in original DataFrame - end_idx (int): End index of actual event in original DataFrame - warmup_start_idx (int): Start index including warmup period - data (pd.DataFrame): Event data including warmup period - is_warmup_mask (np.ndarray): Boolean array marking warmup rows - actual_start_time: Start time of actual event - actual_end_time: End time of actual event

Raises:

Type Description
ValueError

If required columns are missing from DataFrame.

Example

df = pd.DataFrame({ ... 'time': pd.date_range('2020-01-01', periods=5), ... 'flood_event': [0, 1, 1, 1, 0], ... 'flow': [100, 200, 300, 250, 150] ... }) events = extract_flood_events(df, warmup_length=1) len(events) 1 events[0]['data'] time flood_event flow 0 2020-01-01 0 100 # warmup period 1 2020-01-02 1 200 # event start 2 2020-01-03 1 300 3 2020-01-04 1 250 # event end

Source code in hydroutils/hydro_event.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def extract_flood_events(
    df: pd.DataFrame,
    warmup_length: int = 0,
    flood_event_col: str = "flood_event",
    time_col: str = "time",
) -> List[Dict]:
    """Extract flood events from a DataFrame based on a flood event indicator column.

    This function extracts flood events based on a binary indicator column (flood_event).
    The design philosophy is to be agnostic about other columns, letting the caller
    decide how to handle the data columns. The function only requires the flood_event
    column to mark events and a time column for event naming.

    Args:
        df (pd.DataFrame): DataFrame containing site data. Must have flood_event and
            time columns.
        warmup_length (int, optional): Number of time steps to include as warmup
            period before each event. Defaults to 0.
        flood_event_col (str, optional): Name of the flood event indicator column.
            Defaults to "flood_event".
        time_col (str, optional): Name of the time column. Defaults to "time".

    Returns:
        List[Dict]: List of flood events. Each dictionary contains:
            - event_name (str): Event name based on start/end times
            - start_idx (int): Start index of actual event in original DataFrame
            - end_idx (int): End index of actual event in original DataFrame
            - warmup_start_idx (int): Start index including warmup period
            - data (pd.DataFrame): Event data including warmup period
            - is_warmup_mask (np.ndarray): Boolean array marking warmup rows
            - actual_start_time: Start time of actual event
            - actual_end_time: End time of actual event

    Raises:
        ValueError: If required columns are missing from DataFrame.

    Example:
        >>> df = pd.DataFrame({
        ...     'time': pd.date_range('2020-01-01', periods=5),
        ...     'flood_event': [0, 1, 1, 1, 0],
        ...     'flow': [100, 200, 300, 250, 150]
        ... })
        >>> events = extract_flood_events(df, warmup_length=1)
        >>> len(events)
        1
        >>> events[0]['data']
           time  flood_event  flow
        0  2020-01-01    0  100  # warmup period
        1  2020-01-02    1  200  # event start
        2  2020-01-03    1  300
        3  2020-01-04    1  250  # event end
    """
    events: List[Dict] = []

    # 检查必要的列是否存在
    required_cols = [flood_event_col, time_col]
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"DataFrame缺少必要的列: {missing_cols}")

    # 找到连续的flood_event > 0区间
    flood_mask = df[flood_event_col] > 0
    if not flood_mask.any():
        return events

    # 找连续区间
    in_event = False
    start_idx = None

    for idx, is_flood in enumerate(flood_mask):
        if is_flood and not in_event:
            start_idx = idx
            in_event = True
        elif not is_flood and in_event and start_idx is not None:
            # 事件结束,提取事件数据
            event_dict = _extract_single_event(
                df, start_idx, idx, warmup_length, flood_event_col, time_col
            )
            if event_dict is not None:
                events.append(event_dict)
            in_event = False

    # 处理最后一个事件(如果数据结束时仍在事件中)
    if in_event and start_idx is not None:
        event_dict = _extract_single_event(
            df, start_idx, len(df), warmup_length, flood_event_col, time_col
        )
        if event_dict is not None:
            events.append(event_dict)

    return events

find_flood_event_segments_as_tuples(flood_event_array, warmup_length=0)

Find continuous flood event segments and return them as tuples.

This is a convenience function that returns event segments as tuples instead of dictionaries, for compatibility with existing code.

Parameters:

Name Type Description Default
flood_event_array ndarray

Binary array where values > 0 indicate flood events.

required
warmup_length int

Number of time steps to include as warmup period before each event. Defaults to 0.

0

Returns:

Type Description
List[Tuple[int, int, int, int]]

List[Tuple[int, int, int, int]]: List of tuples, each containing: (extended_start, extended_end, original_start, original_end) where: - extended_start: Start index including warmup period - extended_end: End index of event - original_start: Start index of actual event - original_end: End index of actual event

Example

arr = np.array([0, 0, 1, 1, 1, 0]) segments = find_flood_event_segments_as_tuples(arr, warmup_length=1) segments[0] # (warmup_start, event_end, event_start, event_end) (1, 4, 2, 4)

Source code in hydroutils/hydro_event.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def find_flood_event_segments_as_tuples(
    flood_event_array: np.ndarray,
    warmup_length: int = 0,
) -> List[Tuple[int, int, int, int]]:
    """Find continuous flood event segments and return them as tuples.

    This is a convenience function that returns event segments as tuples instead
    of dictionaries, for compatibility with existing code.

    Args:
        flood_event_array (np.ndarray): Binary array where values > 0 indicate
            flood events.
        warmup_length (int, optional): Number of time steps to include as warmup
            period before each event. Defaults to 0.

    Returns:
        List[Tuple[int, int, int, int]]: List of tuples, each containing:
            (extended_start, extended_end, original_start, original_end)
            where:
            - extended_start: Start index including warmup period
            - extended_end: End index of event
            - original_start: Start index of actual event
            - original_end: End index of actual event

    Example:
        >>> arr = np.array([0, 0, 1, 1, 1, 0])
        >>> segments = find_flood_event_segments_as_tuples(arr, warmup_length=1)
        >>> segments[0]  # (warmup_start, event_end, event_start, event_end)
        (1, 4, 2, 4)
    """
    segments = find_flood_event_segments_from_array(flood_event_array, warmup_length)

    return [
        (
            seg["extended_start"],
            seg["extended_end"],
            seg["original_start"],
            seg["original_end"],
        )
        for seg in segments
    ]

find_flood_event_segments_from_array(flood_event_array, warmup_length=0)

Find continuous flood event segments in a binary indicator array.

This is a low-level function that handles the core logic of segmenting a flood event indicator array into continuous events. It can be reused by different higher-level functions.

Parameters:

Name Type Description Default
flood_event_array ndarray

Binary array where values > 0 indicate flood events.

required
warmup_length int

Number of time steps to include as warmup period before each event. Defaults to 0.

0

Returns:

Type Description
List[Dict]

List[Dict]: List of event segment information. Each dictionary contains: - extended_start (int): Start index including warmup period - extended_end (int): End index of event - original_start (int): Start index of actual event - original_end (int): End index of actual event - duration (int): Duration of actual event in time steps - total_length (int): Total length including warmup period

Example

arr = np.array([0, 0, 1, 1, 1, 0, 0, 1, 1, 0]) segments = find_flood_event_segments_from_array(arr, warmup_length=1) len(segments) # Two events found 2 segments[0] # First event with one timestep warmup { 'extended_start': 1, # Warmup start 'extended_end': 4, # Event end 'original_start': 2, # Actual event start 'original_end': 4, # Actual event end 'duration': 3, # Event duration 'total_length': 4 # Total length with warmup }

Source code in hydroutils/hydro_event.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def find_flood_event_segments_from_array(
    flood_event_array: np.ndarray,
    warmup_length: int = 0,
) -> List[Dict]:
    """Find continuous flood event segments in a binary indicator array.

    This is a low-level function that handles the core logic of segmenting a
    flood event indicator array into continuous events. It can be reused by
    different higher-level functions.

    Args:
        flood_event_array (np.ndarray): Binary array where values > 0 indicate
            flood events.
        warmup_length (int, optional): Number of time steps to include as warmup
            period before each event. Defaults to 0.

    Returns:
        List[Dict]: List of event segment information. Each dictionary contains:
            - extended_start (int): Start index including warmup period
            - extended_end (int): End index of event
            - original_start (int): Start index of actual event
            - original_end (int): End index of actual event
            - duration (int): Duration of actual event in time steps
            - total_length (int): Total length including warmup period

    Example:
        >>> arr = np.array([0, 0, 1, 1, 1, 0, 0, 1, 1, 0])
        >>> segments = find_flood_event_segments_from_array(arr, warmup_length=1)
        >>> len(segments)  # Two events found
        2
        >>> segments[0]  # First event with one timestep warmup
        {
            'extended_start': 1,  # Warmup start
            'extended_end': 4,    # Event end
            'original_start': 2,  # Actual event start
            'original_end': 4,    # Actual event end
            'duration': 3,        # Event duration
            'total_length': 4     # Total length with warmup
        }
    """
    segments = []

    # 找到所有 flood_event > 0 的索引
    event_indices = np.where(flood_event_array > 0)[0]

    if len(event_indices) == 0:
        return segments

    # 找到连续段的分割点
    gaps = np.diff(event_indices) > 1
    split_points = np.where(gaps)[0] + 1
    split_indices = np.split(event_indices, split_points)

    # 为每个连续段生成信息
    for indices in split_indices:
        if len(indices) > 0:
            original_start = indices[0]
            original_end = indices[-1]

            # 添加预热期
            extended_start = max(0, original_start - warmup_length)

            segments.append(
                {
                    "extended_start": extended_start,
                    "extended_end": original_end,
                    "original_start": original_start,
                    "original_end": original_end,
                    "duration": original_end - original_start + 1,
                    "total_length": original_end - extended_start + 1,
                }
            )

    return segments

get_event_indices(df, warmup_length=0, flood_event_col='flood_event')

Get index information for flood events without extracting data.

This function identifies flood events in the DataFrame and returns their index information, but does not extract the actual data. This is useful when you only need to know the locations and durations of events.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing site data.

required
warmup_length int

Number of time steps to include as warmup period before each event. Defaults to 0.

0
flood_event_col str

Name of flood event indicator column. Defaults to "flood_event".

'flood_event'

Returns:

Type Description
List[Dict]

List[Dict]: List of event index information. Each dictionary contains: - start_idx (int): Start index of actual event - end_idx (int): End index of actual event - warmup_start_idx (int): Start index including warmup period - duration (int): Duration of actual event in time steps - total_length (int): Total length including warmup period

Raises:

Type Description
ValueError

If flood_event_col is not found in DataFrame.

Example

df = pd.DataFrame({'flood_event': [0, 1, 1, 1, 0]}) indices = get_event_indices(df, warmup_length=1) indices[0] { 'start_idx': 1, 'end_idx': 4, 'warmup_start_idx': 0, 'duration': 3, 'total_length': 4 }

Source code in hydroutils/hydro_event.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def get_event_indices(
    df: pd.DataFrame, warmup_length: int = 0, flood_event_col: str = "flood_event"
) -> List[Dict]:
    """Get index information for flood events without extracting data.

    This function identifies flood events in the DataFrame and returns their index
    information, but does not extract the actual data. This is useful when you only
    need to know the locations and durations of events.

    Args:
        df (pd.DataFrame): DataFrame containing site data.
        warmup_length (int, optional): Number of time steps to include as warmup
            period before each event. Defaults to 0.
        flood_event_col (str, optional): Name of flood event indicator column.
            Defaults to "flood_event".

    Returns:
        List[Dict]: List of event index information. Each dictionary contains:
            - start_idx (int): Start index of actual event
            - end_idx (int): End index of actual event
            - warmup_start_idx (int): Start index including warmup period
            - duration (int): Duration of actual event in time steps
            - total_length (int): Total length including warmup period

    Raises:
        ValueError: If flood_event_col is not found in DataFrame.

    Example:
        >>> df = pd.DataFrame({'flood_event': [0, 1, 1, 1, 0]})
        >>> indices = get_event_indices(df, warmup_length=1)
        >>> indices[0]
        {
            'start_idx': 1,
            'end_idx': 4,
            'warmup_start_idx': 0,
            'duration': 3,
            'total_length': 4
        }
    """
    # 检查必要的列是否存在
    if flood_event_col not in df.columns:
        raise ValueError(f"DataFrame缺少洪水事件标记列: {flood_event_col}")

    # 使用底层函数处理分割逻辑
    flood_event_array = df[flood_event_col].values
    segments = find_flood_event_segments_from_array(flood_event_array, warmup_length)

    # 转换为与原接口兼容的格式
    events = []
    for seg in segments:
        events.append(
            {
                "start_idx": seg["original_start"],
                "end_idx": seg["original_end"] + 1,  # +1 因为原来是不包含结束索引的
                "warmup_start_idx": seg["extended_start"],
                "duration": seg["duration"],
                "total_length": seg["total_length"],
            }
        )

    return events

time_to_ten_digits(time_obj)

Convert a time object to a ten-digit format YYYYMMDDHH.

Parameters:

Name Type Description Default
time_obj Union[datetime, datetime64, str]

Time object to convert. Can be datetime, numpy.datetime64, or string.

required

Returns:

Name Type Description
str str

Ten-digit time string in YYYYMMDDHH format.

Example

time_to_ten_digits(datetime.datetime(2020, 1, 1, 12, 0)) '2020010112' time_to_ten_digits(np.datetime64('2020-01-01T12')) '2020010112' time_to_ten_digits('2020-01-01T12:00:00') '2020010112'

Source code in hydroutils/hydro_event.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def time_to_ten_digits(time_obj) -> str:
    """Convert a time object to a ten-digit format YYYYMMDDHH.

    Args:
        time_obj (Union[datetime.datetime, np.datetime64, str]): Time object to convert.
            Can be datetime, numpy.datetime64, or string.

    Returns:
        str: Ten-digit time string in YYYYMMDDHH format.

    Example:
        >>> time_to_ten_digits(datetime.datetime(2020, 1, 1, 12, 0))
        '2020010112'
        >>> time_to_ten_digits(np.datetime64('2020-01-01T12'))
        '2020010112'
        >>> time_to_ten_digits('2020-01-01T12:00:00')
        '2020010112'
    """
    if isinstance(time_obj, np.datetime64):
        # 如果是numpy datetime64对象
        return (
            time_obj.astype("datetime64[h]")
            .astype(str)
            .replace("-", "")
            .replace("T", "")
            .replace(":", "")
        )
    elif hasattr(time_obj, "strftime"):
        # 如果是datetime对象
        return time_obj.strftime("%Y%m%d%H")
    else:
        # 如果是字符串,尝试解析
        try:
            if isinstance(time_obj, str):
                dt = datetime.fromisoformat(time_obj.replace("Z", "+00:00"))
                return dt.strftime("%Y%m%d%H")
            else:
                return "0000000000"  # 默认值
        except Exception:
            return "0000000000"  # 默认值