-
Notifications
You must be signed in to change notification settings - Fork 0
/
pxparser.py
376 lines (329 loc) · 15.9 KB
/
pxparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import struct
import xlsxwriter
from io import TextIOWrapper
from xlsxwriter.worksheet import Worksheet
from pathlib import Path
class PxParser:
BLOCK_SIZE = 8192
MSG_HEADER_LEN = 3
MSG_HEAD1 = 0xA3
MSG_HEAD2 = 0x95
MSG_FORMAT_PACKET_LEN = 89
MSG_FORMAT_STRUCT = "BB4s16s64s"
MSG_TYPE_FORMAT = 0x80
FORMAT_TO_STRUCT = {
"a": ("a", None),
"b": ("b", None),
"B": ("B", None),
"h": ("h", None),
"H": ("H", None),
"i": ("i", None),
"I": ("I", None),
"f": ("f", None),
"d": ("d", None),
"n": ("4s", None),
"N": ("16s", None),
"Z": ("64s", None),
"c": ("h", 0.01),
"C": ("H", 0.01),
"e": ("i", 0.01),
"E": ("I", 0.01),
"L": ("i", 0.0000001),
"M": ("b", None),
"q": ("q", None),
"Q": ("Q", None),
}
__msg_filter = list()
__msg_ignore = list()
__msg_labels = dict()
__msg_descrs = dict()
__msg_names = list()
__msg_filter_map = dict()
__txt_columns = list()
__txt_data = dict()
__namespace = dict()
__prev_data = list()
__next_data = list()
__buffer = bytearray()
__msg_id_ignore = set()
__pointer = 0
__delim_char = '\t'
__null_char = ''
__time_msg = ""
__file = ""
__txt_updated = False
__time_msg_id = 0
__debug_out = False
__correct_errors = False
__interpolation = False
__workbook = None
completed = 0
msg_count = 0
def set_namespace(self, namespace):
""" Set custom names for column headers """
self.__namespace = namespace
def set_null_char(self, csv_null):
""" Set null char for None values """
self.__null_char = csv_null
def set_msg_filter(self, msg_filter):
""" Set whitelist of allowed message labels """
self.__msg_filter = msg_filter
def set_time_msg(self, time_msg):
""" Set label of message to get time from """
self.__time_msg = time_msg
def enable_debug_out(self):
""" Enable debug output to stdout """
self.__debug_out = True
# Enable error correction. If enabled tries to bypass errors error
def enable_err_correct(self):
self.__correct_errors = True
def enable_interpolation(self): # Set a constant clock flag
self.__interpolation = True
# Set a list of messages to ignore during processing
def set_msg_ignore(self, msg_ignore):
self.__msg_ignore = msg_ignore
def set_output_file(self, file_name, file_type):
""" Creates output file with provided file type and name """
if file_type == 'txt' or file_type == 'csv':
self.__file = open(file_name + '.' + file_type, "w+")
self.__delim_char = ',' if file_type == 'csv' else '\t'
elif file_type == 'xlsx':
self.__workbook = xlsxwriter.Workbook(
file_name + '.' + file_type, {'nan_inf_to_errors': True})
self.__file = self.__workbook.add_worksheet()
# Convert Cstring to string object
def __to_utf8(self, cstr):
""" Convert ASCII to UTF-8 """
return str(cstr, 'ascii').split('\0')[0]
def process(self, fn):
""" Main function. Converts provided .bin file to human-readable text format """
file_size = Path(fn).stat().st_size # Get file size
if self.__debug_out:
for msg_name, show_fields in self.__msg_filter:
self.__msg_filter_map[msg_name] = show_fields
first_data_msg = True
f = open(fn, "rb") # Open log file
bytes_read = 0
while True:
chunk = f.read(self.BLOCK_SIZE) # Get chunk
if len(chunk) == 0: # Quit if block is empty
break
# Add chunk to buffer
self.__buffer = self.__buffer[self.__pointer:] + chunk
self.__pointer = 0 # Rest pointer
while self.__bytesLeft() >= self.MSG_HEADER_LEN: # If past header
head1 = self.__buffer[self.__pointer]
head2 = self.__buffer[self.__pointer+1]
if (head1 != self.MSG_HEAD1 or head2 != self.MSG_HEAD2): # Check header integrity
if self.__correct_errors: # If correction enabled, skip chunk
self.__pointer += 1
continue
else: # If correction disabled, raise exception
raise Exception("Invalid header at %i (0x%X): %02X %02X, must be %02X %02X" % (
bytes_read + self.__pointer, bytes_read + self.__pointer, head1, head2, self.MSG_HEAD1, self.MSG_HEAD2))
# Get message type
msg_type = self.__buffer[self.__pointer + 2]
if msg_type == self.MSG_TYPE_FORMAT: # If it's format description
if self.__bytesLeft() < self.MSG_FORMAT_PACKET_LEN: # If remaining lenght less than format message
break # Quit
self.__parseMsgDescr() # Parse it
else: # Parse data message
# Get message discription
msg_descr = self.__msg_descrs[msg_type]
if msg_descr == None:
# If type unknown, raise exception
raise Exception("Unknown msg type: %i" % msg_type)
msg_length = msg_descr[0] # Set message length
if self.__bytesLeft() < msg_length:
break # Quit if remaining length lesser than msg_length
if first_data_msg: # If it's first data message
if not self.__debug_out:
self.__initOutput() # Initialize file
first_data_msg = False
# Get data from message by it's description
self.__parseMsg(msg_descr)
bytes_read += self.__pointer # Move pointer
self.completed = bytes_read / file_size * 100 # Update completion status
if not self.__debug_out and self.__time_msg != None and self.__txt_updated: # Ignore this
self.__processData() # Process data
f.close() # Close log file
if type(self.__file) is Worksheet: # If writing to .xlsx, close file
self.__workbook.close()
def __bytesLeft(self):
""" Get amout of bytes left in file being processed """
return len(self.__buffer) - self.__pointer
def __filterMsg(self, msg_name):
""" Create message filter """
show_fields = "*"
if self.__msg_filter_map:
show_fields = self.__msg_filter_map.get(msg_name)
return show_fields
def __initOutput(self):
""" Create output file, write column headers """
if not self.__msg_filter: # If filter is empty, enable all messages
for msg_name in self.__msg_names:
self.__msg_filter.append((msg_name, "*"))
# Fill __txt_columns and __txt_data in accrodig to the __msg_filter
for msg_name, show_fields in self.__msg_filter:
if show_fields == "*":
show_fields = self.__msg_labels.get(msg_name, [])
self.__msg_filter_map[msg_name] = show_fields
for field in show_fields:
full_label = msg_name + "_" + field
self.__txt_columns.append(full_label)
self.__txt_data[full_label] = None
# Fill in __msg_ignore_id in accroding to the __msg_ignore
for col in self.__txt_columns:
if col in self.__msg_ignore: # If message col is in __msg_ignore
self.__msg_id_ignore.add(self.__txt_columns.index(col))
self.__time_msg_id = self.__txt_columns.index(self.__time_msg)
self.__msg_id_ignore.add(self.__time_msg_id)
headers = []
if self.__namespace:
for column in self.__txt_columns: # Check every column
if column in self.__namespace: # If it has a custom name, use it
headers.append(self.__namespace[column])
else:
# If there doest have a custom name, use default from __txt_columns
headers.append(column)
else:
# If __namesapce is empty, use default columns headers
headers = self.__txt_columns
if type(self.__file) is TextIOWrapper: # Output headers
print(self.__delim_char.join(headers), file=self.__file)
elif type(self.__file) is Worksheet:
for h in headers:
self.__file.write(0, headers.index(h), h)
else:
print(self.__delim_char.join(headers))
def __processData(self):
""" Convert to correct type, apply interpolation if needed """
data = []
for full_label in self.__txt_columns: # Fill data from __txt_columns
# Get single string from data dictionary
val = self.__txt_data[full_label]
if val == None: # If string is empty
print("set null")
val = self.__null_char # Put null char in
else: # If there is some data
if type(val) is int:
val = int(val)
elif type(val) is float:
val = float(val)
else:
val = str(val)
data.append(val)
if self.__interpolation:
if not self.__next_data:
self.__next_data = data[:]
else: # If __next_data contains something
curr_data = self.__next_data[:]
prev_data = curr_data[:]
self.__next_data = data[:] # Update __next_data
if self.__prev_data:
time_diff = (
(curr_data[self.__time_msg_id] - self.__prev_data[self.__time_msg_id]) // 1000)
# Calculate a multiplier based on time difference for curr_data to reach a round digit time
to_round_time = (100 - (time_diff % 100)) * 0.0001
extra_msg_count = time_diff // 100
for count in range(extra_msg_count):
# If it's first extra message and time is not a round number
if count == 0 and round(to_round_time, 2) > 0:
# Multiply every string in list by to_round_time multiplier
for i in range(len(curr_data)):
try:
if type(curr_data[i]) is str or i in self.__msg_id_ignore or curr_data[i] == self.__prev_data[i]:
continue
elif self.__next_data[i] > curr_data[i]:
curr_data[i] += curr_data[i] * \
to_round_time
else:
curr_data[i] -= curr_data[i] * \
to_round_time
except TypeError:
print(
curr_data[i], self.__next_data[i] if self.__next_data[i] else 'null')
curr_data[self.__time_msg_id] = self.msg_count * 100
self.__printData(curr_data)
tmp = curr_data[:] # Create a temporary data list
for id in range(len(curr_data)): # Interpolate data
if type(tmp[id]) is str or id in self.__msg_id_ignore or self.__next_data[id] == curr_data[id]:
continue
elif self.__next_data[id] > curr_data[id]:
tmp[id] = curr_data[id] + (
((self.__next_data[id] - curr_data[id]) / extra_msg_count) * count)
else:
tmp[id] = curr_data[id] - (
((curr_data[id] - self.__next_data[id]) / extra_msg_count) * count)
tmp[self.__time_msg_id] = self.msg_count * 100
self.__printData(tmp) # Print data
else: # If __prev_data empty
# Calculate current time by multiplying amount of messages by clock time (100 ms)
curr_data[self.__time_msg_id] = 0
self.__printData(curr_data) # Print data
self.__prev_data = prev_data[:]
else:
self.__printData(data)
def __parseMsgDescr(self):
""" Unpack message description, save to globals """
data = struct.unpack(
self.MSG_FORMAT_STRUCT, self.__buffer[self.__pointer + 3: self.__pointer + self.MSG_FORMAT_PACKET_LEN])
msg_type = data[0]
if msg_type != self.MSG_TYPE_FORMAT:
msg_length = data[1]
msg_name = self.__to_utf8(data[2])
msg_format = self.__to_utf8(data[3])
msg_labels = self.__to_utf8(data[4]).split(",")
msg_struct = ""
msg_mults = []
for c in msg_format:
try:
f = self.FORMAT_TO_STRUCT[c]
msg_struct += f[0]
msg_mults.append(f[1])
except KeyError as e:
raise Exception("Unsupported format char: %s in message %s (%i)" % (
c, msg_name, msg_type))
msg_struct = "<" + msg_struct # force little-endian
self.__msg_descrs[msg_type] = (
msg_length, msg_name, msg_format, msg_labels, msg_struct, msg_mults)
self.__msg_labels[msg_name] = msg_labels
self.__msg_names.append(msg_name)
if self.__debug_out:
if self.__filterMsg(msg_name) != None:
print("MSG FORMAT: type = %i, length = %i, name = %s, format = %s, labels = %s, struct = %s, mults = %s" % (
msg_type, msg_length, msg_name, msg_format, str(msg_labels), msg_struct, msg_mults))
self.__pointer += self.MSG_FORMAT_PACKET_LEN
def __parseMsg(self, msg_descr):
""" Get raw data from message """
msg_length, msg_name, msg_format, msg_labels, msg_struct, msg_mults = msg_descr
show_fields = self.__filterMsg(msg_name) # Create filter
if show_fields: # Parse data from file to list
data = list(struct.unpack(
msg_struct, self.__buffer[self.__pointer+self.MSG_HEADER_LEN:self.__pointer+msg_length]))
for i in range(len(data)):
if type(data[i]) is str:
data[i] = self.__to_utf8(data[i])
if msg_mults[i]:
data[i] *= msg_mults[i] # apply multuplyer if needed
label = msg_labels[i]
if label in show_fields: # If label is in filter
# Add parsed raw data to __txt_data
self.__txt_data[msg_name + "_" + label] = data[i]
if self.__time_msg != None and msg_name != self.__time_msg:
self.__txt_updated = True
if self.__time_msg == None:
self.__processData()
self.__pointer += msg_length
def __printData(self, data):
""" Write data to file/stdout """
data = list(map(lambda val: val.lstrip("b'").rstrip(
"'").rstrip("\\x00") if type(data) is str and "\\x00" in data else val, data)) # Clear null termination if needed
if type(self.__file) is TextIOWrapper: # Convert to str, join with delim and write to file
print(self.__delim_char.join(list(map(str, data))), file=self.__file)
elif type(self.__file) is Worksheet:
for d in data:
self.__file.write(self.msg_count + 1, data.index(d), d)
else: # If no output file is set, write to stdout
print(self.__delim_char.join(data))
self.msg_count += 1