-
Notifications
You must be signed in to change notification settings - Fork 0
/
lpfk.py
executable file
·91 lines (75 loc) · 2.46 KB
/
lpfk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python
import os
import sys
import serial
from time import sleep
import struct
class lpfk:
PORT = os.getenv('LIGHTS_PORT', '/dev/serial/by-id/usb-Silicon_Labs_IBM-LPFK_75BB67F3-A9AA8F7A-if00-port0')
RESET = b'\x01'
READCONF = b'\x06'
CONFOK = b'\x03'
ENABLE = b'\x08'
SET = b'\x94'
RETRANSMIT = b'\x80'
OK = b'\x81'
def __init__(self):
self.keys = [0] * 32
self.history = [0] * 5
self.sp = serial.Serial(port=self.PORT,
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_ODD,
stopbits=serial.STOPBITS_ONE,
xonxoff=False,
rtscts=False,
dsrdtr=False)
# reset
self.sp.timeout = 1
self.get()
self.sp.timeout = 10
self.sp.write(self.RESET)
sleep(0.8)
# check ID
self.sp.write(self.READCONF)
if self.get() == self.CONFOK:
print ("LPFK detected")
else:
print ("uh oh, LPFK not detected")
self.sp.close()
sys.exit(1)
# enable
self.sp.write(b'\x08')
sleep(0.1)
def quitcheck(self, cmd):
# did we get the sentinel sequence?
self.history.append(cmd)
self.history.pop(0)
if self.history == [0,1,2,3,31]:
self.keys = [0] * 32
self.update_lights()
return True
return False
def update_lights(self, retries=0):
self.sp.write(self.SET)
self.sp.write(struct.pack('BBBB',
int(''.join(str(k) for k in self.keys[0:8]),2),
int(''.join(str(k) for k in self.keys[8:16]),2),
int(''.join(str(k) for k in self.keys[16:24]),2),
int(''.join(str(k) for k in self.keys[24:32]),2)))
resp = self.get()
if resp == b'\x80':
print('error, retry')
if retries > 2:
print('{} retries and still failing so aborting'.format(retries))
sys.exit(1)
else:
self.update_lights(retries+1)
elif resp == b'\x81':
pass # print('good set')
else:
print('huh??')
def get(self):
return self.sp.read(1)
def close(self):
self.sp.close()