Skip to content

CAMELSH

Overview

CAMELSH is the United States hourly hydrological dataset. Hourly resolution hydrological dataset for US catchments, providing high-temporal-resolution data for detailed hydrological analysis.

Dataset Information

  • Region: United States
  • Temporal Resolution: Hourly
  • Module: hydrodataset.camelsh
  • Class: Camelsh

Key Features

Hourly Resolution

Unlike daily CAMELS datasets, CAMELSH provides hourly timeseries data, enabling: - Sub-daily hydrological process analysis - Flash flood and storm event studies - High-frequency streamflow dynamics - Detailed precipitation event analysis

Static Attributes

Static catchment attributes include: - Basin area - Mean precipitation - Topographic characteristics - Land cover information - Soil properties - Climate indices

Dynamic Variables

Hourly timeseries variables available: - Streamflow (hourly) - Precipitation (hourly) - Temperature - Potential evapotranspiration - Solar radiation - And more...

Usage

Basic Usage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from hydrodataset.camelsh import Camelsh
from hydrodataset import SETTING

# Initialize dataset
data_path = SETTING["local_data_path"]["datasets-origin"]
ds = Camelsh(data_path)

# Get basin IDs
basin_ids = ds.read_object_ids()
print(f"Number of basins: {len(basin_ids)}")

# Check available features
print("Static features:", ds.available_static_features)
print("Dynamic features:", ds.available_dynamic_features)

# Read hourly timeseries data
timeseries = ds.read_ts_xrdataset(
    gage_id_lst=basin_ids[:5],
    t_range=["2015-01-01", "2015-01-31"],  # One month of hourly data
    var_lst=["streamflow", "precipitation"]
)
print(timeseries)

# Read attribute data
attributes = ds.read_attr_xrdataset(
    gage_id_lst=basin_ids[:5],
    var_lst=["area", "p_mean"]
)
print(attributes)

Analyzing Storm Events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Read hourly data for a specific storm event
storm_data = ds.read_ts_xrdataset(
    gage_id_lst=basin_ids[:3],
    t_range=["2015-06-15 00:00:00", "2015-06-20 23:00:00"],
    var_lst=["streamflow", "precipitation", "temperature_mean"]
)

# Analyze sub-daily patterns
import xarray as xr
hourly_precip = storm_data["precipitation"]
daily_total = hourly_precip.resample(time="1D").sum()
print("Daily precipitation totals:", daily_total)

Reading Specific Variables

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Read with specific time range (note hourly timestamps)
ts_data = ds.read_ts_xrdataset(
    gage_id_lst=basin_ids[:10],
    t_range=["2015-01-01 00:00:00", "2015-12-31 23:00:00"],
    var_lst=["streamflow", "precipitation", "temperature_mean"]
)

# Read basin area
areas = ds.read_area(gage_id_lst=basin_ids[:10])

# Read mean precipitation
mean_precip = ds.read_mean_prcp(gage_id_lst=basin_ids[:10])

Data Considerations

Large Data Volumes

Hourly data results in significantly larger datasets compared to daily data: - 24x more data points per day - Larger cache files - Longer initial cache generation time

Time Range Selection

When working with hourly data:

1
2
3
4
5
# Specify full datetime for hourly data
t_range = ["2015-01-01 00:00:00", "2015-01-31 23:00:00"]

# Or use date strings (defaults to 00:00:00)
t_range = ["2015-01-01", "2015-01-31"]

API Reference

hydrodataset.camelsh.Camelsh

Bases: HydroDataset

CAMELSH (CAMELS-Hourly) dataset class extending RainfallRunoff.

This class provides access to the CAMELSH dataset, which contains hourly hydrological and meteorological data for various watersheds.

Attributes:

Name Type Description
region

Geographic region identifier

download

Whether to download data automatically

ds_description

Dictionary containing dataset file paths

