forked from mxcube/HardwareObjects
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MicrodiffInOutMockup.py
executable file
·105 lines (90 loc) · 3.63 KB
/
MicrodiffInOutMockup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import logging
from HardwareRepository.BaseHardwareObjects import Device
import time
"""
Use the exporter to set different MD2 actuators in/out.
If private_state not specified, True will be send to set in and False for out.
Example xml file:
<device class="MicrodiffInOut">
<username>Scintilator</username>
<exporter_address>wid30bmd2s:9001</exporter_address>
<cmd_name>ScintillatorPosition</cmd_name>
<private_state>{"PARK":"out", "SCINTILLATOR":"in"}</private_state>
<use_hwstate>True</use_hwstate>
</device>
"""
class MicrodiffInOutMockup(Device):
def __init__(self, name):
Device.__init__(self, name)
self.actuatorState = "unknown"
self.username = "unknown"
#default timeout - 3 sec
self.timeout = 3
self.hwstate_attr = None
def init(self):
self.cmdname = self.getProperty("cmd_name")
self.username = self.getProperty("username")
self.states = {True:"in", False:"out"}
self.state_attr = False
self.offset = self.getProperty("offset")
if self.offset > 0:
self.states = {self.offset:"out", self.offset-1:"in",}
states = self.getProperty("private_state")
if states:
import ast
self.states = ast.literal_eval(states)
try:
tt = float(self.getProperty("timeout"))
self.timeout = tt
except:
pass
self.moves = dict((self.states[k], k) for k in self.states)
def connectNotify(self, signal):
if signal=='actuatorStateChanged':
self.valueChanged(self.state_attr)
def valueChanged(self, value):
self.actuatorState = self.states.get(value, "unknown")
self.emit('actuatorStateChanged', (self.actuatorState, ))
def _ready(self):
return True
def _wait_ready(self, timeout=None):
timeout = timeout or self.timeout
tt1 = time.time()
while time.time() - tt1 < timeout:
if self._ready():
break
else:
time.sleep(0.5)
def getActuatorState(self, read=False):
if read is True:
value = self.state_attr
self.actuatorState = self.states.get(value, "unknown")
self.connectNotify("actuatorStateChanged")
else:
if self.actuatorState == "unknown":
self.connectNotify("actuatorStateChanged")
return self.actuatorState
def actuatorIn(self, wait=True, timeout=None):
if self._ready():
try:
self.state_attr = self.moves["in"]
if wait:
timeout = timeout or self.timeout
self._wait_ready(timeout)
self.valueChanged(self.state_attr)
except:
logging.getLogger('user_level_log').error("Cannot put %s in", self.username)
else:
logging.getLogger('user_level_log').error("Microdiff is not ready, will not put %s in" , self.username)
def actuatorOut(self, wait=True, timeout=None):
if self._ready():
try:
self.state_attr = self.moves["out"]
if wait:
timeout = timeout or self.timeout
self._wait_ready(timeout)
self.valueChanged(self.state_attr)
except:
logging.getLogger('user_level_log').error("Cannot put %s out", self.username)
else:
logging.getLogger('user_level_log').error("Microdiff is not ready, will not put %s out" , self.username)