Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor and combine for EPICS scan_id PV #280

Merged
merged 2 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions bluesky/instrument/epics_signal_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@

import logging

from ophyd.signal import EpicsSignalBase

logger = logging.getLogger(__name__)
logger.info(__file__)

from . import iconfig
from ophyd.signal import EpicsSignal
from ophyd.signal import EpicsSignalBase
from . import iconfig # noqa


# set default timeout for all EpicsSignal connections & communications
# always first, before ANY ophyd EPICS-based signals are created
# Set default timeout for all EpicsSignal connections & communications.
# Always call first, before ANY ophyd EPICS-based signals are created.
TIMEOUT = 60
if not EpicsSignalBase._EpicsSignalBase__any_instantiated:
EpicsSignalBase.set_defaults(
Expand All @@ -33,25 +33,32 @@
logger.info("Using RunEngine metadata for scan_id")
scan_id_epics = None
else:
from ophyd.signal import EpicsSignal

logger.info("Using EPICS PV %s for scan_id", pvname)
# Must not call _before_ default timeouts are set.
scan_id_epics = EpicsSignal(pvname, name="scan_id_epics")


def epics_scan_id_source(*args, **kwargs):
"""
Callback function for RunEngine. Returns *next* scan_id to be used.

* Ignore args and kwargs.
* Get current scan_id from PV.
* Apply lower limit of zero.
* Increment.
* Set PV with new value.
* Return new value.

Exception will be raised if PV is not connected when next
``bluesky.plan_stubs.open_run()`` is called.
"""
if scan_id_epics is None:
raise RuntimeError(
"epics_scan_id_source() called when"
" 'RUN_ENGINE_SCAN_ID_PV' is"
"undefined in 'iconfig.yml' file."
" undefined in 'iconfig.yml' file."
)
new_scan_id = max(scan_id_epics.get(), 0) + 1
scan_id_epics.put(new_scan_id)
Expand Down
45 changes: 6 additions & 39 deletions bluesky/instrument/framework/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,47 +109,14 @@ def get_md_path():
# diagnostics
# RE.msg_hook = ts_msg_hook

# set default timeout for all EpicsSignal connections & communications
TIMEOUT = 60
if not EpicsSignalBase._EpicsSignalBase__any_instantiated:
EpicsSignalBase.set_defaults(
auto_monitor=True,
timeout=iconfig.get("PV_READ_TIMEOUT", TIMEOUT),
write_timeout=iconfig.get("PV_WRITE_TIMEOUT", TIMEOUT),
connection_timeout=iconfig.get("PV_CONNECTION_TIMEOUT", TIMEOUT),
)

# Create a registry of ophyd devices
registry = Registry(auto_register=True)

_pv = iconfig.get("RUN_ENGINE_SCAN_ID_PV")
if _pv is None:
logger.info("Using RunEngine metadata for scan_id")
else:
from ophyd import EpicsSignal

logger.info("Using EPICS PV %s for scan_id", _pv)
scan_id_epics = EpicsSignal(_pv, name="scan_id_epics")

def epics_scan_id_source(_md):
"""
Callback function for RunEngine. Returns *next* scan_id to be used.

* Ignore metadata dictionary passed as argument.
* Get current scan_id from PV.
* Apply lower limit of zero.
* Increment (so that scan_id numbering starts from 1).
* Set PV with new value.
* Return new value.

Exception will be raised if PV is not connected when next
``bps.open_run()`` is called.
"""
new_scan_id = max(scan_id_epics.get(), 0) + 1
scan_id_epics.put(new_scan_id)
return new_scan_id
if iconfig.get("RUN_ENGINE_SCAN_ID_PV") is not None:
from ..epics_signal_config import epics_scan_id_source
from ..epics_signal_config import scan_id_epics

# tell RunEngine to use the EPICS PV to provide the scan_id.
RE.scan_id_source = epics_scan_id_source
scan_id_epics.wait_for_connection()
RE.md["scan_id"] = scan_id_epics.get()

# Create a registry of ophyd devices
registry = Registry(auto_register=True)
Loading