diff --git a/Makefile b/Makefile index ae254c6d..ae674ba4 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ PROJECT = ra ESCRIPT_NAME = ra_fifo_cli ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli -dep_gen_batch_server = hex 0.8.7 +dep_gen_batch_server = hex 0.8.8 dep_aten = hex 0.5.8 DEPS = aten gen_batch_server diff --git a/WORKSPACE.bazel b/WORKSPACE.bazel index df10cbd3..86505a29 100644 --- a/WORKSPACE.bazel +++ b/WORKSPACE.bazel @@ -21,8 +21,7 @@ hex_pm_erlang_app( hex_pm_erlang_app( name = "gen_batch_server", - sha256 = "94a49a528486298b009d2a1b452132c0a0d68b3e89d17d3764cb1ec879b7557a", - version = "0.8.7", + version = "0.8.8", ) http_archive( diff --git a/rebar.config b/rebar.config index ab0a3370..590445be 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,5 @@ {deps, [ - {gen_batch_server, "0.8.7"}, + {gen_batch_server, "0.8.8"}, {aten, "0.5.8"} ]}. diff --git a/rebar.lock b/rebar.lock index 57169413..e291ca20 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,11 +1,11 @@ {"1.2.0", [{<<"aten">>,{pkg,<<"aten">>,<<"0.5.8">>},0}, - {<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.7">>},0}]}. + {<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.8">>},0}]}. [ {pkg_hash,[ {<<"aten">>, <<"B5C97F48517C4F37F26A519AA57A00A31FF1B8EA4324EC1CAE27F818ED5C0DB2">>}, - {<<"gen_batch_server">>, <<"1D91A3605D3110EC791F00D5968E86E62FD4C6979E760796A51D50A7ACC7A40D">>}]}, + {<<"gen_batch_server">>, <<"7840A1FA63EE1EFFC83E8A91D22664847A2BA1192D30EAFFFD914ACB51578068">>}]}, {pkg_hash_ext,[ {<<"aten">>, <<"64D40A8CF0DDFEA4E13AF00B7327F0925147F83612D0627D9506CBFFE90C13EF">>}, - {<<"gen_batch_server">>, <<"94A49A528486298B009D2A1B452132C0A0D68B3E89D17D3764CB1EC879B7557A">>}]} + {<<"gen_batch_server">>, <<"C3E6A1A2A0FB62AEE631A98CFA0FD8903E9562422CBF72043953E2FB1D203017">>}]} ]. diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index 3b5f1dec..eeb697bb 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -205,7 +205,9 @@ force_roll_over(Wal) -> %% corresponding .wal file. -spec start_link(Config :: wal_conf()) -> - {ok, pid()} | {error, {already_started, pid()}}. + {ok, pid()} | + {error, {already_started, pid()}} | + {error, wal_checksum_validation_failure}. start_link(#{name := Name} = Config) when is_atom(Name) -> WalMaxBatchSize = maps:get(max_batch_size, Config, @@ -818,8 +820,8 @@ recover_wal_chunks(Conf, Fd, RecoveryChunkSize) -> recover_records(Conf, Fd, Chunk, #{}, RecoveryChunkSize). % All zeros indicates end of a pre-allocated wal file recover_records(_, _Fd, <<0:1/unsigned, 0:1/unsigned, 0:22/unsigned, - IdDataLen:16/unsigned, _:IdDataLen/binary, - 0:32/integer, 0:32/unsigned, _/binary>>, + IdDataLen:16/unsigned, _:IdDataLen/binary, + 0:32/integer, 0:32/unsigned, _/binary>>, _Cache, _ChunkSize) -> ok; % First record or different UID to last record @@ -832,9 +834,20 @@ recover_records(Conf, Fd, EntryData:EntryDataLen/binary, Rest/binary>>, Cache, RecoveryChunkSize) -> - true = validate_and_update(Conf, UId, Checksum, Idx, Term, EntryData, Trunc), - Cache0 = Cache#{IdRef => {UId, <<1:1/unsigned, IdRef:22/unsigned>>}}, - recover_records(Conf, Fd, Rest, Cache0, RecoveryChunkSize); + + case validate_checksum(Checksum, Idx, Term, EntryData) of + ok -> + true = update_mem_table(Conf, UId, Idx, Term, + binary_to_term(EntryData), Trunc =:= 1), + Cache0 = Cache#{IdRef => {UId, <<1:1/unsigned, IdRef:22/unsigned>>}}, + recover_records(Conf, Fd, Rest, Cache0, RecoveryChunkSize); + error -> + ?DEBUG("WAL: record failed CRC check. If this is the last record" + " recovery can resume", []), + %% if this is the last entry in the wal we can just drop the + %% record; + is_last_record(Fd, Rest) + end; % Same UID as last record recover_records(Conf, Fd, <>, Cache, RecoveryChunkSize) -> #{IdRef := {UId, _}} = Cache, - true = validate_and_update(Conf, UId, Checksum, Idx, Term, EntryData, Trunc), - recover_records(Conf, Fd, Rest, Cache, RecoveryChunkSize); + case validate_checksum(Checksum, Idx, Term, EntryData) of + ok -> + true = update_mem_table(Conf, UId, Idx, Term, + binary_to_term(EntryData), Trunc =:= 1), + recover_records(Conf, Fd, Rest, Cache, RecoveryChunkSize); + error -> + ?DEBUG("WAL: record failed CRC check. If this is the last record" + " recovery can resume", []), + %% if this is the last entry in the wal we can just drop the + %% record; + is_last_record(Fd, Rest) + end; % Not enough remainder to parse another record, need to read recover_records(Conf, Fd, Chunk, Cache, RecoveryChunkSize) -> NextChunk = read_from_wal_file(Fd, RecoveryChunkSize), @@ -860,6 +883,24 @@ recover_records(Conf, Fd, Chunk, Cache, RecoveryChunkSize) -> recover_records(Conf, Fd, Chunk0, Cache, RecoveryChunkSize) end. +is_last_record(_Fd, <<0:104, _/binary>>) -> + ok; +is_last_record(Fd, Rest) -> + case byte_size(Rest) < 13 of + true -> + case read_from_wal_file(Fd, 256) of + <<>> -> + ok; + Next -> + is_last_record(Fd, <>) + end; + false -> + ?ERROR("WAL: record failed CRC check during recovery. " + "Unable to recover WAL data safely", []), + throw({stop, wal_checksum_validation_failure}) + + end. + read_from_wal_file(Fd, Len) -> case file:read(Fd, Len) of {ok, <>} -> @@ -870,11 +911,6 @@ read_from_wal_file(Fd, Len) -> exit({could_not_read_wal_chunk, Reason}) end. -validate_and_update(Conf, UId, Checksum, Idx, Term, EntryData, Trunc) -> - validate_checksum(Checksum, Idx, Term, EntryData), - true = update_mem_table(Conf, UId, Idx, Term, - binary_to_term(EntryData), Trunc =:= 1). - validate_checksum(0, _, _, _) -> % checksum not used ok; @@ -885,7 +921,7 @@ validate_checksum(Checksum, Idx, Term, Data) -> Checksum -> ok; _ -> - exit(wal_checksum_validation_failure) + error end. merge_conf_defaults(Conf) -> diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index 47cd7f23..59f497dc 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -5,6 +5,7 @@ %% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. %% -module(ra_log_wal_SUITE). +-compile(nowarn_export_all). -compile(export_all). -include_lib("common_test/include/ct.hrl"). @@ -41,6 +42,10 @@ all_tests() -> recover_empty, recover_after_roll_over, recover_truncated_write, + recover_with_last_entry_corruption, + recover_with_last_entry_corruption_pre_allocate, + checksum_failure_in_middle_of_file_should_fail, + recover_with_partial_last_entry, sys_get_status ]. @@ -721,6 +726,126 @@ recover_empty(Config) -> meck:unload(), ok. +recover_with_partial_last_entry(Config) -> + ok = logger:set_primary_config(level, all), + #{dir := Dir} = Conf0 = ?config(wal_conf, Config), + WriterId = ?config(writer_id, Config), + Conf = Conf0#{segment_writer => spawn(fun () -> ok end), + pre_allocate => false}, + Data = crypto:strong_rand_bytes(1000), + meck:new(ra_log_segment_writer, [passthrough]), + meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), + {ok, _Wal} = ra_log_wal:start_link(Conf), + [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + || Idx <- lists:seq(1, 100)], + _ = await_written(WriterId, {1, 100, 1}), + empty_mailbox(), + ok = proc_lib:stop(ra_log_wal), + + [WalFile] = filelib:wildcard(filename:join(Dir, "*.wal")), + + %% overwrite last few bytes of the file with 0s + {ok, Fd} = file:open(WalFile, [raw, binary, read, write]), + {ok, _Pos} = file:position(Fd, {eof, -10}), + file:truncate(Fd), + file:close(Fd), + + {ok, Pid} = ra_log_wal:start_link(Conf), + ?assert(erlang:is_process_alive(Pid)), + ok = proc_lib:stop(ra_log_wal), + meck:unload(), + ok. + +recover_with_last_entry_corruption(Config) -> + ok = logger:set_primary_config(level, all), + #{dir := Dir} = Conf0 = ?config(wal_conf, Config), + WriterId = ?config(writer_id, Config), + Conf = Conf0#{segment_writer => spawn(fun () -> ok end), + pre_allocate => false}, + Data = crypto:strong_rand_bytes(1000), + meck:new(ra_log_segment_writer, [passthrough]), + meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), + {ok, _Wal} = ra_log_wal:start_link(Conf), + [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + || Idx <- lists:seq(1, 100)], + _ = await_written(WriterId, {1, 100, 1}), + empty_mailbox(), + ok = proc_lib:stop(ra_log_wal), + + [WalFile] = filelib:wildcard(filename:join(Dir, "*.wal")), + + %% overwrite last few bytes of the file with 0s + {ok, Fd} = file:open(WalFile, [raw, binary, read, write]), + {ok, _Pos} = file:position(Fd, {eof, -10}), + ok = file:write(Fd, <<0,0,0,0,0,0,0,0,0,0>>), + file:close(Fd), + + {ok, Pid} = ra_log_wal:start_link(Conf), + ?assert(erlang:is_process_alive(Pid)), + ok = proc_lib:stop(ra_log_wal), + meck:unload(), + ok. + +recover_with_last_entry_corruption_pre_allocate(Config) -> + ok = logger:set_primary_config(level, all), + #{dir := Dir} = Conf0 = ?config(wal_conf, Config), + WriterId = ?config(writer_id, Config), + Conf = Conf0#{segment_writer => spawn(fun () -> ok end), + pre_allocate => true}, + Data = crypto:strong_rand_bytes(1000), + meck:new(ra_log_segment_writer, [passthrough]), + meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), + {ok, _Wal} = ra_log_wal:start_link(Conf), + [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + || Idx <- lists:seq(1, 100)], + _ = await_written(WriterId, {1, 100, 1}), + empty_mailbox(), + ok = proc_lib:stop(ra_log_wal), + + [WalFile] = filelib:wildcard(filename:join(Dir, "*.wal")), + + %% overwrite last few bytes of the file with 0s + {ok, Fd} = file:open(WalFile, [raw, binary, read, write]), + %% TODO: if the internal WAL format changes this will be wrong + _ = file:position(Fd, 103331), + ok = file:write(Fd, <<0,0,0,0,0,0,0,0,0,0>>), + file:close(Fd), + + {ok, Pid} = ra_log_wal:start_link(Conf), + ?assert(erlang:is_process_alive(Pid)), + ok = proc_lib:stop(ra_log_wal), + meck:unload(), + ok. + +checksum_failure_in_middle_of_file_should_fail(Config) -> + process_flag(trap_exit, true), + ok = logger:set_primary_config(level, all), + #{dir := Dir} = Conf0 = ?config(wal_conf, Config), + WriterId = ?config(writer_id, Config), + Conf = Conf0#{segment_writer => spawn(fun () -> ok end), + pre_allocate => false}, + Data = crypto:strong_rand_bytes(1000), + meck:new(ra_log_segment_writer, [passthrough]), + meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), + {ok, _Wal} = ra_log_wal:start_link(Conf), + [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + || Idx <- lists:seq(1, 100)], + _ = await_written(WriterId, {1, 100, 1}), + empty_mailbox(), + ok = proc_lib:stop(ra_log_wal), + + [WalFile] = filelib:wildcard(filename:join(Dir, "*.wal")), + + %% overwrite last few bytes of the file with 0s + {ok, Fd} = file:open(WalFile, [raw, binary, read, write]), + {ok, _Pos} = file:position(Fd, 1000), + ok = file:write(Fd, <<0,0,0,0,0,0,0,0,0,0>>), + file:close(Fd), + + {error, wal_checksum_validation_failure} = ra_log_wal:start_link(Conf), + empty_mailbox(), + ok. + empty_mailbox() -> receive _ ->