-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_pgns.py
138 lines (97 loc) · 3.3 KB
/
get_pgns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
get_pgns.py
prints a list of messages containing the specified PGNs to stdout.
The object used to do this: Filter_PGNs can be imported and used with other things as well
Running options:
1) cmd args
get_pgns.py path_to_logs pgn1 pgn3 pgn3...
2) config file
get_pgns.py (checks for presense of get_pgns.conf and sets the arguments with that)
conf file format:
{
"path": "path/to/file",
"pgns: [
"pgn1",
"pgn2"
]
}
3) interative session
get_pgns.py (no conf file, no arguments) -> interactive mode
"""
import sys
import json
import chardet
import os
from messages.j1939id import J1939ID
def get_msgs(path: str) -> list:
"""gets msgs from file and returns list
Args:
path (str): path to file
Returns:
list: list of messages
"""
messages: list[tuple] = []
try:
with open(path, "rb") as f:
ch = chardet.detect(f.readline())
encoding = ch["encoding"]
with open(path, "r", encoding=encoding) as f:
for line in f:
line = line.strip()
time, channel, message = line.split(" ")
time = time.replace(")", "").replace("(", "").strip()
time = float(time)
can_id, data = message.split("#")
messages.append((time, channel, J1939ID(can_id), data))
return messages
except FileNotFoundError:
print("file not found, please try again")
sys.exit()
def get_args() -> tuple:
"""returns path and list of pgns of interest
Returns:
tuple: (path, [pgns])
"""
if len(sys.argv) > 2:
path = sys.argv[1]
interest_pgns = sys.argv[2:]
elif os.path.isfile(f"{__file__[:-3]}.conf"):
with open(f"{__file__[:-3]}.conf", "r") as f:
conf = json.load(f)
path = conf["path"]
interest_pgns = conf["pgns"]
else:
path = input("enter path to candump file: ")
pgn_str = input("enter pgns of interest seperated by spaces: ")
interest_pgns = pgn_str.split()
return (path, interest_pgns)
class PGN_Filter:
"""Creates an iterator of messages filtered by their pgn values
Args:
interest_pgns (list): list of pgns to return
messages: (list[tuple[timestamp, channel, can_id, data]]): list of messages to filter
"""
def __init__(
self, interest_pgns: list, messages: list[tuple[float, str, J1939ID, str]]
) -> None:
self._interest_pgns: list = interest_pgns
self._messages: list[tuple[float, str, J1939ID, str]] = messages
def __iter__(self) -> iter:
"""Iterator for values
Returns:
iter: iterator
Yields:
Iterator[iter]: iterator
"""
for msg in self._messages:
_, _, can_id, data = msg
if str(can_id.pgn) in self._interest_pgns:
yield f"{can_id.hex}#{data}"
def main():
path, interest_pgns = get_args()
messages: tuple = get_msgs(path)
results = PGN_Filter(interest_pgns=interest_pgns, messages=messages)
for result in results:
print(result)
if __name__ == "__main__":
main()