Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove most of the fd related FHC code #11436

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions deps/amqp_client/src/amqp_network_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ do_connect({Addr, Family},
connection_timeout = Timeout,
socket_options = ExtraOpts},
SIF, State) ->
ok = obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
Expand All @@ -134,7 +133,6 @@ do_connect({Addr, Family},
SIF, State) ->
{ok, GlobalSslOpts} = application:get_env(amqp_client, ssl_options),
app_utils:start_applications([asn1, crypto, public_key, ssl]),
ok = obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
Expand Down Expand Up @@ -379,11 +377,5 @@ handshake_recv(Expecting) ->
end
end.

obtain() ->
case code:is_loaded(file_handle_cache) of
false -> ok;
_ -> file_handle_cache:obtain()
end.

get_reason(#'connection.close'{reply_code = ErrCode}) ->
?PROTOCOL:amqp_exception(ErrCode).
15 changes: 13 additions & 2 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1656,8 +1656,9 @@ config_files() ->
start_fhc() ->
ok = rabbit_sup:start_restartable_child(
file_handle_cache,
[fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]),
ensure_working_fhc().
[fun(_) -> ok end, fun(_) -> ok end]),
ensure_working_fhc(),
maybe_warn_low_fd_limit().

ensure_working_fhc() ->
%% To test the file handle cache, we simply read a file we know it
Expand Down Expand Up @@ -1697,6 +1698,16 @@ ensure_working_fhc() ->
throw({ensure_working_fhc, {timeout, TestPid}})
end.

maybe_warn_low_fd_limit() ->
case file_handle_cache:ulimit() of
%% unknown is included as atom() > integer().
L when L > 1024 ->
ok;
L ->
rabbit_log:warning("Available file handles: ~tp. "
"Please consider increasing system limits", [L])
end.

%% Any configuration that
%% 1. is not allowed to change while RabbitMQ is running, and
%% 2. is read often
Expand Down
27 changes: 0 additions & 27 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

-module(rabbit_amqqueue).

-export([warn_file_limit/0]).
-export([recover/1, stop/1, start/1, declare/6, declare/7,
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
Expand Down Expand Up @@ -74,7 +73,6 @@

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_maximum_since_use/2,
emit_consumers_local/3, internal_delete/3]).

-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down Expand Up @@ -119,21 +117,6 @@
active, activity_status, arguments]).
-define(KILL_QUEUE_DELAY_INTERVAL, 100).

warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
L = length(DurableQueues),

%% if there are not enough file handles, the server might hang
%% when trying to recover queues, warn the user:
case file_handle_cache:get_limit() < L of
true ->
rabbit_log:warning(
"Recovering ~tp queues, available file handles: ~tp. Please increase max open file handles limit to at least ~tp!",
lhoguin marked this conversation as resolved.
Show resolved Hide resolved
[L, file_handle_cache:get_limit(), L]);
false ->
ok
end.

-spec recover(rabbit_types:vhost()) ->
{Recovered :: [amqqueue:amqqueue()],
Failed :: [amqqueue:amqqueue()]}.
Expand Down Expand Up @@ -183,11 +166,6 @@ find_local_durable_queues(VHostName) ->
rabbit_queue_type:is_recoverable(Q)
end).

find_recoverable_queues() ->
rabbit_db_queue:filter_all_durable(fun(Q) ->
rabbit_queue_type:is_recoverable(Q)
end).

-spec declare(name(),
boolean(),
boolean(),
Expand Down Expand Up @@ -1840,11 +1818,6 @@ forget_node_for_queue(Q) ->
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).

-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.

set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).

-spec is_replicated(amqqueue:amqqueue()) -> boolean().

is_replicated(Q) when ?amqqueue_is_classic(Q) ->
Expand Down
7 changes: 0 additions & 7 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@ init_it2(Recover, From, State = #q{q = Q,
(Res == created orelse Res == existing) ->
case matches(Recover, Q, Q1) of
true ->
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
BQ = backing_queue_module(),
BQS = bq_init(BQ, Q, TermsOrNew),
send_reply(From, {new, Q}),
Expand Down Expand Up @@ -1189,7 +1187,6 @@ prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{delete_exclusive, _Pid} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{ack, _AckTags, _ChPid} -> 4; %% [1]
{resume, _ChPid} -> 3;
Expand Down Expand Up @@ -1499,10 +1496,6 @@ handle_cast({deactivate_limit, ChPid}, State) ->
noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(),
ChPid, State));

handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);

