Skip to content

Commit

Permalink
Improve mDNS client implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mickel8 committed May 13, 2024
1 parent e408a4a commit cfcc949
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 88 deletions.
64 changes: 59 additions & 5 deletions lib/ex_ice/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ defmodule ExICE.ICEAgent do
@impl true
def init(opts) do
ice_agent = ExICE.Priv.ICEAgent.new(opts)
{:ok, %{ice_agent: ice_agent}}
{:ok, %{ice_agent: ice_agent, pending_eoc: false, pending_remote_cands: MapSet.new()}}
end

@impl true
Expand Down Expand Up @@ -317,14 +317,29 @@ defmodule ExICE.ICEAgent do

@impl true
def handle_cast({:add_remote_candidate, remote_cand}, state) do
ice_agent = ExICE.Priv.ICEAgent.add_remote_candidate(state.ice_agent, remote_cand)
{:noreply, %{state | ice_agent: ice_agent}}
task =
Task.async(fn ->
Logger.debug("Unmarshaling remote candidate: #{remote_cand}")

case ExICE.Priv.ICEAgent.unmarshal_remote_candidate(remote_cand) do
{:ok, cand} -> {:unmarshal_task, {:ok, cand, remote_cand}}
{:error, reason} -> {:unmarshal_task, {:error, reason, remote_cand}}
end
end)

pending_remote_cands = MapSet.put(state.pending_remote_cands, task.ref)
state = %{state | pending_remote_cands: pending_remote_cands}
{:noreply, state}
end

@impl true
def handle_cast(:end_of_candidates, state) do
ice_agent = ExICE.Priv.ICEAgent.end_of_candidates(state.ice_agent)
{:noreply, %{state | ice_agent: ice_agent}}
if MapSet.size(state.pending_remote_cands) == 0 do
ice_agent = ExICE.Priv.ICEAgent.end_of_candidates(state.ice_agent)
{:noreply, %{state | ice_agent: ice_agent}}
else
{:noreply, %{state | pending_eoc: true}}
end
end

@impl true
Expand Down Expand Up @@ -375,6 +390,45 @@ defmodule ExICE.ICEAgent do
{:noreply, %{state | ice_agent: ice_agent}}
end

@impl true
def handle_info({_ref, {:unmarshal_task, {:ok, %Candidate{} = cand, raw_cand}}}, state) do
Logger.debug("""
Successfully unmarshaled candidate.
Raw candidate: #{raw_cand}.
Unmarshaled candidate: #{inspect(cand)}
""")

ice_agent = ExICE.Priv.ICEAgent.add_remote_candidate(state.ice_agent, cand)
{:noreply, %{state | ice_agent: ice_agent}}
end

@impl true
def handle_info({_ref, {:unmarshal_task, {:error, reason, raw_cand}}}, state) do
Logger.warning("""
Couldn't unmarshal candidate, reason: #{inspect(reason)}.
Candidate: #{raw_cand}
""")

{:noreply, state}
end

@impl true
def handle_info({:DOWN, ref, _, _, _}, state) do
if MapSet.member?(state.pending_remote_cands, ref) do
pending_remote_cands = MapSet.delete(state.pending_remote_cands, ref)
state = %{state | pending_remote_cands: pending_remote_cands}

if MapSet.size(state.pending_remote_cands) == 0 and state.pending_eoc == true do
ice_agent = ExICE.Priv.ICEAgent.end_of_candidates(state.ice_agent)
{:noreply, %{state | ice_agent: ice_agent, pending_eoc: false}}
else
{:noreply, state}
end
else
{:noreply, state}
end
end

@impl true
def handle_info(msg, state) do
Logger.warning("Got unexpected msg: #{inspect(msg)}")
Expand Down
1 change: 1 addition & 0 deletions lib/ex_ice/priv/dns/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ defmodule ExICE.Priv.DNS.Message do
defp decode_type(<<14::16, data::binary>>), do: {:ok, :minfo, data}
defp decode_type(<<15::16, data::binary>>), do: {:ok, :mx, data}
defp decode_type(<<16::16, data::binary>>), do: {:ok, :txt, data}
defp decode_type(<<47::16, data::binary>>), do: {:ok, :nsec, data}
defp decode_type(<<252::16, data::binary>>), do: {:ok, :afxr, data}
defp decode_type(<<253::16, data::binary>>), do: {:ok, :mailb, data}
defp decode_type(<<254::16, data::binary>>), do: {:ok, :maila, data}
Expand Down
92 changes: 47 additions & 45 deletions lib/ex_ice/priv/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,36 @@ defmodule ExICE.Priv.ICEAgent do
packets_received: 0
]

@spec unmarshal_remote_candidate(String.t()) :: {:ok, Candidate.t()} | {:error, term()}
def unmarshal_remote_candidate(remote_cand_str) do
resolve_address = fn
remote_cand when is_binary(remote_cand.address) ->
Logger.debug("Trying to resolve addr: #{remote_cand.address}")

