Skip to content

Commit

Permalink
Merge pull request #11436 from rabbitmq/loic-remove-fhc
Browse files Browse the repository at this point in the history
Remove most of the fd related FHC code
  • Loading branch information
michaelklishin authored Jun 26, 2024
2 parents de866b8 + 2a64a0f commit 0838008
Show file tree
Hide file tree
Showing 39 changed files with 99 additions and 573 deletions.
8 changes: 0 additions & 8 deletions deps/amqp_client/src/amqp_network_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ do_connect({Addr, Family},
connection_timeout = Timeout,
socket_options = ExtraOpts},
SIF, State) ->
ok = obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
Expand All @@ -134,7 +133,6 @@ do_connect({Addr, Family},
SIF, State) ->
{ok, GlobalSslOpts} = application:get_env(amqp_client, ssl_options),
app_utils:start_applications([asn1, crypto, public_key, ssl]),
ok = obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
Expand Down Expand Up @@ -379,11 +377,5 @@ handshake_recv(Expecting) ->
end
end.

obtain() ->
case code:is_loaded(file_handle_cache) of
false -> ok;
_ -> file_handle_cache:obtain()
end.

get_reason(#'connection.close'{reply_code = ErrCode}) ->
?PROTOCOL:amqp_exception(ErrCode).
15 changes: 13 additions & 2 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1656,8 +1656,9 @@ config_files() ->
start_fhc() ->
ok = rabbit_sup:start_restartable_child(
file_handle_cache,
[fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]),
ensure_working_fhc().
[fun(_) -> ok end, fun(_) -> ok end]),
ensure_working_fhc(),
maybe_warn_low_fd_limit().

ensure_working_fhc() ->
%% To test the file handle cache, we simply read a file we know it
Expand Down Expand Up @@ -1697,6 +1698,16 @@ ensure_working_fhc() ->
throw({ensure_working_fhc, {timeout, TestPid}})
end.

maybe_warn_low_fd_limit() ->
case file_handle_cache:ulimit() of
%% unknown is included as atom() > integer().
L when L > 1024 ->
ok;
L ->
rabbit_log:warning("Available file handles: ~tp. "
"Please consider increasing system limits", [L])
end.

%% Any configuration that
%% 1. is not allowed to change while RabbitMQ is running, and
%% 2. is read often
Expand Down
27 changes: 0 additions & 27 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

-module(rabbit_amqqueue).

-export([warn_file_limit/0]).
-export([recover/1, stop/1, start/1, declare/6, declare/7,
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
Expand Down Expand Up @@ -74,7 +73,6 @@

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_maximum_since_use/2,
emit_consumers_local/3, internal_delete/3]).

-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down Expand Up @@ -119,21 +117,6 @@
active, activity_status, arguments]).
-define(KILL_QUEUE_DELAY_INTERVAL, 100).

warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
L = length(DurableQueues),

%% if there are not enough file handles, the server might hang
%% when trying to recover queues, warn the user:
case file_handle_cache:get_limit() < L of
true ->
rabbit_log:warning(
"Recovering ~tp queues, available file handles: ~tp. Please increase max open file handles limit to at least ~tp!",
[L, file_handle_cache:get_limit(), L]);
false ->
ok
end.

-spec recover(rabbit_types:vhost()) ->
{Recovered :: [amqqueue:amqqueue()],
Failed :: [amqqueue:amqqueue()]}.
Expand Down Expand Up @@ -183,11 +166,6 @@ find_local_durable_queues(VHostName) ->
rabbit_queue_type:is_recoverable(Q)
end).

find_recoverable_queues() ->
rabbit_db_queue:filter_all_durable(fun(Q) ->
rabbit_queue_type:is_recoverable(Q)
end).

-spec declare(name(),
boolean(),
boolean(),
Expand Down Expand Up @@ -1840,11 +1818,6 @@ forget_node_for_queue(Q) ->
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).

-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.

set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).

-spec is_replicated(amqqueue:amqqueue()) -> boolean().

is_replicated(Q) when ?amqqueue_is_classic(Q) ->
Expand Down
7 changes: 0 additions & 7 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@ init_it2(Recover, From, State = #q{q = Q,
(Res == created orelse Res == existing) ->
case matches(Recover, Q, Q1) of
true ->
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
BQ = backing_queue_module(),
BQS = bq_init(BQ, Q, TermsOrNew),
send_reply(From, {new, Q}),
Expand Down Expand Up @@ -1187,7 +1185,6 @@ prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{delete_exclusive, _Pid} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{ack, _AckTags, _ChPid} -> 4; %% [1]
{resume, _ChPid} -> 3;
Expand Down Expand Up @@ -1497,10 +1494,6 @@ handle_cast({deactivate_limit, ChPid}, State) ->
noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(),
ChPid, State));

handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);

