Skip to content

Commit

Permalink
Merge pull request #284 from rabbitmq/wal-handle-corrupt-last-record
Browse files Browse the repository at this point in the history
WAL: handle corrupt last record
  • Loading branch information
kjnilsson authored May 30, 2022
2 parents e819c76 + d00042b commit 01334fb
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ PROJECT = ra
ESCRIPT_NAME = ra_fifo_cli
ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli

dep_gen_batch_server = hex 0.8.7
dep_gen_batch_server = hex 0.8.8
dep_aten = hex 0.5.8
DEPS = aten gen_batch_server

Expand Down
3 changes: 1 addition & 2 deletions WORKSPACE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ hex_pm_erlang_app(

hex_pm_erlang_app(
name = "gen_batch_server",
sha256 = "94a49a528486298b009d2a1b452132c0a0d68b3e89d17d3764cb1ec879b7557a",
version = "0.8.7",
version = "0.8.8",
)

http_archive(
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{deps, [
{gen_batch_server, "0.8.7"},
{gen_batch_server, "0.8.8"},
{aten, "0.5.8"}
]}.

Expand Down
6 changes: 3 additions & 3 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{"1.2.0",
[{<<"aten">>,{pkg,<<"aten">>,<<"0.5.8">>},0},
{<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.7">>},0}]}.
{<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.8">>},0}]}.
[
{pkg_hash,[
{<<"aten">>, <<"B5C97F48517C4F37F26A519AA57A00A31FF1B8EA4324EC1CAE27F818ED5C0DB2">>},
{<<"gen_batch_server">>, <<"1D91A3605D3110EC791F00D5968E86E62FD4C6979E760796A51D50A7ACC7A40D">>}]},
{<<"gen_batch_server">>, <<"7840A1FA63EE1EFFC83E8A91D22664847A2BA1192D30EAFFFD914ACB51578068">>}]},
{pkg_hash_ext,[
{<<"aten">>, <<"64D40A8CF0DDFEA4E13AF00B7327F0925147F83612D0627D9506CBFFE90C13EF">>},
{<<"gen_batch_server">>, <<"94A49A528486298B009D2A1B452132C0A0D68B3E89D17D3764CB1EC879B7557A">>}]}
{<<"gen_batch_server">>, <<"C3E6A1A2A0FB62AEE631A98CFA0FD8903E9562422CBF72043953E2FB1D203017">>}]}
].
64 changes: 50 additions & 14 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ force_roll_over(Wal) ->
%% corresponding .wal file.

-spec start_link(Config :: wal_conf()) ->
{ok, pid()} | {error, {already_started, pid()}}.
{ok, pid()} |
{error, {already_started, pid()}} |
{error, wal_checksum_validation_failure}.
start_link(#{name := Name} = Config)
when is_atom(Name) ->
WalMaxBatchSize = maps:get(max_batch_size, Config,
Expand Down Expand Up @@ -818,8 +820,8 @@ recover_wal_chunks(Conf, Fd, RecoveryChunkSize) ->
recover_records(Conf, Fd, Chunk, #{}, RecoveryChunkSize).
% All zeros indicates end of a pre-allocated wal file
recover_records(_, _Fd, <<0:1/unsigned, 0:1/unsigned, 0:22/unsigned,
IdDataLen:16/unsigned, _:IdDataLen/binary,
0:32/integer, 0:32/unsigned, _/binary>>,
IdDataLen:16/unsigned, _:IdDataLen/binary,
0:32/integer, 0:32/unsigned, _/binary>>,
_Cache, _ChunkSize) ->
ok;
% First record or different UID to last record
Expand All @@ -832,9 +834,20 @@ recover_records(Conf, Fd,
EntryData:EntryDataLen/binary,
Rest/binary>>,
Cache, RecoveryChunkSize) ->
true = validate_and_update(Conf, UId, Checksum, Idx, Term, EntryData, Trunc),
Cache0 = Cache#{IdRef => {UId, <<1:1/unsigned, IdRef:22/unsigned>>}},
recover_records(Conf, Fd, Rest, Cache0, RecoveryChunkSize);

case validate_checksum(Checksum, Idx, Term, EntryData) of
ok ->
true = update_mem_table(Conf, UId, Idx, Term,
binary_to_term(EntryData), Trunc =:= 1),
Cache0 = Cache#{IdRef => {UId, <<1:1/unsigned, IdRef:22/unsigned>>}},
recover_records(Conf, Fd, Rest, Cache0, RecoveryChunkSize);
error ->
?DEBUG("WAL: record failed CRC check. If this is the last record"
" recovery can resume", []),
%% if this is the last entry in the wal we can just drop the
%% record;
is_last_record(Fd, Rest)
end;
% Same UID as last record
recover_records(Conf, Fd,
<<Trunc:1/unsigned, 1:1/unsigned, IdRef:22/unsigned,
Expand All @@ -845,8 +858,18 @@ recover_records(Conf, Fd,
Rest/binary>>,
Cache, RecoveryChunkSize) ->
#{IdRef := {UId, _}} = Cache,
true = validate_and_update(Conf, UId, Checksum, Idx, Term, EntryData, Trunc),
recover_records(Conf, Fd, Rest, Cache, RecoveryChunkSize);
case validate_checksum(Checksum, Idx, Term, EntryData) of
ok ->
true = update_mem_table(Conf, UId, Idx, Term,
binary_to_term(EntryData), Trunc =:= 1),
recover_records(Conf, Fd, Rest, Cache, RecoveryChunkSize);
error ->
?DEBUG("WAL: record failed CRC check. If this is the last record"
" recovery can resume", []),
%% if this is the last entry in the wal we can just drop the
%% record;
is_last_record(Fd, Rest)
end;
% Not enough remainder to parse another record, need to read
recover_records(Conf, Fd, Chunk, Cache, RecoveryChunkSize) ->
NextChunk = read_from_wal_file(Fd, RecoveryChunkSize),
Expand All @@ -860,6 +883,24 @@ recover_records(Conf, Fd, Chunk, Cache, RecoveryChunkSize) ->
recover_records(Conf, Fd, Chunk0, Cache, RecoveryChunkSize)
end.

is_last_record(_Fd, <<0:104, _/binary>>) ->
ok;
is_last_record(Fd, Rest) ->
case byte_size(Rest) < 13 of
true ->
case read_from_wal_file(Fd, 256) of
<<>> ->
ok;
Next ->
is_last_record(Fd, <<Rest/binary, Next/binary>>)
end;
false ->
?ERROR("WAL: record failed CRC check during recovery. "
"Unable to recover WAL data safely", []),
throw({stop, wal_checksum_validation_failure})

end.

read_from_wal_file(Fd, Len) ->
case file:read(Fd, Len) of
{ok, <<Data/binary>>} ->
Expand All @@ -870,11 +911,6 @@ read_from_wal_file(Fd, Len) ->
exit({could_not_read_wal_chunk, Reason})
end.

validate_and_update(Conf, UId, Checksum, Idx, Term, EntryData, Trunc) ->
validate_checksum(Checksum, Idx, Term, EntryData),
true = update_mem_table(Conf, UId, Idx, Term,
binary_to_term(EntryData), Trunc =:= 1).

validate_checksum(0, _, _, _) ->
% checksum not used
ok;
Expand All @@ -885,7 +921,7 @@ validate_checksum(Checksum, Idx, Term, Data) ->
Checksum ->
ok;
_ ->
exit(wal_checksum_validation_failure)
error
end.

merge_conf_defaults(Conf) ->
Expand Down
125 changes: 125 additions & 0 deletions test/ra_log_wal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(ra_log_wal_SUITE).
-compile(nowarn_export_all).
-compile(export_all).

-include_lib("common_test/include/ct.hrl").
Expand Down Expand Up @@ -41,6 +42,10 @@ all_tests() ->
recover_empty,
recover_after_roll_over,
recover_truncated_write,
recover_with_last_entry_corruption,
recover_with_last_entry_corruption_pre_allocate,
checksum_failure_in_middle_of_file_should_fail,
recover_with_partial_last_entry,
sys_get_status
].

Expand Down Expand Up @@ -721,6 +726,126 @@ recover_empty(Config) ->
meck:unload(),
ok.

recover_with_partial_last_entry(Config) ->
ok = logger:set_primary_config(level, all),
#{dir := Dir} = Conf0 = ?config(wal_conf, Config),
WriterId = ?config(writer_id, Config),
Conf = Conf0#{segment_writer => spawn(fun () -> ok end),
pre_allocate => false},
Data = crypto:strong_rand_bytes(1000),
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
{ok, _Wal} = ra_log_wal:start_link(Conf),
[ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data)
|| Idx <- lists:seq(1, 100)],
_ = await_written(WriterId, {1, 100, 1}),
empty_mailbox(),
ok = proc_lib:stop(ra_log_wal),

[WalFile] = filelib:wildcard(filename:join(Dir, "*.wal")),

%% overwrite last few bytes of the file with 0s
{ok, Fd} = file:open(WalFile, [raw, binary, read, write]),
{ok, _Pos} = file:position(Fd, {eof, -10}),
file:truncate(Fd),
file:close(Fd),

{ok, Pid} = ra_log_wal:start_link(Conf),
?assert(erlang:is_process_alive(Pid)),
ok = proc_lib:stop(ra_log_wal),
meck:unload(),
ok.

recover_with_last_entry_corruption(Config) ->
ok = logger:set_primary_config(level, all),
#{dir := Dir} = Conf0 = ?config(wal_conf, Config),
WriterId = ?config(writer_id, Config),
Conf = Conf0#{segment_writer => spawn(fun () -> ok end),
pre_allocate => false},
Data = crypto:strong_rand_bytes(1000),
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
{ok, _Wal} = ra_log_wal:start_link(Conf),
[ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data)
|| Idx <- lists:seq(1, 100)],
_ = await_written(WriterId, {1, 100, 1}),
empty_mailbox(),
ok = proc_lib:stop(ra_log_wal),

[WalFile] = filelib:wildcard(filename:join(Dir, "*.wal")),

%% overwrite last few bytes of the file with 0s
{ok, Fd} = file:open(WalFile, [raw, binary, read, write]),
{ok, _Pos} = file:position(Fd, {eof, -10}),
ok = file:write(Fd, <<0,0,0,0,0,0,0,0,0,0>>),
file:close(Fd),

{ok, Pid} = ra_log_wal:start_link(Conf),
?assert(erlang:is_process_alive(Pid)),
ok = proc_lib:stop(ra_log_wal),
meck:unload(),
ok.

recover_with_last_entry_corruption_pre_allocate(Config) ->
ok = logger:set_primary_config(level, all),
#{dir := Dir} = Conf0 = ?config(wal_conf, Config),
WriterId = ?config(writer_id, Config),
Conf = Conf0#{segment_writer => spawn(fun () -> ok end),
pre_allocate => true},
Data = crypto:strong_rand_bytes(1000),
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
{ok, _Wal} = ra_log_wal:start_link(Conf),
[ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data)
|| Idx <- lists:seq(1, 100)],
_ = await_written(WriterId, {1, 100, 1}),
empty_mailbox(),
ok = proc_lib:stop(ra_log_wal),

