Skip to content

Commit

Permalink
TST #919 refactor tests with namedtuple
Browse files Browse the repository at this point in the history
  • Loading branch information
prjemian committed Feb 20, 2024
1 parent 32e40df commit 6b092d2
Showing 1 changed file with 166 additions and 107 deletions.
273 changes: 166 additions & 107 deletions apstools/plans/tests/test_alignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,105 +100,160 @@ def test_SynPseudoVoigt_randomize():
assert signal.noise_multiplier == 1


@pytest.mark.parametrize(
"signal, mover, start, finish, npts, feature",
[
[noisy, m1, -1.2, 1.2, 11, "max"], # slower
[pvoigt, axis, -1.2, 1.2, 11, "max"], # faster
[pvoigt, axis, -1.2, 1.2, 11, "cen"],
[pvoigt, axis, -1.2, 1.2, 51, "com"],
],
TestParameters = collections.namedtuple(
"TestParameters",
"signal mover start finish npts feature"
)
def test_direct_implementation_with_rel_scan(signal, mover, start, finish, npts, feature):
RE(bps.mv(mover, 0))
assert get_position(mover) == 0.0
parms_slower = TestParameters(noisy, m1, -1.2, 1.2, 11, "max")
parms_faster = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max")
parms_cen = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen")
parms_com = TestParameters(pvoigt, axis, -1.2, 1.2, 51, "com")


if isinstance(signal, SynPseudoVoigt):
signal.randomize_parameters(scale=250_000)
@pytest.mark.parametrize("parms", [parms_slower, parms_faster, parms_cen, parms_com])
def test_direct_implementation_with_rel_scan(parms: TestParameters):
RE(bps.mv(parms.mover, 0))
assert get_position(parms.mover) == 0.0

RE(bp.rel_scan([signal], mover, start, finish, npts))
if isinstance(parms.signal, SynPseudoVoigt):
parms.signal.randomize_parameters(scale=250_000)

if feature in ("max", "min"):
center = bec.peaks[feature][signal.name][0]
RE(bp.rel_scan([parms.signal], parms.mover, parms.start, parms.finish, parms.npts))

if parms.feature in ("max", "min"):
center = bec.peaks[parms.feature][parms.signal.name][0]
else:
center = bec.peaks[feature][signal.name]
center = bec.peaks[parms.feature][parms.signal.name]
# fwhm = bec.peaks["fwhm"][signal.name]

# confirm center will be within scan range
assert min(start, finish) <= center
assert center <= max(start, finish)
assert min(parms.start, parms.finish) <= center
assert center <= max(parms.start, parms.finish)

RE(bps.mv(mover, center)) # move to the center position
position = get_position(mover)
RE(bps.mv(parms.mover, center)) # move to the center position
position = get_position(parms.mover)
assert math.isclose(position, center, abs_tol=0.001)


TestParameters = collections.namedtuple(
"TestParameters",
"signals mover start finish npts feature rescan"
)
parms_motor_slower = TestParameters(noisy, m1, -1.2, 1.2, 11, "max", True)
parms_ao_faster = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen", True)
parms_ao_max = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max", True)
parms_det_list_of_1 = TestParameters([pvoigt], axis, -1.2, 1.2, 11, "max", True)
parms_det_list_of_3 = TestParameters([pvoigt, noisy, scaler1], axis, -1.2, 1.2, 11, "max", True)
parms_det_tuple = TestParameters((pvoigt), axis, -1.2, 1.2, 11, "max", True)
parms_cen = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen", False)
parms_com = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "com", False)
parms_max = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max", False)
parms_min___pathological = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "min", False)


