Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block/allow interfaces #57

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/aioice/ice.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import enum
import ipaddress
import fnmatch
import logging
import random
import secrets
Expand Down Expand Up @@ -61,13 +62,20 @@ def candidate_pair_priority(
D = ice_controlling and remote.priority or local.priority
return (1 << 32) * min(G, D) + 2 * max(G, D) + (G > D and 1 or 0)


def get_host_addresses(use_ipv4: bool, use_ipv6: bool) -> List[str]:
def get_host_addresses(use_ipv4: bool, use_ipv6: bool, block_list: Optional[List[str]] = None,
allow_list: Optional[List[str]] = None) -> List[str]:
"""
Get local IP addresses.
"""
addresses = []
for interface in netifaces.interfaces():
interfaces = netifaces.interfaces()
if block_list is not None:
blocked_interfaces = [item for blocked in block_list for item in fnmatch.filter(interfaces, blocked)]
interfaces = list(filter(lambda iface: iface not in blocked_interfaces, interfaces))
if allow_list is not None:
interfaces = [item for allowed in allow_list for item in fnmatch.filter(interfaces, allowed)]

for interface in interfaces:
ifaddresses = netifaces.ifaddresses(interface)
for address in ifaddresses.get(socket.AF_INET, []):
if use_ipv4 and address["addr"] != "127.0.0.1":
Expand Down Expand Up @@ -282,6 +290,8 @@ class Connection:
:param turn_transport: The transport for TURN server, `"udp"` or `"tcp"`.
:param use_ipv4: Whether to use IPv4 candidates.
:param use_ipv6: Whether to use IPv6 candidates.
:param block_interfaces: do not allow listed network interfaces
:param allow_interfaces: allow only listed network interfaces
"""

def __init__(
Expand All @@ -296,7 +306,11 @@ def __init__(
turn_transport: str = "udp",
use_ipv4: bool = True,
use_ipv6: bool = True,
block_interfaces: Optional[List[str]] = None,
allow_interfaces: Optional[List[str]] = None,
) -> None:
self.allow_interfaces = allow_interfaces
self.block_interfaces = block_interfaces
self.ice_controlling = ice_controlling
#: Local username, automatically set to a random value.
self.local_username = random_string(4)
Expand Down Expand Up @@ -419,7 +433,9 @@ async def gather_candidates(self) -> None:
if not self._local_candidates_start:
self._local_candidates_start = True
addresses = get_host_addresses(
use_ipv4=self._use_ipv4, use_ipv6=self._use_ipv6
use_ipv4=self._use_ipv4, use_ipv6=self._use_ipv6,
block_list=self.block_interfaces,
allow_list=self.allow_interfaces
)
coros = [
self.get_component_candidates(component=component, addresses=addresses)
Expand Down
41 changes: 41 additions & 0 deletions tests/test_ice.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,47 @@ def test_get_host_addresses(self, mock_ifaddresses, mock_interfaces):
addresses, ["1.2.3.4", "2a02:0db8:85a3:0000:0000:8a2e:0370:7334"]
)

@mock.patch("netifaces.interfaces")
@mock.patch("netifaces.ifaddresses")
def test_block_allow_interfaces(self, mock_ifaddresses, mock_interfaces):
mock_interfaces.return_value = ["eth0", "docker0", "docker1"]
mock_ifaddresses.return_value = {
socket.AF_INET: [{"addr": "127.0.0.1"}, {"addr": "1.2.3.4"}],
socket.AF_INET6: [
{"addr": "::1"},
{"addr": "2a02:0db8:85a3:0000:0000:8a2e:0370:7334"},
{"addr": "fe80::1234:5678:9abc:def0%eth0"},
],
}

# IPv4 only
addresses = ice.get_host_addresses(use_ipv4=True, use_ipv6=False, block_list=["docker*"])
self.assertEqual(addresses, ["1.2.3.4"])

# IPv6 only
addresses = ice.get_host_addresses(use_ipv4=False, use_ipv6=True, block_list=["docker*"])
self.assertEqual(addresses, ["2a02:0db8:85a3:0000:0000:8a2e:0370:7334"])

# both
addresses = ice.get_host_addresses(use_ipv4=True, use_ipv6=True, block_list=["docker*"])
self.assertEqual(
addresses, ["1.2.3.4", "2a02:0db8:85a3:0000:0000:8a2e:0370:7334"]
)

# IPv4 only
addresses = ice.get_host_addresses(use_ipv4=True, use_ipv6=False, allow_list=["eth*"])
self.assertEqual(addresses, ["1.2.3.4"])

# IPv6 only
addresses = ice.get_host_addresses(use_ipv4=False, use_ipv6=True, allow_list=["eth*"])
self.assertEqual(addresses, ["2a02:0db8:85a3:0000:0000:8a2e:0370:7334"])

# both
addresses = ice.get_host_addresses(use_ipv4=True, use_ipv6=True, allow_list=["eth*"])
self.assertEqual(
addresses, ["1.2.3.4", "2a02:0db8:85a3:0000:0000:8a2e:0370:7334"]
)

@asynctest
async def test_close(self):
conn_a = ice.Connection(ice_controlling=True)
Expand Down