Skip to content

Commit

Permalink
Remove pmacCSmotor
Browse files Browse the repository at this point in the history
  • Loading branch information
tomtrafford committed Jul 12, 2024
1 parent 631bb22 commit a6ff99b
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/ophyd_async/epics/motion/motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, prefix: str, name="") -> None:
self.motor_done_move = epics_signal_r(int, prefix + ".DMOV")
self.low_limit_travel = epics_signal_rw(float, prefix + ".LLM")
self.high_limit_travel = epics_signal_rw(float, prefix + ".HLM")

self.output_link = epics_signal_r(str, prefix + ".OUT")
self.motor_stop = epics_signal_x(prefix + ".STOP")
# Whether set() should complete successfully or not
self._set_success = True
Expand Down
3 changes: 1 addition & 2 deletions src/ophyd_async/epics/pmac/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from ._pmacIO import Pmac
from ._pmacMotor import PmacCSMotor
from ._pmacTrajectory import PmacTrajectory

__all__ = ["Pmac", "PmacCSMotor", "PmacTrajectory"]
__all__ = ["Pmac", "PmacTrajectory"]
23 changes: 0 additions & 23 deletions src/ophyd_async/epics/pmac/_pmacMotor.py

This file was deleted.

61 changes: 37 additions & 24 deletions src/ophyd_async/epics/pmac/_pmacTrajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from ophyd_async.core.async_status import AsyncStatus, WatchableAsyncStatus
from ophyd_async.core.signal import observe_value
from ophyd_async.core.utils import WatcherUpdate
from ophyd_async.epics.pmac import Pmac, PmacCSMotor
from ophyd_async.epics.motion import Motor
from ophyd_async.epics.pmac import Pmac

TICK_S = 0.000001

Expand All @@ -21,7 +22,7 @@ def __init__(self, prefix: str, cs: int, name="") -> None:
super().__init__(prefix, cs, name=name)

