diff --git a/lib/chain/node_proxy.ex b/lib/chain/node_proxy.ex index 943a04a..4ccfcb3 100644 --- a/lib/chain/node_proxy.ex +++ b/lib/chain/node_proxy.ex @@ -37,12 +37,11 @@ defmodule Chain.NodeProxy do } |> Poison.encode!() - {:ok, binary_frame} = WebSockex.Frame.encode_frame({:text, request}) - WebSockex.Conn.socket_send(conn, binary_frame) - {:noreply, %{state | req: id}, requests: Map.put(state.requests, id, from)} + WebSockex.cast(conn, {:send_request, request}) + {:noreply, %{state | req: id, requests: Map.put(state.requests, id, from)}} end - @security_level 2 + @security_level 1 @impl true def handle_info( {:new_block, ws_url, block_number}, @@ -57,6 +56,13 @@ defmodule Chain.NodeProxy do {:noreply, %{state | lastblocks: lastblocks}} end + def handle_info({:DOWN, _ref, :process, down_pid, _reason}, state) do + connections = + state.connections |> Enum.filter(fn {_, pid} -> pid != down_pid end) |> Map.new() + + {:noreply, ensure_connections(%{state | connections: connections})} + end + defp ensure_connections(state = %NodeProxy{chain: chain, connections: connections}) when map_size(connections) < @security_level do urls = MapSet.new(chain.ws_endpoints()) diff --git a/lib/chain/sup.ex b/lib/chain/sup.ex index 9327f9f..dccda33 100644 --- a/lib/chain/sup.ex +++ b/lib/chain/sup.ex @@ -6,7 +6,7 @@ defmodule Chain.Sup do use Supervisor def start_link(chain) do - Supervisor.start_link(__MODULE__, chain, name: __MODULE__) + Supervisor.start_link(__MODULE__, chain) end def init(chain) do diff --git a/lib/chain/ws_conn.ex b/lib/chain/ws_conn.ex index bfc0c0f..aa6ee72 100644 --- a/lib/chain/ws_conn.ex +++ b/lib/chain/ws_conn.ex @@ -14,12 +14,18 @@ defmodule Chain.WSConn do :chain, :ws_url, :conn, - lastblock_at: DateTime.utc_now(), + :lastblock_at, lastblock_number: 0 ] def start(owner, chain, ws_url) do - state = %__MODULE__{owner: owner, chain: chain, ws_url: ws_url} + state = %__MODULE__{ + owner: owner, + chain: chain, + ws_url: ws_url, + lastblock_at: DateTime.utc_now() + } + {:ok, pid} = WebSockex.start(ws_url, __MODULE__, state, async: true) :timer.send_interval(chain.expected_block_intervall() * 2, pid, :ping) pid @@ -44,28 +50,46 @@ defmodule Chain.WSConn do end @impl true - def handle_frame({:text, json}, state = %{ws_url: ws_url}) do + def handle_frame({:text, json}, state = %{ws_url: ws_url, chain: chain}) do + if chain == Chains.Diode do + IO.inspect(Poison.decode!(json), label: "#{chain}") + end + case Poison.decode!(json) do %{"id" => 1, "result" => subscription_id} when is_binary(subscription_id) -> {:ok, state} + %{"id" => _} = other -> + send(state.owner, {:response, ws_url, other}) + {:ok, state} + %{"params" => %{"result" => %{"number" => hex_number}}} -> block_number = String.to_integer(hex_number, 16) send(state.owner, {:new_block, ws_url, block_number}) {:ok, %{state | lastblock_at: DateTime.utc_now(), lastblock_number: block_number}} - - other -> - send(state.owner, {:response, ws_url, other}) - {:ok, state} end end @impl true - def handle_info(:ping, %WSConn{chain: chain, lastblock_at: lastblock_at} = state) do - if DateTime.diff(DateTime.utc_now(), lastblock_at, :millisecond) > - chain.expected_block_intervall() * 2 do + # Chain.RPC.block_number(Chains.Diode) + def handle_cast({:send_request, request}, state) do + IO.inspect("Sending frame: #{inspect(request)}") + {:ok, frame} = WebSockex.Frame.encode_frame({:text, request}) + WebSockex.Conn.socket_send(state.conn, frame) + {:noreply, state} + end + + @impl true + def handle_info( + :ping, + %WSConn{chain: chain, lastblock_at: lastblock_at, ws_url: ws_url} = state + ) do + age = DateTime.diff(DateTime.utc_now(), lastblock_at, :second) + + if age > chain.expected_block_intervall() * 2 do + # IO.inspect({lastblock_at, DateTime.utc_now(), age, chain.expected_block_intervall() * 2}) Logger.warning( - "WSConn did not receive a block from (#{chain}) since double block interval. Restarting..." + "WSConn did not receive a block from #{chain} (#{ws_url}) since double block interval. Restarting..." ) {:close, state} diff --git a/lib/diode.ex b/lib/diode.ex index 07b3cda..f99009a 100644 --- a/lib/diode.ex +++ b/lib/diode.ex @@ -281,12 +281,12 @@ defmodule Diode do case get_env("SEED") do nil -> [ - "diode://0xceca2f8cf1983b4cf0c1ba51fd382c2bc37aba58@us1.prenet.diode.io:51054", - "diode://0x7e4cd38d266902444dc9c8f7c0aa716a32497d0b@us2.prenet.diode.io:443", - "diode://0x68e0bafdda9ef323f692fc080d612718c941d120@as1.prenet.diode.io:51054", - "diode://0x1350d3b501d6842ed881b59de4b95b27372bfae8@as2.prenet.diode.io:443", - "diode://0x937c492a77ae90de971986d003ffbc5f8bb2232c@eu1.prenet.diode.io:51054", - "diode://0xae699211c62156b8f29ce17be47d2f069a27f2a6@eu2.prenet.diode.io:443" + # "diode://0xceca2f8cf1983b4cf0c1ba51fd382c2bc37aba58@us1.prenet.diode.io:51054", + # "diode://0x7e4cd38d266902444dc9c8f7c0aa716a32497d0b@us2.prenet.diode.io:443", + # "diode://0x68e0bafdda9ef323f692fc080d612718c941d120@as1.prenet.diode.io:51054", + # "diode://0x1350d3b501d6842ed881b59de4b95b27372bfae8@as2.prenet.diode.io:443", + # "diode://0x937c492a77ae90de971986d003ffbc5f8bb2232c@eu1.prenet.diode.io:51054", + # "diode://0xae699211c62156b8f29ce17be47d2f069a27f2a6@eu2.prenet.diode.io:443" ] other -> diff --git a/lib/network/edge_v2.ex b/lib/network/edge_v2.ex index f59dd1b..c9dfa9b 100644 --- a/lib/network/edge_v2.ex +++ b/lib/network/edge_v2.ex @@ -298,7 +298,7 @@ defmodule Network.EdgeV2 do def handle_msg(msg, state) do case msg do ["hello", vsn | flags] when is_binary(vsn) -> - if to_num(vsn) != 1_000 do + if to_num(vsn) != 2_000 do {error("version not supported"), state} else state1 = diff --git a/mix.exs b/mix.exs index f3854c8..89d90e3 100644 --- a/mix.exs +++ b/mix.exs @@ -5,7 +5,7 @@ defmodule Diode.Mixfile do use Mix.Project @vsn "1.1.0" - @full_vsn "v1.1.0-5-g6b9baab-dirty" + @full_vsn "v1.1.0-6-g25a57f1-dirty" @url "https://github.com/diodechain/diode_server" def project do