forked from scionproto/scion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
395 lines (337 loc) · 11.7 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# Copyright 2014 ETH Zurich
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
:mod:`util` --- SCION utilities
===============================
Various utilities for SCION functionality.
"""
# Stdlib
import atexit
import json
import logging
import os
import shutil
import signal
import sys
import time
from binascii import hexlify
from datetime import datetime, timezone
from socket import MSG_DONTWAIT
# External packages
import yaml
from external.stacktracer import trace_start
# SCION
from lib.errors import (
SCIONIOError,
SCIONIndexError,
SCIONJSONError,
SCIONParseError,
SCIONTypeError,
SCIONYAMLError,
)
TRACE_DIR = 'traces'
_SIG_MAP = {
signal.SIGHUP: "SIGHUP",
signal.SIGINT: "SIGINT",
signal.SIGQUIT: "SIGQUIT",
signal.SIGTERM: "SIGTERM",
signal.SIGUSR2: "SIGUSR2"
}
def read_file(file_path):
"""
Read and return contents of a file.
:param str file_path: the path to the file.
:returns: the file's contents.
:rtype: str
:raises:
lib.errors.SCIONIOError: error opening/reading from file.
"""
try:
with open(file_path) as file_handler:
return file_handler.read()
except OSError as e:
raise SCIONIOError("Unable to open '%s': %s" % (
file_path, e.strerror)) from None
def write_file(file_path, text):
"""
Write some text into a temporary file, creating its directory as needed, and
then atomically move to target location.
:param str file_path: the path to the file.
:param str text: the file content.
:raises:
lib.errors.SCIONIOError: IO error occurred
"""
# ":" is an illegal filename char on both windows and OSX, so disallow it globally to prevent
# incompatibility.
assert ":" not in file_path, file_path
dir_ = os.path.dirname(file_path)
try:
os.makedirs(dir_, exist_ok=True)
except OSError as e:
raise SCIONIOError("Error creating '%s' dir: %s" %
(dir_, e.strerror)) from None
tmp_file = file_path + ".new"
try:
with open(tmp_file, 'w') as f:
f.write(text)
except OSError as e:
raise SCIONIOError("Error creating/writing to temp file '%s': %s" %
(file_path, e.strerror)) from None
try:
os.rename(tmp_file, file_path)
except OSError as e:
raise SCIONIOError("Error moving '%s' to '%s': %s" %
(tmp_file, file_path, e.strerror)) from None
def copy_file(src, dst):
dst_dir = os.path.dirname(dst)
try:
os.makedirs(dst_dir, exist_ok=True)
except OSError as e:
raise SCIONIOError("Error creating dir '%s': %s" %
(dst_dir, e.strerror)) from None
try:
shutil.copyfile(src, dst)
except OSError as e:
raise SCIONIOError("Error copying '%s' to '%s': %s" %
(src, dst, e.strerror)) from None
def load_json_file(file_path):
"""
Read and parse a JSON config file.
:param str file_path: the path to the file.
:returns: JSON data
:rtype: dict
:raises:
lib.errors.SCIONIOError: error opening/reading from file.
lib.errors.SCIONJSONError: error parsing file.
"""
try:
with open(file_path) as f:
return json.load(f)
except OSError as e:
raise SCIONIOError("Error opening '%s': %s" %
(file_path, e.strerror)) from None
except (ValueError, KeyError, TypeError) as e:
raise SCIONJSONError("Error parsing '%s': %s" %
(file_path, e)) from None
def load_yaml_file(file_path):
"""
Read and parse a YAML config file.
:param str file_path: the path to the file.
:returns: YAML data
:rtype: dict
:raises:
lib.errors.SCIONIOError: error opening/reading from file.
lib.errors.SCIONYAMLError: error parsing file.
"""
try:
with open(file_path) as f:
return yaml.load(f)
except OSError as e:
raise SCIONIOError("Error opening '%s': %s" %
(file_path, e.strerror)) from None
except (yaml.scanner.ScannerError) as e:
raise SCIONYAMLError("Error parsing '%s': %s" %
(file_path, e)) from None
def update_dict(dictionary, key, values, elem_num=0):
"""
Update dictionary. Used for managing a temporary paths' cache.
"""
if key in dictionary:
dictionary[key].extend(values)
else:
dictionary[key] = values
dictionary[key] = dictionary[key][-elem_num:]
def calc_padding(length, block_size):
"""
Calculate how much padding is needed to bring `length` to a multiple of
`block_size`.
:param int length: The length of the data that needs padding.
:param int block_size: The block size.
"""
if length % block_size:
return block_size - (length % block_size)
else:
return 0
def trace(id_):
path = os.path.join(TRACE_DIR, "%s.trace.html" % id_)
trace_start(path)
def sleep_interval(start, interval, desc, quiet=False):
"""
Sleep until the `interval` seconds have elapsed since `start`.
If the interval is already over, log a warning with `desc` at the start.
:param float start:
Time (in seconds since the Epoch) the current interval started.
:param float interval: Length (in seconds) of an interval.
:param str desc: Description of the operation.
:param bool quiet: If set, don't log warnings.
"""
now = SCIONTime.get_time()
delay = start + interval - now
if delay < 0:
if not quiet:
logging.warning(
"%s took too long: %.3fs (should have been <= %.3fs)",
desc, now - start, interval)
delay = 0
time.sleep(delay)
def handle_signals():
"""Setup basic signal handler for the most common signals."""
# FIXME(kormat): the SIGUSR1 handler is actually silently overridden by
# pycapnp, so we can't use/catch it.
# https://github.com/jparyani/pycapnp/issues/101
for sig in _SIG_MAP.keys():
signal.signal(sig, _signal_handler)
def _signal_handler(signum, _):
"""Basic signal handler function."""
text = "Received %s" % _SIG_MAP[signum]
if signum == signal.SIGTERM:
atexit.register(lambda: logging.info(text))
sys.exit(0)
elif signum == signal.SIGINT:
atexit.register(lambda: logging.info(text))
else:
atexit.register(lambda: logging.error(text))
sys.exit(1)
def iso_timestamp(ts): # pragma: no cover
"""
Format a unix timestamp as a UTC ISO 8601 format string
(YYYY-MM-DD HH:MM:SS.mmmmmm+00:00)
:param float ts: Seconds since the UNIX epoch.
"""
return str(datetime.fromtimestamp(ts, tz=timezone.utc))
def hex_str(raw):
"""Format a byte string as hex characters."""
return hexlify(raw).decode("ascii")
def recv_all(sock, total_len, flags):
barr = bytearray()
while len(barr) < total_len:
# The first recv call must support non-blocking mode to raise an error
# if the socket is not ready. Subsequent calls should be blocking to
# avoid sync problems.
if flags & MSG_DONTWAIT and len(barr) > 0:
flags &= ~MSG_DONTWAIT
try:
buf = sock.recv(total_len - len(barr), flags)
except InterruptedError:
continue
except ConnectionResetError:
# Peer closed the connection without reading
logging.error("socket closed by peer")
return None
if not buf:
if not barr:
logging.debug("recv returned nil, socket closed")
else:
logging.error("socket connection prematurely terminated")
return None
barr += buf
return bytes(barr)
class SCIONTime(object):
"""A class to return current time."""
# Function which would return time upon calling it
# Can be set using set_time_method
_custom_time = None
@classmethod
def get_time(cls):
"""Get current time."""
if cls._custom_time:
return cls._custom_time()
else:
return time.time()
@classmethod
def set_time_method(cls, method=None):
"""Set the method used to get time."""
cls._custom_time = method
class Raw(object):
"""A class to wrap raw bytes objects."""
def __init__(self, data, desc="", len_=None,
min_=False): # pragma: no cover
self._data = data
self._desc = desc
self._len = len_
self._min = min_
self._offset = 0
self.check_type()
self.check_len()
def check_type(self):
"""
Check that the data is a `bytes` instance. If not, raise an exception.
:raises:
lib.errors.SCIONTypeError: data is the wrong type
"""
if not isinstance(self._data, bytes):
raise SCIONTypeError(
"Error parsing raw %s: Expected %s, got %s" %
(self._desc, bytes, type(self._data)))
def check_len(self):
"""
Check that the data is of the expected length. If not, raise an
exception.
:raises:
lib.errors.SCIONTypeError: data is the wrong length
"""
if self._len is None:
return
if self._min:
if len(self._data) >= self._len:
return
else:
op = ">="
elif len(self._data) == self._len:
return
else:
op = "=="
raise SCIONParseError(
"Error parsing raw %s: Expected len %s %s, got %s" %
(self._desc, op, self._len, len(self._data)))
def get(self, n=None, bounds=True):
"""
Return next elements from data.
If `n` is not specified, return all remaining elements of data.
If `n` is 1, return the next element of data (as an int).
If `n` is > 1, return the next `n` elements of data (as bytes).
:param n: How many elements to return (see above)
:param bool bounds: Perform bounds checking on access if True
"""
dlen = len(self._data)
if n and bounds and (self._offset + n) > dlen:
raise SCIONIndexError("%s: Attempted to access beyond end of raw "
"data (len=%d, offset=%d, request=%d)" %
(self._desc, dlen, self._offset, n))
if n is None:
return self._data[self._offset:]
elif n == 1:
return self._data[self._offset]
else:
return self._data[self._offset:self._offset + n]
def pop(self, n=None, bounds=True):
"""
Return next elements from data, and advance the internal offset.
Arguments have the same meaning as for Raw.get
"""
ret = self.get(n, bounds)
dlen = len(self._data)
if n is None:
self._offset = dlen
elif n == 1:
self._offset += 1
else:
self._offset += n
if self._offset > dlen:
self._offset = dlen
return ret
def offset(self): # pragma: no cover
return self._offset
def __len__(self):
return max(0, len(self._data) - self._offset)