case ExICE.Priv.MDNS.Resolver.gethostbyname(remote_cand.address) do
{:ok, addr} ->
Logger.debug("Successfully resolved #{remote_cand.address} to #{inspect(addr)}")
remote_cand = %ExICE.Candidate{remote_cand | address: addr}
{:ok, remote_cand}

{:error, reason} = err ->
Logger.debug("Couldn't resolve #{remote_cand.address}, reason: #{reason}")
err
end

remote_cand ->
{:ok, remote_cand}
end

with {_, {:ok, remote_cand}} <- {:unmarshal, ExICE.Candidate.unmarshal(remote_cand_str)},
{_, {:ok, remote_cand}} <- {:resolve_address, resolve_address.(remote_cand)} do
{:ok, remote_cand}
else
{operation, {:error, reason}} ->
{:error, {operation, reason}}
end
end

@spec new(Keyword.t()) :: t()
def new(opts) do
{stun_servers, turn_servers} = parse_ice_servers(opts[:ice_servers] || [])
Expand Down Expand Up @@ -278,94 +308,66 @@ defmodule ExICE.Priv.ICEAgent do
|> update_ta_timer()
end

@spec add_remote_candidate(t(), String.t()) :: t()
def add_remote_candidate(%__MODULE__{eoc: true} = ice_agent, remote_cand_str) do
@spec add_remote_candidate(t(), Candidate.t()) :: t()
def add_remote_candidate(%__MODULE__{eoc: true} = ice_agent, remote_cand) do
Logger.warning("""
Received remote candidate after end-of-candidates. Ignoring.
Candidate: #{remote_cand_str}\
Candidate: #{inspect(remote_cand)}\
""")

ice_agent
end

def add_remote_candidate(
%__MODULE__{remote_ufrag: nil, remote_pwd: nil} = ice_agent,
remote_cand_str
remote_cand
) do
Logger.warning("""
Received remote candidate but there are no remote credentials. Ignoring.
Candidate: #{remote_cand_str}\
Candidate: #{inspect(inspect(remote_cand))}\
""")

ice_agent
end

def add_remote_candidate(ice_agent, remote_cand_str) do
Logger.debug("New remote candidate: #{remote_cand_str}")

resolve_address = fn
remote_cand when is_binary(remote_cand.address) ->
Logger.debug("Trying to resolve addr: #{remote_cand.address}")

case ExICE.Priv.MDNS.Resolver.gethostbyname(remote_cand.address) do
{:ok, addr} ->
Logger.debug("Successfully resolved #{remote_cand.address} to #{inspect(addr)}")
remote_cand = %ExICE.Candidate{remote_cand | address: addr}
{:ok, remote_cand}

{:error, reason} = err ->
Logger.debug("Couldn't resolve #{remote_cand.address}, reason: #{reason}")
err
end

remote_cand ->
{:ok, remote_cand}
end
def add_remote_candidate(ice_agent, remote_cand) do
Logger.debug("New remote candidate: #{inspect(remote_cand)}")

uniq? = fn remote_cands, remote_cand ->
not Enum.any?(remote_cands, fn cand ->
cand.address == remote_cand.address and cand.port == remote_cand.port
end)
end

with {_, {:ok, remote_cand}} <- {:unmarshal, ExICE.Candidate.unmarshal(remote_cand_str)},
{_, {:ok, remote_cand}} <- {:resolve_address, resolve_address.(remote_cand)},
{_, true} <- {:uniq, uniq?.(Map.values(ice_agent.remote_cands), remote_cand)} do
if uniq?.(Map.values(ice_agent.remote_cands), remote_cand) do
ice_agent = do_add_remote_candidate(ice_agent, remote_cand)
Logger.debug("Successfully added remote candidate.")

ice_agent
|> update_connection_state()
|> update_ta_timer()
else
{:uniq, false} ->
# This is pretty common case (we can get conn-check
# before getting a remote candidate), hence debug.
Logger.debug("""
Duplicated remote candidate. Ignoring.
Candidate: #{remote_cand_str}\
""")

ice_agent

{operation, {:error, reason}} ->
Logger.warning("""
Invalid remote candidate. Couldn't #{operation}, reason: #{inspect(reason)}. Ignoring.
Candidate: #{remote_cand_str}\
""")
# This is pretty common case (we can get conn-check
# before getting a remote candidate), hence debug.
Logger.debug("""
Duplicated remote candidate. Ignoring.
Candidate: #{inspect(remote_cand)}\
""")

ice_agent
ice_agent
end
end

@spec end_of_candidates(t()) :: t()
def end_of_candidates(%__MODULE__{role: :controlled} = ice_agent) do
Logger.debug("Setting end-of-candidates flag.")
ice_agent = %{ice_agent | eoc: true}
# we might need to move to the completed state
update_connection_state(ice_agent)
end

def end_of_candidates(%__MODULE__{role: :controlling} = ice_agent) do
Logger.debug("Setting end-of-candidates flag.")
ice_agent = %{ice_agent | eoc: true}
# check wheter it's time to nominate and if yes, try noimnate
maybe_nominate(ice_agent)
Expand Down
103 changes: 90 additions & 13 deletions lib/ex_ice/priv/mdns/resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule ExICE.Priv.MDNS.Resolver do

