Skip to content

Commit

Permalink
Merge pull request #487 from rabbitmq/opt-read-planning
Browse files Browse the repository at this point in the history
Optimise read planning
  • Loading branch information
kjnilsson authored Dec 12, 2024
2 parents 8960ad7 + 9eaf822 commit bb89b61
Show file tree
Hide file tree
Showing 8 changed files with 504 additions and 193 deletions.
24 changes: 15 additions & 9 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@
-define(WAL_RESEND_TIMEOUT, 5000).

-type ra_meta_key() :: atom().
-type segment_ref() :: {From :: ra_index(), To :: ra_index(),
File :: file:filename_all()}.
-type segment_ref() :: {ra_range:range(), File :: file:filename_all()}.
-type event_body() :: {written, ra_term(), ra:range()} |
{segments, [{ets:tid(), ra:range()}], [segment_ref()]} |
{resend_write, ra_index()} |
Expand Down Expand Up @@ -279,7 +278,7 @@ init(#{uid := UId,
LastSegRefIdx = case SegRefs of
[] ->
-1;
[{_, L, _} | _] ->
[{{_, L}, _} | _] ->
L
end,
LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of
Expand Down Expand Up @@ -969,8 +968,14 @@ should_snapshot(snapshot, Idx,
% We should take a snapshot if the new snapshot index would allow us
% to discard any segments or if the we've handled enough commands
% since the last snapshot.
CanFreeSegments = lists:any(fun({_, To, _}) -> To =< Idx end,
ra_log_reader:segment_refs(Reader)),
CanFreeSegments = case ra_log_reader:range(Reader) of
undefined ->
false;
{Start, _End} ->
%% this isn't 100% guaranteed to free a segment
%% but there is a good chance
Idx > Start
end,
CanFreeSegments orelse Idx > SnapLimit;
should_snapshot(checkpoint, Idx,
#?MODULE{cfg = #cfg{min_checkpoint_interval = CheckpointInter},
Expand Down Expand Up @@ -1029,7 +1034,8 @@ overview(#?MODULE{last_index = LastIndex,
last_term => LastTerm,
first_index => FirstIndex,
last_written_index_term => LWIT,
num_segments => length(ra_log_reader:segment_refs(Reader)),
num_segments => ra_log_reader:segment_ref_count(Reader),
segments_range => ra_log_reader:range(Reader),
open_segments => ra_log_reader:num_open_segments(Reader),
snapshot_index => case CurrSnap of
undefined -> undefined;
Expand Down Expand Up @@ -1166,9 +1172,9 @@ delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId,
ok = ra_log_segment_writer:truncate_segments(SegWriter,
UId, Pivot)
end),
Active = ra_log_reader:segment_refs(Reader),
NumActive = ra_log_reader:segment_ref_count(Reader),
?DEBUG("~ts: ~b obsolete segments at ~b - remaining: ~b, pivot ~0p",
[LogId, length(Obsolete), SnapIdx, length(Active), Pivot]),
[LogId, length(Obsolete), SnapIdx, NumActive, Pivot]),
State = State0#?MODULE{reader = Reader},
{State, log_update_effects(Readers, Pid, State)}
end.
Expand Down Expand Up @@ -1331,7 +1337,7 @@ recover_ranges(UId, MtRange, SegWriter) ->
[SegRef | Acc]
end
end, [], SegFiles),
SegRanges = [{F, L} || {F, L, _} <- SegRefs],
SegRanges = [Range || {Range, _} <- SegRefs],
Ranges = [MtRange | SegRanges],
{pick_range(Ranges, undefined), SegRefs}.

Expand Down
Loading

0 comments on commit bb89b61

Please sign in to comment.