Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download data simultaneously and cache files #385

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 155 additions & 11 deletions atlite/datasets/era5.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation
"""

import hashlib
import logging
import os
import time
import warnings
import weakref
from tempfile import mkstemp

import cdsapi
import numpy as np
import pandas as pd
import requests
import xarray as xr
from dask import compute, delayed
from dask.array import arctan2, sqrt
Expand All @@ -25,6 +28,10 @@
from atlite.gis import maybe_swap_spatial_dims
from atlite.pv.solar_position import SolarPosition

download_status = {}
file_aliases = {}
MAX_DISPLAY_FILES = 3

# Null context for running a with statements wihout any context
try:
from contextlib import nullcontext
Expand Down Expand Up @@ -323,6 +330,125 @@
logger.error(f"Unable to delete file {path}, as it is still in use.")


def get_cache_filename(request, cache_dir):
"""
Generate a unique cache filename based on the request parameters.
"""
# Serialize the request dictionary into a sorted string to ensure consistency
request_str = "_".join(

Check warning on line 338 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L338

Added line #L338 was not covered by tests
f"{key}-{sorted(value) if isinstance(value, list) else value}"
for key, value in sorted(request.items())
)
# Generate a hash of the request string
request_hash = hashlib.md5(request_str.encode("utf-8")).hexdigest()

Check warning on line 343 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L343

Added line #L343 was not covered by tests
# Use the first 8 characters of the hash for brevity
return f"{request_hash}.nc"

Check warning on line 345 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L345

Added line #L345 was not covered by tests


def custom_download(url, size, target, lock, filename):
"""
Optimized download function that uses a simple progress bar and removes
completed files from the display.
"""
if target is None:
target = url.split("/")[-1]

Check warning on line 354 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L354

Added line #L354 was not covered by tests

# Assign a short alias for the filename (e.g. f1, f2, ...)
file_number = len(file_aliases) + 1
file_aliases[filename] = f"f{file_number}"

Check warning on line 358 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L357-L358

Added lines #L357 - L358 were not covered by tests

logging.info(f"Downloading {filename} to {target} ({size} bytes)")
start = time.time()

Check warning on line 361 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L360-L361

Added lines #L360 - L361 were not covered by tests

mode = "wb"
total = 0
sleep = 10
tries = 0
headers = None

Check warning on line 367 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L363-L367

Added lines #L363 - L367 were not covered by tests

while tries < 5:
r = requests.get(url, stream=True, headers=headers)
try:
r.raise_for_status()

Check warning on line 372 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L370-L372

Added lines #L370 - L372 were not covered by tests

with open(target, mode) as f:

Check warning on line 374 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L374

Added line #L374 was not covered by tests
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
total += len(chunk)
with lock:
download_status[filename] = total / size * 100
update_progress_bar()

Check warning on line 381 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L377-L381

Added lines #L377 - L381 were not covered by tests

except requests.exceptions.ConnectionError as e:
logging.error(f"Download interrupted: {e}")
break

Check warning on line 385 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L383-L385

Added lines #L383 - L385 were not covered by tests
finally:
r.close()

Check warning on line 387 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L387

Added line #L387 was not covered by tests

if total >= size:
break

Check warning on line 390 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L390

Added line #L390 was not covered by tests

logging.error(f"Download incomplete, downloaded {total} bytes out of {size}")
logging.warning(f"Sleeping {sleep} seconds")
time.sleep(sleep)
mode = "ab"
total = os.path.getsize(target)
sleep *= 1.5
headers = {"Range": f"bytes={total}-"}
tries += 1

Check warning on line 399 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L392-L399

Added lines #L392 - L399 were not covered by tests

if total != size:
raise Exception(f"Download failed: downloaded {total} bytes out of {size}")

Check warning on line 402 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L402

Added line #L402 was not covered by tests

elapsed = time.time() - start

Check warning on line 404 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L404

Added line #L404 was not covered by tests
if elapsed:
logging.info(f"Download rate {total / elapsed:.2f} bytes/s")

Check warning on line 406 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L406

Added line #L406 was not covered by tests

return target

Check warning on line 408 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L408

Added line #L408 was not covered by tests


def update_progress_bar():
"""
Update a progress bar that shows the percentage of all files being
downloaded.

