Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

possible data corruption when retrieving data for a large number of steps #24

Open
notoriusjack opened this issue Jul 28, 2023 · 1 comment

Comments

@notoriusjack
Copy link

I think there might be a problem with the data that is downloaded when a large number of steps are passed.

In this example I am passing all steps from 0h to 240h for all ensemble numbers from 1 to 50 for the parameters 10u and 10v.
I am opening the file with xarray, filter the array by a random ensemble number and loop through every single step printing all the values for one of the variables.

The code will fail with the error reported below. However if I try divide the list of steps in two halves and process them separately the code will run correctly which makes me think there might be some issues with the data when retrieving a large number of steps in one go.

Can you replicate this behaviour?

import sys
np.set_printoptions(threshold=sys.maxsize)

from ecmwf.opendata import Client
def get_ECMWF_open_data_grib_ensemble(target, stream, step, param, number):
    try:
        client = Client(source="ecmwf")
        downloaded_file = client.retrieve(
            # time=0,
            target = target,
            stream=stream,
            # type=type,
            step=step,
            param=param,
            number=number,
        )
        return downloaded_file

    except:
        print('The ECMWF service is not currently available, please try again later')

grib_stream = 'enfo'
# grib_type = 'fc'
grib_format = 'grib2'
grib_target = f'aaa_{grib_stream}.{grib_format}'
step = [0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99, 102, 105, 108, 111, 114, 117, 120, 123, 126, 129, 132, 135, 138, 141, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198, 204, 210, 216, 222, 228, 234, 240]
grib_param = ["10u", "10v"]
grib_number = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
grib_file = get_ECMWF_open_data_grib_ensemble(grib_target, grib_stream, step, grib_param, grib_number)

ds = xr.open_mfdataset(grib_target, engine='cfgrib', parallel=True, chunks={'step': 3, 'number': 3,},
                               backend_kwargs={'filter_by_keys': {'typeOfLevel': 'heightAboveGround', 'topLevel': 10}, })
print(ds)


ensemble = 11 #21
mask = (ds.number.values == ensemble)
xarr = ds.sel(number=mask).squeeze(drop=True)
print('xarr')
print(xarr)

print(xarr.coords['step'].values)

for number in step:
    # number = 0
    h = str(number) + 'hours'
    timedelta = pd.Timedelta(h)
    res = timedelta.to_timedelta64()
    print(res)


    new_da = xarr.where(xarr.step == res, drop=True)
    print('new_da')
    print(new_da)

    print(new_da['u10'].values)
    print(new_da['u10'].size)

error:

Traceback (most recent call last):
  File "C:\Users\Giacomo\PycharmProjects\HIT-v3\test.py", line 378, in <module>
    print(new_da['u10'].values)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\dataarray.py", line 732, in values
    return self.variable.values
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\variable.py", line 614, in values
    return _as_array_or_item(self._data)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\variable.py", line 314, in _as_array_or_item
    data = np.asarray(data)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\array\core.py", line 1701, in __array__
    x = self.compute()
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\base.py", line 310, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\base.py", line 595, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\threaded.py", line 89, in get
    results = get_async(
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\local.py", line 319, in reraise
    raise exc
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\optimization.py", line 992, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 151, in get
    result = _execute_task(task, cache)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\array\core.py", line 126, in getter
    c = np.asarray(c)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 484, in __array__
    return np.asarray(self.get_duck_array(), dtype=dtype)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 487, in get_duck_array
    return self.array.get_duck_array()
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 664, in get_duck_array
    return self.array.get_duck_array()
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 551, in get_duck_array
    array = self.array[self.key]
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\xarray_plugin.py", line 155, in __getitem__
    return xr.core.indexing.explicit_indexing_adapter(
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 858, in explicit_indexing_adapter
    result = raw_indexing_method(raw_key.tuple)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\xarray_plugin.py", line 164, in _getitem
    return self.array[key]
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\dataset.py", line 358, in __getitem__
    message = self.index.get_field(message_ids[0])  # type: ignore
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\messages.py", line 484, in get_field
    return ComputedKeysAdapter(self.fieldset[message_id], self.computed_keys)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\messages.py", line 344, in __getitem__
    return self.message_from_file(file, offset=item)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\messages.py", line 340, in message_from_file
    return Message.from_file(file, offset, **kwargs)
  File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\messages.py", line 93, in from_file
    file.seek(offset)
OSError: [Errno 22] Invalid argument
@hammad93
Copy link

It is possible that the data returned is incomplete and causes an error. Perhaps test small subsets of the steps, explore the data, then proceed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants