Skip to content

Commit

Permalink
[parser] zmq is sadly not thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
Sec42 committed Nov 20, 2024
1 parent 1320f6c commit a0ad8f1
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions iridium-parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ def __call__(self, parser, ns, values, option):
cl=[]
sl=[]

poller = None

if args.output == "zmq":
import zmq

Expand All @@ -187,10 +189,13 @@ def __call__(self, parser, ns, values, option):
socket.bind(url)

stats['clients']=0
def zmq_thread(socket, stats):
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

def zmq_xpub(poller, stats):
try:
while True:
event = socket.recv()
while len(rv:=poller.poll(0))>0:
event = rv[0][0].recv()
# Event is one byte 0=unsub or 1=sub, followed by topic
if event[0] == 1:
log("new subscriber for", event[1:])
Expand All @@ -205,10 +210,6 @@ def log(*msg):
s=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
print("%s:"%s,*msg, end=eolnl, file=statsfile)

from threading import Thread
zthread = Thread(target = zmq_thread, args = [socket, stats], daemon= True, name='zmq')
zthread.start()

def stats_thread(stats):
ltime=time.time()
lline=0
Expand Down Expand Up @@ -278,12 +279,14 @@ def do_input():
stats['files']=len(args.remainder)
stats['fileno']=0
for line in fileinput.input(args.remainder, openhook=openhook):
if args.do_stats:
if args.do_stats or poller is not None:
if fileinput.isfirstline():
stats['fileno']+=1
stat=os.fstat(fileinput.fileno())
stats['size']=stat.st_size
stats['in']+=1
if poller is not None and len(poller.poll(0))>0:
zmq_xpub(poller, stats)
if args.min_confidence is not None:
q=bitsparser.Message(line.strip())
try:
Expand Down

0 comments on commit a0ad8f1

Please sign in to comment.