From 056a0dd7e25f1cd85856e336e8229710eb58c37b Mon Sep 17 00:00:00 2001 From: Alex Zhukov Date: Mon, 6 Jun 2022 20:17:19 -0700 Subject: [PATCH] block/allow interfaces --- src/aioice/ice.py | 24 ++++++++++++++++++++---- tests/test_ice.py | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/src/aioice/ice.py b/src/aioice/ice.py index f33a8dc..b057cfd 100644 --- a/src/aioice/ice.py +++ b/src/aioice/ice.py @@ -2,6 +2,7 @@ import copy import enum import ipaddress +import fnmatch import logging import random import secrets @@ -61,13 +62,20 @@ def candidate_pair_priority( D = ice_controlling and remote.priority or local.priority return (1 << 32) * min(G, D) + 2 * max(G, D) + (G > D and 1 or 0) - -def get_host_addresses(use_ipv4: bool, use_ipv6: bool) -> List[str]: +def get_host_addresses(use_ipv4: bool, use_ipv6: bool, block_list: Optional[List[str]] = None, + allow_list: Optional[List[str]] = None) -> List[str]: """ Get local IP addresses. """ addresses = [] - for interface in netifaces.interfaces(): + interfaces = netifaces.interfaces() + if block_list is not None: + blocked_interfaces = [item for blocked in block_list for item in fnmatch.filter(interfaces, blocked)] + interfaces = list(filter(lambda iface: iface not in blocked_interfaces, interfaces)) + if allow_list is not None: + interfaces = [item for allowed in allow_list for item in fnmatch.filter(interfaces, allowed)] + + for interface in interfaces: ifaddresses = netifaces.ifaddresses(interface) for address in ifaddresses.get(socket.AF_INET, []): if use_ipv4 and address["addr"] != "127.0.0.1": @@ -282,6 +290,8 @@ class Connection: :param turn_transport: The transport for TURN server, `"udp"` or `"tcp"`. :param use_ipv4: Whether to use IPv4 candidates. :param use_ipv6: Whether to use IPv6 candidates. + :param block_interfaces: do not allow listed network interfaces + :param allow_interfaces: allow only listed network interfaces """ def __init__( @@ -296,7 +306,11 @@ def __init__( turn_transport: str = "udp", use_ipv4: bool = True, use_ipv6: bool = True, + block_interfaces: Optional[List[str]] = None, + allow_interfaces: Optional[List[str]] = None, ) -> None: + self.allow_interfaces = allow_interfaces + self.block_interfaces = block_interfaces self.ice_controlling = ice_controlling #: Local username, automatically set to a random value. self.local_username = random_string(4) @@ -419,7 +433,9 @@ async def gather_candidates(self) -> None: if not self._local_candidates_start: self._local_candidates_start = True addresses = get_host_addresses( - use_ipv4=self._use_ipv4, use_ipv6=self._use_ipv6 + use_ipv4=self._use_ipv4, use_ipv6=self._use_ipv6, + block_list=self.block_interfaces, + allow_list=self.allow_interfaces ) coros = [ self.get_component_candidates(component=component, addresses=addresses) diff --git a/tests/test_ice.py b/tests/test_ice.py index fb5e308..209ef44 100644 --- a/tests/test_ice.py +++ b/tests/test_ice.py @@ -163,6 +163,47 @@ def test_get_host_addresses(self, mock_ifaddresses, mock_interfaces): addresses, ["1.2.3.4", "2a02:0db8:85a3:0000:0000:8a2e:0370:7334"] ) + @mock.patch("netifaces.interfaces") + @mock.patch("netifaces.ifaddresses") + def test_block_allow_interfaces(self, mock_ifaddresses, mock_interfaces): + mock_interfaces.return_value = ["eth0", "docker0", "docker1"] + mock_ifaddresses.return_value = { + socket.AF_INET: [{"addr": "127.0.0.1"}, {"addr": "1.2.3.4"}], + socket.AF_INET6: [ + {"addr": "::1"}, + {"addr": "2a02:0db8:85a3:0000:0000:8a2e:0370:7334"}, + {"addr": "fe80::1234:5678:9abc:def0%eth0"}, + ], + } + + # IPv4 only + addresses = ice.get_host_addresses(use_ipv4=True, use_ipv6=False, block_list=["docker*"]) + self.assertEqual(addresses, ["1.2.3.4"]) + + # IPv6 only + addresses = ice.get_host_addresses(use_ipv4=False, use_ipv6=True, block_list=["docker*"]) + self.assertEqual(addresses, ["2a02:0db8:85a3:0000:0000:8a2e:0370:7334"]) + + # both + addresses = ice.get_host_addresses(use_ipv4=True, use_ipv6=True, block_list=["docker*"]) + self.assertEqual( + addresses, ["1.2.3.4", "2a02:0db8:85a3:0000:0000:8a2e:0370:7334"] + ) + + # IPv4 only + addresses = ice.get_host_addresses(use_ipv4=True, use_ipv6=False, allow_list=["eth*"]) + self.assertEqual(addresses, ["1.2.3.4"]) + + # IPv6 only + addresses = ice.get_host_addresses(use_ipv4=False, use_ipv6=True, allow_list=["eth*"]) + self.assertEqual(addresses, ["2a02:0db8:85a3:0000:0000:8a2e:0370:7334"]) + + # both + addresses = ice.get_host_addresses(use_ipv4=True, use_ipv6=True, allow_list=["eth*"]) + self.assertEqual( + addresses, ["1.2.3.4", "2a02:0db8:85a3:0000:0000:8a2e:0370:7334"] + ) + @asynctest async def test_close(self): conn_a = ice.Connection(ice_controlling=True)