-
Notifications
You must be signed in to change notification settings - Fork 0
/
rdoorlib.py
276 lines (248 loc) · 10.1 KB
/
rdoorlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# vim:ft=python:ai:expandtab:ts=4:
import socket
import re
import time
import select
class GHMUX:
port = 23200
server ='localhost'
mux = None
# re strings to build up patterns from
# matching the third (release) version of HSDC-0.1.0.5.ino
#
# from readACL()
res_acl_header = re.escape(r'Record #, Address, Addribute, Card #, in HEX format ') + r'\r*\n'
# from printACLR()
res_acl_data = r'"(?P<index>[A-F0-9]{2})", "(?P<addr>[A-F0-9]{4})", "(?P<attribute>[A-F0-9]{2})", "(?P<card_num>(?P<facility_code>[A-F0-9]{2})(?P<card_code>[A-F0-9]{4}))"\r*\n'
# v0.1.0.8 or so got rid of the every ACL header line
#res_acl = res_acl_header + res_acl_data
res_acl = res_acl_data
# from printAllAclRecords()
res_list_header_a = r'(?:[ ]\r*\n){2} Start printing of ACL List \r*\n \r*\n'
res_list_header_b = re.escape(r' Format in Hex = Record #, EEProm Address, Attribute, Card code ')
res_list_header = res_list_header_a + res_list_header_b + r'\r*\n[ ]\r*\n'
res_list_footer = r'[ ]\r*\n End printing of ACL List \r*\n'
res_list = res_list_header + r'(?P<acl_list>(?:' + res_acl + r')*)' + res_list_footer
# from aclAtt()
res_att_header = r'Set Attribute of Record \d+ at address \d+\r*\n'
res_att = res_att_header + res_acl
# from eeMemory()
res_byte = r'(?P<byte>[A-F0-9]{2}) '
res_memory_header = r'Address (?P<address>[A-F0-9]{1,}) -HEX- '
res_memory = res_memory_header + r'(?P<byte_list>(?:' + res_byte + r')*) \r*\n'
# re to match the various command return values
letter_re = {}
letter_re['k'] = re.compile(res_acl)
letter_re['j'] = letter_re['k']
letter_re['v'] = letter_re['k']
letter_re['o'] = re.compile(res_memory)
letter_re['q'] = re.compile(res_att)
letter_re['s'] = re.compile(res_list)
def __init__(self, port=None, server=None):
if port is not None:
self.port = port
if server is not None:
self.server = server
def connect(self):
server_address = (self.server, self.port)
self.mux = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# this should be connecting to localhost, so aggressive timeout
self.mux.settimeout(1.0)
self.mux.connect(server_address)
def close(self):
self.mux.shutdown(socket.SHUT_RDWR)
self.mux.close()
def run(self, wait=0.1, letter='k', options='00', cycle=3, timeout=0.1):
self.connect()
# set a (normally smaller) timeout for all the send/recv we will do
self.mux.settimeout(timeout)
# format for command is (A###) where ### is a options string
# based on what letter command is being used
command = '(' + letter + options + ')'
command = command.upper()
self.mux.send(command.encode())
result = ''
for count in range(cycle):
rlist, _, _ = select.select([self.mux], [], [], 0.1)
if rlist:
# do what if recv raises a timeout, catch it?, not?
recieved = self.mux.recv(2048)
result += recieved.decode()
# NOTE: we are explicitly not caring about any trailing stuff
# it might be garbage or the output from the next whatever
# all the letter_re should be designed to match (start anchored)
# open ended regular expressions
matched = self.letter_re[letter].match(result)
if matched:
self.close()
return matched
time.sleep(wait)
self.close()
return None
def set_attribute(self, index, attribute):
# TODO check input
match = self.run(letter='q', options= index+attribute)
# TODO error check
return match
def add(self, attribute, facility_code, card_code):
# TODO check input
match = self.run(letter='v', options= attribute+facility_code+card_code)
# TODO error check
return match
def read_address(self, addr):
address = str(addr)
if not re.match(r'\A[0-9a-fA-F]{4}\Z', address):
raise(Exception("read_address() address is not a 4 digit hexadecimal string"))
match = self.run(letter='o', options=address)
byte_list = match.group('byte_list')
out = []
for byte in re.finditer(self.res_byte, byte_list):
out.append( byte.group('byte') )
return out
def acl_list(self):
match = self.run(letter='s', options='', wait=0.5, cycle=30)
acls = match.group('acl_list')
out = []
for acl in re.finditer(self.res_acl, acls):
out.append({
'index': acl.group('index'),
'addr': acl.group('addr'),
'attribute': acl.group('attribute'),
'facility_code': acl.group('facility_code'),
'card_code': acl.group('card_code'),
'card_num': acl.group('card_num'),
})
# parts of card_num:
# facility_code
# card_code
return out
class GHCard:
# This is the attribute byte, but only item defined is bit 0 == 0 is access allowed, 1 is denied
attribute = '01' # 2 digit hex number as string
allowed = False # this is boolean from bit 0 of attribute
# facility code (FC) 2 digit hex number as string
fc = None
# card code (CC) 4 digit hex number as string
cc = None
# location in memory on the nano
loc = None
# index of ACL in memory on the nano
index = None
# name so we can log better, not stored on Arduino
name = ''
# if new ACL list says we are denied
set_denied = False
# if new ACL list says we are allowed
set_allowed = False
# if matches new ACL list entry
no_change = False
# if processed in new ACL list
done = False
def __init__( self, facilityCode, cardCode, location=None, allowed=None,
attribute=None, name=None, index = None ):
if index is not None:
if re.match(r'\A[0-9a-fA-F]{2}\Z', str(index)):
self.index = index.upper()
else:
raise(Exception("index was not a two character hexadecimal string"))
self.index = index
if name is not None:
# TODO strip name down to 'safe' characters set for clean logging?
# e.g. [a-zA-Z0-9 ._'-]
# this is silly, but I don't want newlines (for example) to end up in log lines
self.name = name
if attribute is not None:
if re.match(r'\A[0-9a-fA-F]{2}\Z', str(attribute)):
self.attribute = attribute.upper()
else:
raise(Exception("attribute was not a two character hexadecimal string"))
self.attribute = attribute
if self.attribute == '00':
self.allowed = True
if re.match(r'\A[0-9a-fA-F]{2}\Z', str(facilityCode)):
self.fc = facilityCode.upper()
else:
raise(Exception("facilityCode was not a two character hexadecimal string"))
if re.match(r'\A[0-9a-fA-F]{4}\Z', str(cardCode)):
self.cc = cardCode.upper()
else:
raise(Exception("cardCode was not a four character hexadecimal string"))
self.loc = location
if allowed is not None:
if allowed:
self.allowed = True
self.attribute = '00'; # this will need to change if we define more bits in attribute
def __eq__( self, other ):
if type(self) is type(other):
return self.fc == other.fc and self.cc == other.cc
else:
return False
def __hash__( self ):
return hash((self.fc,self.cc))
class GHACL:
acl = None
# start location fo the ACL list on nan0
start = None
# end location of the ACL list on nano
# XXX verify this, but I think this is the first address not used by
# the ACL list (e.i. pointer just after the last record)
end = None
def __init__( self, acl=None ):
if acl is None:
acl = []
self.acl = acl
def add( self, card ):
self.acl.append( card )
def delta_list_to( self, gold, door=None ):
s_card = {k:k for k in self.acl}
seen = {}
add = []
to_deny = []
to_allow = []
for new in gold.acl:
if new in seen:
pass # WTF? duplicate new card data in gold? XXX
seen[new] = 1
new_card = True
if new in s_card:
old = s_card[new]
new_card = False
if new.allowed != old.allowed:
if new.allowed:
old.set_allowed = True
to_allow.append(old.index)
else:
old.set_denied = True
to_deny.append(old.index)
old.done = True
else:
# not neccessarily will it have allowed set
# we also keep track of cards that may not get in
add.append(new)
new.done = True
for old in self.acl:
if old.done:
continue
if old.allowed:
# no card matched in new ACL list, disable it
old.set_denied = True
to_deny.append(old.index)
if not to_allow and not to_deny and not add:
# no changes are needed
print('No changes needed')
return None
print(len(to_allow))
print(len(to_deny))
print(len(add))
if door is None:
raise(Exception('No door to update with acl delta'))
## loop on to_allow for set attribute.0 = 0
for index in to_allow:
door.set_attribute(index,'00')
## loop on to_deny for set attribute.0 = 1
for index in to_deny:
door.set_attribute(index,'01')
# ignore any pre-loading of space in the ACL list, just use "V" to add at end
## loop on add list to create calls for adding them
for card in add:
door.add(card.attribute, card.fc, card.cc)