Skip to content

Commit

Permalink
Merge pull request #44 from ferd/fix-otel-usage-and-others
Browse files Browse the repository at this point in the history
Rework tracing, bump deps, turn disk cache on
  • Loading branch information
ferd authored Oct 1, 2024
2 parents 1b83d5f + 7aea86b commit bcc0ec8
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 252 deletions.
256 changes: 108 additions & 148 deletions apps/revault/src/revault_fsm.erl

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions apps/revault/src/revault_otel.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-module(revault_otel).
-export([start_active_span/1, start_active_span/2,
end_active_span/0,
extract_propagation_data/0, apply_propagation_data/1]).

-include_lib("opentelemetry_api/include/otel_tracer.hrl").

-define(KEY, {self(), ?MODULE}).

start_active_span(Name) ->
start_active_span(Name, #{}).

start_active_span(Name, StartOpts) ->
SpanCtx = otel_tracer:start_span(?current_tracer, Name, StartOpts),
Ctx = otel_tracer:set_current_span(otel_ctx:get_current(), SpanCtx),
Token = otel_ctx:attach(Ctx),
Stack = case erlang:get(?KEY) of
undefined -> [];
Val -> Val
end,
erlang:put(?KEY, [{SpanCtx, Token}|Stack]),
ok.

end_active_span() ->
case erlang:get(?KEY) of
undefined ->
ok;
[] ->
ok;
[{SpanCtx, Token} | Stack] ->
_ = otel_tracer:set_current_span(otel_ctx:get_current(),
otel_span:end_span(SpanCtx, undefined)),
otel_ctx:detach(Token),
erlang:put(?KEY, Stack),
ok
end.

extract_propagation_data() ->
otel_propagator_text_map:inject([]).

apply_propagation_data(Data) when is_list(Data) ->
otel_propagator_text_map:extract(Data).
14 changes: 14 additions & 0 deletions apps/revault/src/revault_otel.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-define(start_active_span(Name),
revault_otel:start_active_span(Name)).

-define(start_active_span(Name, StartOpts),
revault_otel:start_active_span(Name, StartOpts)).

-define(end_active_span(),
revault_otel:end_active_span()).

-define(extract_propagation_data(),
revault_otel:extract_propagation_data()).

-define(apply_propagation_data(Data),
revault_otel:apply_propagation_data(Data)).
69 changes: 23 additions & 46 deletions apps/revault/src/revault_tls_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

-include("revault_tls.hrl").
-include_lib("opentelemetry_api/include/otel_tracer.hrl").
-include("revault_otel.hrl").
-define(str(T), unicode:characters_to_binary(io_lib:format("~tp", [T]))).
-define(attrs(T), [{<<"module">>, ?MODULE},
{<<"function">>, ?FUNCTION_NAME},
Expand Down Expand Up @@ -66,7 +67,11 @@ handle_event({call, From}, {connect, {Peer,Auth}, Msg}, disconnected, Data) ->
{ok, ConnData} ->
NewData = case introspect(Msg) of
{peer, Dir, Attrs} ->
maybe_add_ctx(Attrs, ConnData#client{dir=Dir});
?apply_propagation_data(maps:get(ctx, Attrs, [])),
?start_active_span(<<"tls_client">>),
TmpConnData=ConnData#client{dir=Dir},
?set_attributes(?attrs(TmpConnData)),
TmpConnData;
_ ->
ConnData
end,
Expand All @@ -78,32 +83,34 @@ handle_event({call, From}, disconnect, disconnected, Data) ->
{keep_state, Data, [{reply, From, ok}]};
handle_event({call, From}, disconnect, connected, Data=#client{sock=Sock}) ->
ssl:close(Sock),
?end_active_span(),
otel_ctx:clear(),
{next_state, disconnected, Data#client{sock=undefined, dir=undefined},
[{reply, From, ok}]};
handle_event({call, From}, Msg, disconnected, Data=#client{peer=Peer, auth=Auth}) ->
case connect(Data, Peer, Auth) of
{ok, TmpData} ->
NewData = start_span(<<"tls_client">>, TmpData),
set_attributes(?attrs(NewData)),
{ok, NewData} ->
?start_active_span(<<"tls_client">>),
?set_attributes(?attrs(NewData)),
handle_event({call, From}, Msg, connected, NewData);
{error, Reason} ->
%% TODO: backoffs & retry, maybe add idle -> disconnected -> connected
exit({error, Reason})
end;
handle_event({call, From}, {revault, Marker, _Msg}=Msg, connected, TmpData=#client{sock=Sock}) ->
Data = start_span(<<"fwd">>, TmpData),
set_attributes([{<<"msg">>, ?str(type(Msg))} | ?attrs(Data)]),
handle_event({call, From}, {revault, Marker, _Msg}=Msg, connected, Data=#client{sock=Sock}) ->
?start_active_span(<<"fwd">>),
?set_attributes([{<<"msg">>, ?str(type(Msg))} | ?attrs(Data)]),
Payload = revault_tls:wrap(Msg),
Res = ssl:send(Sock, Payload),
NewData = end_span(Data),
?end_active_span(),
case Res of
ok ->
{next_state, connected, Data, [{reply, From, {ok, Marker}}]};
{error, Reason} ->
_ = ssl:close(Sock),
?end_active_span(),
otel_ctx:clear(),
{next_state, disconnected, NewData#client{sock=undefined, dir=undefined},
{next_state, disconnected, Data#client{sock=undefined, dir=undefined},
[{reply, From, {error, Reason}}]}
end;
handle_event(info, {ssl_passive, Sock}, connected, Data=#client{name=Name, sock=Sock}) ->
Expand All @@ -118,19 +125,20 @@ handle_event(info, {pong, T}, connected, Data=#client{sock=Sock, active=Active})
ssl:setopts(Sock, [{active, NewActive}]),
{keep_state, Data#client{active=NewActive}, []};
handle_event(info, {ssl, Sock, Bin}, connected, Data=#client{name=Name, sock=Sock, buf=Buf0}) ->
TmpData = maybe_start_unique_span(<<"recv">>, Data#client.recv, Data#client{recv=true}),
Data#client.recv orelse ?start_active_span(<<"recv">>, #{attributes => ?attrs(Data)}),
TmpData = Data#client{recv=true},
Buf1 = revault_tls:buf_add(Bin, Buf0),
{Unwrapped, IncompleteBuf} = revault_tls:unwrap_all(Buf1),
[revault_tls:send_local(Name, Msg) || Msg <- Unwrapped],
NewData = case length(Unwrapped) of
0 ->
TmpData;
MsgCount ->
set_attributes([{<<"msgs">>, MsgCount},
{<<"buf">>, revault_tls:buf_size(Buf1)},
{<<"buf_trail">>, revault_tls:buf_size(IncompleteBuf)}
| ?attrs(TmpData)]),
end_span(TmpData#client{recv=false})
?set_attributes([{<<"msgs">>, MsgCount},
{<<"buf">>, revault_tls:buf_size(Buf1)},
{<<"buf_trail">>, revault_tls:buf_size(IncompleteBuf)}]),
?end_active_span(),
TmpData#client{recv=false}
end,
{next_state, connected, NewData#client{buf=IncompleteBuf}};
handle_event(info, {ssl_error, Sock, _Reason}, connected, Data=#client{sock=Sock}) ->
Expand Down Expand Up @@ -213,37 +221,6 @@ sock_attrs(Sock) ->
[]
end.


start_span(SpanName, Data=#client{ctx=Stack}) ->
SpanCtx = otel_tracer:start_span(?current_tracer, SpanName, #{}),
Ctx = otel_tracer:set_current_span(otel_ctx:get_current(), SpanCtx),
Token = otel_ctx:attach(Ctx),
set_attributes(attrs(Data)),
Data#client{ctx=[{SpanCtx,Token}|Stack]}.

maybe_start_unique_span(SpanName, false, Data) ->
start_span(SpanName, Data);
maybe_start_unique_span(_, true, Data) ->
Data.

maybe_add_ctx(#{ctx:=SpanCtx}, Data=#client{ctx=Stack}) ->
Ctx = otel_tracer:set_current_span(otel_ctx:get_current(), SpanCtx),
Token = otel_ctx:attach(Ctx),
set_attributes(attrs(Data)),
Data#client{ctx=[{SpanCtx,Token}|Stack]};
maybe_add_ctx(_, Data) ->
Data.

set_attributes(Attrs) ->
SpanCtx = otel_tracer:current_span_ctx(otel_ctx:get_current()),
otel_span:set_attributes(SpanCtx, Attrs).

end_span(Data=#client{ctx=[{SpanCtx,Token}|Stack]}) ->
_ = otel_tracer:set_current_span(otel_ctx:get_current(),
otel_span:end_span(SpanCtx, undefined)),
otel_ctx:detach(Token),
Data#client{ctx=Stack}.

type({revault, _, T}) when is_tuple(T) ->
element(1,T);
type(_) ->
Expand Down
58 changes: 22 additions & 36 deletions apps/revault/src/revault_tls_serv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

-include("revault_tls.hrl").
-include_lib("opentelemetry_api/include/otel_tracer.hrl").
-include("revault_otel.hrl").

-define(str(T), unicode:characters_to_binary(io_lib:format("~tp", [T]))).
-define(attrs(T), [{<<"module">>, ?MODULE},
{<<"function">>, ?FUNCTION_NAME},
Expand Down Expand Up @@ -210,18 +212,25 @@ worker_dispatch(Names, C=#conn{sock=Sock, dirs=Dirs, buf=Buf}) ->
#{<<"authorized">> := #{<<"sync">> := DirNames}} = Dirs,
case Msg of
{revault, Marker, {peer, Dir, Attrs}} ->
Cn = maybe_add_ctx(Attrs, C),
?apply_propagation_data(maps:get(ctx, Attrs, [])),
?start_active_span(<<"tls_serv">>),
case lists:member(Dir, DirNames) of
true ->
#{Dir := Name} = Names,
ssl:setopts(Sock, [{active, 5}]),
revault_tls:send_local(Name, {revault, {self(),Marker},
{peer, self(), Attrs}}),
worker_loop(Dir, Cn#conn{localname=Name, buf=NewBuf});
NewC = C#conn{localname=Name, buf=NewBuf},
?set_attributes(?attrs(NewC)),
worker_loop(Dir, NewC);
false ->
?set_attributes([{<<"error">>, <<"eperm">>}, {<<"dir">>, Dir}
| sock_attrs(Sock) ++ pid_attrs()]),
ssl:send(Sock, revault_tls:wrap({revault, Marker, {error, eperm}})),
ssl:close(Sock)
end;
end,
?end_active_span(),
ok;
_ ->
Msg = {revault, internal, revault_data_wrapper:error(protocol)},
ssl:send(Sock, revault_tls:wrap(Msg)),
Expand All @@ -241,6 +250,7 @@ worker_loop(Dir, C=#conn{localname=Name, sock=Sock, buf=Buf0, active=Active}) ->
disconnect ->
?with_span(<<"disconnect">>, #{attributes => ?attrs(C)},
fun(_SpanCtx) -> ssl:close(Sock) end),
?end_active_span(),
exit(normal);
{ssl_passive, Sock} ->
revault_fsm:ping(Name, self(), erlang:monotonic_time(millisecond)),
Expand All @@ -254,7 +264,9 @@ worker_loop(Dir, C=#conn{localname=Name, sock=Sock, buf=Buf0, active=Active}) ->
ssl:setopts(Sock, [{active, NewActive}]),
worker_loop(Dir, C#conn{active=NewActive});
{ssl, Sock, Data} ->
TmpC = maybe_start_unique_span(<<"recv">>, C#conn.recv, C#conn{recv=true}),
C#conn.recv orelse ?start_active_span(<<"recv">>),
?set_attributes(attrs(C)),
TmpC = C#conn{recv=true},
Buf1 = revault_tls:buf_add(Data, Buf0),
{Unwrapped, IncompleteBuf} = revault_tls:unwrap_all(Buf1),
[revault_tls:send_local(Name, {revault, {self(), Marker}, Msg})
Expand All @@ -263,16 +275,20 @@ worker_loop(Dir, C=#conn{localname=Name, sock=Sock, buf=Buf0, active=Active}) ->
0 ->
TmpC;
MsgCount ->
set_attributes([{<<"msgs">>, MsgCount},
?set_attributes([{<<"msgs">>, MsgCount},
{<<"buf">>, revault_tls:buf_size(Buf1)},
{<<"buf_trail">>, revault_tls:buf_size(IncompleteBuf)}
| ?attrs(C)]),
end_span(TmpC#conn{recv=false})
?end_active_span(),
TmpC#conn{recv=false}
end,
worker_loop(Dir, NewC#conn{buf = IncompleteBuf});
{ssl_error, Sock, Reason} ->
?set_attribute(<<"error">>, ?str(Reason)),
?end_active_span(),
exit(Reason);
{ssl_closed, Sock} ->
?end_active_span(),
exit(normal)
end.

Expand Down Expand Up @@ -347,36 +363,6 @@ sock_attrs(Sock) ->
[]
end.

start_span(SpanName, Data=#conn{ctx=Stack}) ->
SpanCtx = otel_tracer:start_span(?current_tracer, SpanName, #{}),
Ctx = otel_tracer:set_current_span(otel_ctx:get_current(), SpanCtx),
Token = otel_ctx:attach(Ctx),
set_attributes(attrs(Data)),
Data#conn{ctx=[{SpanCtx,Token}|Stack]}.

maybe_start_unique_span(SpanName, false, Data) ->
start_span(SpanName, Data);
maybe_start_unique_span(_, true, Data) ->
Data.

maybe_add_ctx(#{ctx:=SpanCtx}, Data=#conn{ctx=Stack}) ->
Ctx = otel_tracer:set_current_span(otel_ctx:get_current(), SpanCtx),
Token = otel_ctx:attach(Ctx),
set_attributes(attrs(Data)),
Data#conn{ctx=[{SpanCtx,Token}|Stack]};
maybe_add_ctx(_, Data) ->
Data.

set_attributes(Attrs) ->
SpanCtx = otel_tracer:current_span_ctx(otel_ctx:get_current()),
otel_span:set_attributes(SpanCtx, Attrs).

end_span(Data=#conn{ctx=[{SpanCtx,Token}|Stack]}) ->
_ = otel_tracer:set_current_span(otel_ctx:get_current(),
otel_span:end_span(SpanCtx, undefined)),
otel_ctx:detach(Token),
Data#conn{ctx=Stack}.

type({revault, _, T}) when is_tuple(T) ->
element(1,T);
type(_) ->
Expand Down
6 changes: 2 additions & 4 deletions config/sys.config
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
[
{revault, [
{s3_tmpdir, ".tmp"},
%% Turn this on at your own risk, some updates
%% may be missed since this cache uses a mix of modification
%% times and file sizes to avoid re-hashing files.
{disk_hash_cache, false}
{disk_hash_cache, true}
]},
{maestro, []},
{opentelemetry,
[{span_processor, batch},
{text_map_propagators, [trace_context, baggage]},
{traces_exporter, otlp}]}
].
6 changes: 6 additions & 0 deletions config/tests.sys.config
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
[
{revault, [
{s3_tmpdir, ".tmp"},
%% keep the cache off for most testing purposes
%% since it has an effect on invalidation efforts
%% and specific tests are geared to see it is
%% equivalent (when properly invalidated) to working
%% without it.
{disk_hash_cache, false}
]},
{maestro, []},
{opentelemetry,
[{span_processor, batch},
{text_map_propagators, [trace_context, baggage]},
{traces_exporter, {otel_exporter_stdout, []}}]}
].
Loading

0 comments on commit bcc0ec8

Please sign in to comment.