Skip to content

Commit

Permalink
Corrected some utility data conversion routines and added unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
bobfox committed Feb 13, 2018
1 parent f7c02e1 commit 5242ea9
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 37 deletions.
128 changes: 128 additions & 0 deletions sunspec/core/test/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@

"""
Copyright (C) 2018 SunSpec Alliance
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
"""

import sys
import os
import time
import unittest

import sunspec.core.util as util

class TestUtil(unittest.TestCase):

def test_data_to_s16(self):
self.assertEqual(util.data_to_s16(b'\x12\x34'), int(4660))
self.assertEqual(util.data_to_s16(b'\x92\x34'), int(-28108))

def test_data_to_u16(self):
self.assertEqual(util.data_to_u16(b'\x12\x34'), int(4660))
self.assertEqual(util.data_to_u16(b'\x92\x34'), int(37428))

def test_data_to_s32(self):
self.assertEqual(util.data_to_s32(b'\x12\x34\x56\x78'), int(305419896))
self.assertEqual(util.data_to_s32(b'\x92\x34\x56\x78'), int(-1842063752))

def test_data_to_u32(self):
self.assertEqual(util.data_to_u32(b'\x12\x34\x56\x78'), int(305419896))
self.assertEqual(util.data_to_u32(b'\x92\x34\x56\x78'), int(2452903544))

def test_data_to_s64(self):
self.assertEqual(util.data_to_s64(b'\x12\x34\x56\x78\x12\x34\x56\x78'), int(1311768465173141112))
self.assertEqual(util.data_to_s64(b'\x92\x34\x56\x78\x12\x34\x56\x78'), int(-7911603571681634696))

def test_data_to_u64(self):
self.assertEqual(util.data_to_u64(b'\x12\x34\x56\x78\x12\x34\x56\x78'), int(1311768465173141112))
self.assertEqual(util.data_to_u64(b'\x92\x34\x56\x78\x12\x34\x56\x78'), int(10535140502027916920))

def test_data_to_ipv6addr(self):
self.assertEqual(util.data_to_ipv6addr(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), None)
self.assertEqual(util.data_to_ipv6addr(b'\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF0'), '12345678:9ABCDEF0:12345678:9ABCDEF0')
self.assertEqual(util.data_to_ipv6addr(b'\x01\x00\x00\x00\x00\x00\x00\x00'), None)

def test_data_to_eui48(self):
self.assertEqual(util.data_to_eui48(b'\x00\x00\x00\x00\x00\x00\x00\x00'), None)
self.assertEqual(util.data_to_eui48(b'\x00\x00\x12\x34\x56\x78\x9A\xBC'), '12:34:56:78:9A:BC')

def test_data_to_float(self):
self.assertEqual(util.data_to_float(b'\x7f\xc0\x00\x00'), None)
self.assertEqual(util.data_to_float(b'\x44\x7a\x00\x00'), float(1000))
self.assertEqual(util.data_to_float(b'\xc4\x7a\x00\x00'), float(-1000))

def test_data_to_double(self):
self.assertEqual(util.data_to_double(b'\x7F\xF8\x00\x00\x00\x00\x00\x00'), None)
self.assertEqual(util.data_to_double(b'\x40\x8f\x40\x00\x00\x00\x00\x00'), float(1000))
self.assertEqual(util.data_to_double(b'\xc0\x8f\x40\x00\x00\x00\x00\x00'), float(-1000))

def test_data_to_str(self):
self.assertEqual(util.data_to_str(b'\x53\x75\x6e\x53\x70\x65\x63\x20\x54\x65\x73\x74\x00'), 'SunSpec Test')


def test_s16_to_data(self):
self.assertEqual(util.s16_to_data(int(4660)), b'\x12\x34')
self.assertEqual(util.s16_to_data(int(-28108)), b'\x92\x34')

def test_u16_to_data(self):
self.assertEqual(util.u16_to_data(int(4660)), b'\x12\x34')
self.assertEqual(util.u16_to_data(int(37428)), b'\x92\x34')

def test_s32_to_data(self):
self.assertEqual(util.s32_to_data(int(305419896)), b'\x12\x34\x56\x78')
self.assertEqual(util.s32_to_data(int(-1842063752)), b'\x92\x34\x56\x78')

def test_u32_to_data(self):
self.assertEqual(util.u32_to_data(int(305419896)), b'\x12\x34\x56\x78')
self.assertEqual(util.u32_to_data(int(2452903544)), b'\x92\x34\x56\x78')

def test_s64_to_data(self):
self.assertEqual(util.s64_to_data(int(1311768465173141112)), b'\x12\x34\x56\x78\x12\x34\x56\x78')
self.assertEqual(util.s64_to_data(int(-7911603571681634696)), b'\x92\x34\x56\x78\x12\x34\x56\x78')

def test_u64_to_data(self):
self.assertEqual(util.u64_to_data(int(1311768465173141112)), b'\x12\x34\x56\x78\x12\x34\x56\x78')
self.assertEqual(util.u64_to_data(int(10535140502027916920)), b'\x92\x34\x56\x78\x12\x34\x56\x78')