[WalFile] = filelib:wildcard(filename:join(Dir, "*.wal")),

%% overwrite last few bytes of the file with 0s
{ok, Fd} = file:open(WalFile, [raw, binary, read, write]),
%% TODO: if the internal WAL format changes this will be wrong
_ = file:position(Fd, 103331),
ok = file:write(Fd, <<0,0,0,0,0,0,0,0,0,0>>),
file:close(Fd),

{ok, Pid} = ra_log_wal:start_link(Conf),
?assert(erlang:is_process_alive(Pid)),
ok = proc_lib:stop(ra_log_wal),
meck:unload(),
ok.

checksum_failure_in_middle_of_file_should_fail(Config) ->
process_flag(trap_exit, true),
ok = logger:set_primary_config(level, all),
#{dir := Dir} = Conf0 = ?config(wal_conf, Config),
WriterId = ?config(writer_id, Config),
Conf = Conf0#{segment_writer => spawn(fun () -> ok end),
pre_allocate => false},
Data = crypto:strong_rand_bytes(1000),
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
{ok, _Wal} = ra_log_wal:start_link(Conf),
[ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data)
|| Idx <- lists:seq(1, 100)],
_ = await_written(WriterId, {1, 100, 1}),
empty_mailbox(),
ok = proc_lib:stop(ra_log_wal),

[WalFile] = filelib:wildcard(filename:join(Dir, "*.wal")),

%% overwrite last few bytes of the file with 0s
{ok, Fd} = file:open(WalFile, [raw, binary, read, write]),
{ok, _Pos} = file:position(Fd, 1000),
ok = file:write(Fd, <<0,0,0,0,0,0,0,0,0,0>>),
file:close(Fd),

{error, wal_checksum_validation_failure} = ra_log_wal:start_link(Conf),
empty_mailbox(),
ok.

empty_mailbox() ->
receive
_ ->
Expand Down

0 comments on commit 01334fb

Please sign in to comment.