Skip to content

Commit

Permalink
Fix sending and handling keepalives (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
mickel8 authored Sep 23, 2024
1 parent 4c51906 commit 9a4eeac
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 68 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ as WebRTC multiplexes traffic on a single socket but PRs are welcomed
```elixir
def deps do
[
{:ex_ice, "~> 0.8.2"}
{:ex_ice, "~> 0.8.3"}
]
end
```
Expand Down
4 changes: 2 additions & 2 deletions lib/ex_ice/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ defmodule ExICE.ICEAgent do
end

@impl true
def handle_info({:keepalive, id}, state) do
ice_agent = ExICE.Priv.ICEAgent.handle_keepalive(state.ice_agent, id)
def handle_info({:keepalive_timeout, id}, state) do
ice_agent = ExICE.Priv.ICEAgent.handle_keepalive_timeout(state.ice_agent, id)
{:noreply, %{state | ice_agent: ice_agent}}
end

Expand Down
2 changes: 1 addition & 1 deletion lib/ex_ice/priv/candidate_pair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ defmodule ExICE.Priv.CandidatePair do
end

def schedule_keepalive(pair, dest) do
ref = Process.send_after(dest, {:keepalive, pair.id}, @tr_timeout)
ref = Process.send_after(dest, {:keepalive_timeout, pair.id}, @tr_timeout)
%{pair | keepalive_timer: ref}
end

Expand Down
168 changes: 111 additions & 57 deletions lib/ex_ice/priv/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -673,24 +673,24 @@ defmodule ExICE.Priv.ICEAgent do
end
end

@spec handle_keepalive(t(), integer()) :: t()
def handle_keepalive(%__MODULE__{selected_pair_id: id} = ice_agent, id) do
@spec handle_keepalive_timeout(t(), integer()) :: t()
def handle_keepalive_timeout(%__MODULE__{selected_pair_id: id} = ice_agent, id) do
# if pair was selected, send keepalives only on that pair
s_pair = Map.fetch!(ice_agent.checklist, id)
pair = CandidatePair.schedule_keepalive(s_pair)
ice_agent = %__MODULE__{ice_agent | checklist: Map.put(ice_agent.checklist, id, pair)}
send_keepalive(ice_agent, ice_agent.checklist[id])
end

def handle_keepalive(%__MODULE__{selected_pair_id: s_pair_id} = ice_agent, _id)
def handle_keepalive_timeout(%__MODULE__{selected_pair_id: s_pair_id} = ice_agent, _id)
when not is_nil(s_pair_id) do
# note: current implementation assumes that, if selected pair exists, none of the already existing
# valid pairs will ever become selected (only new appearing valid pairs)
# that's why there's no call to `CandidatePair.schedule_keepalive/1`
ice_agent
end

def handle_keepalive(ice_agent, id) do
def handle_keepalive_timeout(ice_agent, id) do
# TODO: keepalives should be sent only if no data has been sent for @tr_timeout
# atm, we send keepalives anyways, also it might be better to pace them with ta_timer
# TODO: candidates not in a valid pair also should be kept alive (RFC 8445, sect 5.1.1.4)
Expand Down Expand Up @@ -1246,16 +1246,7 @@ defmodule ExICE.Priv.ICEAgent do
%Type{class: class, method: :binding}
when is_response(class) and is_map_key(ice_agent.keepalives, msg.transaction_id) ->
# TODO: this a good basis to implement consent freshness
Logger.debug("""
Received keepalive response from from #{inspect({src_ip, src_port})}, \
on: #{inspect({local_cand.base.base_address, local_cand.base.base_port})} \
""")

{pair_id, ice_agent} = pop_in(ice_agent.keepalives[msg.transaction_id])

pair = Map.fetch!(ice_agent.checklist, pair_id)
pair = %CandidatePair{pair | last_seen: now()}
put_in(ice_agent.checklist[pair.id], pair)
handle_keepalive_response(ice_agent, local_cand, src_ip, src_port, msg)

%Type{class: class, method: :binding} when is_response(class) ->
Logger.warning("""
Expand Down Expand Up @@ -1475,7 +1466,6 @@ defmodule ExICE.Priv.ICEAgent do
end

## BINDING RESPONSE HANDLING ##

defp handle_conn_check_response(ice_agent, local_cand, src_ip, src_port, msg) do
{%{pair_id: pair_id}, conn_checks} = Map.pop!(ice_agent.conn_checks, msg.transaction_id)
ice_agent = %__MODULE__{ice_agent | conn_checks: conn_checks}
Expand All @@ -1492,7 +1482,7 @@ defmodule ExICE.Priv.ICEAgent do
cc_local_cand = Map.fetch!(ice_agent.local_cands, conn_check_pair.local_cand_id)
cc_remote_cand = Map.fetch!(ice_agent.remote_cands, conn_check_pair.remote_cand_id)

Logger.warning("""
Logger.debug("""
Ignoring conn check response, non-symmetric src and dst addresses.
Sent from: #{inspect({cc_local_cand.base.base_address, cc_local_cand.base.base_port})}, \
to: #{inspect({cc_remote_cand.address, cc_remote_cand.port})}
Expand Down Expand Up @@ -1647,6 +1637,66 @@ defmodule ExICE.Priv.ICEAgent do
ice_agent
end

defp handle_keepalive_response(
ice_agent,
local_cand,
src_ip,
src_port,
%Message{type: %Type{class: :success_response}} = msg
) do
{pair_id, ice_agent} = pop_in(ice_agent.keepalives[msg.transaction_id])
pair = Map.fetch!(ice_agent.checklist, pair_id)

with true <- symmetric?(ice_agent, local_cand.base.socket, {src_ip, src_port}, pair),
:ok <- authenticate_msg(msg, ice_agent.remote_pwd) do
Logger.debug("Received keepalive success response on: #{pair_info(ice_agent, pair)}")
pair = %CandidatePair{pair | last_seen: now()}
put_in(ice_agent.checklist[pair.id], pair)
else
false ->
ka_local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id)
ka_remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id)

Logger.debug("""
Ignoring keepalive success response, non-symmetric src and dst addresses.
Sent from: #{inspect({ka_local_cand.base.base_address, ka_local_cand.base.base_port})}, \
to: #{inspect({ka_remote_cand.address, ka_remote_cand.port})}
Recv from: #{inspect({src_ip, src_port})}, on: #{inspect({local_cand.base.base_address, local_cand.base.base_port})} \
Not refreshing last_seen time. \
""")

ice_agent

{:error, reason} ->
Logger.debug("""
Couldn't authenticate keepalive success response, reason: #{reason}. \
Not refreshing last_seen time.\
""")

ice_agent
end
end

defp handle_keepalive_response(
ice_agent,
local_cand,
src_ip,
src_port,
%Message{type: %Type{class: :error_response}} = msg
) do
{pair_id, ice_agent} = pop_in(ice_agent.keepalives[msg.transaction_id])
pair = Map.fetch!(ice_agent.checklist, pair_id)

Logger.debug("""
Received keepalive error response from #{inspect({src_ip, src_port})}, \
on: #{inspect({local_cand.base.base_address, local_cand.base.base_port})}. \
pair: #{pair_info(ice_agent, pair)} \
Not refreshing last_seen time. \
""")

ice_agent
end

# Adds valid pair according to sec 7.2.5.3.2
# TODO sec. 7.2.5.3.3
# The agent MUST set the states for all other Frozen candidate pairs in
Expand Down Expand Up @@ -2090,8 +2140,19 @@ defmodule ExICE.Priv.ICEAgent do
{ufrag, pwd}
end

defp authenticate_msg(msg, local_pwd) do
with :ok <- Message.authenticate(msg, local_pwd),
defp pair_info(ice_agent, pair) do
local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id)
remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id)

"""
#{pair.id} \
l: #{:inet.ntoa(local_cand.base.address)}:#{local_cand.base.port} \
r: #{:inet.ntoa(remote_cand.address)}:#{remote_cand.port} \
"""
end

defp authenticate_msg(msg, pwd) do
with :ok <- Message.authenticate(msg, pwd),
:ok <- Message.check_fingerprint(msg) do
:ok
else
Expand Down Expand Up @@ -2402,17 +2463,11 @@ defmodule ExICE.Priv.ICEAgent do
end

defp send_keepalive(ice_agent, pair) do
Logger.debug("Sending keepalive")
Logger.debug("Sending keepalive on #{pair_info(ice_agent, pair)}")
local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id)
remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id)