handle_cast({credit, SessionPid, CTag, Credit, Drain},
#q{q = Q,
backing_queue = BQ,
Expand Down
27 changes: 2 additions & 25 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,6 @@
-define(HEADER_SIZE, 64). %% bytes
-define(ENTRY_SIZE, 32). %% bytes

%% The file_handle_cache module tracks reservations at
%% the level of the process. This means we cannot
%% handle them independently in the store and index.
%% Because the index may reserve more FDs than the
%% store the index becomes responsible for this and
%% will always reserve at least 2 FDs, and release
%% everything when terminating.
-define(STORE_FD_RESERVATIONS, 2).

-include_lib("rabbit_common/include/rabbit.hrl").
%% Set to true to get an awful lot of debug logs.
-if(false).
Expand Down Expand Up @@ -538,7 +529,6 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
ok = file:sync(Fd),
ok = file:close(Fd)
end, OpenFds),
file_handle_cache:release_reservation(),
%% Write recovery terms for faster recovery.
_ = rabbit_recovery_terms:store(VHost,
filename:basename(rabbit_file:binary_to_filename(Dir)),
Expand All @@ -555,7 +545,6 @@ delete_and_terminate(State = #qi { dir = Dir,
_ = maps:map(fun(_, Fd) ->
ok = file:close(Fd)
end, OpenFds),
file_handle_cache:release_reservation(),
%% Erase the data on disk.
ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)),
State#qi{ segments = #{},
Expand Down Expand Up @@ -626,18 +615,9 @@ new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments })
%% using too many FDs when the consumer lags a lot. We
%% limit at 4 because we try to keep up to 2 for reading
%% and 2 for writing.
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds })
reduce_fd_usage(_SegmentToOpen, State = #qi{ fds = OpenFds })
when map_size(OpenFds) < 4 ->
%% The only case where we need to update reservations is
%% when we are opening a segment that wasn't already open,
%% and we are not closing another segment at the same time.
case OpenFds of
#{SegmentToOpen := _} ->
State;
_ ->
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds) + 1),
State
end;
State;
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) ->
case OpenFds0 of
#{SegmentToOpen := _} ->
Expand Down Expand Up @@ -719,7 +699,6 @@ flush_buffer(State0 = #qi { write_buffer = WriteBuffer0,
{Fd, FoldState} = get_fd_for_segment(Segment, FoldState1),
LocBytes = flush_buffer_consolidate(lists:sort(LocBytes0), 1),
ok = file:pwrite(Fd, LocBytes),
file_handle_cache_stats:update(queue_index_write),
FoldState
end, State0, Writes),
%% Update the cache. If we are flushing the entire write buffer,
Expand Down Expand Up @@ -868,7 +847,6 @@ delete_segment(Segment, State0 = #qi{ fds = OpenFds0 }) ->
State = case maps:take(Segment, OpenFds0) of
{Fd, OpenFds} ->
ok = file:close(Fd),
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds)),
State0#qi{ fds = OpenFds };
error ->
State0
Expand Down Expand Up @@ -982,7 +960,6 @@ read_from_disk(SeqIdsToRead0, State0 = #qi{ write_buffer = WriteBuffer }, Acc0)
ReadSize = (LastSeqId - FirstSeqId + 1) * ?ENTRY_SIZE,
case get_fd(FirstSeqId, State0) of
{Fd, OffsetForSeqId, State} ->
file_handle_cache_stats:update(queue_index_read),
%% When reading further than the end of a partial file,
%% file:pread/3 will return what it could read.
case file:pread(Fd, OffsetForSeqId, ReadSize) of
Expand Down
5 changes: 0 additions & 5 deletions deps/rabbit/src/rabbit_classic_queue_store_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@
%% need to look into the store to discard them. Messages on disk
%% will be dropped at the same time as the index deletes the
%% corresponding segment file.
%%
%% The file_handle_cache reservations are done by the v2 index
%% because they are handled at a pid level. Since we are using
%% up to 2 FDs in this module we make the index reserve 2 extra
%% FDs.

-module(rabbit_classic_queue_store_v2).

Expand Down
11 changes: 2 additions & 9 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -894,10 +894,8 @@ state_enter0(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue,
file_handle_leader_reservation, [Resource]}],
NotifyDecs = notify_decorators_startup(Resource),
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs],
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs],
case BLH of
undefined ->
Effects;
Expand All @@ -914,12 +912,7 @@ state_enter0(eol, #?MODULE{enqueuers = Enqs,
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []} | Effects];
state_enter0(State, #?MODULE{cfg = #cfg{resource = _Resource}}, Effects)
when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation | Effects];
[{aux, eol} | Effects];
state_enter0(_, _, Effects) ->
%% catch all as not handling all states
Effects.
Expand Down
10 changes: 2 additions & 8 deletions deps/rabbit/src/rabbit_fifo_v0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {0, [], 0, []}
}) ->
Expand All @@ -559,8 +558,7 @@ state_enter(leader, #?STATE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
Expand All @@ -575,11 +573,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
#{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))];
state_enter(_, _) ->
%% catch all as not handling all states
[].
Expand Down
10 changes: 2 additions & 8 deletions deps/rabbit/src/rabbit_fifo_v1.erl
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,6 @@ state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {0, [], 0, []}
}) ->
Expand All @@ -687,8 +686,7 @@ state_enter(leader, #?STATE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
Expand All @@ -704,11 +702,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
[{aux, eol}];
state_enter(_, _) ->
%% catch all as not handling all states
[].
Expand Down
Loading

0 comments on commit 0838008

Please sign in to comment.