diff --git a/libs/module/DjangoBackendRfidAuth.py b/libs/module/DjangoBackendRfidAuth.py index cd30c09..fbd78c6 100644 --- a/libs/module/DjangoBackendRfidAuth.py +++ b/libs/module/DjangoBackendRfidAuth.py @@ -76,7 +76,7 @@ def get_timestamps_now_begin_end(self, precision=None): datetime.datetime.fromtimestamp(t_end, tz=datetime.timezone.utc), ) - def add(self, key, known_key): + def add(self, key, known_key, meta_data={}): # get privacy friendly timestamps, now , begin, end t_now, t_begin, t_end = self.get_timestamps_now_begin_end() @@ -98,8 +98,12 @@ def add(self, key, known_key): ) item["count"] += r["count"] item["timestamp"] = r["timestamp"] + item["meta_data_json"] = json.dumps( + {**meta_data, **json.loads(item["meta_data_json"])} + ) except: # add new + r["meta_data_json"] = json.dumps(meta_data) self.table_unknown_key.append(r) # Key -> LastSeen table with obfuscated timestamp @@ -563,8 +567,9 @@ def acl_has_access(self, hwid): # no condition return true return False, "no condition in ruleset returned true" - def lookup(self, key): + def lookup(self, key, target, nfc_tools, *args, **kwargs): key = key.lower() # lowercase this key + meta_data = {} # placeholder for meta_data if self.lock_disabled is True: msg = "Warning: lock disabled or never synchronised, lookup() and last-seen logging ignored." @@ -577,13 +582,15 @@ def lookup(self, key): if key not in self.keys: has_access, msg = False, "Not found" known_key = False + meta_data = nfc_tools.collect_meta() + logger.info(f"card meta info: {meta_data}") else: has_access, msg = self.acl_has_access(key) known_key = True # keep last_seen list if known_key or (not known_key and self.log_unknownkeys): - self.log_stats.add(key, known_key) + self.log_stats.add(key, known_key, meta_data) else: logger.debug("Logging is disabled by log_unknownkeys.") @@ -755,10 +762,10 @@ def disable(self): def teardown(self): self.api.cleanup() - def has_access(self, hwid_str): + def has_access(self, hwid_str, target, nfc_tools, *args, **kwargs): """lookup detected hwid,""" # lookup hwid in db - access, msg = self.api.lookup(hwid_str) + access, msg = self.api.lookup(hwid_str, target, nfc_tools, *args, **kwargs) logger.debug( f"'{self.api.lockname}' RFID KEY lookup({hwid_str}): access={access} : {msg}" ) diff --git a/libs/module/PN532.py b/libs/module/PN532.py index 2de6574..f7c31da 100644 --- a/libs/module/PN532.py +++ b/libs/module/PN532.py @@ -19,6 +19,12 @@ import time import threading +import datetime + + +class TargetGoneWhileReadingError(Exception): + pass + class PN532(module.BaseModule): @@ -153,15 +159,18 @@ def io_wait_for_tag_detected(self): self.callback_tag_detected(target) def callback_tag_detected(self, target): - # print("DEBUG: ", target) - # print("DEBUG: ", target.identifier) - # print("DEBUG: ", hwid2hexstr(target.identifier)) - # hwid_str = hwid2hexstr( target.identifier ) # make hwid in hex lowercase string format - if dc.rfid_auth.has_access(hwid_str): + try: + has_access = dc.rfid_auth.has_access( + hwid_str, target=target, nfc_tools=NfcTools(target) + ) + except TargetGoneWhileReadingError: + has_access = None + + if has_access is True: logger.info("hwid ({:s}) access alowed.".format(hwid_str)) self.event_bus.raise_event( "rfid_access_allowed" @@ -172,7 +181,7 @@ def callback_tag_detected(self, target): self.event, {}, wait=True ) # raise configured trigger_action for rfid_action - else: + elif has_access is False: logger.info("hwid ({:s}) access denied.".format(hwid_str)) self.event_bus.raise_event( "rfid_access_denied" @@ -182,6 +191,11 @@ def callback_tag_detected(self, target): "rfid_access_denied_fin" ) # raise when rfid is access_denied after sleeping x seconds. + else: + # has_access is None or anythng else + logger.info("hwid ({:s}) gone while reading.".format(hwid_str)) + time.sleep(2) + def start_thread(self): if not (self.thread and self.thread.is_alive()): self.thread = threading.Thread(target=self.run, args=()) @@ -204,3 +218,158 @@ def stop_thread(self): # join thread to wait for stop: if self.thread: self.thread.join() + + +class NfcTools: + """Set of tools to communicate with nfc tags or collect various tag data.""" + + def __init__(self, target): + self.target = target + + def _authenticate( + self, secret=bytearray([0x0, 0x0, 0x0, 0x0, 0x0, 0x0]), page=0, timeout=0.005 + ): + # + # only for target.type Type2Tag + if self.target.type != "Type2Tag": + raise Exception("authenticate only for for type Type2Tag") + + send = bytearray([0x60, page]) + secret + self.target._nfcid + logger.debug(f"authenticate using: target.transceive({send.hex()}, {timeout})") + # target.transceive(send, timeout) + with self.target.clf.lock: + return self.target.clf.device.chipset.in_data_exchange(send, timeout) + + def _read(self, page, max_retries=3): + """wrapper for target.read(page), but then with retries for timeout + Type2TagCommandError(nfc.tag.TIMEOUT_ERROR) + """ + n = 0 + while n != max_retries: + n = n + 1 + try: + logger.debug( + f"attemp..... while read({page}), attemp ({n}/{max_retries})" + ) + return self.target.read(page) + except nfc.tag.tt2.Type2TagCommandError as e: + if repr(e) != repr( + nfc.tag.tt2.Type2TagCommandError(nfc.tag.TIMEOUT_ERROR) + ): + raise (e) + else: + logger.debug( + f"TIMEOUT_ERROR while read({page}), attemp ({n}/{max_retries})" + ) + + # max_retries reached, raise timeout error: + # raise nfc.tag.tt2.Type2TagCommandError(nfc.tag.TIMEOUT_ERROR) + raise TargetGoneWhileReadingError + + # TODO: how to respond here?, raise exception and ignore this NFC comunication like it never happened ? or add this message to the meta dict so the hwid still showsup as "UnknownKey". + # Idea TargetGoneWhileReadingError: + # - UI Leds keep shows something stuck in communication , so that enduser whill understand he/she needs to keep the rfid tag longer in front of the reading device to succeed reading. + # we raise this exception anywhere in the dc.rfid_auth.has_access() callback + # raise TargetGoneWhileReadingError + + def parse_ovchipkaart(self, data): + """Parse NL OV-Chipkaart, we now only read validuntil date.""" + + def getbits(data, start, end): + """Return number at bit positions of bytestring data (msb first)""" + val = 0 + # verbose = (start == 4 and end == 6) + for byte in range(start // 8, (end + 7) // 8): + bits = 8 # to chop off excess bits on the right + if byte * 8 > end - 8: + bits = end - byte * 8 + # if verbose: print "byte =", byte, "bits =", bits + mask = 0xFF # to chop off excess bits on the left + if byte == start // 8: + mask = (1 << (8 - start % 8)) - 1 + # if verbose: print "mask = %02x" % mask + val = (val << bits) + ((data[byte] & mask) >> (8 - bits)) + # if verbose: print "data[byte] = %02x val = %02x" % (ord(data[byte]), val) + return val + + cardtypes = {0: "anonymous", 2: "personal"} + + cardid = getbits(data[0:4], 0, 4 * 8) + cardtype = cardtypes[getbits(data[0x10:0x36], 18 * 8 + 4, 19 * 8)] + validuntildays = getbits(data[0x10:0x36], 11 * 8 + 6, 13 * 8 + 4) + validuntil = datetime.date(1997, 1, 1) + validuntil += datetime.timedelta(validuntildays) + + logger.debug(f"RAW DATA: {data.hex()}") + + s = "OV-Chipkaart id %d, %s, valid until %s" % (cardid, cardtype, validuntil) + logger.info(s) + + # return {'cardid': cardid, 'cardtype': cardtype, 'validuntil': str(validuntil)} + return {"validuntil": str(validuntil)} + + def collect_ovchipkaart(self): + # OV chipcards use Mifare Classic (SAK == 0x18), looks like we can check this on .clf.target.sen_sel + if self.target.clf.target.sel_res != b"\x18": + logger.debug( + f"No OV Chipkaart, No Mifare classic 4k. SAK (.clf.target.sel_res ({self.target.clf.target.sel_res})) is not b'\x18'" + ) + return {} + + # we use our own authenticate method (was not implemented in nfcpy) + try: + self._authenticate() + except Exception as e: + logger.debug(e, exc_info=True) + return {} + + # is OV Chip kaart, match page2 vs known string. + # read(1)[0:11] match: bytearray.fromhex('840000000603a00013aee4') + data = [None, None, None, None] + + data[1] = self._read(1) + # data[1] = self.target.read(1) + if data[1][0:11] != bytearray.fromhex("840000000603a00013aee4"): + logger.debug("collect_ovchipkaart: Not an OV Chipkaart.") + return {} + + # read other pages + data[0] = self._read(0) + data[2] = self._read(2) + data[3] = self._read(3) + + # parse meta info + meta = self.parse_ovchipkaart(data[0] + data[1] + data[2] + data[3]) + return {"ovchipkaart": meta} + + def collect_meta(self): + meta = {} + + # add nfc.tag.Tag type, product, exact class (Type 1/2/3/4 etc..): + meta.update( + { + "tag": { + "class": str(type(self.target)), + "type": self.target.type, + "product": self.target.product, + } + } + ) + + # add nfc.clf.RemoteTarget: sel_res, sens_res, sdd_res and brty + meta.update( + { + "target": { + "brty": self.target.clf.target.brty, + "sdd_res": self.target.clf.target.sdd_res.hex(), + "sel_res": self.target.clf.target.sel_res.hex(), + "sens_res": self.target.clf.target.sens_res.hex(), + } + } + ) + + # try OV chiptkaart: + meta.update(self.collect_ovchipkaart()) + + # return all meta info + return meta diff --git a/libs/module/SimpleRfidAuth.py b/libs/module/SimpleRfidAuth.py index fc5f89e..0692b92 100644 --- a/libs/module/SimpleRfidAuth.py +++ b/libs/module/SimpleRfidAuth.py @@ -36,7 +36,7 @@ def disable(self): def teardown(self): pass - def has_access(self, hwid_str): + def has_access(self, hwid_str, *args, **kwargs): """lookup detected hwid,""" # lookup hwid in db access = hwid_str in self.access_list