Skip to content

Commit

Permalink
Adds reading of tag meta data (OV Chipkaart). Requires api v1.1
Browse files Browse the repository at this point in the history
WARNING: This client version is not compatible with older backend versions!!
requires Doorlockd-backend with API version v1.1.0

- has_access(hwid, target=target, nfc_tools=nfc_tools)
  target + nfc_tools are added, other implementations should atleast add (*args, **kwargs)

- NfcTools: Set of tools to communicate with nfc tags or collect various tag data.
  .parse_ovchipkaart(): read valid date of NL OV Chipkaart.
  .collect_meta(): read various meta data like nfc-chip type and version.

- Requires doorlockd api v 1.1.0 (doorlockd-backend)
  • Loading branch information
wie-niet committed Nov 28, 2024
1 parent 3d9d395 commit fdba2af
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 12 deletions.
17 changes: 12 additions & 5 deletions libs/module/DjangoBackendRfidAuth.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_timestamps_now_begin_end(self, precision=None):
datetime.datetime.fromtimestamp(t_end, tz=datetime.timezone.utc),
)

def add(self, key, known_key):
def add(self, key, known_key, meta_data={}):
# get privacy friendly timestamps, now , begin, end
t_now, t_begin, t_end = self.get_timestamps_now_begin_end()

Expand All @@ -98,8 +98,12 @@ def add(self, key, known_key):
)
item["count"] += r["count"]
item["timestamp"] = r["timestamp"]
item["meta_data_json"] = json.dumps(
{**meta_data, **json.loads(item["meta_data_json"])}
)
except:
# add new
r["meta_data_json"] = json.dumps(meta_data)
self.table_unknown_key.append(r)

# Key -> LastSeen table with obfuscated timestamp
Expand Down Expand Up @@ -563,8 +567,9 @@ def acl_has_access(self, hwid):
# no condition return true
return False, "no condition in ruleset returned true"

def lookup(self, key):
def lookup(self, key, target, nfc_tools, *args, **kwargs):
key = key.lower() # lowercase this key
meta_data = {} # placeholder for meta_data

if self.lock_disabled is True:
msg = "Warning: lock disabled or never synchronised, lookup() and last-seen logging ignored."
Expand All @@ -577,13 +582,15 @@ def lookup(self, key):
if key not in self.keys:
has_access, msg = False, "Not found"
known_key = False
meta_data = nfc_tools.collect_meta()
logger.info(f"card meta info: {meta_data}")
else:
has_access, msg = self.acl_has_access(key)
known_key = True

# keep last_seen list
if known_key or (not known_key and self.log_unknownkeys):
self.log_stats.add(key, known_key)
self.log_stats.add(key, known_key, meta_data)
else:
logger.debug("Logging is disabled by log_unknownkeys.")

Expand Down Expand Up @@ -755,10 +762,10 @@ def disable(self):
def teardown(self):
self.api.cleanup()

def has_access(self, hwid_str):
def has_access(self, hwid_str, target, nfc_tools, *args, **kwargs):
"""lookup detected hwid,"""
# lookup hwid in db
access, msg = self.api.lookup(hwid_str)
access, msg = self.api.lookup(hwid_str, target, nfc_tools, *args, **kwargs)
logger.debug(
f"'{self.api.lockname}' RFID KEY lookup({hwid_str}): access={access} : {msg}"
)
Expand Down
181 changes: 175 additions & 6 deletions libs/module/PN532.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import time
import threading

import datetime


class TargetGoneWhileReadingError(Exception):
pass


class PN532(module.BaseModule):

Expand Down Expand Up @@ -153,15 +159,18 @@ def io_wait_for_tag_detected(self):
self.callback_tag_detected(target)

def callback_tag_detected(self, target):
# print("DEBUG: ", target)
# print("DEBUG: ", target.identifier)
# print("DEBUG: ", hwid2hexstr(target.identifier))
#
hwid_str = hwid2hexstr(
target.identifier
) # make hwid in hex lowercase string format

if dc.rfid_auth.has_access(hwid_str):
try:
has_access = dc.rfid_auth.has_access(
hwid_str, target=target, nfc_tools=NfcTools(target)
)
except TargetGoneWhileReadingError:
has_access = None

if has_access is True:
logger.info("hwid ({:s}) access alowed.".format(hwid_str))
self.event_bus.raise_event(
"rfid_access_allowed"
Expand All @@ -172,7 +181,7 @@ def callback_tag_detected(self, target):
self.event, {}, wait=True
) # raise configured trigger_action for rfid_action

else:
elif has_access is False:
logger.info("hwid ({:s}) access denied.".format(hwid_str))
self.event_bus.raise_event(
"rfid_access_denied"
Expand All @@ -182,6 +191,11 @@ def callback_tag_detected(self, target):
"rfid_access_denied_fin"
) # raise when rfid is access_denied after sleeping x seconds.

else:
# has_access is None or anythng else
logger.info("hwid ({:s}) gone while reading.".format(hwid_str))
time.sleep(2)

def start_thread(self):
if not (self.thread and self.thread.is_alive()):
self.thread = threading.Thread(target=self.run, args=())
Expand All @@ -204,3 +218,158 @@ def stop_thread(self):
# join thread to wait for stop:
if self.thread:
self.thread.join()


class NfcTools:
"""Set of tools to communicate with nfc tags or collect various tag data."""

def __init__(self, target):
self.target = target

def _authenticate(
self, secret=bytearray([0x0, 0x0, 0x0, 0x0, 0x0, 0x0]), page=0, timeout=0.005
):
#
# only for target.type Type2Tag
if self.target.type != "Type2Tag":
raise Exception("authenticate only for for type Type2Tag")