def test_ipv6addr_to_data(self):
self.assertEqual(util.ipv6addr_to_data('12345678:9ABCDEF0:12345678:9ABCDEF0'), b'\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF0')

def test_float_to_data32(self):
self.assertEqual(util.float_to_data32(float(1000)), b'\x44\x7a\x00\x00')
self.assertEqual(util.float_to_data32(float(-1000)), b'\xc4\x7a\x00\x00')

def test_float32_to_data(self):
self.assertEqual(util.float32_to_data(float(1000)), b'\x44\x7a\x00\x00')
self.assertEqual(util.float32_to_data(float(-1000)), b'\xc4\x7a\x00\x00')

def test_float_to_data(self):
self.assertEqual(util.float_to_data(float(1000)), b'\x40\x8f\x40\x00\x00\x00\x00\x00')
self.assertEqual(util.float_to_data(float(-1000)), b'\xc0\x8f\x40\x00\x00\x00\x00\x00')

def test_str_to_data(self):
self.assertEqual(util.str_to_data('SunSpec Test'), b'\x53\x75\x6e\x53\x70\x65\x63\x20\x54\x65\x73\x74\x00')

def test_eui48_to_data(self):
self.assertEqual(util.eui48_to_data('12:34:56:78:9A:BC'), b'\x00\x00\x12\x34\x56\x78\x9A\xBC')


if __name__ == "__main__":

unittest.main()
79 changes: 42 additions & 37 deletions sunspec/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import sys
import zipfile
import array
import base64

class SunSpecError(Exception):
pass
Expand Down Expand Up @@ -58,41 +59,40 @@ def data_to_u64(data):
return u64[0]

def data_to_ipv6addr(data):
addr = struct.unpack('16s', data)
return addr[0]
if sys.version_info < (3,):
data = [ord(x) for x in data]

value = False
for i in data:
if i != 0:
value = True
break
if value and len(data) == 16:
return '%02X%02X%02X%02X:%02X%02X%02X%02X:%02X%02X%02X%02X:%02X%02X%02X%02X' % (
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15])

def data_to_eui48(data):
return '%02X:%02X:%02X:%02X:%02X:%02X' % (ord(data[2]), ord(data[3]), ord(data[4]), ord(data[5]), ord(data[6]), ord(data[7]))

try:
float('nan')

def data_to_float(data):
f = struct.unpack('>f', data)
return f[0]

def data_to_double(data):
d = struct.unpack('>d', data)
return d[0]

except Exception:
# earlier python version - nan not supported
def data_to_float(data):
e = struct.unpack('>L', data)
# if all exponent bits are '1' it is nan or inf, set to None
if (e[0] & 0x7f800000) == 0x7f800000:
return None
else:
f = struct.unpack('>f', data)
if sys.version_info < (3,):
data = [ord(x) for x in data]

value = False
for i in data:
if i != 0:
value = True
break
if value and len(data) == 8:
return '%02X:%02X:%02X:%02X:%02X:%02X' % (
data[2], data[3], data[4], data[5], data[6], data[7])

def data_to_float(data):
f = struct.unpack('>f', data)
if str(f[0]) != str(float('nan')):
return f[0]

def data_to_double(data):
e = struct.unpack('>Q', data)
# if all exponent bits are '1' it is nan or inf, set to None
if (e[0] & 0x7ff0000000000000) == 0x7ff0000000000000:
return None
else:
d = struct.unpack('>d', data)
def data_to_double(data):
d = struct.unpack('>d', data)
if str(d[0]) != str(float('nan')):
return d[0]

def data_to_str(data):
Expand Down Expand Up @@ -124,13 +124,14 @@ def s64_to_data(s64, len=None):
def u64_to_data(u64, len=None):
return struct.pack('>Q', u64)

def ipv6addr_to_data(addr, len=None):
return struct.pack('16s', addr)
def ipv6addr_to_data(addr, slen=None):
s = base64.b16decode(addr.replace(':', ''))
if slen is None:
slen = len(s)
return struct.pack(str(slen) + 's', s)

def float_to_data32(f, len=None):
# convert python float (float64) to float32 before packing
fa = array('f', f)
return struct.pack('>f', fa[0])
return struct.pack('>f', f)

def float32_to_data(f, len=None):
return struct.pack('>f', f)
Expand All @@ -144,10 +145,14 @@ def str_to_data(s, slen=None):
slen = len(s)
if sys.version_info > (3,):
s = bytes(s, 'latin-1')
if slen < 16:
s += b'\x00'
slen += 1
return struct.pack(str(slen) + 's', s)

def eui48_to_data(eui48):
return ('0000' + eui48.replace(':', '')).decode('hex')
return (b'\x00\x00' + base64.b16decode(eui48.replace(':', '')))


""" Simple XML pretty print support function
Expand Down

0 comments on commit 5242ea9

Please sign in to comment.