Skip to content

Commit

Permalink
Merge pull request #920 from BCDA-APS/919-lineup2-fixes
Browse files Browse the repository at this point in the history
lineup2() should work with low intensity peaks
  • Loading branch information
prjemian authored Feb 20, 2024
2 parents c3796ea + 251bc1e commit 0d7647f
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 71 deletions.
14 changes: 12 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,20 @@ Project `milestones <https://github.com/BCDA-APS/apstools/milestones>`_
describe future plans.

..
1.6.19
1.6.20
******
release expected by 2024-02-29
release expected by 2024-04-30

1.6.19
******

release expected by 2024-02-29

Fixes
-----

* lineup2() should work with low intensity peaks

1.6.18
******
Expand Down
2 changes: 1 addition & 1 deletion apstools/callbacks/scan_signal_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SignalStatsCallback:
from bluesky import plans as bp
from bluesky import preprocessors as bpp
signal_stats = SignalStats()
signal_stats = SignalStatsCallback()
def my_plan(detectors, mover, rel_start, rel_stop, points, md={}):
Expand Down
8 changes: 4 additions & 4 deletions apstools/plans/alignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,12 @@ def principal_signal_stats() -> str:
def strong_peak(stats) -> bool:
"""Determine if the peak is strong."""
try:
value = (stats.max_y - stats.min_y) / stats.sigma
return value > peak_factor
value = (stats.max_y - stats.min_y) / stats.stddev_y
return abs(value) > peak_factor
except ZeroDivisionError: # not enough samples
try:
value = abs(stats.max_y / stats.min_y)
return value > peak_factor
value = stats.max_y / stats.min_y
return abs(value) > peak_factor
except ZeroDivisionError:
return False