Files that have reached 100% are removed from the display. Only
short aliases are displayed.
"""
completed_files = [

Check warning on line 419 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L419

Added line #L419 was not covered by tests
file for file, progress in download_status.items() if progress >= 100
]

# Remove completed files from the progress dictionary
for file in completed_files:
del download_status[file]
del file_aliases[file] # Remove alias as well

Check warning on line 426 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L425-L426

Added lines #L425 - L426 were not covered by tests

if not download_status:
# If no active downloads, clear the progress bar
print("\r", end="")
return

Check warning on line 431 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L430-L431

Added lines #L430 - L431 were not covered by tests

# Only display the top N files to avoid multi-line output
displayed_files = list(download_status.items())[:MAX_DISPLAY_FILES]

Check warning on line 434 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L434

Added line #L434 was not covered by tests

# Create progress string using the short aliases
progress = " | ".join(

Check warning on line 437 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L437

Added line #L437 was not covered by tests
[
f"{file_aliases[file]}: {int(progress)}%"
for file, progress in displayed_files
]
)

# If there are more files, show a summary
if len(download_status) > MAX_DISPLAY_FILES:
progress += f" | ... and {len(download_status) - MAX_DISPLAY_FILES} more"

Check warning on line 446 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L446

Added line #L446 was not covered by tests

# Use \r to overwrite the same line
print(f"\r{progress}", end="")

Check warning on line 449 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L449

Added line #L449 was not covered by tests


def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates):
"""
Download data like ERA5 from the Climate Data Store (CDS).
Expand All @@ -337,6 +463,21 @@
request
), "Need to specify at least 'variable', 'year' and 'month'"

# Use tmpdir for cache directory; if not provided, use current working directory
if tmpdir is None:
tmpdir = os.getcwd()
cache_dir = tmpdir
os.makedirs(cache_dir, exist_ok=True)

Check warning on line 470 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L468-L470

Added lines #L468 - L470 were not covered by tests

# Generate cache filename based on request
cache_filename = get_cache_filename(request, cache_dir)
cache_filepath = os.path.join(cache_dir, cache_filename)

Check warning on line 474 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L473-L474

Added lines #L473 - L474 were not covered by tests

if os.path.exists(cache_filepath):
logging.info(f"Using cached file for request: {cache_filename}")
ds = xr.open_dataset(cache_filepath, chunks=chunks or {})
return ds

Check warning on line 479 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L477-L479

Added lines #L477 - L479 were not covered by tests

client = cdsapi.Client(
info_callback=logger.debug, debug=logging.DEBUG >= logging.root.level
)
Expand All @@ -349,25 +490,28 @@
fd, target = mkstemp(suffix=".nc", dir=tmpdir)
os.close(fd)

# Inform user about data being downloaded as "* variable (year-month)"
timestr = f"{request['year']}-{request['month']}"
variables = atleast_1d(request["variable"])
varstr = "\n\t".join([f"{v} ({timestr})" for v in variables])
logger.info(f"CDS: Downloading variables\n\t{varstr}\n")
result.download(target)

ds = xr.open_dataset(target, chunks=chunks or {})
# Inform user about data being downloaded as "* variable (year-month)"
timestr = f"{request['year']}-{request['month']}"
variables = atleast_1d(request["variable"])
varstr = "\n\t".join([f"{v} ({timestr})" for v in variables])
filename = f"{variables[0]}_{timestr}.nc"
logger.info(f"CDS: Downloading variables\n\t{varstr}\n")
custom_download(result.location, result.content_length, target, lock, filename)

Check warning on line 499 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L494-L499

Added lines #L494 - L499 were not covered by tests

# Move the downloaded file to cache directory
os.rename(target, cache_filepath)
ds = xr.open_dataset(cache_filepath, chunks=chunks or {})

Check warning on line 503 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L502-L503

Added lines #L502 - L503 were not covered by tests
if tmpdir is None:
logger.debug(f"Adding finalizer for {target}")
weakref.finalize(ds._file_obj._manager, noisy_unlink, target)
logger.debug(f"Adding finalizer for {cache_filepath}")
weakref.finalize(ds._file_obj._manager, noisy_unlink, cache_filepath)

Check warning on line 506 in atlite/datasets/era5.py

View check run for this annotation

Codecov / codecov/patch

atlite/datasets/era5.py#L505-L506

Added lines #L505 - L506 were not covered by tests

# Remove default encoding we get from CDSAPI, which can lead to NaN values after loading with subsequent
# saving due to how xarray handles netcdf compression (only float encoded as short int seem affected)
# Fixes issue by keeping "float32" encoded as "float32" instead of internally saving as "short int", see:
# https://stackoverflow.com/questions/75755441/why-does-saving-to-netcdf-without-encoding-change-some-values-to-nan
# and hopefully fixed soon (could then remove), see https://github.com/pydata/xarray/issues/7691
for v in ds.data_vars:
if ds[v].encoding["dtype"] == "int16":
if ds[v].encoding.get("dtype") == "int16":
ds[v].encoding.clear()

return ds
Expand Down