async def _ramp_up_velocity_pos(
self, velocity: float, motor: PmacCSMotor, end_velocity: float
self, velocity: float, motor: Motor, end_velocity: float
):
# Assuming ramping to or from 0
max_velocity_acceleration_time = await motor.acceleration_time.get_value()
Expand All @@ -32,34 +33,37 @@ async def _ramp_up_velocity_pos(
return [disp, accl_time]

@AsyncStatus.wrap
async def prepare(self, stack: list[Frames[PmacCSMotor]]):
async def prepare(self, stack: list[Frames[Motor]]):
# Which Axes are in use?

scanSize = len(stack[0])
scanAxes = stack[0].axes()

cs_ports = set()
self.profile = {}
cs_axes: dict[Motor, str] = {}
for axis in scanAxes:
if axis != "DURATION":
await axis.get_cs_info()
self.profile[axis.cs_axis] = []
self.profile[axis.cs_axis + "_velocity"] = []
cs_ports.add(axis.cs_port)
cs_port, cs_axis = await self.get_cs_info(axis)
self.profile[cs_axis] = []
self.profile[cs_axis + "_velocity"] = []
cs_ports.add(cs_port)
cs_axes[axis] = cs_axis
else:
self.profile[axis.lower()] = []
assert len(cs_ports) == 1, "Motors in more than one CS"
cs_port = cs_ports.pop()

# Calc Velocity

for axis in scanAxes:
for i in range(scanSize):
if axis != "DURATION":
self.profile[axis.cs_axis + "_velocity"].append(
self.profile[cs_axes[axis] + "_velocity"].append(
(stack[0].upper[axis][i] - stack[0].lower[axis][i])
/ (stack[0].midpoints["DURATION"][i])
)
self.profile[axis.cs_axis].append(stack[0].midpoints[axis][i])
self.profile[cs_axes[axis]].append(stack[0].midpoints[axis][i])
else:
self.profile[axis.lower()].append(
int(stack[0].midpoints[axis][i] / TICK_S)
Expand All @@ -74,28 +78,28 @@ async def prepare(self, stack: list[Frames[PmacCSMotor]]):
run_up_disp, run_up_t = await self._ramp_up_velocity_pos(
0,
axis,
self.profile[axis.cs_axis + "_velocity"][0],
self.profile[cs_axes[axis] + "_velocity"][0],
)
self.initial_pos[axis.cs_axis] = (
self.profile[axis.cs_axis][0] - run_up_disp
self.initial_pos[cs_axes[axis]] = (
self.profile[cs_axes[axis]][0] - run_up_disp
)
# trail off position and time
if (
self.profile[axis.cs_axis + "_velocity"][0]
== self.profile[axis.cs_axis + "_velocity"][-1]
self.profile[cs_axes[axis] + "_velocity"][0]
== self.profile[cs_axes[axis] + "_velocity"][-1]
):
final_pos = self.profile[axis.cs_axis][-1] + run_up_disp
final_pos = self.profile[cs_axes[axis]][-1] + run_up_disp
final_time = run_up_t
else:
ramp_down_disp, ramp_down_time = await self._ramp_up_velocity_pos(
self.profile[axis.cs_axis + "_velocity"][-1],
self.profile[cs_axes[axis] + "_velocity"][-1],
axis,
0,
)
final_pos = self.profile[axis.cs_axis][-1] + ramp_down_disp
final_pos = self.profile[cs_axes[axis]][-1] + ramp_down_disp
final_time = max(ramp_down_time, final_time)
self.profile[axis.cs_axis].append(final_pos)
self.profile[axis.cs_axis + "_velocity"].append(0)
self.profile[cs_axes[axis]].append(final_pos)
self.profile[cs_axes[axis] + "_velocity"].append(0)
run_up_time = max(run_up_time, run_up_t)

self.profile["duration"][0] += run_up_time / TICK_S
Expand All @@ -106,18 +110,18 @@ async def prepare(self, stack: list[Frames[PmacCSMotor]]):
if axis != "DURATION":
self.profile_cs_name.set(cs_port)
self.points_to_build.set(scanSize + 1)
getattr(self, "use_" + axis.cs_axis).set(True)
getattr(self, axis.cs_axis).set(self.profile[axis.cs_axis])
getattr(self, axis.cs_axis + "_vel").set(
self.profile[axis.cs_axis + "_velocity"]
getattr(self, "use_" + cs_axes[axis]).set(True)
getattr(self, cs_axes[axis]).set(self.profile[cs_axes[axis]])
getattr(self, cs_axes[axis] + "_vel").set(
self.profile[cs_axes[axis] + "_velocity"]
)
else:
self.timeArray.set(self.profile["duration"])

# MOVE TO START
for axis in scanAxes:
if axis != "DURATION":
await axis.set(self.initial_pos[axis.cs_axis])
await axis.set(self.initial_pos[cs_axes[axis]])

# Set No Of Points

Expand All @@ -140,3 +144,12 @@ async def complete(self):
precision=0,
time_elapsed=time.monotonic() - self._fly_start,
)

async def get_cs_info(self, motor: Motor):
output_link = await motor.output_link.get_value()
# Split "@asyn(PORT,num)" into ["PORT", "num"]
split = output_link.split("(")[1].rstrip(")").split(",")
cs_port = split[0].strip()
assert "CS" in cs_port, f"{self.name} not in a CS. It is not a compound motor."
cs_axis = "abcuvwxyz"[int(split[1].strip()) - 1]
return cs_port, cs_axis
5 changes: 3 additions & 2 deletions tests/epics/pmac/test_pmac.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from scanspec.specs import Line, fly

from ophyd_async.core import DeviceCollector, set_mock_value
from ophyd_async.epics.pmac import PmacCSMotor, PmacTrajectory
from ophyd_async.epics.motion import Motor
from ophyd_async.epics.pmac import PmacTrajectory


@pytest.fixture
async def sim_x_motor():
async with DeviceCollector(mock=True):
sim_motor = PmacCSMotor("BLxxI-MO-STAGE-01:X", name="sim_x_motor")
sim_motor = Motor("BLxxI-MO-STAGE-01:X", name="sim_x_motor")

set_mock_value(sim_motor.motor_egu, "mm")
set_mock_value(sim_motor.precision, 3)
Expand Down

0 comments on commit a6ff99b

Please sign in to comment.