-
Notifications
You must be signed in to change notification settings - Fork 34
/
rrd-sensors-logger
executable file
·1422 lines (1222 loc) · 49.3 KB
/
rrd-sensors-logger
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
import os, sys
def Adafruit_DHT_read_func(st, pin):
# XXX: untested leftover code from old python2 version, likely needs to be updated
import Adafruit_DHT.common as c
driver = c.get_platform().driver
dht_err_names = dict((v,k[4:]) for k,v in vars(c).items() if k.startswith('DHT_'))
def read_func():
res, rh, t = driver.read(st, pin)
if res == c.DHT_SUCCESS: read_err = None
else:
read_err = dht_err_names.get(res) or f'Unknown [{res}]'
rh = t = None
return read_err, rh, t
return read_func
def Adafruit_DHT_read_proc(args):
# XXX: untested leftover code from old python2 version, likely needs to be updated
import Adafruit_DHT, json, select
poller_timeout, = args
poller_timeout = float(poller_timeout)
poller = select.epoll()
poller.register(sys.stdin, select.EPOLLIN)
sys.stdout.write('!')
sys.stdout.flush()
read_funcs = dict()
while True:
try:
if not poller.poll(poller_timeout): msg = ''
else:
msg = sys.stdin.read(3)
if msg: msg = json.loads(sys.stdin.read(int(msg)))
if not msg: raise IOError
except KeyboardInterrupt: continue
except OSError: return
res = None
if msg['cmd'] == 'read':
st, pin = k = msg['st'], msg['pin']
if k not in read_funcs:
st = getattr(Adafruit_DHT, st or 'DHT22')
read_funcs[k] = Adafruit_DHT_read_func(st, pin)
res = read_funcs[k]()
else: raise ValueError(msg)
msg = json.dumps(res)
sys.stdout.write(f'{len(msg):03d}{msg}')
sys.stdout.flush()
# Special case for Adafruit_DHT module,
# which fails in the main pid for some reason.
if __name__ == '__main__' and sys.argv[1] == '--dht-read-subproc':
sys.exit(Adafruit_DHT_read_proc(sys.argv[2:]))
import collections as cs, pathlib as pl, datetime as dt, operator as op, functools as ft
import io, re, string, base64, time, math, json, stat, errno, tempfile
import glob, subprocess, signal, struct, contextlib, hashlib, logging
import rrdtool # should be installed with rrdtool binary/package
import yaml # https://pyyaml.org/
class LayeredAttrDict(cs.ChainMap):
def __init__(self, *maps, **map0):
super().__init__(*filter(None, [map0] + list(maps)))
def __getitem__(self, k, _err=KeyError):
k_maps = list()
for m in self.maps:
if k in m: k_maps.append(m[k])
if not k_maps: raise _err(k)
if not isinstance(k_maps[0], dict): return k_maps[0]
return LayeredAttrDict(*(m for m in k_maps if isinstance(m, dict)))
def __getattr__(self, k): return self.__getitem__(k, _err=AttributeError)
def __setattr__(self, k, v):
if k in ['maps']: return super().__setattr__(k, v)
self[k] = v
yaml_load = yaml.safe_load
yaml_dump = lambda data, dst:\
yaml.safe_dump(data, dst, allow_unicode=True, default_flow_style=False)
yaml.SafeDumper.add_representer(LayeredAttrDict, yaml.SafeDumper.represent_dict)
### systemd unit file:
#
# [Unit]
# After=time-sync.target
#
# [Service]
# Type=notify
# WatchdogSec=30
# Restart=always
# RestartSec=30
# StartLimitInterval=10min
# StartLimitBurst=10
# ExecStart=/usr/local/bin/rrd-sensors-logger --systemd daemon --http-listen
#
# [Install]
# WantedBy=multi-user.target
conf_base_str = '''
### rrd-sensors-logger configuration file (format: YAML)
### Place this file into ~/.rrd-sensors-logger.yaml or specify explicitly with --conf option.
rrd:
# Parameters for the created rrd databases
# Most are translated directly to values for "rrdtool create":
# http://oss.oetiker.ch/rrdtool/doc/rrdcreate.en.html
storage_dir: /var/lib/rrd_logger
name: rrd_logger.0 # can be used to encode schema version
cleanup_after: 14d # remove unused rrd files older than that
# Symlink that gets updated to point to a currently-used rrd file
current_symlink: rrd_logger.current.rrd
# See http://oss.oetiker.ch/rrdtool/doc/rrdcreate.en.html#IThe_HEARTBEAT_and_the_STEP
step: 30s # finest granularity of stored data
heartbeat: 60s # used for all DS'es, must be at least 20% greater than measurement interval
aggregate:
# RRA parameters for rrdtool
cf: average
xff: 0.8
keep: 30d
# "aggregate.step" gets converted to a number
# of datapoints, so it must be a multiple of "step" above
step: 5min
# "timesync_file" will contain current posix timestamp and uptime values
# It should be checked on system boot to make sure clock doesn't get reset to
# some insanely-backwards value, as it will make datapoints completely meaningless
# Best way to use the file would be to start some script with root privileges
# that would detect uptime wrap-arounds (abort if it wasn't),
# then check if time has been synced via NTP (abort if it was),
# then set the time to timestamp to this value + new uptime + update_interval
# File will be created under "storage_dir" (can be set above)
timesync_file:
enabled: true
offset: 5s # roughly the time for RPi to boot
path: last_write.timestamps
check: true
check_fatal: true
meas:
interval: 20
sensors:
## Common keys for all sensors:
## type: type_name # used to pick the code to query the sensor, mandatory
## disabled: false # do no actually read stuff from sensor (only return errors), optional
## Below are examples for implemented sensors.
# rpi_soc: # sensor name (will correspond to rrd DS name)
# type: rpi_soc
# # path: /sys/class/thermal/thermal_zoneX/temp
# mb:
# type: lm
# chip: atk0110-acpi-0 # see "sensors -u" output
# feature: temp1 # either this or "feature_type" can be specified to remove ambiguity
# # feature_type: t # v, rpm, t
# cpu:
# type: lm
# chip: k10temp-pci-00c3
# # only one feature here, so no need for other specs
# dht22_indoors:
# type: adafruit_dht # requires github.com/adafruit/Adafruit_Python_DHT
# sysfs_gpio_pin: 9 # must be specified
# # sensor_type: DHT22 # as defined in Adafruit_DHT module, DHT22 is the default
# # gpio_init: false # null/false, { in | high | low }[-{ up | down | off }]
# # retry: [6, 0.2, 1.5] # [attempts, initial_delay, delay_factor], optional
# # subprocess: true # query each sensor in its own subprocess, seem to be Adafruit_DHT issue
# sht75_outside:
# type: sht # requires github.com/mk-fg/sht-sensor
# sysfs_gpio_pin_sck: 30 # must be specified
# sysfs_gpio_pin_data: 60 # must be specified
# # sensor_voltage_spec: 3.5V # either 3.5V or 5V, 3.5V (recommended) is the default
graph:
opts:
full-size-mode: true
width: 1000
height: 500
start: -5h
end:
left-axis-format: '%0.1lf'
'''.replace('\t', ' ')
conf_base = yaml_load(conf_base_str)
### For bencode-like bits below
# Based on old python2 module by Petru Paler, it's not an actual bencode format
# Different from vanilla bencode - encodes none/bool types
# Used only for data_hash below, which itself is only used to generate rrd names from schema
def _ns_class(cls_name, cls_parents, cls_attrs):
func_type = type(lambda: None)
for k, v in cls_attrs.items():
if isinstance(v, func_type): cls_attrs[k] = classmethod(v)
return type(cls_name, cls_parents, cls_attrs)
class BTEError(Exception): pass
class Bencached:
__slots__ = 'bencoded',
def __init__(self, s): self.bencoded = s
class BTE(metaclass=_ns_class):
def decode_int(cls, x, f):
f += 1
newf = x.index(b'e', f)
n = int(x[f:newf])
if x[f] == b'-':
if x[f + 1] == b'0': raise ValueError
elif x[f] == b'0' and newf != f+1: raise ValueError
return n, newf+1
def decode_str(cls, x, f):
colon = x.index(b':', f)
n = int(x[f:colon])
if x[f] == b'0' and colon != f+1: raise ValueError
colon += 1
return (x[colon:colon+n], colon+n)
def decode_list(cls, x, f):
r, f = [], f+1
while x[f] != b'e':
v, f = cls.decode_func[x[f]](cls, x, f)
r.append(v)
return r, f + 1
def decode_dict(cls, x, f):
r, f = {}, f+1
while x[f] != b'e':
k, f = cls.decode_str(x, f)
r[k], f = cls.decode_func[x[f]](cls, x, f)
return r, f + 1
def decode_none(cls, x, f): return None, f+1
decode_func = dict(l=decode_list, d=decode_dict, i=decode_int, n=decode_none)
for n in range(10): decode_func[str(n)] = decode_str
decode_func = dict((k.encode(), v) for k,v in decode_func.items())
def encode_bencached(cls, x, r): r.append(x.bencoded)
def encode_int(cls, x, r): r.extend((b'i', str(x), b'e'))
def encode_float(cls, x, r): r.extend((b'f', struct.pack('!d', x), b'e'))
def encode_bool(cls, x, r):
if x: cls.encode_int(1, r)
else: cls.encode_int(0, r)
def encode_str(cls, x, r):
if isinstance(x, str): x = x.encode()
r.extend((str(len(x)), b':', x))
def encode_list(cls, x, r):
r.append(b'l')
for i in x: cls.encode_func[type(i)](cls, i, r)
r.append(b'e')
def encode_dict(cls, x, r):
r.append(b'd')
for k, v in sorted(x.items()):
r.extend((str(len(k)), b':', k))
cls.encode_func[type(v)](cls, v, r)
r.append(b'e')
def encode_none(cls, x, r): r.append(b'n')
encode_func = {
Bencached: encode_bencached,
str: encode_str,
bytes: encode_str,
int: encode_int,
float: encode_float,
list: encode_list,
tuple: encode_list,
dict: encode_dict,
bool: encode_bool,
type(None): encode_none }
def bdecode(cls, x):
try: r, l = cls.decode_func[x[0]](cls, x, 0)
except (IndexError, KeyError, ValueError) as err:
raise BTEError(f'Not a valid bencoded string: {err}')
if l != len(x):
raise BTEError('Invalid bencoded value (data after valid prefix)')
return r
def bencode(cls, x):
r = list()
cls.encode_func[type(x)](cls, x, r)
return b''.join((v.encode() if isinstance(v, str) else v) for v in r)
def b64(data):
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
def get_uid_token(chars=4):
assert chars * 6 % 8 == 0, chars
return b64(os.urandom(chars * 6 // 8))
def data_hash(data, n=20, **kws):
data = _data_hash(data, **kws)
if not n or n <= 0: return data
assert n <= len(data), [n, len(data)]
return data[:n]
def _data_hash(data, as_bencode=False, as_hex=False):
data = BTE.bencode(data)
if as_bencode: return data
data = hashlib.sha256(data)
if as_hex: return data.hexdigest()
return b64(data.digest())
class TSValueError(Exception): pass
_short_ts_days = dict(y=365.2422, yr=365.2422, mo=30.5, w=7, d=1)
_short_ts_s = dict(h=3600, hr=3600, m=60, min=60, s=1, sec=1)
def _short_ts_regexp():
rex = list('^-?')
for k in [*_short_ts_days, *_short_ts_s]:
rex.append(fr'(?P<{k}>\d+{k}\s*)?')
return re.compile(''.join(rex), re.I | re.U)
_short_ts_regexp = _short_ts_regexp()
def ts_diff_parse(spec, t=float, as_td_obj=False):
try: spec = float(spec)
except ValueError: pass
else: return t(spec)
spec = spec.strip()
if re.search(r'^[\d.]+$', spec): spec = f'{spec}s'
m = _short_ts_regexp.search(spec)
if not m: raise ValueError(spec)
delta, delta_any = list(), False
parse_int = lambda v: int(''.join(c for c in v if c.isdigit()))
for units in [_short_ts_days, _short_ts_s]:
val = 0
for k, v in units.items():
try:
if not m.group(k): continue
n = parse_int(m.group(k))
except IndexError: continue
delta_any, val = True, val + n * v
delta.append(val)
if not delta_any:
raise ValueError(f'Invalid time delta specification: {spec!r}')
td = dt.timedelta(*delta)
return td if as_td_obj else t(td.total_seconds())
def ts_diff_repr( ts_or_delta, ts0=None, ext=None, absolute_diff=False,
_units=dict( h=3600, m=60, s=1,
y=365.2422*86400, mo=30.5*86400, w=7*86400, d=1*86400 ) ):
if isinstance(ts0, (int, float)): ts0 = dt.datetime.fromtimestamp(ts_or_delta)
assert ts0 is None or isinstance(ts0, (int, float, dt.datetime)), ts0
if isinstance(ts_or_delta, (int, float)):
ts_or_delta = dt.timedelta(seconds=ts_or_delta)\
if not ts0 else dt.datetime.fromtimestamp(ts_or_delta)
delta = (ts_or_delta - (ts0 or dt.datetime.now()))\
if not isinstance(ts_or_delta, dt.timedelta) else ts_or_delta
if absolute_diff: delta = abs(delta)
res, s = list(), delta.total_seconds()
for unit, unit_s in sorted(_units.items(), key=op.itemgetter(1), reverse=True):
val = math.floor(s / unit_s)
if not val: continue
res.append(f'{val:.0f}{unit}')
if len(res) >= 2: break
s -= val * unit_s
if not res: return 'now'
else:
if ext: res.append(ext)
return ' '.join(res)
def get_uptime():
with open('/proc/uptime', 'rb') as src: return float(src.readline().split()[0])
@contextlib.contextmanager
def safe_replacement(path):
kws = dict( delete=False,
dir=os.path.dirname(path), prefix=os.path.basename(path)+'.' )
with tempfile.NamedTemporaryFile(**kws) as tmp:
try: mode = stat.S_IMODE(os.stat(path).st_mode)
except (OSError, IOError): mode = None
try:
yield tmp
tmp.flush()
if mode is not None: os.fchmod(tmp.fileno(), mode)
os.rename(tmp.name, path)
except safe_replacement.cancel: pass
finally:
try: os.unlink(tmp.name)
except (OSError, IOError): pass
class CancelFileReplacement(Exception): pass
safe_replacement.cancel = CancelFileReplacement
class RRD:
# DS:ds-name:GAUGE | COUNTER | DERIVE | ABSOLUTE:heartbeat:min:max
# RRA:AVERAGE | MIN | MAX | LAST:xff:steps:rows
ds_types = 'gauge', 'counter', 'derive', 'absolute' # "compute" is different
rra_cfs = 'average', 'min', 'max', 'last'
def __init__(self, storage_dir, name, timesync=None, log=None):
self.storage_dir, self.name = pl.Path(storage_dir), name
self._path = self._dss = None
self.log = log or logging.getLogger(f'rrd.{id(self):x}')
self.timesync = timesync
if timesync:
timesync.path = self.storage_dir / timesync.path
if timesync.offset: timesync.offset = ts_diff_parse(timesync.offset)
self.timesync_file_check()
def ds_for_sensor_metric(self, sensor_name, metric):
assert '__' not in sensor_name + metric, [sensor_name, metric]
return f'{sensor_name}__{metric}'
def sensor_metric_for_ds(self, ds): return tuple(ds.split('__'))
def timesync_file_write(self):
if not self.timesync: return
uptime, ts = get_uptime(), time.time()
if self.timesync.offset: ts += self.timesync.offset
with safe_replacement(self.timesync.path) as dst:
dst.write(f'{uptime!r} {ts!r}\n'.encode())
def timesync_file_check(self):
if not self.timesync or not self.timesync.check: return
try:
with open(self.timesync.path, 'rb') as src:
src_uptime, src_ts = map(float, src.read().split())
except IOError as err:
if err.errno != errno.ENOENT: raise
except Exception as err:
self.log.warn( 'Failed to process/check timestamp/uptime'
' values from timesync file %r: %s', self.timesync.path, err )
else:
uptime, ts = get_uptime(), time.time()
if src_uptime > uptime and src_ts > ts:
err_msg = ( f'Detected uptime roll-over ({src_uptime:.1f} -> {uptime:.1f}) with'
f' backwards time (delta: {abs(src_ts - ts):.1f}), this will cause a lot of issues' )
if self.timesync.check_fatal: raise RuntimeError(err_msg)
else: log.error(err_msg)
@property
def path(self):
assert self._path, ( '"ensure_schema" method must'
' be called before any kind of access/manipulation methods' )
return self._path
@staticmethod
def _path_meta(path):
return f'{path}.meta'
@property
def path_meta(self):
return self._path_meta(self.path)
def ensure_schema(self, step, dss, rras, symlink_to=None, cleanup_td=None):
assert dss and rras, [dss, rras]
for ds in dss:
if 'name' not in ds: ds['name'] = self.ds_for_sensor_metric(*ds['src'])
assert ds['t'] in self.ds_types, ds['t']
schema = dict(step=step, dss=dss, rras=rras)
self._path = self.storage_dir / f'{self.name}.{data_hash(schema, n=10)}.rrd'
self._dss = list(ds['name'] for ds in dss)
assert len(set(self._dss)) == len(self._dss), dss
if not self.path.exists():
self.log.debug('Creating new rrd database: %r', self.path)
self.storage_dir.mkdir(parents=True, exist_ok=True)
rrd_clause = lambda t, *a: opts.append(':'.join(map(str, [t.upper()] + list(a))))
opts = [str(self.path), '--step', str(step)]
for ds in dss:
rrd_clause( 'ds', ds['name'], ds['t'].upper(),
int(ds['hb']), ds.get('min') or 'U', ds.get('max') or 'U' )
for rra in rras:
rrd_clause( 'rra', rra['cf'].upper(),
f'{rra["xff"]:.2f}', rra['steps'], rra['rows'] )
rrdtool.create(opts)
with open(self.path_meta, 'w') as dst:
dst.write('### Auto-generated on database creation\n'
'### Only some timestamps are used for dir cleanup purposes\n')
yaml_dump(dict(schema=schema, ts=dict(created=time.time())), dst)
if symlink_to:
symlink_to = self.storage_dir / symlink_to
try: dst = pl.Path(os.readlink(symlink_to))
except OSError: dst = None
if not dst or self.path.resolve() != dst.resolve():
if symlink_to.exists(): os.unlink(symlink_to)
os.symlink(self.path.resolve(), symlink_to)
if cleanup_td is not None:
ts = time.time()
for p in self.storage_dir.glob(f'{self.name}.*.rrd'):
if p.resolve() == self.path.resolve(): continue
p_meta = self._path_meta(p)
try:
ts_p = os.stat(p).st_mtime
try: ts_p = max(ts_p, os.stat(p_meta).st_mtime)
except (OSError, IOError): pass
except (OSError, IOError): continue
if ts_p < ts - cleanup_td:
self.log.debug('Removing old rrd file: %r', p)
for p in p, p_meta:
try: os.unlink(p)
except (OSError, IOError): pass
def load_schema(self, path):
self._path = (self.storage_dir / path).resolve()
paths = self.path, self.path_meta
assert all(p.exists() for p in paths), paths
def update(self, vals_dict, ts=None):
'vals_dict should be {(sensor_name, metric): value, ...}.'
if not vals_dict: return
if ts is None: ts = time.time()
entry = dict((self.ds_for_sensor_metric(*k), v) for k, v in vals_dict.items())
entry_str = ':'.join([str(ts), *(str(entry.pop(k, 'U')) for k in self._dss)])
assert not entry, entry # some unknown keys left-over
self.timesync_file_write()
rrdtool.update(str(self.path), entry_str)
def fetch_last_update(self):
'Returns ts and dict with values stored on last db update.'
# There is no way to do it through API atm, sadly
lines = subprocess.check_output(
['rrdtool', 'lastupdate', self.path], close_fds=True ).splitlines()
ts, keys = None, list(self.sensor_metric_for_ds(k) for k in lines[0].split())
vals = dict.fromkeys(keys)
for line in lines[1:]:
if not line.strip(): continue
ts, line = line.split(':', 1)
line = line.split()
assert len(line) == len(keys), [keys, line]
for k, v in zip(keys, line):
try: vals[k] = float(v)
except ValueError: pass # assuming unknown
if ts: ts = float(ts)
return ts, vals
def fetch_last_values(self, time_delta, cf='average'):
'''Returns a dict with average value for each metric for specified time delta.
"cf" argument specifies consolidation function to pass to rrdtool, used for
picking appropriate RRAs, but values returned from rrdtool will still be averaged.'''
assert cf in self.rra_cfs, cf
td = int(round(ts_diff_parse(time_delta)))
(rra_ts0, rra_ts1, rra_step), keys, vals = rrdtool.fetch(
self.path, cf.upper(), '--resolution', str(td), '--start', f'-{td}' )
vals = dict(( self.sensor_metric_for_ds(k),
list(v[n] for v in vals if v[n] is not None) ) for n,k in enumerate(keys))
for k, vs in vals.items(): # average what's left there
vals[k] = (sum(vs) / len(vs)) if vs else None
return vals
def get_meta(self):
return LayeredAttrDict(yaml_load(self.path_meta))
def get_info(self):
info_tree = LayeredAttrDict()
for k, v in rrdtool.info(self.path).items():
# Parse stuff like "rra[1].cdp_prep[0].value" and "ds[speed].minimal_heartbeat"
br, keys = info_tree, cs.deque(k.split('.'))
while keys:
kk = keys.popleft()
m = re.search(r'^(?P<n>[\w\d_]+)(?:\[(?P<k>[\w\d_]+)\])?', kk)
if not m: raise KeyError(k)
mn, mk = m.group('n'), m.group('k')
br_prev, br = br, br.setdefault(mn, dict())
if mk:
assert mk not in br, [k, mk, br]
keys.append(mk)
elif not keys: br_prev[mn] = v
return info_tree
color_scale_c10 = ( '#1f77b4 #ff7f0e #2ca02c' # d3.scale.category10
' #d62728 #9467bd #8c564b #e377c2 #7f7f7f #bcbd22 #17becf' ).split()
def graph( self, metrics, sensors, image_path=None,
legend_style='stats-table',
hrules=None, colors=None, units=None, alt_axis=None,
perc=None, perc_direction='+-', defs_table_len=100, **cli_opts ):
# http://oss.oetiker.ch/rrdtool/gallery/index.en.html
# http://oss.oetiker.ch/rrdtool/doc/rrdgraph.en.html#IOPTIONS
# http://oss.oetiker.ch/rrdtool/doc/rrdgraph_graph.en.html
# http://oss.oetiker.ch/rrdtool/doc/rrdgraph_data.en.html
# http://oss.oetiker.ch/rrdtool/doc/rrdgraph_rpn.en.html
# http://oss.oetiker.ch/rrdtool/doc/rrdgraph_examples.en.html
if isinstance(metrics, str): metrics = [metrics]
if image_path is None:
image_path = tempfile.NamedTemporaryFile(
prefix='rrd.chart.', suffix='.png', delete=False ).name
assert legend_style == 'stats-table', legend_style
opts = dict( # sane-ish defaults
width=1000, height=500,
start='-24h', end=None, imgformat='PNG',
pango_markup=True, font='LEGEND:7:monospace' )
if alt_axis:
m = alt_axis['metric']
opts.update(
right_axis_label=f'{m}, {sensors[m][0]}',
right_axis_format=alt_axis.get('format'),
right_axis_formatter=alt_axis.get('formatter'),
right_axis=f'{alt_axis.get("scale", 1)}:{alt_axis.get("shift", 0)}' )
opts.update(cli_opts or dict())
colors = iter(colors or self.color_scale_c10)
assert not set(perc_direction).difference('+-'), perc_direction
defs = ['TEXTALIGN:left']
def def_label(sn, cf=None, length=3, _src=list()):
if not _src: _src.extend([iter(range(2**30)), string.ascii_lowercase])
u, un, us, ul = '', next(_src[0]), _src[1], len(_src[1])
for n in range(length): u, un = us[un % ul] + u, un // ul
return f'{sn}_{cf or ""}_{u}'
def defs_add(sn, cfs, fmt=None, def_t=None):
if isinstance(cfs, str): cfs = [cfs]
if not def_t: def_t = 'cdef' if len(cfs) > 1 else 'vdef'
gprint, sn_base = None, sn
for cf in cfs:
if isinstance(cf, str): cf = [def_t, cf]
(cf_t, cf), k = cf, def_label(sn_base)
if '{sn}' not in cf: cf = '{{sn}},{}'.format(cf.replace('{', '{{').replace('}', '}}'))
defs.append(f'{cf_t.upper()}:{k}={cf}'.format(sn=sn))
if fmt: gprint = 'GPRINT:{k}:{fmt}'.format(k=k, fmt=fmt.replace(':', r'\:'))
sn, k = k, None
if gprint: defs.append(gprint)
defs.append(r'COMMENT:{}\n'.format('-'*defs_table_len))
msusn = list()
for m in metrics:
units, ss = sensors[m]
if 'vertical_label' not in opts: opts['vertical_label'] = f'{m}, {units}'
sns = list(f'{s}_{m}' for s in ss)
for s, sn in zip(ss, sns): msusn.append((m, s, units, sn))
sn_len, u_len = (max(len(v[n]) for v in msusn) for n in [3, 2])
for m, s, units, sn in msusn:
# Line
defs.append(f'DEF:{sn}={self.path}:{s}__{m}:AVERAGE')
defs.append(f'LINE1:{sn}{next(colors)}:{sn}')
# Legend
sn_pad, u_pad = sn_len - len(sn), u_len - len(units)
defs.append('COMMENT:{}{}{}'.format(' '*sn_pad, units, ' '*u_pad))
for label, cf in [
('last', 'LAST'), ('avg', 'AVERAGE'),
('min', 'MINIMUM'), ('max', 'MAXIMUM') ]:
vdef = f'{sn}_{label}'
defs_add(sn, cf, f'<small>{label.title()}:</small> %4.1lf')
for p in perc or list():
if p.endswith('σ'): pn, p = p, {'1σ': 68.2, '2σ': 95.4, '3σ': 99.7}[p]
else: pn = f'{p}%'
if len(perc_direction) == 1:
pd = perc_direction[0]
pn += pd
if pd == '+':
defs_add( sn,
f'{p},PERCENTNAN',
f'<small>{pn}:</small> %4.1lf' )
else:
defs_add( sn,
['-1,*', ['vdef', f'{p},PERCENTNAN']],
f'<small>{pn}:</small> %5.1lf' )
else:
defs_add( sn,
['-1,*', ['vdef', f'{p},PERCENTNAN']],
f'<small>{pn}:</small> %5.1lf' )
defs_add(sn, f'{p},PERCENTNAN', '%4.1lf')
defs.append(r'COMMENT:\n')
defs.append(r'COMMENT:{}\n'.format('-'*defs_table_len))
for hrule in hrules or list():
if not isinstance(hrule, dict): hrule = dict(v=hrule)
hrule = LayeredAttrDict(hrule)
if not hrule.get('c'): hrule['c'] = next(colors)
d = f'HRULE:{hrule["v"]}{hrule["c"]}'
if hrule.get('legend'): d += f':{hrule["legend"]}'
if hrule.get('dashes'):
d += ':dashes{}'.format(
'' if hrule['dashes'] is True else f'={hrule["dashes"]}' )
if hrule.get('dash_offset'): d += f':dash-offset={hrule["dash_offset"]}'
defs.append(d)
args = [str(image_path)]
for k, v in opts.items():
if v in [None, False]: continue
if len(k) != 1:
args.append(''.join(['--', k.replace('_', '-'), '' if v is True else f'={v}']))
else:
args.append(f'-{k}')
if v is not True: args.append(str(v))
rrdtool.graph(*map(str, args + defs))
return image_path
def init_rrd(sensors_interval, sensors, conf):
rrdb_timesync = conf.timesync_file.enabled and conf.timesync_file
rrdb = RRD(conf.storage_dir, conf.name, rrdb_timesync)
ds_step = ts_diff_parse(conf.step, t=int)
ds_hb = ts_diff_parse(conf.heartbeat, t=int)
if ds_hb < sensors_interval * 1.2:
raise AssertionError(
f'rrd.heartbeat ({conf.heartbeat:.2f}s) must be at least 20%'
f' greater than interval between measurements ({sensors_interval:.2f}s)' )
dss = list()
for k,s in sensors.items():
assert re.search(r'^[a-zA-Z0-9_]+$', k), k
for sk,sv in s.values.items():
dss.append(dict(
min=sv.get('min'), max=sv.get('max'),
src=(k, sk), t=sv.t, hb=ts_diff_parse(conf.heartbeat, t=int) ))
rra_step = ts_diff_parse(conf.aggregate.step, t=int)
if rra_step % ds_step != 0:
raise AssertionError(
f'Aggregation (RRA) step ({conf.aggregate.step!r})'
f' must be a multiple of data source (DS) step ({conf.step!r})' )
rras =[dict(
xff=conf.aggregate.xff, cf='average',
steps=int(round(rra_step / ds_step)),
rows=ts_diff_parse(conf.aggregate.keep, t=int) // rra_step + 1 )]
rrdb.ensure_schema( step=ds_step,
dss=dss, rras=rras, symlink_to=conf.current_symlink,
cleanup_td=conf.cleanup_after and ts_diff_parse(conf.cleanup_after) )
return rrdb
def rrd_conf_graph( conf, rrdb,
metrics=None, ss=None, opts=None, threshold=None, path=None ):
if not metrics: metrics = sensors.metric_default
if isinstance(metrics, str): metrics = [metrics]
ss = dict( (m, (sensors.units[m], ss)) for m, ss in
sensors.filter_by_metrics(metrics, ss or conf.meas.sensors).items() )
alt_axis = dict(metric=metrics[1], scale=1, shift=1)\
if len(metrics) > 1 and metrics[1] in ss else None
graph_opts = conf.graph.opts.copy()
graph_opts.update( perc=['1σ', '2σ'],
title='{} history'.format(' / '.join(sensors.metrics[m].title() for m in metrics)) )
for d in opts or list():
if not d.strip().startswith('{'): d = f'{{{d}}}'
graph_opts.update(yaml_load(d))
hrules = list()
for spec in threshold or list():
desc, v = spec.split(':', 1)
v = float(v)
hrules.append(dict( v=v, dashes=True,
legend=f'{desc} threshold ({sensors.value_repr(v, metrics[0])})' ))
image_path = rrdb.graph( metrics, ss,
image_path=path, hrules=hrules, alt_axis=alt_axis, **graph_opts )
return image_path
class HTTPPollExit(Exception): pass
def httpd_poller(conf, rrdb, bind, http_opts=None, http_opts_allow=False):
import http.server, urllib.parse, socketserver, select, mimetypes, base64
httpd_log = log or logging.getLogger('httpd')
@contextlib.contextmanager
def rrd_graph(*args, **kws):
image_path = rrd_conf_graph(conf, rrdb, *args, **kws)
try: yield image_path
finally:
try: os.unlink(image_path)
except (OSError, IOError): pass
class HTTPReqHandler(http.server.BaseHTTPRequestHandler):
req_id = None
def setup(self):
self.req_id = get_uid_token(4)
super().setup()
def log_message(self, fmt, *args):
httpd_log.debug( 'http-req[%s] :: %s - - [%s] %s', self.req_id,
self.client_address[0], self.log_date_time_string(), fmt % args )
def do_GET(self):
params = self.path.split('/', 3)[1:3]
if self.path == '/favicon.ico': return ''
try: metrics, opts_ext = list(urllib.parse.unquote_plus(p) for p in params)
except ValueError:
try: (metrics,), opts_ext = params, None
except ValueError: metrics = opts_ext = None
opts = list(http_opts or list())
if http_opts_allow and opts_ext: opts.append(opts_ext)
if metrics:
if not re.search(r'^[-a-zA-Z0-9_ ]+$', metrics):
return self.send_error( 400,
f'Bad request - invalid metric name spec: {metrics!r}' )
metrics = metrics.replace('-', ' ').split(' ')
with rrd_graph(metrics=metrics, opts=opts) as image_path:
ct, enc = mimetypes.guess_type(image_path)
if not ct: ct = 'image/png'
with open(image_path, 'rb') as src:
src.seek(0, os.SEEK_END)
self.send_response(200)
self.send_header('Content-Type', ct)
self.send_header('Content-Length', src.tell())
self.end_headers()
src.seek(0)
for chunk in iter(ft.partial(src.read, 2**20), b''): self.wfile.write(chunk)
class ReusePortTCPServer(socketserver.TCPServer): allow_reuse_address = True
httpd, poller = ReusePortTCPServer(bind, HTTPReqHandler), select.epoll()
poller.register(httpd.fileno(), select.EPOLLIN)
def httpd_poller(delay):
ts = time.time()
ts1 = ts + delay
while True:
delay = ts1 - ts
if delay <= 0: break
try: events = poller.poll(delay)
except KeyboardInterrupt: raise HTTPPollExit
for fd, ev in events:
req, addr = httpd.get_request()
httpd.verify_request(req, addr)
try: httpd.process_request(req, addr)
except Exception as err:
httpd_log.exception( 'Error when processing request'
' from %s: [%s] %s', addr, err.__class__.__name__, err )
httpd.shutdown_request(req)
ts = time.time()
return httpd_poller
class RawMemSlice:
# XXX: untested leftover code from old python2 version, likely needs to be updated
# Read/write always normalize-to/expect LE bytes
# All units are bytes too, not words or pages
wf = 'I'
w = struct.calcsize(wf) # size of a word that will be used for reading/writing
wm = ~(w - 1)
def __init__(self, addr, n):
import mmap
self.addr_base = addr & ~(mmap.PAGESIZE - 1)
self.addr_offset = addr - self.addr_base
self.mem_len = mmap.PAGESIZE * int(math.ceil(
(n + self.addr_offset) / mmap.PAGESIZE ))
self.fd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC)
self.mem = mmap.mmap( self.fd, self.mem_len,
mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE, offset=self.addr_base )
def __del__(self):
try: os.close(self.fd)
except: pass
def read(self, n=None, offset=0, unpack=False):
if n is None: n = self.w
seek = self.addr_offset & self.wm
offset += self.addr_offset - seek
self.mem.seek(seek)
buff = list()
for i in range(int(math.ceil((offset + n) / self.w))):
word, = struct.unpack(self.wf, self.mem.read(self.w))
buff.append(struct.pack('<I', word))
buff = ''.join(buff)[offset:offset+n+1]
if unpack:
assert n == self.w, [n, self.w]
buff, = struct.unpack(self.wf, buff)
return buff
def write(self, buff, offset=0):
if isinstance(buff, int): buff = struct.pack(self.wf, buff)
self.mem.seek(self.addr_offset + offset)
self.mem.write(buff)
class InvalidPlatform(Exception): pass
def detect_rpi():
if not hasattr(detect_rpi, 'cache'):
import re
with open('/proc/cpuinfo', 'r') as src: cpuinfo = src.read()
m = re.search(r'^Revision\s*:\s*([\da-f]{4})$', cpuinfo, re.MULTILINE | re.I)
if not m: raise OSError(cpuinfo)
# https://elinux.org/RPi_HardwareHistory
rev, rev_int = m.group(1).lower(), int(m.group(1), 16)
if rev_int <= 0x15 or rev in '900021 900032'.split(): hw = 'rpi1'
elif rev in 'a01040 a01041 a21041 a22042'.split(): hw = 'rpi2'
elif rev in '900092 900093 920093 9000c1'.split(): hw = 'rpi0'
elif rev in 'a02082 a020a0 a22082 a32082 a020d3'.split(): hw = 'rpi3'
else: raise OSError(rev, cpuinfo)
detect_rpi.cache = hw
return detect_rpi.cache
def rpi_set_internal_resistor_state(bcm_gpio_n, state, platform=False):
'Resistor state can be "up", "down" or "off".'
state_val = dict(off=0, down=1, up=2)[state]
addr_bcm_peri_base = dict(rpi1=0x20000000, rpi2=0x3f000000)
if not platform: platform = detect_rpi()
if not platform or platform not in addr_bcm_peri_base:
raise InvalidPlatform( 'Internal pull-up control for'
f' anything but RPi/RPi2 is not supported, platform: {platform!r}' )
addr_gpio_base = addr_bcm_peri_base[platform] + 0x200000
addr_gpio_pull = addr_gpio_base + 37 * RawMemSlice.w
# addr_gpio_pull_clk = addr_gpio_pull + 1 * RawMemSlice.w
mem = RawMemSlice(addr_gpio_pull, RawMemSlice.w * 2)
# print(mem.read(unpack=True))
# print(' '.join(hex(ord(c)) for c in mem.read(4)))
if not isinstance(bcm_gpio_n, (tuple, list)):
bcm_gpio_n = [bcm_gpio_n]
pin_bitmask = ft.reduce( op.or_,
map(lambda n: 1<<n, bcm_gpio_n), 0 )
mem.write(state_val)
time.sleep(2e-6)
mem.write(pin_bitmask, RawMemSlice.w)
time.sleep(2e-6)
mem.write(0)
mem.write(0, RawMemSlice.w)
def gpio_init(sysfs_gpio_n, state, path_gpio=pl.Path('/sys/class/gpio')):
if not state: return
pin_str = str(int(sysfs_gpio_n))
m = re.search(r'^(in|high|low)(?:-(up|down|off))?$', state)
assert m, state
d, pull = m.groups()
log.debug('Initializing gpio pin %s into state: %s (pull: %s)', pin_str, d, pull)
d_path = path_gpio / f'gpio{pin_str}/direction'
if not d_path.exists():
with (path_gpio / 'export').open('wb') as dst: dst.write(pin_str.encode())
with d_path.open('wb') as dst: dst.write(d.encode())
if pull: rpi_set_internal_resistor_state(sysfs_gpio_n, pull)
class SensorDisabled(Exception): pass
class Sensor:
values = LayeredAttrDict() # must be overidden
disabled = True
def __init__(self, conf, log=None):
self.conf, self.log = conf, log
if not self.log:
log = self.__class__.__name__
if log.startswith('Sensor'): log = log[6:]
self.log = logging.getLogger(f'sensor.{log}.{id(self):x}')
disabled = self.conf.get('disabled')
if disabled is not None: self.disabled = bool(disabled)
def __repr__(self):
return ( '<{1.__class__.__name__} {0:x},'
' disabled: {1.disabled}, values: {1.values}>' ).format(id(self), self)
def read_keys(self, *ks):
return op.itemgetter(ks)(self.read())
def read(self):
if self.disabled: raise SensorDisabled()
data = self.read_raw() or dict()
# self.log.debug('Raw data: %r', data)
for k, v in data.items():
ka = self.values[k]
if not isinstance(v, (int, float)):
del data[k]
continue
v_min, v_max = (ka.get(k) for k in ['min', 'max'])
if v_min is not None: v = max(v_min, v)
if v_max is not None: v = min(v_max, v)
data[k] = v
return data
def read_raw(self):
raise NotImplementedError
class SensorRPiSoC(Sensor):
# XXX: untested leftover code from old python2 version, likely needs to be updated
disabled = False
values = LayeredAttrDict(t=dict(min=0, max=100, t='gauge'))
path = '/sys/class/thermal/thermal_zone0/temp'
def __init__(self, conf, log=None):