@mdns_port 5353
@multicast_addr {{224, 0, 0, 251}, @mdns_port}
@response_timeout_ms 300
@response_timeout_ms 3000
@rtx_timeout_ms 500

@spec start_link(module()) :: GenServer.on_start()
def start_link(transport_module \\ :gen_udp) do
Expand All @@ -28,7 +29,7 @@ defmodule ExICE.Priv.MDNS.Resolver do
@impl true
def init(transport_module) do
Logger.debug("Starting MDNS Resolver")
{:ok, %{transport_module: transport_module}, {:continue, nil}}
{:ok, %{transport_module: transport_module, cache: %{}}, {:continue, nil}}
end

@impl true
Expand Down Expand Up @@ -75,6 +76,12 @@ defmodule ExICE.Priv.MDNS.Resolver do
end
end

@impl true
def handle_call({:gethostbyname, addr}, _from, %{cache: cache} = state)
when is_map_key(cache, addr) do
{:reply, {:ok, Map.fetch!(cache, addr)}, state}
end

@impl true
def handle_call({:gethostbyname, addr}, from, state) do
query =
Expand All @@ -92,9 +99,19 @@ defmodule ExICE.Priv.MDNS.Resolver do

case state.transport_module.send(state.socket, @multicast_addr, query) do
:ok ->
state = put_in(state, [:queries, addr], from)
Process.send_after(self(), {:response_timeout, addr}, @response_timeout_ms)
{:noreply, state}
rtx_timer = Process.send_after(self(), {:rtx, addr}, @rtx_timeout_ms)

if Map.has_key?(state.queries, addr) do
query_info = Map.fetch!(state.queries, addr)
requesters = [from | query_info.requesters]
query_info = %{query_info | requesters: requesters}
state = put_in(state, [:queries, addr], query_info)
{:noreply, state}
else
state = put_in(state, [:queries, addr], %{requesters: [from], rtx_timer: rtx_timer})
{:noreply, state}
end

{:error, reason} ->
{:reply, {:error, reason}, state}
Expand All @@ -107,15 +124,29 @@ defmodule ExICE.Priv.MDNS.Resolver do
# Only accept query response with one resource record.
# See https://datatracker.ietf.org/doc/html/draft-ietf-mmusic-mdns-ice-candidates#section-3.2.2
{:ok, %{qr: true, aa: true, answer: [%{type: :a, class: :in, rdata: <<a, b, c, d>>} = rr]}} ->
{from, state} = pop_in(state, [:queries, rr.name])

if from do
addr = {a, b, c, d}
GenServer.reply(from, {:ok, addr})
uuid4 = ice_name?(rr.name)
{query_info, state} = pop_in(state, [:queries, rr.name])
addr = {a, b, c, d}

case {uuid4, query_info} do
# Name is in the form of uuid4 and we didn't ask for it.
# This should be an annoucement - save it in the cache.
# See: https://issues.chromium.org/issues/339829283
{true, nil} ->
state = put_in(state, [:cache, rr.name], addr)
{:noreply, state}

{false, nil} ->
{:noreply, state}

{true, %{requesters: requesters}} ->
Process.cancel_timer(query_info.rtx_timer)
for requester <- requesters, do: :ok = GenServer.reply(requester, {:ok, addr})
Process.send_after(self(), {:ttl_expired, rr.name}, rr.ttl * 1000)
state = put_in(state, [:cache, rr.name], addr)
{:noreply, state}
end

{:noreply, state}

_other ->
{:noreply, state}
end
Expand All @@ -127,9 +158,55 @@ defmodule ExICE.Priv.MDNS.Resolver do
{nil, state} ->
{:noreply, state}

{from, state} ->
GenServer.reply(from, {:error, :timeout})
{%{requesters: requesters}, state} ->
for requester <- requesters, do: :ok = GenServer.reply(requester, {:error, :timeout})
{:noreply, state}
end
end

@impl true
def handle_info({:rtx, addr}, %{queries: queries} = state) when is_map_key(queries, addr) do
# rtx messages should be casual QM questions - no unicast-response flag set
query =
%ExICE.Priv.DNS.Message{
question: [
%{
qname: addr,
qtype: :a,
qclass: :in,
unicast_response: false
}
]
}
|> ExICE.Priv.DNS.Message.encode()

state.transport_module.send(state.socket, @multicast_addr, query)
rtx_timer = Process.send_after(self(), {:rtx, addr}, @rtx_timeout_ms)
state = put_in(state, [:queries, addr, :rtx_timer], rtx_timer)
{:noreply, state}
end

@impl true
def handle_info({:rtx, _addr}, state) do
{:noreply, state}
end

@impl true
def handle_info({:ttl_expired, addr}, state) do
{_, state} = pop_in(state, [:cache, addr])
{:noreply, state}
end

defp ice_name?(name) do
name
|> String.trim_trailing(".local")
|> uuid4?()
end

defp uuid4?(uuid) do
case UUID.info(uuid) do
{:ok, info} -> info[:version] == 4
_ -> false
end
end
end
Loading

0 comments on commit cfcc949

Please sign in to comment.