@pytest.mark.parametrize(
"signals, mover, start, finish, npts, feature, rescan",
"parms",
[
[noisy, m1, -1.2, 1.2, 11, "max", True], # slower, is motor
[pvoigt, axis, -1.2, 1.2, 11, "cen", True], # faster, is ao (float)
[pvoigt, axis, -1.2, 1.2, 11, "max", True],
[[pvoigt], axis, -1.2, 1.2, 11, "max", True], # list
[[pvoigt, noisy, scaler1], axis, -1.2, 1.2, 11, "max", True], # more than one detector
[(pvoigt), axis, -1.2, 1.2, 11, "max", True], # tuple
[pvoigt, axis, -1.2, 1.2, 11, "cen", False],
[pvoigt, axis, -1.2, 1.2, 11, "com", False],
[pvoigt, axis, -1.2, 1.2, 11, "max", False],
[pvoigt, axis, -1.2, 1.2, 11, "min", False], # pathological
parms_motor_slower,
parms_ao_faster,
parms_ao_max,
parms_det_list_of_1,
parms_det_list_of_3,
parms_det_tuple,
parms_cen,
parms_com,
parms_max,
parms_min___pathological,
],
)
def test_lineup(signals, mover, start, finish, npts, feature, rescan):
if isinstance(signals, SynPseudoVoigt):
signals.randomize_parameters(scale=250_000, bkg=0.000_000_000_1)
def test_lineup(parms : TestParameters):
if isinstance(parms.signals, SynPseudoVoigt):
parms.signals.randomize_parameters(scale=250_000, bkg=0.000_000_000_1)
else:
change_noisy_parameters()

RE(bps.mv(mover, 0))
assert get_position(mover) == 0.0

RE(alignment.lineup(signals, mover, start, finish, npts, feature=feature, rescan=rescan, bec=bec))
RE(bps.mv(parms.mover, 0))
assert get_position(parms.mover) == 0.0

RE(
alignment.lineup(
parms.signals,
parms.mover,
parms.start,
parms.finish,
parms.npts,
feature=parms.feature,
rescan=parms.rescan,
bec=bec
)
)

# if rescan and feature in "max cen".split():
# if parms.rescan and parms.feature in "max cen".split():
# # Test if mover position is within width of center.
# if isinstance(signals, SynPseudoVoigt):
# center = signals.center
# width = signals.sigma * 2.355 # FWHM approx.
# if isinstance(parms.signals, SynPseudoVoigt):
# center = parms.signals.center
# width = parms.signals.sigma * 2.355 # FWHM approx.
# else: # FIXME: multiple signals
# center = swait.channels.B.input_value.get()
# width = swait.channels.C.input_value.get()
# # FIXME: fails to find the bec analysis in lineup()
# position = get_position(mover)
# position = get_position(parms.mover)
# lo = center-width
# hi = center+width
# # assert lo <= position <= hi, f"{bec=} {bec.peaks=} {position=} {center=} {width=}"


TestParameters = collections.namedtuple(
"TestParameters",
"signals, mover, start, finish, npts, feature, nscans"
)
parms_motor_slower = TestParameters(noisy, m1, -1.2, 1.2, 11, "max", 2)
parms_ao_faster = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen", 2)
parms_ao_max = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max", 2)
parms_det_list_of_1 = TestParameters([pvoigt], axis, -1.2, 1.2, 11, "max", 2)
parms_det_list_of_3 = TestParameters([pvoigt, noisy, scaler1], axis, -1.2, 1.2, 11, "max", 2)
parms_det_tuple = TestParameters((pvoigt), axis, -1.2, 1.2, 11, "max", 2)
parms_cen = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen", 1)
parms_com = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "com", 1)
parms_max = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max", 1)
parms_min___pathological = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "min", 1)


