-
Notifications
You must be signed in to change notification settings - Fork 0
/
ext_trigger.py
executable file
·62 lines (51 loc) · 1.44 KB
/
ext_trigger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python
# Copyright (C) 2015 Vladimir Nadvornik
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
import numpy as np
import threading
import subprocess
import atexit
import time
import sys
import logging
from cmd import cmdQueue
log = logging.getLogger()
class ExtTrigger(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.terminating = False
self.bin_name = "./ext_trigger"
self.cmd = None
self.start()
def run(self):
while True:
if self.cmd is None or self.cmd.poll() is not None:
restart = self.cmd is not None
if restart:
log.error("ext_trigger exited with %d\n" % (self.cmd.poll()))
if self.terminating:
return
self.cmd = subprocess.Popen([self.bin_name], close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=1 )
if not restart:
atexit.register(self.terminate)
try:
line = self.cmd.stdout.readline()
cmdQueue.put(line.strip())
log.info("ext trigger %s" % (line))
except:
pass
def terminate(self):
self.terminating = True
if self.cmd is not None and self.cmd.poll() is None:
self.cmd.terminate()
self.join()
if __name__ == "__main__":
t = ExtTrigger()
while True:
time.sleep(10)
#usb.close()