From 4fab37eb7cb16896f355aed0b8e3be9b6480fa95 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Tue, 10 Sep 2024 14:21:23 -0800 Subject: [PATCH 1/6] Fix sign of horizontal shifts and add test --- tests/test_coreg/test_affine.py | 94 ++++++++++++++++++++++++++++++--- xdem/coreg/affine.py | 28 +++++----- xdem/coreg/base.py | 2 + 3 files changed, 104 insertions(+), 20 deletions(-) diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 0baa6114..01e0bdcd 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -7,6 +7,8 @@ import numpy as np import pytest import rasterio as rio + +import geoutils from geoutils import Raster, Vector from geoutils._typing import NDArrayNum from geoutils.raster import RasterType @@ -96,14 +98,34 @@ class TestAffineCoreg: ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. inlier_mask = ~outlines.create_mask(ref) - fit_params = dict( - reference_elev=ref.data, - to_be_aligned_elev=tba.data, - inlier_mask=inlier_mask, - transform=ref.transform, - crs=ref.crs, + # Check all point-raster possibilities supported + # Use the reference DEM for both, it will be artificially misaligned during tests + # Raster-Raster + fit_args_rst_rst = dict( + reference_elev=ref, + to_be_aligned_elev=ref, + verbose=True, + ) + + # Convert DEMs to points with a bit of subsampling for speed-up + ref_pts = ref.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + + # Raster-Point + fit_args_rst_pts = dict( + reference_elev=ref, + to_be_aligned_elev=ref_pts, verbose=True, ) + + # Point-Raster + fit_args_pts_rst = dict( + reference_elev=ref_pts, + to_be_aligned_elev=ref, + verbose=True, + ) + + all_fit_args = [fit_args_rst_rst, fit_args_rst_pts, fit_args_pts_rst] + # Create some 3D coordinates with Z coordinates being 0 to try the apply functions. points_arr = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T points = gpd.GeoDataFrame( @@ -172,6 +194,66 @@ def test_from_classmethods(self) -> None: if "non-finite values" not in str(exception): raise exception + @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore + @pytest.mark.parametrize("shifts", [(20, 5, 2), (-50, 100, 2)]) # type: ignore + @pytest.mark.parametrize("coreg_method", [coreg.NuthKaab, coreg.GradientDescending, coreg.ICP]) # type: ignore + def test_shift_coreg__synthetic(self, fit_args, shifts, coreg_method) -> None: + """ + Test the horizontal/vertical shift coregistrations with synthetic shifted data. These tests include NuthKaab, + ICP and GradientDescending. + + We test all combinaison of inputs: raster-raster, point-raster and raster-point. + + We verify that the shifts found by the coregistration are within 1% of the synthetic shifts with opposite sign + of the ones introduced, and that applying the coregistration to the shifted elevations corrects more than + 99% of the variance from the initial elevation differences. + """ + + # Try default "fit" parameters instantiation + horizontal_coreg = coreg_method() + elev_fit_args = fit_args.copy() + + # Create synthetic translation from the reference DEM + ref = self.ref + ref_shifted = ref.translate(shifts[0], shifts[1]) + shifts[2] + # Convert to point cloud if input was point cloud + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + ref_shifted = ref_shifted.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + elev_fit_args["to_be_aligned_elev"] = ref_shifted + + # Run coregistration + horizontal_coreg.fit( + **elev_fit_args, + subsample=40000, + random_state=42, + ) + + # Check all fit parameters are the opposite of those used above, within a relative 1% + fit_shifts = [-horizontal_coreg.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] + assert np.allclose(shifts, fit_shifts, rtol=10e-2) + + # Run apply and check that 99% of the variance was corrected + coreg_dem = horizontal_coreg.apply(elev_fit_args["to_be_aligned_elev"]) + + # For a point cloud output, need to interpolate with the other DEM to get dh + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + init_dh = ref.interp_points((ref_shifted.geometry.x.values, ref_shifted.geometry.y.values)) - ref_shifted["z"] + dh = ref.interp_points((coreg_dem.geometry.x.values, coreg_dem.geometry.y.values)) - coreg_dem["z"] + else: + init_dh = ref - ref_shifted.reproject(ref) + dh = ref - coreg_dem.reproject(ref) + + # Plots for debugging + # if isinstance(dh, geoutils.Raster): + # import matplotlib.pyplot as plt + # init_dh.plot() + # plt.show() + # dh.plot() + # plt.show() + + # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity + assert np.nanvar(dh / np.nanstd(init_dh)) < 0.01 + def test_vertical_shift(self) -> None: # Create a vertical shift correction instance diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index ca4e58cc..eb7d625d 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -271,15 +271,15 @@ def sub_dh_interpolator(shift_x: float, shift_y: float) -> NDArrayf: def sub_dh_interpolator(shift_x: float, shift_y: float) -> NDArrayf: """Elevation difference interpolator for shifted coordinates of the subsample.""" - diff_rst_pts = pts_elev[z_name][sub_mask].values - rst_elev_interpolator( - (sub_coords[1] + shift_y, sub_coords[0] + shift_x) - ) - # Always return ref minus tba if ref == "point": - return diff_rst_pts + return pts_elev[z_name][sub_mask].values - rst_elev_interpolator( + (sub_coords[1] + shift_y, sub_coords[0] + shift_x) + ) + # Also invert the shift direction on the raster interpolator, so that the shift is the same relative to + # the reference (returns the right shift relative to the reference no matter if it is point or raster) else: - return -diff_rst_pts + return rst_elev_interpolator((sub_coords[1] - shift_y, sub_coords[0] - shift_x)) - pts_elev[z_name][sub_mask].values # Interpolate arrays of bias variables to the subsample point coordinates if aux_vars is not None: @@ -689,10 +689,10 @@ def func_cost(offset: tuple[float, float]) -> float: errorcontrol=False, ) - # Get final offsets - offset_east = res.x[0] - offset_north = res.x[1] - offset_vertical = float(-np.nanmedian(dh_interpolator(offset_east, offset_north))) + # Get final offsets with the right sign direction + offset_east = -res.x[0] + offset_north = -res.x[1] + offset_vertical = float(np.nanmedian(dh_interpolator(-offset_east, -offset_north))) return offset_east, offset_north, offset_vertical @@ -1413,7 +1413,7 @@ def _fit_rst_pts( # Write output to class # (Mypy does not pass with normal dict, requires "OutAffineDict" here for some reason...) - output_affine = OutAffineDict(shift_x=easting_offset, shift_y=northing_offset, shift_z=vertical_offset) + output_affine = OutAffineDict(shift_x=-easting_offset, shift_y=-northing_offset, shift_z=vertical_offset) self._meta["outputs"]["affine"] = output_affine self._meta["outputs"]["random"] = {"subsample_final": subsample_final} @@ -1422,8 +1422,8 @@ def _to_matrix_func(self) -> NDArrayf: # We add a translation, on the last column matrix = np.diag(np.ones(4, dtype=float)) - matrix[0, 3] -= self._meta["outputs"]["affine"]["shift_x"] - matrix[1, 3] -= self._meta["outputs"]["affine"]["shift_y"] + matrix[0, 3] += self._meta["outputs"]["affine"]["shift_x"] + matrix[1, 3] += self._meta["outputs"]["affine"]["shift_y"] matrix[2, 3] += self._meta["outputs"]["affine"]["shift_z"] return matrix @@ -1443,7 +1443,7 @@ class GradientDescending(AffineCoreg): def __init__( self, x0: tuple[float, float] = (0, 0), - bounds: tuple[float, float] = (-3, 3), + bounds: tuple[float, float] = (-10, 10), deltainit: int = 2, deltatol: float = 0.004, feps: float = 0.0001, diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index f6ab079e..aa7942ee 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -467,6 +467,8 @@ def _postprocess_coreg_apply_rst( # Resample the array on the original grid if resample: + # applied_elev = _reproject_horizontal_shift_samecrs(raster_arr=applied_elev, src_transform=out_transform, + # dst_transform=transform) # Reproject the DEM from its out_transform onto the transform applied_rst = gu.Raster.from_array(applied_elev, out_transform, crs=crs, nodata=nodata) if not isinstance(elev, gu.Raster): From f7e58c0ada73c7e87937a2d5ca33ff4498b53abc Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Tue, 10 Sep 2024 14:35:39 -0800 Subject: [PATCH 2/6] Linting --- tests/test_coreg/test_affine.py | 6 +++--- xdem/coreg/affine.py | 7 +++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 01e0bdcd..11aff0dc 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -7,8 +7,6 @@ import numpy as np import pytest import rasterio as rio - -import geoutils from geoutils import Raster, Vector from geoutils._typing import NDArrayNum from geoutils.raster import RasterType @@ -237,7 +235,9 @@ def test_shift_coreg__synthetic(self, fit_args, shifts, coreg_method) -> None: # For a point cloud output, need to interpolate with the other DEM to get dh if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): - init_dh = ref.interp_points((ref_shifted.geometry.x.values, ref_shifted.geometry.y.values)) - ref_shifted["z"] + init_dh = ( + ref.interp_points((ref_shifted.geometry.x.values, ref_shifted.geometry.y.values)) - ref_shifted["z"] + ) dh = ref.interp_points((coreg_dem.geometry.x.values, coreg_dem.geometry.y.values)) - coreg_dem["z"] else: init_dh = ref - ref_shifted.reproject(ref) diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index eb7d625d..700559fd 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -275,11 +275,14 @@ def sub_dh_interpolator(shift_x: float, shift_y: float) -> NDArrayf: if ref == "point": return pts_elev[z_name][sub_mask].values - rst_elev_interpolator( (sub_coords[1] + shift_y, sub_coords[0] + shift_x) - ) + ) # Also invert the shift direction on the raster interpolator, so that the shift is the same relative to # the reference (returns the right shift relative to the reference no matter if it is point or raster) else: - return rst_elev_interpolator((sub_coords[1] - shift_y, sub_coords[0] - shift_x)) - pts_elev[z_name][sub_mask].values + return ( + rst_elev_interpolator((sub_coords[1] - shift_y, sub_coords[0] - shift_x)) + - pts_elev[z_name][sub_mask].values + ) # Interpolate arrays of bias variables to the subsample point coordinates if aux_vars is not None: From 8f8805d1604b959704b3fa711b482710d61dcd69 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Tue, 10 Sep 2024 18:38:35 -0800 Subject: [PATCH 3/6] Homogenize all tests in test_affine --- tests/test_coreg/test_affine.py | 426 +++++++++++++++++--------------- xdem/coreg/base.py | 4 + 2 files changed, 228 insertions(+), 202 deletions(-) diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 11aff0dc..e36a111b 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -1,23 +1,22 @@ """Functions to test the affine coregistrations.""" from __future__ import annotations -import copy - import geopandas as gpd import numpy as np import pytest import rasterio as rio + +import geoutils from geoutils import Raster, Vector from geoutils._typing import NDArrayNum from geoutils.raster import RasterType from geoutils.raster.raster import _shift_transform from scipy.ndimage import binary_dilation +import pytransform3d -import xdem from xdem import coreg, examples from xdem.coreg.affine import ( AffineCoreg, - CoregDict, _reproject_horizontal_shift_samecrs, ) @@ -101,24 +100,28 @@ class TestAffineCoreg: # Raster-Raster fit_args_rst_rst = dict( reference_elev=ref, - to_be_aligned_elev=ref, + to_be_aligned_elev=tba, + inlier_mask=inlier_mask, verbose=True, ) # Convert DEMs to points with a bit of subsampling for speed-up ref_pts = ref.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + tba_pts = ref.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds # Raster-Point fit_args_rst_pts = dict( reference_elev=ref, - to_be_aligned_elev=ref_pts, + to_be_aligned_elev=tba_pts, + inlier_mask=inlier_mask, verbose=True, ) # Point-Raster fit_args_pts_rst = dict( reference_elev=ref_pts, - to_be_aligned_elev=ref, + to_be_aligned_elev=tba, + inlier_mask=inlier_mask, verbose=True, ) @@ -192,10 +195,30 @@ def test_from_classmethods(self) -> None: if "non-finite values" not in str(exception): raise exception + def test_raise_all_nans(self) -> None: + """Check that the coregistration approaches fail gracefully when given only nans.""" + + dem1 = np.ones((50, 50), dtype=float) + dem2 = dem1.copy() + np.nan + affine = rio.transform.from_origin(0, 0, 1, 1) + crs = rio.crs.CRS.from_epsg(4326) + + vshiftcorr = coreg.VerticalShift() + icp = coreg.ICP() + + pytest.raises(ValueError, vshiftcorr.fit, dem1, dem2, transform=affine) + pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) + + dem2[[3, 20, 40], [2, 21, 41]] = 1.2 + + vshiftcorr.fit(dem1, dem2, transform=affine, crs=crs) + + pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) + @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore @pytest.mark.parametrize("shifts", [(20, 5, 2), (-50, 100, 2)]) # type: ignore @pytest.mark.parametrize("coreg_method", [coreg.NuthKaab, coreg.GradientDescending, coreg.ICP]) # type: ignore - def test_shift_coreg__synthetic(self, fit_args, shifts, coreg_method) -> None: + def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> None: """ Test the horizontal/vertical shift coregistrations with synthetic shifted data. These tests include NuthKaab, ICP and GradientDescending. @@ -204,12 +227,16 @@ def test_shift_coreg__synthetic(self, fit_args, shifts, coreg_method) -> None: We verify that the shifts found by the coregistration are within 1% of the synthetic shifts with opposite sign of the ones introduced, and that applying the coregistration to the shifted elevations corrects more than - 99% of the variance from the initial elevation differences. + 99% of the variance from the initial elevation differences (hence, that the direction of coregistration has + to be the right one; and that there is no other errors introduced in the process). """ - # Try default "fit" parameters instantiation + # Initiate coregistration horizontal_coreg = coreg_method() + + # Copy dictionary and remove inlier mask elev_fit_args = fit_args.copy() + elev_fit_args.pop("inlier_mask") # Create synthetic translation from the reference DEM ref = self.ref @@ -220,252 +247,247 @@ def test_shift_coreg__synthetic(self, fit_args, shifts, coreg_method) -> None: elev_fit_args["to_be_aligned_elev"] = ref_shifted # Run coregistration - horizontal_coreg.fit( - **elev_fit_args, - subsample=40000, - random_state=42, - ) + coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) # Check all fit parameters are the opposite of those used above, within a relative 1% fit_shifts = [-horizontal_coreg.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] assert np.allclose(shifts, fit_shifts, rtol=10e-2) - # Run apply and check that 99% of the variance was corrected - coreg_dem = horizontal_coreg.apply(elev_fit_args["to_be_aligned_elev"]) - # For a point cloud output, need to interpolate with the other DEM to get dh if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): init_dh = ( ref.interp_points((ref_shifted.geometry.x.values, ref_shifted.geometry.y.values)) - ref_shifted["z"] ) - dh = ref.interp_points((coreg_dem.geometry.x.values, coreg_dem.geometry.y.values)) - coreg_dem["z"] + dh = ref.interp_points((coreg_elev.geometry.x.values, coreg_elev.geometry.y.values)) - coreg_elev["z"] else: init_dh = ref - ref_shifted.reproject(ref) - dh = ref - coreg_dem.reproject(ref) + dh = ref - coreg_elev.reproject(ref) # Plots for debugging - # if isinstance(dh, geoutils.Raster): - # import matplotlib.pyplot as plt - # init_dh.plot() - # plt.show() - # dh.plot() - # plt.show() + PLOT = False + if PLOT and isinstance(dh, geoutils.Raster): + import matplotlib.pyplot as plt + init_dh.plot() + plt.show() + dh.plot() + plt.show() # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity assert np.nanvar(dh / np.nanstd(init_dh)) < 0.01 - def test_vertical_shift(self) -> None: + @pytest.mark.parametrize("coreg_method__shift", + [(coreg.NuthKaab, (9.202739, 2.735573, -1.97733)), + (coreg.GradientDescending, (10.0, 2.5, -1.964539)), + (coreg.ICP, (8.73833, 1.584255, -1.943957))] + ) # type: ignore + def test_coreg_translations__example(self, + coreg_method__shift: tuple[type[AffineCoreg], tuple[float, float, float]], + verbose: bool = False) -> None: + """ + Test that the translation co-registration outputs are always exactly the same on the real example data. + """ - # Create a vertical shift correction instance - vshiftcorr = coreg.VerticalShift() - # Fit the vertical shift model to the data - vshiftcorr.fit(**self.fit_params) - - res = self.ref.res[0] - - # Check that a vertical shift was found. - assert vshiftcorr.meta["outputs"]["affine"].get("shift_z") is not None - assert vshiftcorr.meta["outputs"]["affine"]["shift_z"] != 0.0 - - # Copy the vertical shift to see if it changes in the test (it shouldn't) - vshift = copy.copy(vshiftcorr.meta["outputs"]["affine"]["shift_z"]) - - # Check that the to_matrix function works as it should - matrix = vshiftcorr.to_matrix() - assert matrix[2, 3] == vshift, matrix - - # Check that the first z coordinate is now the vertical shift - assert all(vshiftcorr.apply(self.points)["z"].values == vshiftcorr.meta["outputs"]["affine"]["shift_z"]) - - # Apply the model to correct the DEM - tba_unshifted, _ = vshiftcorr.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) - - # Create a new vertical shift correction model - vshiftcorr2 = coreg.VerticalShift() - # Check that this is indeed a new object - assert vshiftcorr is not vshiftcorr2 - # Fit the corrected DEM to see if the vertical shift will be close to or at zero - vshiftcorr2.fit( - reference_elev=self.ref.data, - to_be_aligned_elev=tba_unshifted, - transform=self.ref.transform, - crs=self.ref.crs, - inlier_mask=self.inlier_mask, - ) - # Test the vertical shift - newmeta: CoregDict = vshiftcorr2.meta - new_vshift = newmeta["outputs"]["affine"]["shift_z"] - assert np.abs(new_vshift) * res < 0.01 + # Use entire DEMs here (to compare to original values from older package versions) + ref, tba = load_examples(crop=False)[0:2] + inlier_mask = ~self.outlines.create_mask(ref) - # Check that the original model's vertical shift has not changed - # (that the _.meta dicts are two different objects) - assert vshiftcorr.meta["outputs"]["affine"]["shift_z"] == vshift + # Get the coregistration method and expected shifts from the inputs + coreg_method, expected_shifts = coreg_method__shift - def test_all_nans(self) -> None: - """Check that the coregistration approaches fail gracefully when given only nans.""" - dem1 = np.ones((50, 50), dtype=float) - dem2 = dem1.copy() + np.nan - affine = rio.transform.from_origin(0, 0, 1, 1) - crs = rio.crs.CRS.from_epsg(4326) + # Run co-registration + c = coreg_method(subsample=50000) + c.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) + + # Check the output translations match the exact values + shifts = [c.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] + assert shifts == pytest.approx(expected_shifts) + @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore + @pytest.mark.parametrize("vshift", [0.2, 10., 1000.]) # type: ignore + def test_coreg_vertical_translation__synthetic(self, fit_args, vshift) -> None: + """ + Test the vertical shift coregistration with synthetic shifted data. These tests include VerticalShift. + + We test all combinaison of inputs: raster-raster, point-raster and raster-point. + """ + + # Create a vertical shift correction instance vshiftcorr = coreg.VerticalShift() - icp = coreg.ICP() - pytest.raises(ValueError, vshiftcorr.fit, dem1, dem2, transform=affine) - pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) + # Copy dictionary and remove inlier mask + elev_fit_args = fit_args.copy() + elev_fit_args.pop("inlier_mask") - dem2[[3, 20, 40], [2, 21, 41]] = 1.2 + # Create synthetic vertical shift from the reference DEM + ref = self.ref + ref_vshifted = ref + vshift - vshiftcorr.fit(dem1, dem2, transform=affine, crs=crs) + # Convert to point cloud if input was point cloud + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + ref_vshifted = ref_vshifted.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + elev_fit_args["to_be_aligned_elev"] = ref_vshifted - pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) + # Fit the vertical shift model to the data + coreg_elev = vshiftcorr.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) + + # Check that the right vertical shift was found + assert vshiftcorr.meta["outputs"]["affine"]["shift_z"] == pytest.approx(-vshift, rel=10e-2) + + # For a point cloud output, need to interpolate with the other DEM to get dh + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + init_dh = ( + ref.interp_points((ref_vshifted.geometry.x.values, ref_vshifted.geometry.y.values)) - ref_vshifted["z"] + ) + dh = ref.interp_points((coreg_elev.geometry.x.values, coreg_elev.geometry.y.values)) - coreg_elev["z"] + else: + init_dh = ref - ref_vshifted + dh = ref - coreg_elev - def test_coreg_example(self, verbose: bool = False) -> None: + # Plots for debugging + PLOT = False + if PLOT and isinstance(dh, geoutils.Raster): + import matplotlib.pyplot as plt + init_dh.plot() + plt.show() + dh.plot() + plt.show() + + # Check that the median difference is zero, and that no additional variance + # was introduced, so that the variance is also close to zero (no variance for a constant vertical shift) + assert np.nanmedian(dh) == pytest.approx(0, abs=10e-6) + assert np.nanvar(dh) == pytest.approx(0, abs=10e-6) + + @pytest.mark.parametrize("coreg_method__vshift", + [(coreg.VerticalShift, -2.305015)]) # type: ignore + def test_coreg_vertical_translation__example(self, + coreg_method__vshift: tuple[type[AffineCoreg], tuple[float, float, float]], + verbose: bool = False) -> None: """ - Test the co-registration outputs performed on the example are always the same. This overlaps with the test in - test_examples.py, but helps identify from where differences arise. + Test that the vertical translation co-registration output is always exactly the same on the real example data. """ - # Use full DEMs here (to compare to original values from older package versions) + # Use entire DEMs here (to compare to original values from older package versions) ref, tba = load_examples(crop=False)[0:2] inlier_mask = ~self.outlines.create_mask(ref) + # Get the coregistration method and expected shifts from the inputs + coreg_method, expected_vshift = coreg_method__vshift + # Run co-registration - nuth_kaab = xdem.coreg.NuthKaab(offset_threshold=0.005) - nuth_kaab.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) - - # Check the output .metadata is always the same - shifts = ( - nuth_kaab.meta["outputs"]["affine"]["shift_x"], - nuth_kaab.meta["outputs"]["affine"]["shift_y"], - nuth_kaab.meta["outputs"]["affine"]["shift_z"], - ) - assert shifts == pytest.approx((-9.198341, -2.786257, -1.981793)) + c = coreg_method(subsample=50000) + c.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) - def test_gradientdescending(self, subsample: int = 10000, verbose: bool = False) -> None: - """ - Test the co-registration outputs performed on the example are always the same. This overlaps with the test in - test_examples.py, but helps identify from where differences arise. + # Check the output translations match the exact values + vshift = c.meta["outputs"]["affine"]["shift_z"] + assert vshift == pytest.approx(expected_vshift) - It also implicitly tests the z_name kwarg and whether a geometry column can be provided instead of E/N cols. + @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore + @pytest.mark.parametrize("shifts_rotations", [(20, 5, 2, 0.02, 0.05, 0.01), (-50, 100, 20, 10, 5, 4)]) # type: ignore + @pytest.mark.parametrize("coreg_method", [coreg.ICP]) # type: ignore + def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) -> None: """ - # Use full DEMs here (to compare to original values from older package versions) - ref, tba = load_examples(crop=False)[0:2] - inlier_mask = ~self.outlines.create_mask(ref) - - # Run co-registration - gds = xdem.coreg.GradientDescending(subsample=subsample) - gds.fit( - ref.to_pointcloud(data_column_name="z").ds, - tba, - inlier_mask=inlier_mask, - verbose=verbose, - random_state=42, - ) + Test the rigid coregistrations with synthetic misaligned (shifted and rotatedà data. These tests include ICP. - shifts = ( - gds.meta["outputs"]["affine"]["shift_x"], - gds.meta["outputs"]["affine"]["shift_y"], - gds.meta["outputs"]["affine"]["shift_z"], - ) - assert shifts == pytest.approx((-10.625, -2.65625, 1.940031), abs=10e-5) + We test all combinaison of inputs: raster-raster, point-raster and raster-point. - @pytest.mark.parametrize("shift_px", [(1, 1), (2, 2)]) # type: ignore - @pytest.mark.parametrize("coreg_class", [coreg.NuthKaab, coreg.GradientDescending, coreg.ICP]) # type: ignore - @pytest.mark.parametrize("points_or_raster", ["raster", "points"]) - def test_coreg_example_shift(self, shift_px, coreg_class, points_or_raster, verbose=False, subsample=5000): + We verify that the matrix found by the coregistration is within 1% of the synthetic matrix, and inverted from + the one introduced, and that applying the coregistration to the misaligned elevations corrects more than + 99% of the variance from the initial elevation differences (hence, that the direction of coregistration has + to be the right one; and that there is no other errors introduced in the process). """ - For comparison of coreg algorithms: - Shift a ref_dem on purpose, e.g. shift_px = (1,1), and then applying coreg to shift it back. - """ - res = self.ref.res[0] - # shift DEM by shift_px - shifted_ref = self.ref.copy() - shifted_ref.translate(shift_px[0] * res, shift_px[1] * res, inplace=True) + # Initiate coregistration + horizontal_coreg = coreg_method() - shifted_ref_points = shifted_ref.to_pointcloud(subsample=subsample, random_state=42).ds - shifted_ref_points.rename(columns={"b1": "z"}, inplace=True) + # Copy dictionary and remove inlier mask + elev_fit_args = fit_args.copy() + elev_fit_args.pop("inlier_mask") - kwargs = {} if coreg_class.__name__ != "GradientDescending" else {"subsample": subsample} + ref = self.ref - coreg_obj = coreg_class(**kwargs) + # Create synthetic rigid transformation (translation and rotation) from the reference DEM + sr = np.array(shifts_rotations) + shifts = sr[:3] + rotations = sr[3:6] + e = np.deg2rad(rotations) + # Derive is a 3x3 rotation matrix + rot_matrix = pytransform3d.rotations.matrix_from_euler(e=e, i=0, j=1, k=2, extrinsic=True) + matrix = np.diag(np.ones(4, float)) + matrix[:3, :3] = rot_matrix + matrix[:3, 3] = shifts + + # Pass a centroid + centroid = [ref.bounds.left, ref.bounds.bottom, np.nanmean(ref)] + ref_shifted_rotated = coreg.apply_matrix(ref, matrix=matrix, centroid=centroid) - if points_or_raster == "raster": - coreg_obj.fit(shifted_ref, self.ref, verbose=verbose, random_state=42) - elif points_or_raster == "points": - coreg_obj.fit(shifted_ref_points, self.ref, verbose=verbose, random_state=42) + # Convert to point cloud if input was point cloud + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + ref_shifted_rotated = ref_shifted_rotated.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + elev_fit_args["to_be_aligned_elev"] = ref_shifted_rotated - if coreg_class.__name__ == "ICP": - matrix = coreg_obj.to_matrix() - # The ICP fit only creates a matrix and doesn't normally show the alignment in pixels - # Since the test is formed to validate pixel shifts, these calls extract the approximate pixel shift - # from the matrix (it's not perfect since rotation/scale can change it). - coreg_obj.meta["outputs"]["affine"]["shift_x"] = -matrix[0][3] - coreg_obj.meta["outputs"]["affine"]["shift_y"] = -matrix[1][3] + # Run coregistration + coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) + + # Check that fit matrix is the invert of those used above, within a relative 10% for rotations, and within + # a large margin for shifts, as ICP has relative difficulty shifts with large rotations + fit_matrix = horizontal_coreg.meta["outputs"]["affine"]["matrix"] + invert_fit_matrix = coreg.invert_matrix(fit_matrix) + invert_fit_shifts = invert_fit_matrix[:3, 3] + invert_fit_rotations = pytransform3d.rotations.euler_from_matrix(invert_fit_matrix[0:3, 0:3], i=0, j=1, k=2, extrinsic=True) + invert_fit_rotations = np.rad2deg(invert_fit_rotations) + assert np.allclose(shifts, invert_fit_shifts, rtol=1) + assert np.allclose(rotations, invert_fit_rotations, rtol=10e-1) - # ICP can never be expected to be much better than 1px on structured data, as its implementation often finds a - # minimum between two grid points. This is clearly warned for in the documentation. - precision = 1e-2 if coreg_class.__name__ != "ICP" else 1 + # For a point cloud output, need to interpolate with the other DEM to get dh + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + init_dh = ( + ref.interp_points((ref_shifted_rotated.geometry.x.values, ref_shifted_rotated.geometry.y.values)) - ref_shifted_rotated["z"] + ) + dh = ref.interp_points((coreg_elev.geometry.x.values, coreg_elev.geometry.y.values)) - coreg_elev["z"] + else: + init_dh = ref - ref_shifted_rotated + dh = ref - coreg_elev - assert coreg_obj.meta["outputs"]["affine"]["shift_x"] == pytest.approx(-shift_px[0] * res, rel=precision) - assert coreg_obj.meta["outputs"]["affine"]["shift_y"] == pytest.approx(-shift_px[0] * res, rel=precision) + # Plots for debugging + PLOT = False + if PLOT and isinstance(dh, geoutils.Raster): + import matplotlib.pyplot as plt + init_dh.plot() + plt.show() + dh.plot() + plt.show() - def test_nuth_kaab(self) -> None: + # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity + assert np.nanvar(dh / np.nanstd(init_dh)) < 0.01 - nuth_kaab = coreg.NuthKaab(max_iterations=10) - # Synthesize a shifted and vertically offset DEM - pixel_shift = 2 - vshift = 5 - shifted_dem = self.ref.data.squeeze().copy() - shifted_dem[:, pixel_shift:] = shifted_dem[:, :-pixel_shift] - shifted_dem[:, :pixel_shift] = np.nan - shifted_dem += vshift - - # Fit the synthesized shifted DEM to the original - nuth_kaab.fit( - self.ref.data.squeeze(), - shifted_dem, - transform=self.ref.transform, - crs=self.ref.crs, - verbose=self.fit_params["verbose"], - ) + @pytest.mark.parametrize("coreg_method__shifts_rotations", + [(coreg.ICP, (8.738332, 1.584255, -1.943957, 0.0069004, -0.00703, -0.0119733))]) # type: ignore + def test_coreg_rigid__example(self, + coreg_method__shifts_rotations: tuple[type[AffineCoreg], tuple[float, float, float]], + verbose: bool = False) -> None: + """ + Test that the rigid co-registration outputs is always exactly the same on the real example data. + """ - # Make sure that the estimated offsets are similar to what was synthesized. - res = self.ref.res[0] - assert nuth_kaab.meta["outputs"]["affine"]["shift_x"] == pytest.approx(pixel_shift * res, abs=0.03) - assert nuth_kaab.meta["outputs"]["affine"]["shift_y"] == pytest.approx(0, abs=0.03) - assert nuth_kaab.meta["outputs"]["affine"]["shift_z"] == pytest.approx(-vshift, 0.03) - - # Apply the estimated shift to "revert the DEM" to its original state. - unshifted_dem, _ = nuth_kaab.apply(shifted_dem, transform=self.ref.transform, crs=self.ref.crs) - # Measure the difference (should be more or less zero) - diff = self.ref.data.squeeze() - unshifted_dem - diff = diff.compressed() # turn into a 1D array with only unmasked values - - # Check that the median is very close to zero - assert np.abs(np.median(diff)) < 0.01 - # Check that the RMSE is low - assert np.sqrt(np.mean(np.square(diff))) < 1 - - # Transform some arbitrary points. - transformed_points = nuth_kaab.apply(self.points) - - # Check that the x shift is close to the pixel_shift * image resolution - assert all( - abs((transformed_points.geometry.x.values - self.points.geometry.x.values) + pixel_shift * self.ref.res[0]) - < 0.1 - ) - # Check that the z shift is close to the original vertical shift. - assert all(abs((transformed_points["z"].values - self.points["z"].values) + vshift) < 0.1) + # Use entire DEMs here (to compare to original values from older package versions) + ref, tba = load_examples(crop=False)[0:2] + inlier_mask = ~self.outlines.create_mask(ref) + + # Get the coregistration method and expected shifts from the inputs + coreg_method, expected_shifts_rots = coreg_method__shifts_rotations + + # Run co-registration + c = coreg_method(subsample=50000) + c.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) - def test_icp_opencv(self) -> None: + # Check the output translations match the exact values + fit_matrix = c.meta["outputs"]["affine"]["matrix"] + fit_shifts = fit_matrix[:3, 3] + fit_rotations = pytransform3d.rotations.euler_from_matrix(fit_matrix[0:3, 0:3], i=0, j=1, k=2, extrinsic=True) + fit_rotations = np.rad2deg(fit_rotations) + fit_shifts_rotations = tuple(np.concatenate((fit_shifts, fit_rotations))) - # Do a fast and dirty 3 iteration ICP just to make sure it doesn't error out. - icp = coreg.ICP(max_iterations=3) - icp.fit(**self.fit_params) + assert fit_shifts_rotations == pytest.approx(expected_shifts_rots, abs=10e-6) - aligned_dem, _ = icp.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) - assert aligned_dem.shape == self.ref.data.squeeze().shape diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index aa7942ee..4bf96f5c 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -467,8 +467,12 @@ def _postprocess_coreg_apply_rst( # Resample the array on the original grid if resample: + + # TODO: Use this function for a translation only, for consistency with the rest of Coreg? + # (would require checking transform difference is only a translation) # applied_elev = _reproject_horizontal_shift_samecrs(raster_arr=applied_elev, src_transform=out_transform, # dst_transform=transform) + # Reproject the DEM from its out_transform onto the transform applied_rst = gu.Raster.from_array(applied_elev, out_transform, crs=crs, nodata=nodata) if not isinstance(elev, gu.Raster): From f7ea5af1a5f1b6194a3a04c73e5607479cb32f21 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Tue, 10 Sep 2024 18:39:24 -0800 Subject: [PATCH 4/6] Linting --- tests/test_coreg/test_affine.py | 79 ++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index e36a111b..c0541e9e 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -2,23 +2,19 @@ from __future__ import annotations import geopandas as gpd +import geoutils import numpy as np import pytest +import pytransform3d import rasterio as rio - -import geoutils from geoutils import Raster, Vector from geoutils._typing import NDArrayNum from geoutils.raster import RasterType from geoutils.raster.raster import _shift_transform from scipy.ndimage import binary_dilation -import pytransform3d from xdem import coreg, examples -from xdem.coreg.affine import ( - AffineCoreg, - _reproject_horizontal_shift_samecrs, -) +from xdem.coreg.affine import AffineCoreg, _reproject_horizontal_shift_samecrs def load_examples(crop: bool = True) -> tuple[RasterType, RasterType, Vector]: @@ -267,6 +263,7 @@ def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> PLOT = False if PLOT and isinstance(dh, geoutils.Raster): import matplotlib.pyplot as plt + init_dh.plot() plt.show() dh.plot() @@ -275,14 +272,17 @@ def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity assert np.nanvar(dh / np.nanstd(init_dh)) < 0.01 - @pytest.mark.parametrize("coreg_method__shift", - [(coreg.NuthKaab, (9.202739, 2.735573, -1.97733)), - (coreg.GradientDescending, (10.0, 2.5, -1.964539)), - (coreg.ICP, (8.73833, 1.584255, -1.943957))] - ) # type: ignore - def test_coreg_translations__example(self, - coreg_method__shift: tuple[type[AffineCoreg], tuple[float, float, float]], - verbose: bool = False) -> None: + @pytest.mark.parametrize( + "coreg_method__shift", + [ + (coreg.NuthKaab, (9.202739, 2.735573, -1.97733)), + (coreg.GradientDescending, (10.0, 2.5, -1.964539)), + (coreg.ICP, (8.73833, 1.584255, -1.943957)), + ], + ) # type: ignore + def test_coreg_translations__example( + self, coreg_method__shift: tuple[type[AffineCoreg], tuple[float, float, float]], verbose: bool = False + ) -> None: """ Test that the translation co-registration outputs are always exactly the same on the real example data. """ @@ -299,11 +299,11 @@ def test_coreg_translations__example(self, c.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) # Check the output translations match the exact values - shifts = [c.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] + shifts = [c.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] # type: ignore assert shifts == pytest.approx(expected_shifts) @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore - @pytest.mark.parametrize("vshift", [0.2, 10., 1000.]) # type: ignore + @pytest.mark.parametrize("vshift", [0.2, 10.0, 1000.0]) # type: ignore def test_coreg_vertical_translation__synthetic(self, fit_args, vshift) -> None: """ Test the vertical shift coregistration with synthetic shifted data. These tests include VerticalShift. @@ -336,7 +336,7 @@ def test_coreg_vertical_translation__synthetic(self, fit_args, vshift) -> None: # For a point cloud output, need to interpolate with the other DEM to get dh if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): init_dh = ( - ref.interp_points((ref_vshifted.geometry.x.values, ref_vshifted.geometry.y.values)) - ref_vshifted["z"] + ref.interp_points((ref_vshifted.geometry.x.values, ref_vshifted.geometry.y.values)) - ref_vshifted["z"] ) dh = ref.interp_points((coreg_elev.geometry.x.values, coreg_elev.geometry.y.values)) - coreg_elev["z"] else: @@ -347,6 +347,7 @@ def test_coreg_vertical_translation__synthetic(self, fit_args, vshift) -> None: PLOT = False if PLOT and isinstance(dh, geoutils.Raster): import matplotlib.pyplot as plt + init_dh.plot() plt.show() dh.plot() @@ -357,11 +358,10 @@ def test_coreg_vertical_translation__synthetic(self, fit_args, vshift) -> None: assert np.nanmedian(dh) == pytest.approx(0, abs=10e-6) assert np.nanvar(dh) == pytest.approx(0, abs=10e-6) - @pytest.mark.parametrize("coreg_method__vshift", - [(coreg.VerticalShift, -2.305015)]) # type: ignore - def test_coreg_vertical_translation__example(self, - coreg_method__vshift: tuple[type[AffineCoreg], tuple[float, float, float]], - verbose: bool = False) -> None: + @pytest.mark.parametrize("coreg_method__vshift", [(coreg.VerticalShift, -2.305015)]) # type: ignore + def test_coreg_vertical_translation__example( + self, coreg_method__vshift: tuple[type[AffineCoreg], tuple[float, float, float]], verbose: bool = False + ) -> None: """ Test that the vertical translation co-registration output is always exactly the same on the real example data. """ @@ -382,7 +382,9 @@ def test_coreg_vertical_translation__example(self, assert vshift == pytest.approx(expected_vshift) @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore - @pytest.mark.parametrize("shifts_rotations", [(20, 5, 2, 0.02, 0.05, 0.01), (-50, 100, 20, 10, 5, 4)]) # type: ignore + @pytest.mark.parametrize( + "shifts_rotations", [(20, 5, 2, 0.02, 0.05, 0.01), (-50, 100, 20, 10, 5, 4)] + ) # type: ignore @pytest.mark.parametrize("coreg_method", [coreg.ICP]) # type: ignore def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) -> None: """ @@ -422,7 +424,9 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) # Convert to point cloud if input was point cloud if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): - ref_shifted_rotated = ref_shifted_rotated.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + ref_shifted_rotated = ref_shifted_rotated.to_pointcloud( + data_column_name="z", subsample=50000, random_state=42 + ).ds elev_fit_args["to_be_aligned_elev"] = ref_shifted_rotated # Run coregistration @@ -433,7 +437,9 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) fit_matrix = horizontal_coreg.meta["outputs"]["affine"]["matrix"] invert_fit_matrix = coreg.invert_matrix(fit_matrix) invert_fit_shifts = invert_fit_matrix[:3, 3] - invert_fit_rotations = pytransform3d.rotations.euler_from_matrix(invert_fit_matrix[0:3, 0:3], i=0, j=1, k=2, extrinsic=True) + invert_fit_rotations = pytransform3d.rotations.euler_from_matrix( + invert_fit_matrix[0:3, 0:3], i=0, j=1, k=2, extrinsic=True + ) invert_fit_rotations = np.rad2deg(invert_fit_rotations) assert np.allclose(shifts, invert_fit_shifts, rtol=1) assert np.allclose(rotations, invert_fit_rotations, rtol=10e-1) @@ -441,7 +447,8 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) # For a point cloud output, need to interpolate with the other DEM to get dh if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): init_dh = ( - ref.interp_points((ref_shifted_rotated.geometry.x.values, ref_shifted_rotated.geometry.y.values)) - ref_shifted_rotated["z"] + ref.interp_points((ref_shifted_rotated.geometry.x.values, ref_shifted_rotated.geometry.y.values)) + - ref_shifted_rotated["z"] ) dh = ref.interp_points((coreg_elev.geometry.x.values, coreg_elev.geometry.y.values)) - coreg_elev["z"] else: @@ -452,6 +459,7 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) PLOT = False if PLOT and isinstance(dh, geoutils.Raster): import matplotlib.pyplot as plt + init_dh.plot() plt.show() dh.plot() @@ -460,12 +468,15 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity assert np.nanvar(dh / np.nanstd(init_dh)) < 0.01 - - @pytest.mark.parametrize("coreg_method__shifts_rotations", - [(coreg.ICP, (8.738332, 1.584255, -1.943957, 0.0069004, -0.00703, -0.0119733))]) # type: ignore - def test_coreg_rigid__example(self, - coreg_method__shifts_rotations: tuple[type[AffineCoreg], tuple[float, float, float]], - verbose: bool = False) -> None: + @pytest.mark.parametrize( + "coreg_method__shifts_rotations", + [(coreg.ICP, (8.738332, 1.584255, -1.943957, 0.0069004, -0.00703, -0.0119733))], + ) # type: ignore + def test_coreg_rigid__example( + self, + coreg_method__shifts_rotations: tuple[type[AffineCoreg], tuple[float, float, float]], + verbose: bool = False, + ) -> None: """ Test that the rigid co-registration outputs is always exactly the same on the real example data. """ @@ -489,5 +500,3 @@ def test_coreg_rigid__example(self, fit_shifts_rotations = tuple(np.concatenate((fit_shifts, fit_rotations))) assert fit_shifts_rotations == pytest.approx(expected_shifts_rots, abs=10e-6) - - From a5591e3a8cf5545c8d019cea9007b615df6da3f6 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Wed, 11 Sep 2024 13:55:41 -0800 Subject: [PATCH 5/6] Exception in tolerances for ICP --- tests/test_coreg/test_affine.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index c0541e9e..2f61f4c0 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -245,9 +245,12 @@ def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> # Run coregistration coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) - # Check all fit parameters are the opposite of those used above, within a relative 1% + # Check all fit parameters are the opposite of those used above, within a relative 1% (10% for ICP) fit_shifts = [-horizontal_coreg.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] - assert np.allclose(shifts, fit_shifts, rtol=10e-2) + + # ICP can be less precise than other methods + rtol = 10e-2 if coreg_method != coreg.ICP else 10e-1 + assert np.allclose(shifts, fit_shifts, rtol=rtol) # For a point cloud output, need to interpolate with the other DEM to get dh if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): @@ -269,8 +272,11 @@ def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> dh.plot() plt.show() + # Check applying the coregistration removes 99% of the variance (95% for ICP) # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity - assert np.nanvar(dh / np.nanstd(init_dh)) < 0.01 + tol = 0.01 if coreg_method != coreg.ICP else 0.05 + assert np.nanvar(dh / np.nanstd(init_dh)) < tol + @pytest.mark.parametrize( "coreg_method__shift", @@ -383,7 +389,7 @@ def test_coreg_vertical_translation__example( @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore @pytest.mark.parametrize( - "shifts_rotations", [(20, 5, 2, 0.02, 0.05, 0.01), (-50, 100, 20, 10, 5, 4)] + "shifts_rotations", [(20, 5, 0, 0.02, 0.05, 0.1), (-50, 100, 0, 10, 5, 4)] ) # type: ignore @pytest.mark.parametrize("coreg_method", [coreg.ICP]) # type: ignore def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) -> None: @@ -394,7 +400,7 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) We verify that the matrix found by the coregistration is within 1% of the synthetic matrix, and inverted from the one introduced, and that applying the coregistration to the misaligned elevations corrects more than - 99% of the variance from the initial elevation differences (hence, that the direction of coregistration has + 95% of the variance from the initial elevation differences (hence, that the direction of coregistration has to be the right one; and that there is no other errors introduced in the process). """ @@ -433,7 +439,7 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) # Check that fit matrix is the invert of those used above, within a relative 10% for rotations, and within - # a large margin for shifts, as ICP has relative difficulty shifts with large rotations + # a large 100% margin for shifts, as ICP has relative difficulty resolving shifts with large rotations fit_matrix = horizontal_coreg.meta["outputs"]["affine"]["matrix"] invert_fit_matrix = coreg.invert_matrix(fit_matrix) invert_fit_shifts = invert_fit_matrix[:3, 3] @@ -466,7 +472,8 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) plt.show() # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity - assert np.nanvar(dh / np.nanstd(init_dh)) < 0.01 + # Only 95% of variance as ICP cannot always resolve the last shifts + assert np.nanvar(dh / np.nanstd(init_dh)) < 0.05 @pytest.mark.parametrize( "coreg_method__shifts_rotations", From 0c038d7146837d6485dc5772afbfe9aa26341ff2 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Wed, 11 Sep 2024 13:55:55 -0800 Subject: [PATCH 6/6] Linting --- tests/test_coreg/test_affine.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 2f61f4c0..a57c2bd9 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -277,7 +277,6 @@ def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> tol = 0.01 if coreg_method != coreg.ICP else 0.05 assert np.nanvar(dh / np.nanstd(init_dh)) < tol - @pytest.mark.parametrize( "coreg_method__shift", [ @@ -388,9 +387,7 @@ def test_coreg_vertical_translation__example( assert vshift == pytest.approx(expected_vshift) @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore - @pytest.mark.parametrize( - "shifts_rotations", [(20, 5, 0, 0.02, 0.05, 0.1), (-50, 100, 0, 10, 5, 4)] - ) # type: ignore + @pytest.mark.parametrize("shifts_rotations", [(20, 5, 0, 0.02, 0.05, 0.1), (-50, 100, 0, 10, 5, 4)]) # type: ignore @pytest.mark.parametrize("coreg_method", [coreg.ICP]) # type: ignore def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) -> None: """