Source code in hydrodataset/camelsh.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
class Camelsh(HydroDataset):
    """CAMELSH (CAMELS-Hourly) dataset class extending RainfallRunoff.

    This class provides access to the CAMELSH dataset, which contains hourly
    hydrological and meteorological data for various watersheds.

    Attributes:
        region: Geographic region identifier
        download: Whether to download data automatically
        ds_description: Dictionary containing dataset file paths
    """

    def __init__(
        self, data_path: str, region: Optional[str] = None, download: bool = False
    ) -> None:
        """Initialize CAMELSH dataset.

        Args:
            data_path: Path to the CAMELSH data directory
            region: Geographic region identifier (optional)
            download: Whether to download data automatically (default: False)
        """
        super().__init__(data_path)
        self.region = region
        self.download = download

        # Use the fixed CAMELSH class with corrected directory paths
        self.aqua_fetch = CAMELSH(data_path)

    @property
    def _attributes_cache_filename(self):
        return "camelsh_attributes.nc"

    @property
    def _timeseries_cache_filename(self):
        return "camelsh_timeseries.nc"

    @property
    def default_t_range(self):
        return ["1980-01-01", "2024-12-31"]

    _subclass_static_definitions = {
        # Basic station information
        "area": {"specific_name": "area_km2", "unit": "km^2"},
        "p_mean": {"specific_name": "p_mean", "unit": "mm/day"},
        "p_seasonality": {"specific_name": "p_seasonality", "unit": "none"},
        "frac_snow": {"specific_name": "frac_snow", "unit": "none"},
        "aridity": {"specific_name": "aridity_index", "unit": "none"},
    }
    _dynamic_variable_mapping = {
        # unit in aquafetch is m^3/s.in paper is kg/m^2
        StandardVariable.STREAMFLOW: {
            "default_source": "nldas",
            "sources": {"nldas": {"specific_name": "q_cms_obs", "unit": "kg/m^2"}},
        },
        StandardVariable.PRECIPITATION: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "pcp_mm", "unit": "mm"},
            },
        },
        StandardVariable.TEMPERATURE_MEAN: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "airtemp_c_mean", "unit": "°C"},
            },
        },
        StandardVariable.LONGWAVE_SOLAR_RADIATION: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "lwdown", "unit": "W/m^2"},
            },
        },
        # Shortwave radiation flux downwards (surface)
        StandardVariable.SOLAR_RADIATION: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "swdown", "unit": "W/m^2"},
            },
        },
        # unit in aquafetch is mm/day.in paper is kg/m^2
        StandardVariable.POTENTIAL_EVAPOTRANSPIRATION: {
            "default_source": "nldas",
            "sources": {"nldas": {"specific_name": "pet_mm", "unit": "kg/m^2"}},
        },
        StandardVariable.SURFACE_PRESSURE: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "psurf", "unit": "Pa"},
            },
        },
        # 10-meter above ground Zonal wind speed(east to west)
        StandardVariable.U_WIND_SPEED: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "wind_e", "unit": "m/s"},
            },
        },
        # 10-meter above ground Meridional wind speed(north to south)
        StandardVariable.V_WIND_SPEED: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "wind_n", "unit": "m/s"},
            },
        },
        StandardVariable.RELATIVE_HUMIDITY: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "qair", "unit": "kg/kg"},
            },
        },
        StandardVariable.WATER_LEVEL: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "water_level", "unit": "m"},
            },
        },
        StandardVariable.CAPE: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "cape", "unit": "J/kg"},
            },
        },
        StandardVariable.CRAINF_FRAC: {
            "default_source": "nldas",
            "sources": {
                "nldas": {"specific_name": "crainf_frac", "unit": "Fraction"},
            },
        },
    }

    def cache_timeseries_xrdataset(self, batch_size=100):
        """
        Cache timeseries data to NetCDF files in batches, each batch saved as a separate file

        Args:
            batch_size: Number of stations to process per batch, default is 100 stations
        """
        if not hasattr(self, "aqua_fetch"):
            raise NotImplementedError("aqua_fetch attribute is required")

        # Build mapping from variable names to units
        unit_lookup = {}
        if hasattr(self, "_dynamic_variable_mapping"):
            for std_name, mapping_info in self._dynamic_variable_mapping.items():
                for source, source_info in mapping_info["sources"].items():
                    unit_lookup[source_info["specific_name"]] = source_info["unit"]

        # Get all station IDs
        gage_id_lst = self.read_object_ids().tolist()
        total_stations = len(gage_id_lst)

        # Get original variable list and clean
        original_var_lst = self.aqua_fetch.dynamic_features
        cleaned_var_lst = self._clean_feature_names(original_var_lst)
        var_name_mapping = dict(zip(original_var_lst, cleaned_var_lst))

        print(
            f"Start batch processing {total_stations} stations, {batch_size} stations per batch"
        )
        print(
            f"Total number of batches: {(total_stations + batch_size - 1)//batch_size}"
        )

        # Ensure cache directory exists
        self.cache_dir.mkdir(parents=True, exist_ok=True)

        # Process stations in batches and save independently
        batch_num = 1
        for batch_idx in range(0, total_stations, batch_size):
            batch_end = min(batch_idx + batch_size, total_stations)
            batch_stations = gage_id_lst[batch_idx:batch_end]

            print(
                f"\nProcessing batch {batch_num}/{(total_stations + batch_size - 1)//batch_size}"
            )
            print(
                f"Station range: {batch_idx} - {batch_end-1} (total {len(batch_stations)} stations)"
            )

            try:
                # Get data for this batch
                batch_data = self.aqua_fetch.fetch_stations_features(
                    stations=batch_stations,
                    dynamic_features=original_var_lst,
                    static_features=None,
                    st=self.default_t_range[0],
                    en=self.default_t_range[1],
                    as_dataframe=False,
                )

                dynamic_data = (
                    batch_data[1] if isinstance(batch_data, tuple) else batch_data
                )

                # Process variables
                new_data_vars = {}
                time_coord = dynamic_data.coords["time"]

                for original_var in tqdm(
                    original_var_lst,
                    desc=f"Processing variables (batch {batch_num})",
                    total=len(original_var_lst),
                ):
                    cleaned_var = var_name_mapping[original_var]
                    var_data = []
                    for station in batch_stations:
                        if station in dynamic_data.data_vars:
                            station_data = dynamic_data[station].sel(
                                dynamic_features=original_var
                            )
                            if "dynamic_features" in station_data.coords:
                                station_data = station_data.drop("dynamic_features")
                            var_data.append(station_data)

                    if var_data:
                        combined = xr.concat(var_data, dim="basin")
                        combined["basin"] = batch_stations
                        combined.attrs["units"] = unit_lookup.get(
                            cleaned_var, "unknown"
                        )
                        new_data_vars[cleaned_var] = combined

                # Create Dataset for this batch
                batch_ds = xr.Dataset(
                    data_vars=new_data_vars,
                    coords={
                        "basin": batch_stations,
                        "time": time_coord,
                    },
                )

                # Save this batch to independent file
                batch_filename = f"batch{batch_num:03d}_camelsh_timeseries.nc"
                batch_filepath = self.cache_dir.joinpath(batch_filename)

                print(f"Saving batch {batch_num} to: {batch_filepath}")
                batch_ds.to_netcdf(batch_filepath)
                print(f"Batch {batch_num} saved successfully")

            except Exception as e:
                print(f"Batch {batch_num} processing failed: {e}")
                import traceback

                traceback.print_exc()
                continue

            batch_num += 1

        print(f"\nAll batches processed! Total {batch_num - 1} batch files saved")

    def read_ts_xrdataset(
        self,
        gage_id_lst: list = None,
        t_range: list = None,
        var_lst: list = None,
        sources: dict = None,
        **kwargs,
    ) -> xr.Dataset:
        """
        Read timeseries data (supports standardized variable names and multiple data sources)

        Read data from batch-saved cache files

        Args:
            gage_id_lst: List of station IDs
            t_range: Time range [start, end]
            var_lst: List of standard variable names
            sources: Data source dictionary, format is {variable_name: data_source} or {variable_name: [data_source_list]}

        Returns:
            xr.Dataset: xarray dataset containing requested data
        """
        if (
            not hasattr(self, "_dynamic_variable_mapping")
            or not self._dynamic_variable_mapping
        ):
            raise NotImplementedError(
                "This dataset does not support the standardized variable mapping."
            )

        if var_lst is None:
            var_lst = list(self._dynamic_variable_mapping.keys())

        if t_range is None:
            t_range = self.default_t_range

        target_vars_to_fetch = []
        rename_map = {}

        # Process variable name mapping and data source selection
        for std_name in var_lst:
            if std_name not in self._dynamic_variable_mapping:
                raise ValueError(
                    f"'{std_name}' is not a recognized standard variable for this dataset."
                )

            mapping_info = self._dynamic_variable_mapping[std_name]

            # Determine which data source(s) to use
            is_explicit_source = sources and std_name in sources
            sources_to_use = []
            if is_explicit_source:
                provided_sources = sources[std_name]
                if isinstance(provided_sources, list):
                    sources_to_use.extend(provided_sources)
                else:
                    sources_to_use.append(provided_sources)
            else:
                sources_to_use.append(mapping_info["default_source"])

            # Only need suffix when user explicitly requests multiple data sources
            needs_suffix = is_explicit_source and len(sources_to_use) > 1
            for source in sources_to_use:
                if source not in mapping_info["sources"]:
                    raise ValueError(
                        f"Source '{source}' is not available for variable '{std_name}'."
                    )

                actual_var_name = mapping_info["sources"][source]["specific_name"]
                target_vars_to_fetch.append(actual_var_name)
                output_name = f"{std_name}_{source}" if needs_suffix else std_name
                rename_map[actual_var_name] = output_name

        # Find all batch files
        import glob

        batch_pattern = str(self.cache_dir / "batch*_camelsh_timeseries.nc")
        batch_files = sorted(glob.glob(batch_pattern))

        if not batch_files:
            print("No batch cache files found, starting cache creation...")
            self.cache_timeseries_xrdataset()
            batch_files = sorted(glob.glob(batch_pattern))

            if not batch_files:
                raise FileNotFoundError("Cache creation failed, no batch files found")

        print(f"Found {len(batch_files)} batch files")

        # If no stations specified, read all stations
        if gage_id_lst is None:
            print("No station list specified, will read all stations...")
            gage_id_lst = self.read_object_ids().tolist()

        # Convert station IDs to strings (ensure consistency)
        gage_id_lst = [str(gid) for gid in gage_id_lst]

        # Iterate through batch files to find batches containing required stations
        relevant_datasets = []
        for batch_file in batch_files:
            try:
                # First open only coordinates, don't load data
                ds_batch = xr.open_dataset(batch_file)
                batch_basins = [str(b) for b in ds_batch.basin.values]

                # Check if this batch contains required stations
                common_basins = list(set(gage_id_lst) & set(batch_basins))

                if common_basins:
                    print(
                        f"Batch {os.path.basename(batch_file)}: contains {len(common_basins)} required stations"
                    )

                    # Check if variables exist
                    missing_vars = [
                        v for v in target_vars_to_fetch if v not in ds_batch.data_vars
                    ]
                    if missing_vars:
                        ds_batch.close()
                        raise ValueError(
                            f"Batch {os.path.basename(batch_file)} missing variables: {missing_vars}"
                        )

                    # Select variables and stations
                    ds_subset = ds_batch[target_vars_to_fetch]
                    ds_selected = ds_subset.sel(
                        basin=common_basins, time=slice(t_range[0], t_range[1])
                    )

                    relevant_datasets.append(ds_selected)
                    ds_batch.close()
                else:
                    ds_batch.close()

            except Exception as e:
                print(f"Failed to read batch file {batch_file}: {e}")
                continue

        if not relevant_datasets:
            raise ValueError(
                f"Specified stations not found in any batch files: {gage_id_lst}"
            )

        print(f"Reading data from {len(relevant_datasets)} batches...")

        # Merge data from all relevant batches
        if len(relevant_datasets) == 1:
            final_ds = relevant_datasets[0]
        else:
            final_ds = xr.concat(relevant_datasets, dim="basin")

        # Rename to standard variable names
        final_ds = final_ds.rename(rename_map)

        # Ensure stations are arranged in input order
        if len(gage_id_lst) > 0:
            # Only select actually existing stations
            existing_basins = [b for b in gage_id_lst if b in final_ds.basin.values]
            if existing_basins:
                final_ds = final_ds.sel(basin=existing_basins)

        return final_ds

