Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicletz committed Mar 14, 2024
1 parent 25a57f1 commit d4b8bf3
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 24 deletions.
14 changes: 10 additions & 4 deletions lib/chain/node_proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ defmodule Chain.NodeProxy do
}
|> Poison.encode!()

{:ok, binary_frame} = WebSockex.Frame.encode_frame({:text, request})
WebSockex.Conn.socket_send(conn, binary_frame)
{:noreply, %{state | req: id}, requests: Map.put(state.requests, id, from)}
WebSockex.cast(conn, {:send_request, request})
{:noreply, %{state | req: id, requests: Map.put(state.requests, id, from)}}
end

@security_level 2
@security_level 1
@impl true
def handle_info(
{:new_block, ws_url, block_number},
Expand All @@ -57,6 +56,13 @@ defmodule Chain.NodeProxy do
{:noreply, %{state | lastblocks: lastblocks}}
end

def handle_info({:DOWN, _ref, :process, down_pid, _reason}, state) do
connections =
state.connections |> Enum.filter(fn {_, pid} -> pid != down_pid end) |> Map.new()

{:noreply, ensure_connections(%{state | connections: connections})}
end

defp ensure_connections(state = %NodeProxy{chain: chain, connections: connections})
when map_size(connections) < @security_level do
urls = MapSet.new(chain.ws_endpoints())
Expand Down
2 changes: 1 addition & 1 deletion lib/chain/sup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Chain.Sup do
use Supervisor

def start_link(chain) do
Supervisor.start_link(__MODULE__, chain, name: __MODULE__)
Supervisor.start_link(__MODULE__, chain)
end

def init(chain) do
Expand Down
46 changes: 35 additions & 11 deletions lib/chain/ws_conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ defmodule Chain.WSConn do
:chain,
:ws_url,
:conn,
lastblock_at: DateTime.utc_now(),
:lastblock_at,
lastblock_number: 0
]

def start(owner, chain, ws_url) do
state = %__MODULE__{owner: owner, chain: chain, ws_url: ws_url}
state = %__MODULE__{
owner: owner,
chain: chain,
ws_url: ws_url,
lastblock_at: DateTime.utc_now()
}

{:ok, pid} = WebSockex.start(ws_url, __MODULE__, state, async: true)
:timer.send_interval(chain.expected_block_intervall() * 2, pid, :ping)
pid
Expand All @@ -44,28 +50,46 @@ defmodule Chain.WSConn do
end

@impl true
def handle_frame({:text, json}, state = %{ws_url: ws_url}) do
def handle_frame({:text, json}, state = %{ws_url: ws_url, chain: chain}) do
if chain == Chains.Diode do
IO.inspect(Poison.decode!(json), label: "#{chain}")
end

case Poison.decode!(json) do
%{"id" => 1, "result" => subscription_id} when is_binary(subscription_id) ->
{:ok, state}

%{"id" => _} = other ->
send(state.owner, {:response, ws_url, other})
{:ok, state}

%{"params" => %{"result" => %{"number" => hex_number}}} ->
block_number = String.to_integer(hex_number, 16)
send(state.owner, {:new_block, ws_url, block_number})
{:ok, %{state | lastblock_at: DateTime.utc_now(), lastblock_number: block_number}}

other ->
send(state.owner, {:response, ws_url, other})
{:ok, state}
end
end

@impl true
def handle_info(:ping, %WSConn{chain: chain, lastblock_at: lastblock_at} = state) do
if DateTime.diff(DateTime.utc_now(), lastblock_at, :millisecond) >
chain.expected_block_intervall() * 2 do
# Chain.RPC.block_number(Chains.Diode)
def handle_cast({:send_request, request}, state) do
IO.inspect("Sending frame: #{inspect(request)}")
{:ok, frame} = WebSockex.Frame.encode_frame({:text, request})
WebSockex.Conn.socket_send(state.conn, frame)
{:noreply, state}
end

@impl true
def handle_info(
:ping,
%WSConn{chain: chain, lastblock_at: lastblock_at, ws_url: ws_url} = state
) do
age = DateTime.diff(DateTime.utc_now(), lastblock_at, :second)

if age > chain.expected_block_intervall() * 2 do
# IO.inspect({lastblock_at, DateTime.utc_now(), age, chain.expected_block_intervall() * 2})
Logger.warning(
"WSConn did not receive a block from (#{chain}) since double block interval. Restarting..."
"WSConn did not receive a block from #{chain} (#{ws_url}) since double block interval. Restarting..."
)

{:close, state}
Expand Down
12 changes: 6 additions & 6 deletions lib/diode.ex
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ defmodule Diode do
case get_env("SEED") do
nil ->
[
"diode://[email protected]:51054",
"diode://[email protected]:443",
"diode://[email protected]:51054",
"diode://[email protected]:443",
"diode://[email protected]:51054",
"diode://[email protected]:443"
# "diode://[email protected]:51054",
# "diode://[email protected]:443",
# "diode://[email protected]:51054",
# "diode://[email protected]:443",
# "diode://[email protected]:51054",
# "diode://[email protected]:443"
]

other ->
Expand Down
2 changes: 1 addition & 1 deletion lib/network/edge_v2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ defmodule Network.EdgeV2 do
def handle_msg(msg, state) do
case msg do
["hello", vsn | flags] when is_binary(vsn) ->
if to_num(vsn) != 1_000 do
if to_num(vsn) != 2_000 do
{error("version not supported"), state}
else
state1 =
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Diode.Mixfile do
use Mix.Project

@vsn "1.1.0"
@full_vsn "v1.1.0-5-g6b9baab-dirty"
@full_vsn "v1.1.0-6-g25a57f1-dirty"
@url "https://github.com/diodechain/diode_server"

def project do
Expand Down

0 comments on commit d4b8bf3

Please sign in to comment.