Skip to content

Commit

Permalink
Merge pull request #12 from nasa/HARMONY-953
Browse files Browse the repository at this point in the history
Harmony 953
  • Loading branch information
hailiangzhang authored Nov 1, 2021
2 parents 4040407 + d03d21f commit c2d8a67
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 15 deletions.
80 changes: 66 additions & 14 deletions harmony_netcdf_to_zarr/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
import multiprocessing
from multiprocessing import Semaphore
from typing import Union
import re

import s3fs
import numpy as np
Expand All @@ -11,6 +13,9 @@

region = os.environ.get('AWS_DEFAULT_REGION') or 'us-west-2'

# Some global that may be shared by different methods
binary_prefix_conversion_map = {"Ki": 1024, "Mi": 1048576, "Gi": 1073741824}


def make_localstack_s3fs():
host = os.environ.get('LOCALSTACK_HOST') or 'host.docker.internal'
Expand Down Expand Up @@ -94,33 +99,80 @@ def scale_attribute(src, attr, scale_factor, add_offset):
return scale_fn(unscaled)


def regenerate_chunks(shape, chunks):
def compute_chunksize(shape: Union[tuple, list],
datatype: str,
compression_ratio: float = 7.2,
compressed_chunksize_byte: Union[int, str] = '10 Mi'):
"""
Regenerate new chunks based on given zarr chunks
Compute the chunksize for a given shape and datatype
based on the compression requirement
We will try to make it equal along different dimensions,
without exceeding the given shape boundary
Parameters
----------
shape : list/tuple
the zarr shape
chunks : list/tuple
the original zarr chunks
datatype: str
the zarr data type
which must be recognized by numpy
compression_ratio: str
expected compression ratio for each chunk
default to 7.2 which is the compression ratio
from a chunk size of (3000, 3000) with double precision
compressed to 10 Mi
compressed_chunksize_byte: int/string
expected chunk size in bytes after compression
If it's a string, assuming it follows NIST standard for binary prefix
(https://physics.nist.gov/cuu/Units/binary.html)
except that only Ki, Mi, and Gi are allowed.
Space is optional between number and unit.
Returns
-------
list
list/tuple
the regenerated new zarr chunks
"""
# regenerate new chunks
# NOTE currently make each chunk dimension to be its multiplier closest to 3000
# with a max chunksize of 3000
new_chunks = map(
lambda x: min(x[0], int(3000 / x[1]) * x[1] if x[1] < 3000 else 3000),
zip(shape, chunks),
# convert compressed_chunksize_byte to integer if it's a str
if type(compressed_chunksize_byte) == str:
try:
(value, unit) = re.findall(
r"^\s*([\d.]+)\s*(Ki|Mi|Gi)\s*$", compressed_chunksize_byte
)[0]
except IndexError:
err_message = """Chunksize needs to be either an integer or string.
If it's a string, assuming it follows NIST standard for binary prefix
(https://physics.nist.gov/cuu/Units/binary.html)
except that only Ki, Mi, and Gi are allowed."""
raise ValueError(err_message)
compressed_chunksize_byte = int(float(value)) * int(binary_prefix_conversion_map[unit])

# get product of chunksize along different dimensions before compression
if compression_ratio < 1.:
raise ValueError("Compression ratio < 1 found when estimating chunk size.")
chunksize_unrolled = int(
compressed_chunksize_byte * compression_ratio / np.dtype(datatype).itemsize
)
new_chunks = type(chunks)(list(new_chunks))

# compute the chunksize by trying to make it equal along different dimensions,
# without exceeding the given shape boundary
suggested_chunksize = np.full(len(shape), 0)
shape_array = np.array(shape)
dim_to_process = np.full(len(shape), True)
while not (~dim_to_process).all():
chunksize_remaining = chunksize_unrolled // suggested_chunksize[~dim_to_process].prod()
chunksize_oneside = int(pow(chunksize_remaining, 1 / dim_to_process.sum()))
if (shape_array[dim_to_process] >= chunksize_oneside).all():
suggested_chunksize[dim_to_process] = chunksize_oneside
dim_to_process[:] = False
else:
dim_to_fill = dim_to_process & (shape_array < chunksize_oneside)
suggested_chunksize[dim_to_fill] = shape_array[dim_to_fill]
dim_to_process[dim_to_fill] = False

# return new chunks
return new_chunks
suggested_chunksize = type(shape)(suggested_chunksize.tolist())
return suggested_chunksize


def __copy_variable(src, dst_group, name, sema=Semaphore(20)):
Expand Down Expand Up @@ -168,7 +220,7 @@ def __copy_variable(src, dst_group, name, sema=Semaphore(20)):
dtype = src.dtype
dtype = src.scale_factor.dtype if hasattr(src, 'scale_factor') else dtype
dtype = src.add_offset.dtype if hasattr(src, 'add_offset') else dtype
new_chunks = regenerate_chunks(src.shape, chunks)
new_chunks = compute_chunksize(src.shape, dtype)
dst = dst_group.create_dataset(name,
data=src,
shape=src.shape,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_end_to_end_large_file_conversion(self, _callback_post):
self.assertEqual(str(out.tree()), contents)

# -- Data Assertions --
self.assertEqual(out['data/var'].chunks, (2920,) )
self.assertEqual(out['data/var'].chunks, (10000,) )


@patch.object(argparse.ArgumentParser, 'error', return_value=None)
Expand Down
69 changes: 69 additions & 0 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Tests the Harmony convert module
"""

import unittest
import pytest

from harmony_netcdf_to_zarr import convert


class TestConvert(unittest.TestCase):
"""
Tests the Harmony adapter
"""
def setUp(self):
pass

def test_compute_chunksize_small(self):
"""
Test of compute_chunksize method for a small input shape
"""
chunksize_expected = (100, 100, 100)
chunksize_result = convert.compute_chunksize(shape=(100, 100,100), datatype='f8')
assert chunksize_expected == chunksize_result

def test_compute_chunksize_medium(self):
"""
Test of compute_chunksize method for a medium input shape
"""
chunksize_expected = (100, 307, 307)
chunksize_result = convert.compute_chunksize(shape=(100, 1000,1000), datatype='f8')
assert chunksize_expected == chunksize_result

def test_compute_chunksize_large(self):
"""
Test of compute_chunksize method for a large input shape
"""
chunksize_expected = (211, 211, 211)
chunksize_result = convert.compute_chunksize(shape=(1000, 1000,1000), datatype='f8')
assert chunksize_expected == chunksize_result

def test_compute_chunksize_with_compression_args(self):
"""
Test of compute_chunksize method with non-default compression args
"""
chunksize_expected = (100, 680, 680)
chunksize_result = convert.compute_chunksize(shape=(100, 1000,1000),
datatype='i4',
compression_ratio = 6.8,
compressed_chunksize_byte = '26.8 Mi')
assert chunksize_expected == chunksize_result

def test_compute_chunksize_wrong_arguments(self):
"""
Test of compute_chunksize method for a large input shape
"""
with pytest.raises(ValueError) as execinfo:
chunksize_result = convert.compute_chunksize(shape=(100, 1000,1000),
datatype='i4',
compression_ratio = 6.8,
compressed_chunksize_byte = '26.8 MB')
err_message_expected = """Chunksize needs to be either an integer or string.
If it's a string, assuming it follows NIST standard for binary prefix
(https://physics.nist.gov/cuu/Units/binary.html)
except that only Ki, Mi, and Gi are allowed."""
assert str(execinfo.value) == err_message_expected

def tearDown(self):
pass

0 comments on commit c2d8a67

Please sign in to comment.