default_t_range property

__init__(data_path, region=None, download=False)

Initialize CAMELSH dataset.

Parameters:

Name Type Description Default
data_path str

Path to the CAMELSH data directory

required
region Optional[str]

Geographic region identifier (optional)

None
download bool

Whether to download data automatically (default: False)

False
Source code in hydrodataset/camelsh.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self, data_path: str, region: Optional[str] = None, download: bool = False
) -> None:
    """Initialize CAMELSH dataset.

    Args:
        data_path: Path to the CAMELSH data directory
        region: Geographic region identifier (optional)
        download: Whether to download data automatically (default: False)
    """
    super().__init__(data_path)
    self.region = region
    self.download = download

    # Use the fixed CAMELSH class with corrected directory paths
    self.aqua_fetch = CAMELSH(data_path)

read_ts_xrdataset(gage_id_lst=None, t_range=None, var_lst=None, sources=None, **kwargs)

Read timeseries data (supports standardized variable names and multiple data sources)

Read data from batch-saved cache files

Parameters:

Name Type Description Default
gage_id_lst list

List of station IDs

None
t_range list

Time range [start, end]

None
var_lst list

List of standard variable names

None
sources dict

Data source dictionary, format is {variable_name: data_source} or {variable_name: [data_source_list]}

None

Returns:

Type Description
Dataset

xr.Dataset: xarray dataset containing requested data

Source code in hydrodataset/camelsh.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def read_ts_xrdataset(
    self,
    gage_id_lst: list = None,
    t_range: list = None,
    var_lst: list = None,
    sources: dict = None,
    **kwargs,
) -> xr.Dataset:
    """
    Read timeseries data (supports standardized variable names and multiple data sources)

    Read data from batch-saved cache files

    Args:
        gage_id_lst: List of station IDs
        t_range: Time range [start, end]
        var_lst: List of standard variable names
        sources: Data source dictionary, format is {variable_name: data_source} or {variable_name: [data_source_list]}

    Returns:
        xr.Dataset: xarray dataset containing requested data
    """
    if (
        not hasattr(self, "_dynamic_variable_mapping")
        or not self._dynamic_variable_mapping
    ):
        raise NotImplementedError(
            "This dataset does not support the standardized variable mapping."
        )

    if var_lst is None:
        var_lst = list(self._dynamic_variable_mapping.keys())

    if t_range is None:
        t_range = self.default_t_range

    target_vars_to_fetch = []
    rename_map = {}

    # Process variable name mapping and data source selection
    for std_name in var_lst:
        if std_name not in self._dynamic_variable_mapping:
            raise ValueError(
                f"'{std_name}' is not a recognized standard variable for this dataset."
            )

        mapping_info = self._dynamic_variable_mapping[std_name]

        # Determine which data source(s) to use
        is_explicit_source = sources and std_name in sources
        sources_to_use = []
        if is_explicit_source:
            provided_sources = sources[std_name]
            if isinstance(provided_sources, list):
                sources_to_use.extend(provided_sources)
            else:
                sources_to_use.append(provided_sources)
        else:
            sources_to_use.append(mapping_info["default_source"])

        # Only need suffix when user explicitly requests multiple data sources
        needs_suffix = is_explicit_source and len(sources_to_use) > 1
        for source in sources_to_use:
            if source not in mapping_info["sources"]:
                raise ValueError(
                    f"Source '{source}' is not available for variable '{std_name}'."
                )

            actual_var_name = mapping_info["sources"][source]["specific_name"]
            target_vars_to_fetch.append(actual_var_name)
            output_name = f"{std_name}_{source}" if needs_suffix else std_name
            rename_map[actual_var_name] = output_name

    # Find all batch files
    import glob

    batch_pattern = str(self.cache_dir / "batch*_camelsh_timeseries.nc")
    batch_files = sorted(glob.glob(batch_pattern))

    if not batch_files:
        print("No batch cache files found, starting cache creation...")
        self.cache_timeseries_xrdataset()
        batch_files = sorted(glob.glob(batch_pattern))

        if not batch_files:
            raise FileNotFoundError("Cache creation failed, no batch files found")

    print(f"Found {len(batch_files)} batch files")

    # If no stations specified, read all stations
    if gage_id_lst is None:
        print("No station list specified, will read all stations...")
        gage_id_lst = self.read_object_ids().tolist()

    # Convert station IDs to strings (ensure consistency)
    gage_id_lst = [str(gid) for gid in gage_id_lst]

    # Iterate through batch files to find batches containing required stations
    relevant_datasets = []
    for batch_file in batch_files:
        try:
            # First open only coordinates, don't load data
            ds_batch = xr.open_dataset(batch_file)
            batch_basins = [str(b) for b in ds_batch.basin.values]

            # Check if this batch contains required stations
            common_basins = list(set(gage_id_lst) & set(batch_basins))

            if common_basins:
                print(
                    f"Batch {os.path.basename(batch_file)}: contains {len(common_basins)} required stations"
                )

                # Check if variables exist
                missing_vars = [
                    v for v in target_vars_to_fetch if v not in ds_batch.data_vars
                ]
                if missing_vars:
                    ds_batch.close()
                    raise ValueError(
                        f"Batch {os.path.basename(batch_file)} missing variables: {missing_vars}"
                    )

                # Select variables and stations
                ds_subset = ds_batch[target_vars_to_fetch]
                ds_selected = ds_subset.sel(
                    basin=common_basins, time=slice(t_range[0], t_range[1])
                )

                relevant_datasets.append(ds_selected)
                ds_batch.close()
            else:
                ds_batch.close()

        except Exception as e:
            print(f"Failed to read batch file {batch_file}: {e}")
            continue

    if not relevant_datasets:
        raise ValueError(
            f"Specified stations not found in any batch files: {gage_id_lst}"
        )

    print(f"Reading data from {len(relevant_datasets)} batches...")

    # Merge data from all relevant batches
    if len(relevant_datasets) == 1:
        final_ds = relevant_datasets[0]
    else:
        final_ds = xr.concat(relevant_datasets, dim="basin")

    # Rename to standard variable names
    final_ds = final_ds.rename(rename_map)

    # Ensure stations are arranged in input order
    if len(gage_id_lst) > 0:
        # Only select actually existing stations
        existing_basins = [b for b in gage_id_lst if b in final_ds.basin.values]
        if existing_basins:
            final_ds = final_ds.sel(basin=existing_basins)

    return final_ds