diff --git a/src/ophyd_async/epics/pmac/_pmacTrajectory.py b/src/ophyd_async/epics/pmac/_pmacTrajectory.py index f6d2773e6e..d61dca8622 100644 --- a/src/ophyd_async/epics/pmac/_pmacTrajectory.py +++ b/src/ophyd_async/epics/pmac/_pmacTrajectory.py @@ -1,4 +1,5 @@ import time +from typing import Any import numpy as np import numpy.typing as npt @@ -7,11 +8,8 @@ from velocity_profile import velocityprofile as vp from ophyd_async.core import ( - AsyncStatus, TriggerLogic, - WatchableAsyncStatus, - WatcherUpdate, - observe_value, + wait_for_value, ) from ophyd_async.epics import motor from ophyd_async.epics.pmac import Pmac @@ -20,7 +18,7 @@ class PmacTrajInfo(BaseModel): - spec: Spec[motor.Motor] = Field() + spec: Spec = Field(default=None) class PmacTrajectoryTriggerLogic(TriggerLogic[PmacTrajInfo]): @@ -30,7 +28,6 @@ def __init__(self, pmac: Pmac) -> None: # Make a dict of which motors are for which cs axis self.pmac = pmac - @AsyncStatus.wrap async def prepare(self, value: PmacTrajInfo): # initialise use_axis values to False for i in range(len("ABCUVWXYZ")): @@ -46,9 +43,10 @@ async def prepare(self, value: PmacTrajInfo): cs_ports = set() positions: dict[int, npt.NDArray[np.float64]] = {} velocities: dict[int, npt.NDArray[np.float64]] = {} - time_array: npt.NDArray[np.float64] = [] cs_axes: dict[motor.Motor, int] = {} - + time_array: npt.NDArray[np.float64] = np.empty( + 2 * scan_size + (len(gaps) * 4) + 1, dtype=np.float64 + ) # Which Axes are in use? scan_axes = chunk.axes() for axis in scan_axes: @@ -62,9 +60,6 @@ async def prepare(self, value: PmacTrajInfo): velocities[cs_index] = np.empty( 2 * scan_size + (len(gaps) * 4) + 1, dtype=np.float64 ) - time_array = np.empty( - 2 * scan_size + (len(gaps) * 4) + 1, dtype=np.float64 - ) cs_ports.add(cs_port) cs_axes[axis] = cs_index assert len(cs_ports) == 1, "Motors in more than one CS" @@ -104,8 +99,8 @@ async def prepare(self, value: PmacTrajInfo): if gap < scan_size: # Create Position, velocity and time arrays for the gap pos_gap, vel_gap, time_gap = await get_gap_profile(chunk, gap) + len_gap = len(time_gap) for axis in scan_axes: - len_gap = len(time_gap) if axis != "DURATION": positions[cs_axes[axis]][ profile_gap : profile_gap + len_gap @@ -164,17 +159,17 @@ async def prepare(self, value: PmacTrajInfo): for axis in scan_axes: if axis != "DURATION": - self.pmac.profile_cs_name.set(cs_port) - self.pmac.points_to_build.set(profile_length) - self.pmac.use_axis[cs_axes[axis] + 1].set(True) - self.pmac.positions[cs_axes[axis] + 1].set( + await self.pmac.profile_cs_name.set(cs_port) + await self.pmac.points_to_build.set(profile_length) + await self.pmac.use_axis[cs_axes[axis] + 1].set(True) + await self.pmac.positions[cs_axes[axis] + 1].set( positions[cs_axes[axis]][:profile_length], ) - self.pmac.velocities[cs_axes[axis] + 1].set( + await self.pmac.velocities[cs_axes[axis] + 1].set( velocities[cs_axes[axis]][:profile_length] ) else: - self.pmac.time_array.set(time_array[:profile_length]) + await self.pmac.time_array.set(time_array[:profile_length]) # MOVE TO START for axis in scan_axes: @@ -182,31 +177,20 @@ async def prepare(self, value: PmacTrajInfo): await axis.set(self.initial_pos[cs_axes[axis]]) # Set PMAC to use Velocity Array - self.pmac.profile_calc_vel.set(False) - self.pmac.build_profile.set(True) + await self.pmac.profile_calc_vel.set(False) + await self.pmac.build_profile.set(True) self._fly_start = time.monotonic() - @AsyncStatus.wrap async def kickoff(self): - self.status = self.pmac.execute_profile.set(1, timeout=self.scantime + 10) + self.status = await self.pmac.execute_profile.set( + True, timeout=self.scantime + 10 + ) async def stop(self): - await self.pmac.profile_abort.set(1) + await self.pmac.profile_abort.set(True) - @WatchableAsyncStatus.wrap async def complete(self): - async for percent in observe_value(self.scan_percent): - yield WatcherUpdate( - name=self.name, - current=percent, - initial=0, - target=100, - unit="%", - precision=0, - time_elapsed=time.monotonic() - self._fly_start, - ) - if percent >= 100: - break + await wait_for_value(self.status, False, timeout=self.scantime) async def get_cs_info(self, motor: motor.Motor) -> tuple[str, int]: output_link = await motor.output_link.get_value() @@ -220,7 +204,7 @@ async def get_cs_info(self, motor: motor.Motor) -> tuple[str, int]: def _calculate_gaps(self, chunk: Frames[motor.Motor]): inds = np.argwhere(chunk.gap) if len(inds) == 0: - return len(chunk) + return [len(chunk)] else: return inds @@ -229,8 +213,8 @@ async def ramp_up_velocity_pos( motor: motor.Motor, start_velocity: float, end_velocity: float, - ramp_time: int = None, - min_ramp_time: int = None, + ramp_time: float = 0, + min_ramp_time: float = 0, ): # For the given motor return the displacement and time taken to get from one given # velocity to another @@ -238,9 +222,9 @@ async def ramp_up_velocity_pos( max_velocity = await motor.max_velocity.get_value() accl = max_velocity / acceleration_time delta_v = abs(end_velocity - start_velocity) - if ramp_time is None: + if ramp_time == 0: ramp_time = delta_v / accl - if min_ramp_time is not None: + if min_ramp_time: ramp_time = max(ramp_time, min_ramp_time) disp = 0.5 * (start_velocity + end_velocity) * ramp_time return [disp, ramp_time] @@ -380,8 +364,8 @@ async def profile_between_points( async def point_velocities( - chunk: Frames[motor.Motor], index: int, entry: bool = True -) -> dict[str, float]: + chunk: Frames[Any], index: int, entry: bool = True +) -> dict[motor.Motor, float]: """Find the velocities of each axis over the entry/exit of current point""" velocities = {} for axis in chunk.axes(): @@ -419,7 +403,11 @@ async def calculate_profile_from_velocities( time_arrays: dict[motor.Motor, npt.NDArray[np.float64]], velocity_arrays: dict[motor.Motor, npt.NDArray[np.float64]], current_positions: dict[motor.Motor, npt.NDArray[np.float64]], -) -> list[dict[motor.Motor, npt.NDArray[np.float64]]]: +) -> tuple[ + dict[motor.Motor, npt.NDArray[np.float64]], + dict[motor.Motor, npt.NDArray[np.float64]], + list[int], +]: # at this point we have time/velocity arrays with 2-4 values for each # axis. Each time represents a (instantaneous) change in acceleration. # We want to translate this into a move profile (time/position).