@pytest.mark.parametrize(
"signals, mover, start, finish, npts, feature, nscans",
"parms",
[
[noisy, m1, -1.2, 1.2, 11, "max", 2], # slower, is motor
[pvoigt, axis, -1.2, 1.2, 11, "cen", 2], # faster, is ao (float)
[pvoigt, axis, -1.2, 1.2, 11, "max", 2],
[[pvoigt], axis, -1.2, 1.2, 11, "max", 2], # list
[[pvoigt, noisy, scaler1], axis, -1.2, 1.2, 11, "max", 2], # more than one detector
[(pvoigt), axis, -1.2, 1.2, 11, "max", 2], # tuple
[pvoigt, axis, -1.2, 1.2, 11, "cen", 1],
[pvoigt, axis, -1.2, 1.2, 11, "com", 1],
[pvoigt, axis, -1.2, 1.2, 11, "max", 1],
[pvoigt, axis, -1.2, 1.2, 11, "min", 1], # pathological
parms_motor_slower,
parms_ao_faster,
parms_ao_max,
parms_det_list_of_1,
parms_det_list_of_3,
parms_det_tuple,
parms_cen,
parms_com,
parms_max,
parms_min___pathological,
],
)
def test_lineup2(signals, mover, start, finish, npts, feature, nscans):
if isinstance(signals, SynPseudoVoigt):
signals.randomize_parameters(scale=250_000, bkg=0.000_000_000_1)
def test_lineup2(parms : TestParameters):
if isinstance(parms.signals, SynPseudoVoigt):
parms.signals.randomize_parameters(scale=250_000, bkg=0.000_000_000_1)
else:
change_noisy_parameters()

RE(bps.mv(mover, 0))
assert get_position(mover) == 0.0

RE(alignment.lineup2(signals, mover, start, finish, npts, feature=feature, nscans=nscans))
RE(bps.mv(parms.mover, 0))
assert get_position(parms.mover) == 0.0

RE(
alignment.lineup2(
parms.signals,
parms.mover,
parms.start,
parms.finish,
parms.npts,
feature=parms.feature,
nscans=parms.nscans
)
)


def test_TuneAxis():
Expand Down Expand Up @@ -292,74 +347,78 @@ class ImitatorForTesting(Device):
assert results.report() is None