send = bytearray([0x60, page]) + secret + self.target._nfcid
logger.debug(f"authenticate using: target.transceive({send.hex()}, {timeout})")
# target.transceive(send, timeout)
with self.target.clf.lock:
return self.target.clf.device.chipset.in_data_exchange(send, timeout)

def _read(self, page, max_retries=3):
"""wrapper for target.read(page), but then with retries for timeout
Type2TagCommandError(nfc.tag.TIMEOUT_ERROR)
"""
n = 0
while n != max_retries:
n = n + 1
try:
logger.debug(
f"attemp..... while read({page}), attemp ({n}/{max_retries})"
)
return self.target.read(page)
except nfc.tag.tt2.Type2TagCommandError as e:
if repr(e) != repr(
nfc.tag.tt2.Type2TagCommandError(nfc.tag.TIMEOUT_ERROR)
):
raise (e)
else:
logger.debug(
f"TIMEOUT_ERROR while read({page}), attemp ({n}/{max_retries})"
)

# max_retries reached, raise timeout error:
# raise nfc.tag.tt2.Type2TagCommandError(nfc.tag.TIMEOUT_ERROR)
raise TargetGoneWhileReadingError

# TODO: how to respond here?, raise exception and ignore this NFC comunication like it never happened ? or add this message to the meta dict so the hwid still showsup as "UnknownKey".
# Idea TargetGoneWhileReadingError:
# - UI Leds keep shows something stuck in communication , so that enduser whill understand he/she needs to keep the rfid tag longer in front of the reading device to succeed reading.
# we raise this exception anywhere in the dc.rfid_auth.has_access() callback
# raise TargetGoneWhileReadingError

def parse_ovchipkaart(self, data):
"""Parse NL OV-Chipkaart, we now only read validuntil date."""

def getbits(data, start, end):
"""Return number at bit positions of bytestring data (msb first)"""
val = 0
# verbose = (start == 4 and end == 6)
for byte in range(start // 8, (end + 7) // 8):
bits = 8 # to chop off excess bits on the right
if byte * 8 > end - 8:
bits = end - byte * 8
# if verbose: print "byte =", byte, "bits =", bits
mask = 0xFF # to chop off excess bits on the left
if byte == start // 8:
mask = (1 << (8 - start % 8)) - 1
# if verbose: print "mask = %02x" % mask
val = (val << bits) + ((data[byte] & mask) >> (8 - bits))
# if verbose: print "data[byte] = %02x val = %02x" % (ord(data[byte]), val)
return val

cardtypes = {0: "anonymous", 2: "personal"}

cardid = getbits(data[0:4], 0, 4 * 8)
cardtype = cardtypes[getbits(data[0x10:0x36], 18 * 8 + 4, 19 * 8)]
validuntildays = getbits(data[0x10:0x36], 11 * 8 + 6, 13 * 8 + 4)
validuntil = datetime.date(1997, 1, 1)
validuntil += datetime.timedelta(validuntildays)

logger.debug(f"RAW DATA: {data.hex()}")

s = "OV-Chipkaart id %d, %s, valid until %s" % (cardid, cardtype, validuntil)
logger.info(s)

# return {'cardid': cardid, 'cardtype': cardtype, 'validuntil': str(validuntil)}
return {"validuntil": str(validuntil)}

def collect_ovchipkaart(self):
# OV chipcards use Mifare Classic (SAK == 0x18), looks like we can check this on .clf.target.sen_sel
if self.target.clf.target.sel_res != b"\x18":
logger.debug(
f"No OV Chipkaart, No Mifare classic 4k. SAK (.clf.target.sel_res ({self.target.clf.target.sel_res})) is not b'\x18'"
)
return {}

# we use our own authenticate method (was not implemented in nfcpy)
try:
self._authenticate()
except Exception as e:
logger.debug(e, exc_info=True)
return {}

# is OV Chip kaart, match page2 vs known string.
# read(1)[0:11] match: bytearray.fromhex('840000000603a00013aee4')
data = [None, None, None, None]

data[1] = self._read(1)
# data[1] = self.target.read(1)
if data[1][0:11] != bytearray.fromhex("840000000603a00013aee4"):
logger.debug("collect_ovchipkaart: Not an OV Chipkaart.")
return {}

# read other pages
data[0] = self._read(0)
data[2] = self._read(2)
data[3] = self._read(3)

# parse meta info
meta = self.parse_ovchipkaart(data[0] + data[1] + data[2] + data[3])
return {"ovchipkaart": meta}

def collect_meta(self):
meta = {}

# add nfc.tag.Tag type, product, exact class (Type 1/2/3/4 etc..):
meta.update(
{
"tag": {
"class": str(type(self.target)),
"type": self.target.type,
"product": self.target.product,
}
}
)

# add nfc.clf.RemoteTarget: sel_res, sens_res, sdd_res and brty
meta.update(
{
"target": {
"brty": self.target.clf.target.brty,
"sdd_res": self.target.clf.target.sdd_res.hex(),
"sel_res": self.target.clf.target.sel_res.hex(),
"sens_res": self.target.clf.target.sens_res.hex(),
}
}
)

# try OV chiptkaart:
meta.update(self.collect_ovchipkaart())

# return all meta info
return meta
2 changes: 1 addition & 1 deletion libs/module/SimpleRfidAuth.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def disable(self):
def teardown(self):
pass

def has_access(self, hwid_str):
def has_access(self, hwid_str, *args, **kwargs):
"""lookup detected hwid,"""
# lookup hwid in db
access = hwid_str in self.access_list
Expand Down

0 comments on commit fdba2af

Please sign in to comment.