handle_cast({credit, SessionPid, CTag, Credit, Drain},
#q{q = Q,
backing_queue = BQ,
Expand Down
27 changes: 2 additions & 25 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,6 @@
-define(HEADER_SIZE, 64). %% bytes
-define(ENTRY_SIZE, 32). %% bytes

%% The file_handle_cache module tracks reservations at
%% the level of the process. This means we cannot
%% handle them independently in the store and index.
%% Because the index may reserve more FDs than the
%% store the index becomes responsible for this and
%% will always reserve at least 2 FDs, and release
%% everything when terminating.
-define(STORE_FD_RESERVATIONS, 2).

-include_lib("rabbit_common/include/rabbit.hrl").
%% Set to true to get an awful lot of debug logs.
-if(false).
Expand Down Expand Up @@ -538,7 +529,6 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
ok = file:sync(Fd),
ok = file:close(Fd)
end, OpenFds),
file_handle_cache:release_reservation(),
%% Write recovery terms for faster recovery.
_ = rabbit_recovery_terms:store(VHost,
filename:basename(rabbit_file:binary_to_filename(Dir)),
Expand All @@ -555,7 +545,6 @@ delete_and_terminate(State = #qi { dir = Dir,
_ = maps:map(fun(_, Fd) ->
ok = file:close(Fd)
end, OpenFds),
file_handle_cache:release_reservation(),
%% Erase the data on disk.
ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)),
State#qi{ segments = #{},
Expand Down Expand Up @@ -626,18 +615,9 @@ new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments })
%% using too many FDs when the consumer lags a lot. We
%% limit at 4 because we try to keep up to 2 for reading
%% and 2 for writing.
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds })
reduce_fd_usage(_SegmentToOpen, State = #qi{ fds = OpenFds })
when map_size(OpenFds) < 4 ->
%% The only case where we need to update reservations is
%% when we are opening a segment that wasn't already open,
%% and we are not closing another segment at the same time.
case OpenFds of
#{SegmentToOpen := _} ->
State;
_ ->
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds) + 1),
State
end;
State;
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) ->
case OpenFds0 of
#{SegmentToOpen := _} ->
Expand Down Expand Up @@ -719,7 +699,6 @@ flush_buffer(State0 = #qi { write_buffer = WriteBuffer0,
{Fd, FoldState} = get_fd_for_segment(Segment, FoldState1),
LocBytes = flush_buffer_consolidate(lists:sort(LocBytes0), 1),
ok = file:pwrite(Fd, LocBytes),
file_handle_cache_stats:update(queue_index_write),
FoldState
end, State0, Writes),
%% Update the cache. If we are flushing the entire write buffer,
Expand Down Expand Up @@ -868,7 +847,6 @@ delete_segment(Segment, State0 = #qi{ fds = OpenFds0 }) ->
State = case maps:take(Segment, OpenFds0) of
{Fd, OpenFds} ->
ok = file:close(Fd),
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds)),
State0#qi{ fds = OpenFds };
error ->
State0
Expand Down Expand Up @@ -982,7 +960,6 @@ read_from_disk(SeqIdsToRead0, State0 = #qi{ write_buffer = WriteBuffer }, Acc0)
ReadSize = (LastSeqId - FirstSeqId + 1) * ?ENTRY_SIZE,
case get_fd(FirstSeqId, State0) of
{Fd, OffsetForSeqId, State} ->
file_handle_cache_stats:update(queue_index_read),
%% When reading further than the end of a partial file,
%% file:pread/3 will return what it could read.
case file:pread(Fd, OffsetForSeqId, ReadSize) of
Expand Down
5 changes: 0 additions & 5 deletions deps/rabbit/src/rabbit_classic_queue_store_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@
%% need to look into the store to discard them. Messages on disk
%% will be dropped at the same time as the index deletes the
%% corresponding segment file.
%%
%% The file_handle_cache reservations are done by the v2 index
%% because they are handled at a pid level. Since we are using
%% up to 2 FDs in this module we make the index reserve 2 extra
%% FDs.

-module(rabbit_classic_queue_store_v2).

Expand Down
11 changes: 2 additions & 9 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -894,10 +894,8 @@ state_enter0(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue,
file_handle_leader_reservation, [Resource]}],
NotifyDecs = notify_decorators_startup(Resource),
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs],
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs],
case BLH of
undefined ->
Effects;
Expand All @@ -914,12 +912,7 @@ state_enter0(eol, #?MODULE{enqueuers = Enqs,
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []} | Effects];
state_enter0(State, #?MODULE{cfg = #cfg{resource = _Resource}}, Effects)
when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation | Effects];
[{aux, eol} | Effects];
state_enter0(_, _, Effects) ->
%% catch all as not handling all states
Effects.
Expand Down
10 changes: 2 additions & 8 deletions deps/rabbit/src/rabbit_fifo_v0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {0, [], 0, []}
}) ->
Expand All @@ -559,8 +558,7 @@ state_enter(leader, #?STATE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
Expand All @@ -575,11 +573,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
#{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))];
state_enter(_, _) ->
%% catch all as not handling all states
[].
Expand Down
10 changes: 2 additions & 8 deletions deps/rabbit/src/rabbit_fifo_v1.erl
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,6 @@ state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {0, [], 0, []}
}) ->
Expand All @@ -687,8 +686,7 @@ state_enter(leader, #?STATE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
Expand All @@ -704,11 +702,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
[{aux, eol}];
state_enter(_, _) ->
%% catch all as not handling all states
[].
Expand Down
Loading
Loading