type = %Type{class: :request, method: :binding}

req =
type
|> Message.new()
|> Message.with_integrity(ice_agent.remote_pwd)
|> Message.with_fingerprint()
req = binding_request(ice_agent, false)

dst = {remote_cand.address, remote_cand.port}

Expand All @@ -2430,39 +2485,10 @@ defmodule ExICE.Priv.ICEAgent do
local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id)
remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id)

type = %Type{class: :request, method: :binding}

role_attr =
if ice_agent.role == :controlling do
%ICEControlling{tiebreaker: ice_agent.tiebreaker}
else
%ICEControlled{tiebreaker: ice_agent.tiebreaker}
end

# priority sent to the other side has to be
# computed with the candidate type preference of
# peer-reflexive; refer to sec 7.1.1
priority = Candidate.priority(:prflx)

attrs = [
%Username{value: "#{ice_agent.remote_ufrag}:#{ice_agent.local_ufrag}"},
%Priority{priority: priority},
role_attr
]

# we can nominate only when being the controlling agent
# the controlled agent uses nominate? flag according to 7.3.1.5
attrs =
if pair.nominate? and ice_agent.role == :controlling do
attrs ++ [%UseCandidate{}]
else
attrs
end

req =
Message.new(type, attrs)
|> Message.with_integrity(ice_agent.remote_pwd)
|> Message.with_fingerprint()
nominate = pair.nominate? and ice_agent.role == :controlling
req = binding_request(ice_agent, nominate)

raw_req = Message.encode(req)

Expand All @@ -2489,6 +2515,34 @@ defmodule ExICE.Priv.ICEAgent do
end
end

defp binding_request(ice_agent, nominate) do
type = %Type{class: :request, method: :binding}

role_attr =
if ice_agent.role == :controlling do
%ICEControlling{tiebreaker: ice_agent.tiebreaker}
else
%ICEControlled{tiebreaker: ice_agent.tiebreaker}
end

# priority sent to the other side has to be
# computed with the candidate type preference of
# peer-reflexive; refer to sec 7.1.1
priority = Candidate.priority(:prflx)

attrs = [
%Username{value: "#{ice_agent.remote_ufrag}:#{ice_agent.local_ufrag}"},
%Priority{priority: priority},
role_attr
]

attrs = if nominate, do: attrs ++ [%UseCandidate{}], else: attrs

Message.new(type, attrs)
|> Message.with_integrity(ice_agent.remote_pwd)
|> Message.with_fingerprint()
end

defp do_send(ice_agent, %cand_mod{} = local_cand, dst, data, retry \\ true) do
{dst_ip, dst_port} = dst

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule ExICE.MixProject do
use Mix.Project

@version "0.8.2"
@version "0.8.3"
@source_url "https://github.com/elixir-webrtc/ex_ice"

def project do
Expand Down
Loading

0 comments on commit 9a4eeac

Please sign in to comment.