Expand Down
255 changes: 191 additions & 64 deletions apstools/plans/tests/test_alignment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Test the alignment plans."""

import collections
import math
import time

import bluesky.plan_stubs as bps
import bluesky.plans as bp
Expand All @@ -14,6 +16,7 @@
from ophyd import Device
from ophyd import Signal

from ...callbacks.scan_signal_statistics import SignalStatsCallback
from ...devices import SynPseudoVoigt
from ...synApps import SwaitRecord
from ...synApps import setup_lorentzian_swait
Expand All @@ -28,20 +31,22 @@
bec.enable_plots()

axis = ophyd.EpicsSignal(f"{IOC_GP}gp:float1", name="axis")
calcs_enable = ophyd.EpicsSignal(f"{IOC_GP}userCalcEnable", name="calcs_enable", string=True)
m1 = ophyd.EpicsMotor(f"{IOC_GP}m1", name="m1")
m2 = ophyd.EpicsMotor(f"{IOC_GP}m2", name="m2")
noisy = ophyd.EpicsSignalRO(f"{IOC_GP}userCalc1.VAL", name="noisy")
scaler1 = ophyd.scaler.ScalerCH(f"{IOC_GP}scaler1", name="scaler1")
swait = SwaitRecord(f"{IOC_GP}userCalc1", name="swait")

for obj in (axis, m1, m2, noisy, scaler1, swait):
for obj in (axis, m1, m2, noisy, scaler1, swait, calcs_enable):
obj.wait_for_connection()

scaler1.select_channels()

# First, must be connected to m1.
pvoigt = SynPseudoVoigt(name="pvoigt", motor=axis, motor_field=axis.name)
pvoigt.kind = "hinted"
calcs_enable.put("Enable")


def change_noisy_parameters(fwhm=0.15, peak=10000, noise=0.08):
Expand Down Expand Up @@ -95,105 +100,151 @@ def test_SynPseudoVoigt_randomize():
assert signal.noise_multiplier == 1


@pytest.mark.parametrize(
"signal, mover, start, finish, npts, feature",
[
[noisy, m1, -1.2, 1.2, 11, "max"], # slower
[pvoigt, axis, -1.2, 1.2, 11, "max"], # faster
[pvoigt, axis, -1.2, 1.2, 11, "cen"],
[pvoigt, axis, -1.2, 1.2, 51, "com"],
],
)
def test_direct_implementation_with_rel_scan(signal, mover, start, finish, npts, feature):
RE(bps.mv(mover, 0))
assert get_position(mover) == 0.0
TestParameters = collections.namedtuple("TestParameters", "signal mover start finish npts feature")
parms_slower = TestParameters(noisy, m1, -1.2, 1.2, 11, "max")
parms_faster = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max")
parms_cen = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen")
parms_com = TestParameters(pvoigt, axis, -1.2, 1.2, 51, "com")

if isinstance(signal, SynPseudoVoigt):
signal.randomize_parameters(scale=250_000)

RE(bp.rel_scan([signal], mover, start, finish, npts))
@pytest.mark.parametrize("parms", [parms_slower, parms_faster, parms_cen, parms_com])
def test_direct_implementation_with_rel_scan(parms: TestParameters):
RE(bps.mv(parms.mover, 0))
assert get_position(parms.mover) == 0.0

if feature in ("max", "min"):
center = bec.peaks[feature][signal.name][0]
if isinstance(parms.signal, SynPseudoVoigt):
parms.signal.randomize_parameters(scale=250_000)

RE(bp.rel_scan([parms.signal], parms.mover, parms.start, parms.finish, parms.npts))

if parms.feature in ("max", "min"):
center = bec.peaks[parms.feature][parms.signal.name][0]
else:
center = bec.peaks[feature][signal.name]
center = bec.peaks[parms.feature][parms.signal.name]
# fwhm = bec.peaks["fwhm"][signal.name]

# confirm center will be within scan range
assert min(start, finish) <= center
assert center <= max(start, finish)
assert min(parms.start, parms.finish) <= center
assert center <= max(parms.start, parms.finish)

RE(bps.mv(mover, center)) # move to the center position
position = get_position(mover)
RE(bps.mv(parms.mover, center)) # move to the center position
position = get_position(parms.mover)
assert math.isclose(position, center, abs_tol=0.001)


TestParameters = collections.namedtuple("TestParameters", "signals mover start finish npts feature rescan")
parms_motor_slower = TestParameters(noisy, m1, -1.2, 1.2, 11, "max", True)
parms_ao_faster = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen", True)
parms_ao_max = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max", True)
parms_det_list_of_1 = TestParameters([pvoigt], axis, -1.2, 1.2, 11, "max", True)
parms_det_list_of_3 = TestParameters([pvoigt, noisy, scaler1], axis, -1.2, 1.2, 11, "max", True)
parms_det_tuple = TestParameters((pvoigt), axis, -1.2, 1.2, 11, "max", True)
parms_cen = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen", False)
parms_com = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "com", False)
parms_max = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max", False)
parms_min___pathological = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "min", False)


@pytest.mark.parametrize(
"signals, mover, start, finish, npts, feature, rescan",
"parms",
[
[noisy, m1, -1.2, 1.2, 11, "max", True], # slower, is motor
[pvoigt, axis, -1.2, 1.2, 11, "cen", True], # faster, is ao (float)
[pvoigt, axis, -1.2, 1.2, 11, "max", True],
[[pvoigt], axis, -1.2, 1.2, 11, "max", True], # list
[[pvoigt, noisy, scaler1], axis, -1.2, 1.2, 11, "max", True], # more than one detector
[(pvoigt), axis, -1.2, 1.2, 11, "max", True], # tuple
[pvoigt, axis, -1.2, 1.2, 11, "cen", False],
[pvoigt, axis, -1.2, 1.2, 11, "com", False],
[pvoigt, axis, -1.2, 1.2, 11, "max", False],
[pvoigt, axis, -1.2, 1.2, 11, "min", False], # pathological
parms_motor_slower,
parms_ao_faster,
parms_ao_max,
parms_det_list_of_1,
parms_det_list_of_3,
parms_det_tuple,
parms_cen,
parms_com,
parms_max,
parms_min___pathological,
],
)
def test_lineup(signals, mover, start, finish, npts, feature, rescan):
if isinstance(signals, SynPseudoVoigt):
signals.randomize_parameters(scale=250_000, bkg=0.000_000_000_1)
def test_lineup(parms: TestParameters):
if isinstance(parms.signals, SynPseudoVoigt):
parms.signals.randomize_parameters(scale=250_000, bkg=0.000_000_000_1)
else:
change_noisy_parameters()

RE(bps.mv(mover, 0))
assert get_position(mover) == 0.0

RE(alignment.lineup(signals, mover, start, finish, npts, feature=feature, rescan=rescan, bec=bec))
RE(bps.mv(parms.mover, 0))
assert get_position(parms.mover) == 0.0

RE(
alignment.lineup(
parms.signals,
parms.mover,
parms.start,
parms.finish,
parms.npts,
feature=parms.feature,
rescan=parms.rescan,
bec=bec,
)
)

# if rescan and feature in "max cen".split():
# if parms.rescan and parms.feature in "max cen".split():
# # Test if mover position is within width of center.
# if isinstance(signals, SynPseudoVoigt):
# center = signals.center
# width = signals.sigma * 2.355 # FWHM approx.
# if isinstance(parms.signals, SynPseudoVoigt):
# center = parms.signals.center
# width = parms.signals.sigma * 2.355 # FWHM approx.
# else: # FIXME: multiple signals
# center = swait.channels.B.input_value.get()
# width = swait.channels.C.input_value.get()
# # FIXME: fails to find the bec analysis in lineup()
# position = get_position(mover)
# position = get_position(parms.mover)
# lo = center-width
# hi = center+width
# # assert lo <= position <= hi, f"{bec=} {bec.peaks=} {position=} {center=} {width=}"


TestParameters = collections.namedtuple("TestParameters", "signals, mover, start, finish, npts, feature, nscans")
parms_motor_slower = TestParameters(noisy, m1, -1.2, 1.2, 11, "max", 2)
parms_ao_faster = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen", 2)
parms_ao_max = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max", 2)
parms_det_list_of_1 = TestParameters([pvoigt], axis, -1.2, 1.2, 11, "max", 2)
parms_det_list_of_3 = TestParameters([pvoigt, noisy, scaler1], axis, -1.2, 1.2, 11, "max", 2)
parms_det_tuple = TestParameters((pvoigt), axis, -1.2, 1.2, 11, "max", 2)
parms_cen = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "cen", 1)
parms_com = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "com", 1)
parms_max = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "max", 1)
parms_min___pathological = TestParameters(pvoigt, axis, -1.2, 1.2, 11, "min", 1)


@pytest.mark.parametrize(
"signals, mover, start, finish, npts, feature, nscans",
"parms",
[
[noisy, m1, -1.2, 1.2, 11, "max", 2], # slower, is motor
[pvoigt, axis, -1.2, 1.2, 11, "cen", 2], # faster, is ao (float)
[pvoigt, axis, -1.2, 1.2, 11, "max", 2],
[[pvoigt], axis, -1.2, 1.2, 11, "max", 2], # list
[[pvoigt, noisy, scaler1], axis, -1.2, 1.2, 11, "max", 2], # more than one detector
[(pvoigt), axis, -1.2, 1.2, 11, "max", 2], # tuple
[pvoigt, axis, -1.2, 1.2, 11, "cen", 1],
[pvoigt, axis, -1.2, 1.2, 11, "com", 1],
[pvoigt, axis, -1.2, 1.2, 11, "max", 1],
[pvoigt, axis, -1.2, 1.2, 11, "min", 1], # pathological
parms_motor_slower,
parms_ao_faster,
parms_ao_max,
parms_det_list_of_1,
parms_det_list_of_3,
parms_det_tuple,
parms_cen,
parms_com,
parms_max,
parms_min___pathological,
],
)
def test_lineup2(signals, mover, start, finish, npts, feature, nscans):
if isinstance(signals, SynPseudoVoigt):
signals.randomize_parameters(scale=250_000, bkg=0.000_000_000_1)
def test_lineup2(parms: TestParameters):
if isinstance(parms.signals, SynPseudoVoigt):
parms.signals.randomize_parameters(scale=250_000, bkg=0.000_000_000_1)
else:
change_noisy_parameters()

RE(bps.mv(mover, 0))
assert get_position(mover) == 0.0

RE(alignment.lineup2(signals, mover, start, finish, npts, feature=feature, nscans=nscans))
RE(bps.mv(parms.mover, 0))
assert get_position(parms.mover) == 0.0

RE(
alignment.lineup2(
parms.signals,
parms.mover,
parms.start,
parms.finish,
parms.npts,
feature=parms.feature,
nscans=parms.nscans,
)
)


def test_TuneAxis():
Expand Down Expand Up @@ -285,3 +336,79 @@ class ImitatorForTesting(Device):
getattr(peaks, k).put(v)
results.set_stats(peaks)
assert results.report() is None


TestParameters = collections.namedtuple(
"TestParameters", "peak base noise center sigma xlo xhi npts nscans tol outcome"
)
parms_signal_but_high_background = TestParameters(1e5, 1e6, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_model_peak = TestParameters(1e5, 0, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_high_background_poor_resolution = TestParameters(1e5, 1e4, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.1, True)
parms_not_much_better = TestParameters(1e5, 1e4, 10, 0.1, 0.2, -0.7, 0.5, 11, 2, 0.05, True)
parms_neg_peak_1x_base = TestParameters(-1e5, -1e4, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.1, True)
parms_neg_base = TestParameters(1e5, -10, 10, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_small_signal_zero_base = TestParameters(1e-5, 0, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_neg_small_signal_zero_base = TestParameters(-1e5, 0, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_small_signal_finite_base = TestParameters(1e-5, 1e-7, 1e-8, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, True)
parms_no_signal_only_noise = TestParameters(0, 0, 1e-8, 0.1, 0.2, -1.0, 0.5, 11, 1, 0.05, False)
parms_bkg_plus_noise = TestParameters(0, 1, 0.1, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_bkg_plus_big_noise = TestParameters(0, 1, 100, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.05, False)
parms_no_signal__ZeroDivisionError = TestParameters(0, 0, 0, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.005, None)
parms_bkg_only__ZeroDivisionError = TestParameters(0, 1, 0, 0.1, 0.2, -0.7, 0.5, 11, 1, 0.005, None)


@pytest.mark.parametrize(
"parms",
[
parms_signal_but_high_background,
parms_model_peak,
parms_high_background_poor_resolution,
parms_not_much_better,
parms_neg_peak_1x_base,
parms_neg_base,
parms_small_signal_zero_base,
parms_neg_small_signal_zero_base,
parms_small_signal_finite_base,
parms_no_signal_only_noise,
parms_bkg_plus_noise,
parms_bkg_plus_big_noise,
parms_no_signal__ZeroDivisionError,
parms_bkg_only__ZeroDivisionError,
],
)
def test_lineup2_signal_permutations(parms: TestParameters):
starting_position = 0.0
m1.move(starting_position)
time.sleep(1) # without this, the test IOC crashes, sometimes
swait.reset()
swait.description.put("CI test lineup2()")
swait.channels.A.input_pv.put(m1.user_readback.pvname)
swait.channels.B.input_value.put(parms.center)
swait.channels.C.input_value.put(parms.sigma)
swait.channels.D.input_value.put(parms.peak)
swait.channels.E.input_value.put(parms.base)
swait.channels.F.input_value.put(parms.noise)
swait.calculation.put("E+D/(1+((A-B)/C)^2)+F*RNDM")
swait.scanning_rate.put("I/O Intr")

stats = SignalStatsCallback()

detector = noisy
# fmt: off
RE(
alignment.lineup2(
detector, m1, parms.xlo, parms.xhi, parms.npts, nscans=parms.nscans, signal_stats=stats
)
)
# fmt: on
change_noisy_parameters()

try:
centroid = stats._registers[detector.name].centroid
if parms.outcome:
assert math.isclose(m1.position, centroid, abs_tol=parms.tol)
assert math.isclose(parms.center, centroid, abs_tol=parms.tol)
else:
assert not math.isclose(parms.center, centroid, abs_tol=parms.tol)
except ZeroDivisionError:
assert math.isclose(m1.position, starting_position, abs_tol=parms.tol)

0 comments on commit 0d7647f

Please sign in to comment.