diff --git a/README.rst b/README.rst index 0482d25b..7852c29e 100644 --- a/README.rst +++ b/README.rst @@ -649,6 +649,114 @@ Attributes: institution: US National Weather Service - NCEP] +Alternatively, you can as well use the ``filter_heterogeneous`` args, *cfgrib* will try to load +all heterogeneous variables at once, and them merge them to a single ``xr.Dataset```. + +When coordinates values mismatches, name will be incremented, eg. ``pressureFromGroundLayer1``. + +Please note that variable name will be concatenation of ``{shortName}__{typeOfLevel}__{stepType}``, +cf. example below. + +.. code-block: python + +>>> import xarray as xr +>>> xr.open_dataset('nam.t00z.awp21100.tm00.grib2', engine='cfgrib', + backend_kwargs={'filter_heterogeneous': True}) + +Dimensions: (y: 65, x: 93, isobaricInhPa: 19, + pressureFromGroundLayer: 5, + isobaricInhPa1: 5, + pressureFromGroundLayer1: 2, + pressureFromGroundLayer2: 2, + heightAboveGroundLayer: 2) +Coordinates: + time datetime64[ns] ... + step timedelta64[ns] ... + meanSea float64 ... + latitude (y, x) float64 ... + longitude (y, x) float64 ... + valid_time datetime64[ns] ... + surface float64 ... + * isobaricInhPa (isobaricInhPa) float64 1e+03 950.0 ... 100.0 + cloudBase float64 ... + cloudTop float64 ... + maxWind float64 ... + isothermZero float64 ... + tropopause float64 ... + * pressureFromGroundLayer (pressureFromGroundLayer) float64 3e+03 ... 1.5... + * isobaricInhPa1 (isobaricInhPa1) float64 1e+03 850.0 ... 250.0 + heightAboveGround float64 ... + heightAboveGround1 float64 ... + heightAboveGround2 float64 ... + * pressureFromGroundLayer1 (pressureFromGroundLayer1) float64 9e+03 1.8e+04 + * pressureFromGroundLayer2 (pressureFromGroundLayer2) float64 9e+03 1.8e+04 + atmosphereSingleLayer float64 ... + * heightAboveGroundLayer (heightAboveGroundLayer) float64 1e+03 3e+03 + pressureFromGroundLayer3 float64 ... + pressureFromGroundLayer4 float64 ... +Dimensions without coordinates: y, x +Data variables: + prmsl__meanSea__instant (y, x) float32 ... + gust__surface__instant (y, x) float32 ... + gh__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + gh__cloudBase__instant (y, x) float32 ... + gh__cloudTop__instant (y, x) float32 ... + gh__maxWind__instant (y, x) float32 ... + gh__isothermZero__instant (y, x) float32 ... + t__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + t__cloudTop__instant (y, x) float32 ... + t__tropopause__instant (y, x) float32 ... + t__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + r__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + r__isothermZero__instant (y, x) float32 ... + r__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + w__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + u__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + u__tropopause__instant (y, x) float32 ... + u__maxWind__instant (y, x) float32 ... + u__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + v__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + v__tropopause__instant (y, x) float32 ... + v__maxWind__instant (y, x) float32 ... + v__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + absv__isobaricInhPa__instant (isobaricInhPa1, y, x) float32 ... + mslet__meanSea__instant (y, x) float32 ... + sp__surface__instant (y, x) float32 ... + orog__surface__instant (y, x) float32 ... + t2m__heightAboveGround__instant (y, x) float32 ... + r2__heightAboveGround__instant (y, x) float32 ... + u10__heightAboveGround__instant (y, x) float32 ... + v10__heightAboveGround__instant (y, x) float32 ... + tp__surface__accum (y, x) float32 ... + acpcp__surface__accum (y, x) float32 ... + csnow__surface__instant (y, x) float32 ... + cicep__surface__instant (y, x) float32 ... + cfrzr__surface__instant (y, x) float32 ... + crain__surface__instant (y, x) float32 ... + cape__surface__instant (y, x) float32 ... + cape__pressureFromGroundLayer__instant (pressureFromGroundLayer1, y, x) float32 ... + cin__surface__instant (y, x) float32 ... + cin__pressureFromGroundLayer__instant (pressureFromGroundLayer2, y, x) float32 ... + pwat__atmosphereSingleLayer__instant (y, x) float32 ... + pres__cloudBase__instant (y, x) float32 ... + pres__cloudTop__instant (y, x) float32 ... + pres__maxWind__instant (y, x) float32 ... + hlcy__heightAboveGroundLayer__instant (heightAboveGroundLayer, y, x) float32 ... + trpp__tropopause__instant (y, x) float32 ... + pli__pressureFromGroundLayer__instant (y, x) float32 ... + 4lftx__pressureFromGroundLayer__instant (y, x) float32 ... + unknown__surface__instant (y, x) float32 ... +Attributes: + GRIB_edition: 2 + GRIB_centre: kwbc + GRIB_centreDescription: US National Weather Service - NCEP + GRIB_subCentre: 0 + Conventions: CF-1.7 + institution: US National Weather Service - NCEP + history: 2023-12-04T16:56 GRIB to CDM+CF via cfgrib-0.9.1... + + + Advanced usage ============== diff --git a/cfgrib/__main__.py b/cfgrib/__main__.py index 0404c4e6..73b6c62f 100644 --- a/cfgrib/__main__.py +++ b/cfgrib/__main__.py @@ -176,5 +176,18 @@ def dump(inpaths, variable, cdm, engine): print(ds_or_da) +@cfgrib_cli.command("build_index") +@click.argument("inpaths", nargs=-1, required=True) +@click.option("--index-basedir", default=None) +@click.option("--force-index-creation", default=None) +def build_index(inpaths, index_basedir, force_index_creation): + # type: (T.List[str], str, bool) -> None + from .dataset import get_or_create_index + + for fp in inpaths: + print(f"{fp}: Creating index") + get_or_create_index(str(fp), index_basedir, force_index_creation) + + if __name__ == "__main__": # pragma: no cover cfgrib_cli() diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index bf5eea9a..15a4c85f 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -22,7 +22,9 @@ import json import logging import os +import itertools import typing as T +from pathlib import Path import attr import numpy as np @@ -299,6 +301,17 @@ def __eq__(self, other): equal = (self.dimensions, self.attributes) == (other.dimensions, other.attributes) return equal and np.array_equal(self.data, other.data) + def rename_coordinate(self, existing_coord_name, new_coord_name): + # type: (str, str) -> None + # Update data_var dimensions, eg. ('isobaricInhPa', 'y', 'x') + self.dimensions = tuple( + [x.replace(existing_coord_name, new_coord_name) for x in self.dimensions] + ) + if 'coordinates' in self.attributes: + # Update data_var attributes, eg. 'time step isobaricInhPa latitude longitude valid_time' + new_coordinates = self.attributes['coordinates'].replace(existing_coord_name, new_coord_name) + self.attributes['coordinates'] = new_coordinates + def expand_item(item, shape): # type: (T.Tuple[T.Any, ...], T.Sequence[int]) -> T.Tuple[T.List[int], ...] @@ -661,13 +674,31 @@ def build_dataset_components( time_dims: T.Sequence[str] = ("time", "step"), extra_coords: T.Dict[str, str] = {}, cache_geo_coords: bool = True, + filter_heterogeneous: bool = False, ) -> T.Tuple[T.Dict[str, int], T.Dict[str, Variable], T.Dict[str, T.Any], T.Dict[str, T.Any]]: dimensions = {} # type: T.Dict[str, int] variables = {} # type: T.Dict[str, Variable] filter_by_keys = index.filter_by_keys - for param_id in index.get("paramId", []): - var_index = index.subindex(paramId=param_id) + subindex_kwargs_list = [{"paramId": param_id} for param_id in index.get("paramId", [])] + if filter_heterogeneous: + # Generate all possible combinations of paramId/typeOfLevel/stepType + subindex_kwargs_list = [] + combinations = index.get('paramId', []), index.get('typeOfLevel', []), index.get('stepType', []) + for param_id, type_of_level, step_type in itertools.product(*combinations): + subindex_kwargs_list.append({ + 'paramId': param_id, + 'typeOfLevel': type_of_level, + 'stepType': step_type + }) + + for subindex_kwargs in subindex_kwargs_list: + var_index = index.subindex(**subindex_kwargs) + if not var_index.header_values: + # For some combinations, no match availables + log.debug(f"No match for {subindex_kwargs}") + continue + try: dims, data_var, coord_vars = build_variable_components( var_index, @@ -692,10 +723,40 @@ def build_dataset_components( fbks.append(fbk) error_message += "\n filter_by_keys=%r" % fbk raise DatasetBuildError(error_message, key, fbks) + param_id = subindex_kwargs['paramId'] short_name = data_var.attributes.get("GRIB_shortName", "paramId_%d" % param_id) var_name = data_var.attributes.get("GRIB_cfVarName", "unknown") if "parameter" in encode_cf and var_name not in ("undef", "unknown"): short_name = var_name + + if filter_heterogeneous: + # Unique variable name looking like "t__surface__instant" + short_name = "__".join([ + short_name, + data_var.attributes.get("GRIB_typeOfLevel", "unknown"), + data_var.attributes.get("GRIB_stepType", "unknown"), + ]) + + # Handle coordinates name/values collision + for coord_name in list(coord_vars.keys()): + if coord_name in variables: + current_coord_var = coord_vars[coord_name] + existing_coord_var = variables[coord_name] + if current_coord_var == existing_coord_var: + continue + + # Coordinates have same name, but values not equal -> we need to rename to avoid collision + # Renaming will follow something like isobaricInhPa, isobaricInhPa1, etc. + coord_name_count = len([x for x in variables.keys() if x.startswith(coord_name)]) + coord_name_unique = f"{coord_name}{coord_name_count}" + + # Update attributes + if coord_name in dims: + dims[coord_name_unique] = dims.pop(coord_name) + data_var.rename_coordinate(coord_name, coord_name_unique) + current_coord_var.rename_coordinate(coord_name, coord_name_unique) + coord_vars[coord_name_unique] = coord_vars.pop(coord_name) + try: dict_merge(variables, coord_vars) dict_merge(variables, {short_name: data_var}) @@ -797,3 +858,16 @@ def open_file( index = open_fileindex(stream, indexpath, index_keys, filter_by_keys=filter_by_keys) return open_from_index(index, read_keys, time_dims, extra_coords, errors=errors, **kwargs) + + +def get_or_create_index(fp: str | Path, index_basedir: str | Path, force_index_creation: bool=False) -> messages.FileIndex: + """ Create a pygrib index file """ + index_keys = compute_index_keys() + stream = messages.FileStream(str(fp)) + index = messages.FileIndex.from_indexpath_or_filestream( + filestream=stream, + index_keys=index_keys, + indexpath=str(os.path.join(index_basedir, '{path}.idx')), + force_index_creation=force_index_creation + ) + return index diff --git a/cfgrib/messages.py b/cfgrib/messages.py index f7d725fb..9de18942 100644 --- a/cfgrib/messages.py +++ b/cfgrib/messages.py @@ -23,6 +23,7 @@ import os import pickle import typing as T +from pathlib import Path import attr import eccodes # type: ignore @@ -520,9 +521,10 @@ class FileIndex(FieldsetIndex): @classmethod def from_indexpath_or_filestream( - cls, filestream, index_keys, indexpath=DEFAULT_INDEXPATH, computed_keys={}, log=LOG + cls, filestream, index_keys, indexpath=DEFAULT_INDEXPATH, computed_keys={}, log=LOG, + force_index_creation=False ): - # type: (FileStream, T.Sequence[str], str, ComputedKeysType, logging.Logger) -> FileIndex + # type: (FileStream, T.Sequence[str], str, ComputedKeysType, logging.Logger, bool) -> FileIndex # Reading and writing the index can be explicitly suppressed by passing indexpath==''. if not indexpath: @@ -530,6 +532,15 @@ def from_indexpath_or_filestream( hash = hashlib.md5(repr(index_keys).encode("utf-8")).hexdigest() indexpath = indexpath.format(path=filestream.path, hash=hash, short_hash=hash[:5]) + + if not Path(indexpath).parent.exists(): + # When reading from read-only partition, we can define indexes in another + # directory, eg. indexpath='/tmp/indexes/{path}.idx' + Path(indexpath).parent.mkdir(parents=True, exist_ok=True) + + if force_index_creation and os.path.exists(indexpath): + os.unlink(indexpath) + try: with compat_create_exclusive(indexpath) as new_index_file: self = cls.from_fieldset(filestream, index_keys, computed_keys) diff --git a/cfgrib/xarray_plugin.py b/cfgrib/xarray_plugin.py index a9268208..a80b8dcd 100644 --- a/cfgrib/xarray_plugin.py +++ b/cfgrib/xarray_plugin.py @@ -98,6 +98,7 @@ def open_dataset( lock: T.Union[T.ContextManager[T.Any], None] = None, indexpath: str = messages.DEFAULT_INDEXPATH, filter_by_keys: T.Dict[str, T.Any] = {}, + filter_heterogeneous: bool = False, read_keys: T.Iterable[str] = (), encode_cf: T.Sequence[str] = ("parameter", "time", "geography", "vertical"), squeeze: bool = True, @@ -110,6 +111,7 @@ def open_dataset( filename_or_obj, indexpath=indexpath, filter_by_keys=filter_by_keys, + filter_heterogeneous=filter_heterogeneous, read_keys=read_keys, encode_cf=encode_cf, squeeze=squeeze, diff --git a/tests/test_20_messages.py b/tests/test_20_messages.py index 490e3e18..25928aee 100644 --- a/tests/test_20_messages.py +++ b/tests/test_20_messages.py @@ -197,6 +197,15 @@ def test_FileIndex_from_indexpath_or_filestream(tmpdir: py.path.local) -> None: ) assert isinstance(res, messages.FileIndex) + # trigger index dir creation + res = messages.FileIndex.from_indexpath_or_filestream( + messages.FileStream(str(grib_file)), + ["paramId"], + indexpath=str(tmpdir / "non-existent-folder" / "{path}.idx"), + ) + assert isinstance(res, messages.FileIndex) + assert (tmpdir / "non-existent-folder").exists() + def test_FileIndex_errors() -> None: computed_keys = {"error_key": (lambda m: bool(1 / 0), lambda m, v: None)} # pragma: no branch diff --git a/tests/test_30_dataset.py b/tests/test_30_dataset.py index 5523d3ee..94acb316 100644 --- a/tests/test_30_dataset.py +++ b/tests/test_30_dataset.py @@ -324,3 +324,11 @@ def test_missing_field_values() -> None: t2 = res.variables["t2m"] assert np.isclose(np.nanmean(t2.data[0, :, :]), 268.375) assert np.isclose(np.nanmean(t2.data[1, :, :]), 270.716) + + +def test_get_or_create_index(tmpdir) -> None: + index = dataset.get_or_create_index(TEST_DATA, os.path.join(tmpdir, "indexes")) + assert isinstance(index, messages.FileIndex) + + index = dataset.get_or_create_index(TEST_DATA, os.path.join(tmpdir, "indexes"), force_index_creation=True) + assert isinstance(index, messages.FileIndex) diff --git a/tests/test_40_xarray_store.py b/tests/test_40_xarray_store.py index 0f51613b..a4bc6544 100644 --- a/tests/test_40_xarray_store.py +++ b/tests/test_40_xarray_store.py @@ -126,6 +126,16 @@ def test_open_datasets() -> None: assert res[0].attrs["GRIB_centre"] == "ecmf" +def test_open_filter_heterogeneous() -> None: + res = xarray_store.open_dataset(TEST_DATA, backend_kwargs={ + 'filter_heterogeneous': True + }) + + assert isinstance(res, xr.Dataset) + assert "t__isobaricInhPa__instant" in res + assert res.attrs["GRIB_centre"] == "ecmf" + + def test_cached_geo_coords() -> None: ds1 = xarray_store.open_dataset(TEST_DATA_MULTIPLE_FIELDS) ds2 = xarray_store.open_dataset( diff --git a/tests/test_50_sample_data.py b/tests/test_50_sample_data.py index c4384dd7..3a16ac77 100644 --- a/tests/test_50_sample_data.py +++ b/tests/test_50_sample_data.py @@ -64,6 +64,37 @@ def test_open_datasets(grib_name: str) -> None: assert len(res) > 1 +@pytest.mark.parametrize( + "grib_name", + [ + "era5-levels-members", + "fields_with_missing_values", + "lambert_grid", + "reduced_gg", + "regular_gg_sfc", + "regular_gg_pl", + "regular_gg_ml", + "regular_gg_ml_g2", + "regular_ll_sfc", + "regular_ll_msl", + "scanning_mode_64", + "single_gridpoint", + "spherical_harmonics", + "t_analysis_and_fc_0", + "hpa_and_pa", + "uv_on_different_levels", + ] +) +def test_open_dataset_heterogeneous(grib_name: str) -> None: + grib_path = os.path.join(SAMPLE_DATA_FOLDER, grib_name + ".grib") + + res = xarray_store.open_dataset(grib_path, backend_kwargs={ + 'filter_heterogeneous': True + }) + + assert isinstance(res, xr.Dataset) + + @pytest.mark.parametrize( "grib_name", [