-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
184 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import os, sys, io, re, contextlib as cl | ||
|
||
ep_map_default = ''' | ||
# "<prefix/net/addr> <replacement>" pairs go here, newline/comma separated | ||
# Exact-match full address should end with "/". Example: 1.2 mynet, 1.2.3.4/ myaddr | ||
127.0.0 lo4, :: lo6. | ||
''' | ||
|
||
def main(args=None): | ||
import argparse, textwrap | ||
dd = lambda text: re.sub( r' \t+', ' ', | ||
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ') | ||
parser = argparse.ArgumentParser( | ||
formatter_class=argparse.RawTextHelpFormatter, | ||
usage='%(prog)s [opts] [lines]', | ||
description=dd(''' | ||
Regexp-replacer for addrs/nets in tcpdump output for regular packets. | ||
Expects "tcpdump -ln" output piped to stdin, passing it through to stdout. | ||
Only matches and tweaks regular TCP/UDP/ICMP packet lines, | ||
replacing networks/addresses there, passing unmatched lines through as-is.''')) | ||
parser.add_argument('lines', nargs='*', help=dd(''' | ||
Specific tcpdump lines to parse and exit, instead of default stdin-mode. | ||
Intended for spot-checks of lines in already-collected output or testing.''')) | ||
parser.add_argument('-m', '--map-file', metavar='file', help=dd(''' | ||
File with a list of prefixes, networks or addresses or replace/translate. | ||
%%-prefixed file descriptor number can be used to read map from there (e.g. %%3). | ||
Format is newline/comma separated "<prefix/net/addr> <replacement>" pairs, | ||
with any empty lines or #-comments ignored. | ||
Each src/dst address from tcpdump is matched as "<address>/" against | ||
all <prefix/net/addr> values, in longest-first order, with first match replacing | ||
the matched part with <replacement>. | ||
Example patterns: | ||
127.0.0 lo4 # 127.0.0.1 -> lo4.1, 127.0.0.37 -> lo4.37, etc | ||
:: lo6. # ::1 -> lo6.1, ::37 -> lo6.37, fe80::123 - no match, doesn't start with :: | ||
1.2.3.4/ me # 1.2.3.4 -> me, but won't match 1.2.3.44 due to / at the end''')) | ||
parser.add_argument('-f', '--filter', action='store_true', help=dd(''' | ||
Print only lines that match TCP/UDP/ICMP packets and some pattern in -m/--map-file. | ||
Intended for filtering tcpdump output by a list of known addrs/nets easily.''')) | ||
parser.add_argument('-e', '--err-fail', action='store_true', help=dd(''' | ||
Fail on first <translate-fail> error, instead of passing those through with suffix.''')) | ||
parser.add_argument('-t', '--truncate', type=int, metavar='chars', help=dd(''' | ||
Truncate all passed-through lines to specified length. Default: terminal width.''')) | ||
opts = parser.parse_args(sys.argv[1:] if args is None else args) | ||
|
||
@cl.contextmanager | ||
def in_file(path): | ||
if not path or path == '-': return (yield sys.stdin) | ||
if path[0] == '%': path = int(path[1:]) | ||
with open(path) as src: yield src | ||
|
||
if opts.map_file: | ||
with in_file(opts.map_file) as src: ep_map = src.read() | ||
else: ep_map = ep_map_default | ||
ep_map = ep_map.split('\n') | ||
for n, line in enumerate(ep_map): | ||
ls = ep_map[n] = line.strip() | ||
if (m := ls.find('#')) >= 0: ep_map[n] = ls[:m].strip() | ||
ep_map = '\n'.join(filter(None, ep_map)).replace(',', '\n').split('\n') | ||
for n, ls in enumerate(ep_map): | ||
if not ls: continue | ||
try: pre, repl = ls.split() | ||
except: parser.error(f'Invalid "<addr/net> <repl>" line in -m/--map-file: {ls!r}') | ||
ep_map[n] = pre, repl | ||
ep_map.sort(key=lambda ab: (len(ab[0]), ab), reverse=True) | ||
|
||
if not opts.lines: tcpdump = sys.stdin.buffer | ||
else: tcpdump = io.BytesIO('\n'.join(opts.lines).encode()); tcpdump.seek(0) | ||
|
||
trunc = opts.truncate | ||
if not trunc: trunc = os.get_terminal_size().columns | ||
|
||
def _ep(addr, has_port=False): | ||
if has_port: addr, port = addr.rsplit('.', 1) | ||
addr += '/' | ||
for pre, repl in ep_map: | ||
if addr.startswith(pre): addr = repl + addr[len(pre):]; break | ||
addr = addr.rstrip('/') | ||
if has_port: addr += f'.{port}' | ||
return addr | ||
|
||
re_pkt = re.compile( r'(?P<ts>\d\d:\d\d:\d\d\.\d+)' | ||
r'(?P<any> +(?P<iface>\S+) +(?P<op>P|In|Out))?' | ||
r'(?P<addrs> +(?P<af>IP|IP6) +(?P<src>[\da-f:.]+) +\> +(?P<dst>[\da-f:.]+):)' | ||
r'(?P<p> (?P<proto>UDP|TCP|ICMP|ICMP6|Flags))(?P<tail>.*)' ) | ||
re_icmp = re.compile(r'\b(who has|tgt is) (?P<addr>[\da-f:.]+),') | ||
|
||
while line := tcpdump.readline(): | ||
line = line.decode(errors='surrogateescape').rstrip() | ||
if m := re_pkt.fullmatch(line): | ||
src0 = src = m['src']; dst0 = dst = m['dst'] | ||
suff, has_port = '', m['proto'] in ['TCP', 'UDP'] | ||
try: src, dst = (_ep(m[k], has_port) for k in ['src', 'dst']) | ||
except: | ||
if opts.err_fail: raise | ||
suff = ' <translate-fail>' | ||
ep_match = not (src == src0 and dst == dst0) | ||
addrs = f' {m["af"]} {src} > {dst}:' | ||
line = m.expand(fr'\g<ts>\g<any>{addrs}\g<p>\g<tail>') | ||
if m['proto'] in ['ICMP', 'ICMP6'] and (m2 := re_icmp.search(line)): | ||
try: addr = _ep(m2['addr']) | ||
except: | ||
if opts.err_fail: raise | ||
suff = ' <icmp-translate-fail>' | ||
ep_match = ep_match or addr != m2['addr'] | ||
line = line[:m2.start()] + m2.expand(f'{m2[1]} {addr},') + line[m2.end():] | ||
if suff: line += suff; ep_match = True | ||
if opts.filter and not ep_match: continue | ||
print(line[:trunc]) | ||
|
||
if __name__ == '__main__': | ||
try: sys.exit(main()) | ||
except KeyboardInterrupt: sys.exit(1) | ||
except BrokenPipeError: # stdout pipe closed | ||
os.dup2(os.open(os.devnull, os.O_WRONLY), sys.stdout.fileno()) | ||
sys.exit(1) |