Parameters = collections.namedtuple(
"Parameters",
TestParameters = collections.namedtuple(
"TestParameters",
"peak base noise center sigma xlo xhi npts nscans tol outcome"
)
parms_signal_but_high_background = Parameters(1e5, 1e6, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_model_peak = Parameters(1e5, 0, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_high_background_poor_resolution = Parameters(1e5, 1e4, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.1, True)
parms_not_much_better = Parameters(1e5, 1e4, 10, 0.1, 0.2, -0.7, 0.5, 11, 2, 0.05, True)
parms_neg_peak_1x_base = Parameters(-1e5, -1e4, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.1, True)
parms_neg_base = Parameters(1e5, -10, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_small_signal_zero_base = Parameters(1e-5, 0, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_neg_small_signal_zero_base = Parameters(-1e5, 0, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_small_signal_finite_base = Parameters(1e-5, 1e-7, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_no_signal_only_noise = Parameters(0, 0, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_bkg_plus_noise = Parameters(0, 1, 0.1, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_bkg_plus_big_noise = Parameters(0, 1, 100, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_no_signal__ZeroDivisionError = Parameters(0, 0, 0, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.005, None)
parms_bkg_only__ZeroDivisionError = Parameters(0, 1, 0, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.005, None)
parms_signal_but_high_background = TestParameters(1e5, 1e6, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_model_peak = TestParameters(1e5, 0, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_high_background_poor_resolution = TestParameters(1e5, 1e4, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.1, True)
parms_not_much_better = TestParameters(1e5, 1e4, 10, 0.1, 0.2, -0.7, 0.5, 11, 2, 0.05, True)
parms_neg_peak_1x_base = TestParameters(-1e5, -1e4, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.1, True)
parms_neg_base = TestParameters(1e5, -10, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_small_signal_zero_base = TestParameters(1e-5, 0, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_neg_small_signal_zero_base = TestParameters(-1e5, 0, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_small_signal_finite_base = TestParameters(1e-5, 1e-7, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_no_signal_only_noise = TestParameters(0, 0, 1e-8, 0.1, 0.2, -1.0, 0.5, 11, 1, 0.05, False)
parms_bkg_plus_noise = TestParameters(0, 1, 0.1, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_bkg_plus_big_noise = TestParameters(0, 1, 100, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_no_signal__ZeroDivisionError = TestParameters(0, 0, 0, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.005, None)
parms_bkg_only__ZeroDivisionError = TestParameters(0, 1, 0, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.005, None)


@pytest.mark.parametrize(
parms_signal_but_high_background._fields,
"parms",
[
list(parms) for parms in [
parms_signal_but_high_background,
parms_model_peak,
parms_high_background_poor_resolution,
parms_not_much_better,
parms_neg_peak_1x_base,
parms_neg_base,
parms_small_signal_zero_base,
parms_neg_small_signal_zero_base,
parms_small_signal_finite_base,
parms_no_signal_only_noise,
parms_bkg_plus_noise,
parms_bkg_plus_big_noise,
parms_no_signal__ZeroDivisionError,
parms_bkg_only__ZeroDivisionError,
]
parms_signal_but_high_background,
parms_model_peak,
parms_high_background_poor_resolution,
parms_not_much_better,
parms_neg_peak_1x_base,
parms_neg_base,
parms_small_signal_zero_base,
parms_neg_small_signal_zero_base,
parms_small_signal_finite_base,
parms_no_signal_only_noise,
parms_bkg_plus_noise,
parms_bkg_plus_big_noise,
parms_no_signal__ZeroDivisionError,
parms_bkg_only__ZeroDivisionError,
],
)
def test_lineup2_signal_permutations(peak, base, noise, center, sigma, xlo, xhi, npts, nscans, tol, outcome):
def test_lineup2_signal_permutations(parms: TestParameters):
starting_position = 0.0
m1.move(starting_position)
time.sleep(1) # without this, the test IOC crashes, sometimes
swait.reset()
swait.description.put("CI test lineup2()")
swait.channels.A.input_pv.put(m1.user_readback.pvname)
swait.channels.B.input_value.put(center)
swait.channels.C.input_value.put(sigma)
swait.channels.D.input_value.put(peak)
swait.channels.E.input_value.put(base)
swait.channels.F.input_value.put(noise)
swait.channels.B.input_value.put(parms.center)
swait.channels.C.input_value.put(parms.sigma)
swait.channels.D.input_value.put(parms.peak)
swait.channels.E.input_value.put(parms.base)
swait.channels.F.input_value.put(parms.noise)
swait.calculation.put("E+D/(1+((A-B)/C)^2)+F*RNDM")
swait.scanning_rate.put("I/O Intr")

stats = SignalStatsCallback()

detector = noisy
RE(alignment.lineup2(detector, m1, xlo, xhi, npts, nscans=nscans, signal_stats=stats))
# fmt: off
RE(
alignment.lineup2(
detector, m1, parms.xlo, parms.xhi, parms.npts, nscans=parms.nscans, signal_stats=stats
)
)
# fmt: on
change_noisy_parameters()

try:
centroid = stats._registers[detector.name].centroid
if outcome:
assert math.isclose(m1.position, centroid, abs_tol=tol)
assert math.isclose(center, centroid, abs_tol=tol)
if parms.outcome:
assert math.isclose(m1.position, centroid, abs_tol=parms.tol)
assert math.isclose(parms.center, centroid, abs_tol=parms.tol)
else:
assert not math.isclose(center, centroid, abs_tol=tol)
assert not math.isclose(parms.center, centroid, abs_tol=parms.tol)
except ZeroDivisionError:
assert math.isclose(m1.position, starting_position, abs_tol=tol)
assert math.isclose(m1.position, starting_position, abs_tol=parms.tol)

0 comments on commit 6b092d2

Please sign in to comment.