diff --git a/deps/amqp_client/src/amqp_network_connection.erl b/deps/amqp_client/src/amqp_network_connection.erl index 3ce107545e8f..a5ef739ea0f3 100644 --- a/deps/amqp_client/src/amqp_network_connection.erl +++ b/deps/amqp_client/src/amqp_network_connection.erl @@ -118,7 +118,6 @@ do_connect({Addr, Family}, connection_timeout = Timeout, socket_options = ExtraOpts}, SIF, State) -> - ok = obtain(), case gen_tcp:connect(Addr, Port, [Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts, Timeout) of @@ -134,7 +133,6 @@ do_connect({Addr, Family}, SIF, State) -> {ok, GlobalSslOpts} = application:get_env(amqp_client, ssl_options), app_utils:start_applications([asn1, crypto, public_key, ssl]), - ok = obtain(), case gen_tcp:connect(Addr, Port, [Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts, Timeout) of @@ -379,11 +377,5 @@ handshake_recv(Expecting) -> end end. -obtain() -> - case code:is_loaded(file_handle_cache) of - false -> ok; - _ -> file_handle_cache:obtain() - end. - get_reason(#'connection.close'{reply_code = ErrCode}) -> ?PROTOCOL:amqp_exception(ErrCode). diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 1d98ee52ef30..0f201f3b4cfa 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1656,8 +1656,9 @@ config_files() -> start_fhc() -> ok = rabbit_sup:start_restartable_child( file_handle_cache, - [fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]), - ensure_working_fhc(). + [fun(_) -> ok end, fun(_) -> ok end]), + ensure_working_fhc(), + maybe_warn_low_fd_limit(). ensure_working_fhc() -> %% To test the file handle cache, we simply read a file we know it @@ -1697,6 +1698,16 @@ ensure_working_fhc() -> throw({ensure_working_fhc, {timeout, TestPid}}) end. +maybe_warn_low_fd_limit() -> + case file_handle_cache:ulimit() of + %% unknown is included as atom() > integer(). + L when L > 1024 -> + ok; + L -> + rabbit_log:warning("Available file handles: ~tp. " + "Please consider increasing system limits", [L]) + end. + %% Any configuration that %% 1. is not allowed to change while RabbitMQ is running, and %% 2. is read often diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index a477b0ccfd68..5b76ba41685d 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -7,7 +7,6 @@ -module(rabbit_amqqueue). --export([warn_file_limit/0]). -export([recover/1, stop/1, start/1, declare/6, declare/7, delete_immediately/1, delete_exclusive/2, delete/4, purge/1, forget_all_durable/1]). @@ -74,7 +73,6 @@ %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, - set_maximum_since_use/2, emit_consumers_local/3, internal_delete/3]). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -119,21 +117,6 @@ active, activity_status, arguments]). -define(KILL_QUEUE_DELAY_INTERVAL, 100). -warn_file_limit() -> - DurableQueues = find_recoverable_queues(), - L = length(DurableQueues), - - %% if there are not enough file handles, the server might hang - %% when trying to recover queues, warn the user: - case file_handle_cache:get_limit() < L of - true -> - rabbit_log:warning( - "Recovering ~tp queues, available file handles: ~tp. Please increase max open file handles limit to at least ~tp!", - [L, file_handle_cache:get_limit(), L]); - false -> - ok - end. - -spec recover(rabbit_types:vhost()) -> {Recovered :: [amqqueue:amqqueue()], Failed :: [amqqueue:amqqueue()]}. @@ -183,11 +166,6 @@ find_local_durable_queues(VHostName) -> rabbit_queue_type:is_recoverable(Q) end). -find_recoverable_queues() -> - rabbit_db_queue:filter_all_durable(fun(Q) -> - rabbit_queue_type:is_recoverable(Q) - end). - -spec declare(name(), boolean(), boolean(), @@ -1840,11 +1818,6 @@ forget_node_for_queue(Q) -> run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). --spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. - -set_maximum_since_use(QPid, Age) -> - gen_server2:cast(QPid, {set_maximum_since_use, Age}). - -spec is_replicated(amqqueue:amqqueue()) -> boolean(). is_replicated(Q) when ?amqqueue_is_classic(Q) -> diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index a9ecefb3f9f4..a9ab603c7398 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -210,8 +210,6 @@ init_it2(Recover, From, State = #q{q = Q, (Res == created orelse Res == existing) -> case matches(Recover, Q, Q1) of true -> - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), BQ = backing_queue_module(), BQS = bq_init(BQ, Q, TermsOrNew), send_reply(From, {new, Q}), @@ -1187,7 +1185,6 @@ prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {delete_exclusive, _Pid} -> 8; - {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; {ack, _AckTags, _ChPid} -> 4; %% [1] {resume, _ChPid} -> 3; @@ -1497,10 +1494,6 @@ handle_cast({deactivate_limit, ChPid}, State) -> noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(), ChPid, State)); -handle_cast({set_maximum_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - handle_cast({credit, SessionPid, CTag, Credit, Drain}, #q{q = Q, backing_queue = BQ, diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 14aaf3d4d09a..32111ca9651f 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -41,15 +41,6 @@ -define(HEADER_SIZE, 64). %% bytes -define(ENTRY_SIZE, 32). %% bytes -%% The file_handle_cache module tracks reservations at -%% the level of the process. This means we cannot -%% handle them independently in the store and index. -%% Because the index may reserve more FDs than the -%% store the index becomes responsible for this and -%% will always reserve at least 2 FDs, and release -%% everything when terminating. --define(STORE_FD_RESERVATIONS, 2). - -include_lib("rabbit_common/include/rabbit.hrl"). %% Set to true to get an awful lot of debug logs. -if(false). @@ -538,7 +529,6 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir, ok = file:sync(Fd), ok = file:close(Fd) end, OpenFds), - file_handle_cache:release_reservation(), %% Write recovery terms for faster recovery. _ = rabbit_recovery_terms:store(VHost, filename:basename(rabbit_file:binary_to_filename(Dir)), @@ -555,7 +545,6 @@ delete_and_terminate(State = #qi { dir = Dir, _ = maps:map(fun(_, Fd) -> ok = file:close(Fd) end, OpenFds), - file_handle_cache:release_reservation(), %% Erase the data on disk. ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)), State#qi{ segments = #{}, @@ -626,18 +615,9 @@ new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments }) %% using too many FDs when the consumer lags a lot. We %% limit at 4 because we try to keep up to 2 for reading %% and 2 for writing. -reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds }) +reduce_fd_usage(_SegmentToOpen, State = #qi{ fds = OpenFds }) when map_size(OpenFds) < 4 -> - %% The only case where we need to update reservations is - %% when we are opening a segment that wasn't already open, - %% and we are not closing another segment at the same time. - case OpenFds of - #{SegmentToOpen := _} -> - State; - _ -> - file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds) + 1), - State - end; + State; reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) -> case OpenFds0 of #{SegmentToOpen := _} -> @@ -719,7 +699,6 @@ flush_buffer(State0 = #qi { write_buffer = WriteBuffer0, {Fd, FoldState} = get_fd_for_segment(Segment, FoldState1), LocBytes = flush_buffer_consolidate(lists:sort(LocBytes0), 1), ok = file:pwrite(Fd, LocBytes), - file_handle_cache_stats:update(queue_index_write), FoldState end, State0, Writes), %% Update the cache. If we are flushing the entire write buffer, @@ -868,7 +847,6 @@ delete_segment(Segment, State0 = #qi{ fds = OpenFds0 }) -> State = case maps:take(Segment, OpenFds0) of {Fd, OpenFds} -> ok = file:close(Fd), - file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds)), State0#qi{ fds = OpenFds }; error -> State0 @@ -982,7 +960,6 @@ read_from_disk(SeqIdsToRead0, State0 = #qi{ write_buffer = WriteBuffer }, Acc0) ReadSize = (LastSeqId - FirstSeqId + 1) * ?ENTRY_SIZE, case get_fd(FirstSeqId, State0) of {Fd, OffsetForSeqId, State} -> - file_handle_cache_stats:update(queue_index_read), %% When reading further than the end of a partial file, %% file:pread/3 will return what it could read. case file:pread(Fd, OffsetForSeqId, ReadSize) of diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index 90fe79d29a95..a98e666f853c 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -41,11 +41,6 @@ %% need to look into the store to discard them. Messages on disk %% will be dropped at the same time as the index deletes the %% corresponding segment file. -%% -%% The file_handle_cache reservations are done by the v2 index -%% because they are handled at a pid level. Since we are using -%% up to 2 FDs in this module we make the index reserve 2 extra -%% FDs. -module(rabbit_classic_queue_store_v2). diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index ebe29cd6116b..e843595586c2 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -894,10 +894,8 @@ state_enter0(leader, #?MODULE{consumers = Cons, Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), - FHReservation = [{mod_call, rabbit_quorum_queue, - file_handle_leader_reservation, [Resource]}], NotifyDecs = notify_decorators_startup(Resource), - Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs], + Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs], case BLH of undefined -> Effects; @@ -914,12 +912,7 @@ state_enter0(eol, #?MODULE{enqueuers = Enqs, AllConsumers = maps:merge(Custs, WaitingConsumers1), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ - [{aux, eol}, - {mod_call, rabbit_quorum_queue, file_handle_release_reservation, []} | Effects]; -state_enter0(State, #?MODULE{cfg = #cfg{resource = _Resource}}, Effects) - when State =/= leader -> - FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, - [FHReservation | Effects]; + [{aux, eol} | Effects]; state_enter0(_, _, Effects) -> %% catch all as not handling all states Effects. diff --git a/deps/rabbit/src/rabbit_fifo_v0.erl b/deps/rabbit/src/rabbit_fifo_v0.erl index 2161f9e4b011..3ada7f56b23f 100644 --- a/deps/rabbit/src/rabbit_fifo_v0.erl +++ b/deps/rabbit/src/rabbit_fifo_v0.erl @@ -548,7 +548,6 @@ state_enter(leader, #?STATE{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, cfg = #cfg{name = Name, - resource = Resource, become_leader_handler = BLH}, prefix_msgs = {0, [], 0, []} }) -> @@ -559,8 +558,7 @@ state_enter(leader, #?STATE{consumers = Cons, Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), - FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}], - Effects = Mons ++ Nots ++ NodeMons ++ FHReservation, + Effects = Mons ++ Nots ++ NodeMons, case BLH of undefined -> Effects; @@ -575,11 +573,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs, #{}, WaitingConsumers0), AllConsumers = maps:merge(Custs, WaitingConsumers1), [{send_msg, P, eol, ra_event} - || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ - [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; -state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> - FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, - [FHReservation]; + || P <- maps:keys(maps:merge(Enqs, AllConsumers))]; state_enter(_, _) -> %% catch all as not handling all states []. diff --git a/deps/rabbit/src/rabbit_fifo_v1.erl b/deps/rabbit/src/rabbit_fifo_v1.erl index a6f4c7868d17..98b762b08520 100644 --- a/deps/rabbit/src/rabbit_fifo_v1.erl +++ b/deps/rabbit/src/rabbit_fifo_v1.erl @@ -676,7 +676,6 @@ state_enter(leader, #?STATE{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, cfg = #cfg{name = Name, - resource = Resource, become_leader_handler = BLH}, prefix_msgs = {0, [], 0, []} }) -> @@ -687,8 +686,7 @@ state_enter(leader, #?STATE{consumers = Cons, Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), - FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}], - Effects = Mons ++ Nots ++ NodeMons ++ FHReservation, + Effects = Mons ++ Nots ++ NodeMons, case BLH of undefined -> Effects; @@ -704,11 +702,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs, AllConsumers = maps:merge(Custs, WaitingConsumers1), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ - [{aux, eol}, - {mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; -state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> - FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, - [FHReservation]; + [{aux, eol}]; state_enter(_, _) -> %% catch all as not handling all states []. diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl index a21fea3a7f75..1f7182611992 100644 --- a/deps/rabbit/src/rabbit_file.erl +++ b/deps/rabbit/src/rabbit_file.erl @@ -18,8 +18,6 @@ -export([filename_as_a_directory/1]). -export([filename_to_binary/1, binary_to_filename/1]). --import(file_handle_cache, [with_handle/1, with_handle/2]). - -define(TMP_EXT, ".tmp"). %%---------------------------------------------------------------------------- @@ -56,7 +54,7 @@ file_size(File) -> -spec ensure_dir((file:filename())) -> ok_or_error(). -ensure_dir(File) -> with_handle(fun () -> ensure_dir_internal(File) end). +ensure_dir(File) -> ensure_dir_internal(File). ensure_dir_internal("/") -> ok; @@ -81,16 +79,17 @@ wildcard(Pattern, Dir) -> -spec list_dir(file:filename()) -> rabbit_types:ok_or_error2([file:filename()], any()). -list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end). +list_dir(Dir) -> prim_file:list_dir(Dir). read_file_info(File) -> - with_handle(fun () -> file:read_file_info(File, [raw]) end). + file:read_file_info(File, [raw]). -spec read_term_file (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()). read_term_file(File) -> try + %% @todo OTP-27+ has file:read_file(File, [raw]). F = fun() -> {ok, FInfo} = file:read_file_info(File, [raw]), {ok, Fd} = file:open(File, [read, raw, binary]), @@ -100,7 +99,7 @@ read_term_file(File) -> file:close(Fd) end end, - {ok, Data} = with_handle(F), + {ok, Data} = F(), {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), TokenGroups = group_tokens(Tokens), {ok, [begin @@ -166,22 +165,19 @@ with_synced_copy(Path, Modes, Fun) -> true -> {error, append_not_supported, Path}; false -> - with_handle( - fun () -> - Bak = Path ++ ?TMP_EXT, - case prim_file:open(Bak, Modes) of - {ok, Hdl} -> - try - Result = Fun(Hdl), - ok = prim_file:sync(Hdl), - ok = prim_file:rename(Bak, Path), - Result - after - prim_file:close(Hdl) - end; - {error, _} = E -> E - end - end) + Bak = Path ++ ?TMP_EXT, + case prim_file:open(Bak, Modes) of + {ok, Hdl} -> + try + Result = Fun(Hdl), + ok = prim_file:sync(Hdl), + ok = prim_file:rename(Bak, Path), + Result + after + prim_file:close(Hdl) + end; + {error, _} = E -> E + end end. %% TODO the semantics of this function are rather odd. But see bug 25021. @@ -198,16 +194,12 @@ append_file(File, Suffix) -> append_file(_, _, "") -> ok; append_file(File, 0, Suffix) -> - with_handle(fun () -> - case prim_file:open([File, Suffix], [append]) of - {ok, Fd} -> prim_file:close(Fd); - Error -> Error - end - end); + case prim_file:open([File, Suffix], [append]) of + {ok, Fd} -> prim_file:close(Fd); + Error -> Error + end; append_file(File, _, Suffix) -> - case with_handle(2, fun () -> - file:copy(File, {[File, Suffix], [append]}) - end) of + case file:copy(File, {[File, Suffix], [append]}) of {ok, _BytesCopied} -> ok; Error -> Error end. @@ -223,21 +215,19 @@ ensure_parent_dirs_exist(Filename) -> -spec rename(file:filename(), file:filename()) -> ok_or_error(). -rename(Old, New) -> with_handle(fun () -> prim_file:rename(Old, New) end). +rename(Old, New) -> prim_file:rename(Old, New). -spec delete([file:filename()]) -> ok_or_error(). -delete(File) -> with_handle(fun () -> prim_file:delete(File) end). +delete(File) -> prim_file:delete(File). -spec recursive_delete([file:filename()]) -> rabbit_types:ok_or_error({file:filename(), any()}). recursive_delete(Files) -> - with_handle( - fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path); - (_Path, {error, _Err} = Error) -> Error - end, ok, Files) - end). + lists:foldl(fun (Path, ok) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files). recursive_delete1(Path) -> case is_dir_no_handle(Path) and not(is_symlink_no_handle(Path)) of @@ -315,10 +305,8 @@ recursive_copy(Src, Dest) -> lock_file(Path) -> case is_file(Path) of true -> {error, eexist}; - false -> with_handle( - fun () -> {ok, Lock} = prim_file:open(Path, [write]), - ok = prim_file:close(Lock) - end) + false -> {ok, Lock} = prim_file:open(Path, [write]), + ok = prim_file:close(Lock) end. -spec filename_as_a_directory(file:filename()) -> file:filename(). diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl index cd251c35e299..c5ce4843c8d7 100644 --- a/deps/rabbit/src/rabbit_mnesia.erl +++ b/deps/rabbit/src/rabbit_mnesia.erl @@ -817,12 +817,8 @@ execute_mnesia_transaction(TxFun) -> Res = mnesia:sync_transaction(TxFun), DiskLogAfter = mnesia_dumper:get_log_writes(), case DiskLogAfter == DiskLogBefore of - true -> file_handle_cache_stats:update( - mnesia_ram_tx), - Res; - false -> file_handle_cache_stats:update( - mnesia_disk_tx), - {sync, Res} + true -> Res; + false -> {sync, Res} end; true -> mnesia:sync_transaction(TxFun) end diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 8a0427f838f3..c5b02f6eb9c4 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -463,7 +463,6 @@ write(MsgRef, MsgId, Msg, CState) -> client_write(MsgRef, MsgId, Msg, noflow, CS read(MsgId, CState = #client_msstate { index_ets = IndexEts, cur_file_cache_ets = CurFileCacheEts }) -> - file_handle_cache_stats:update(msg_store_read), %% Check the cur file cache case ets:lookup(CurFileCacheEts, MsgId) of [] -> @@ -480,7 +479,6 @@ read(MsgId, CState = #client_msstate { index_ets = IndexEts, -> {#{rabbit_types:msg_id() => msg()}, client_msstate()}. read_many(MsgIds, CState) -> - file_handle_cache_stats:inc(msg_store_read, length(MsgIds)), %% We receive MsgIds in rouhgly the younger->older order so %% we can look for messages in the cache directly. read_many_cache(MsgIds, CState, #{}). @@ -592,7 +590,6 @@ client_write(MsgRef, MsgId, Msg, Flow, CState = #client_msstate { flying_ets = FlyingEts, cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> - file_handle_cache_stats:update(msg_store_write), %% We are guaranteed that the insert will succeed. %% This is true even for queue crashes because CRef will change. true = ets:insert_new(FlyingEts, {{CRef, MsgRef}, ?FLYING_WRITE}), @@ -803,7 +800,6 @@ prioritise_call(Msg, _From, _Len, _State) -> prioritise_cast(Msg, _Len, _State) -> case Msg of {compacted_file, _File} -> 8; - {set_maximum_since_use, _Age} -> 8; {client_dying, _Pid} -> 7; _ -> 0 end. diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index da242d0f6d87..508e0a0e2b9f 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -576,7 +576,7 @@ handshake(Ref, ProxyProtocolEnabled) -> {'EXIT', normal} -> {error, handshake_failed}; {ok, Sock} -> - setup_socket(Sock), + ok = tune_buffer_size(Sock), {ok, {rabbit_proxy_socket, Sock, ProxyInfo}} end end; @@ -585,15 +585,11 @@ handshake(Ref, ProxyProtocolEnabled) -> {'EXIT', normal} -> {error, handshake_failed}; {ok, Sock} -> - setup_socket(Sock), + ok = tune_buffer_size(Sock), {ok, Sock} end end. -setup_socket(Sock) -> - ok = tune_buffer_size(Sock), - ok = file_handle_cache:obtain(). - tune_buffer_size(Sock) -> case tune_buffer_size1(Sock) of ok -> ok; diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl index 249e870af775..77c47c42df2d 100644 --- a/deps/rabbit/src/rabbit_queue_index.erl +++ b/deps/rabbit/src/rabbit_queue_index.erl @@ -922,8 +922,6 @@ append_journal_to_segment(#segment { journal_entries = JEntries, case array:sparse_size(JEntries) of 0 -> Segment; _ -> - file_handle_cache_stats:update(queue_index_write), - {ok, Hdl} = file_handle_cache:open_with_absolute_path( Path, ?WRITE_MODE, [{write_buffer, infinity}]), @@ -1172,7 +1170,6 @@ load_segment(KeepAcked, #segment { path = Path }) -> case rabbit_file:is_file(Path) of false -> Empty; true -> Size = rabbit_file:file_size(Path), - file_handle_cache_stats:update(queue_index_read), {ok, Hdl} = file_handle_cache:open_with_absolute_path( Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index f7042359f184..581ea95d544c 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -51,9 +51,6 @@ grow/4, grow/5]). -export([transfer_leadership/2, get_replicas/1, queue_length/1]). --export([file_handle_leader_reservation/1, - file_handle_other_reservation/0]). --export([file_handle_release_reservation/0]). -export([list_with_minimum_quorum/0, list_with_local_promotable/0, list_with_local_promotable_for_cli/0, @@ -1415,24 +1412,6 @@ matches_strategy(even, Members) -> is_match(Subj, E) -> nomatch /= re:run(Subj, E). -file_handle_leader_reservation(QName) -> - try - {ok, Q} = rabbit_amqqueue:lookup(QName), - ClusterSize = length(get_nodes(Q)), - file_handle_cache:set_reservation(2 + ClusterSize) - catch Class:Err -> - rabbit_log:warning("~s:~s/~b failed with ~w ~w", - [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, - Class, Err]) - end. - - -file_handle_other_reservation() -> - file_handle_cache:set_reservation(2). - -file_handle_release_reservation() -> - file_handle_cache:release_reservation(). - -spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}. reclaim_memory(Vhost, QueueName) -> QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index f85e92b42e4b..9b805502741d 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -374,11 +374,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> after %% We don't call gen_tcp:close/1 here since it waits for %% pending output to be sent, which results in unnecessary - %% delays. We could just terminate - the reader is the - %% controlling process and hence its termination will close - %% the socket. However, to keep the file_handle_cache - %% accounting as accurate as possible we ought to close the - %% socket w/o delay before termination. + %% delays. rabbit_net:fast_close(RealSocket), rabbit_networking:unregister_connection(self()), rabbit_core_metrics:connection_closed(self()), diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index ccb9a41a3a80..192c94ce8520 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -36,8 +36,6 @@ recover() -> %% faster than other nodes handled DOWN messages from us. rabbit_amqqueue:on_node_down(node()), - rabbit_amqqueue:warn_file_limit(), - %% Prepare rabbit_semi_durable_route table {Time, _} = timer:tc(fun() -> rabbit_binding:recover() diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 672bafacbd47..10129201b9dc 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -97,8 +97,7 @@ init_per_group(Group, Config) -> rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps() ++ [ - fun(C) -> init_per_group1(Group, C) end, - fun setup_file_handle_cache/1 + fun(C) -> init_per_group1(Group, C) end ]); false -> rabbit_ct_helpers:run_steps(Config, [ @@ -137,17 +136,6 @@ init_per_group1(from_cluster_node2, Config) -> init_per_group1(_, Config) -> Config. -setup_file_handle_cache(Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, setup_file_handle_cache1, []), - Config. - -setup_file_handle_cache1() -> - %% FIXME: Why are we doing this? - application:set_env(rabbit, file_handles_high_watermark, 100), - ok = file_handle_cache:set_limit(100), - ok. - end_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 9dee5660361f..15b75fac4a69 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -82,8 +82,6 @@ groups() -> reject_after_leader_transfer, shrink_all, rebalance, - file_handle_reservations, - file_handle_reservations_above_limit, node_removal_is_not_quorum_critical, leader_locator_client_local, leader_locator_balanced, @@ -2058,61 +2056,6 @@ node_removal_is_not_quorum_critical(Config) -> ?assertEqual([], Qs). -file_handle_reservations(Config) -> - case rabbit_ct_helpers:is_mixed_versions() of - true -> - {skip, "file_handle_reservations tests isn't mixed version compatible"}; - false -> - file_handle_reservations0(Config) - end. - -file_handle_reservations0(Config) -> - Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), - {ok, _, {_, Leader}} = ra:members({RaName, Server1}), - [Follower1, Follower2] = Servers -- [Leader], - ?assertEqual([{files_reserved, 5}], - rpc:call(Leader, file_handle_cache, info, [[files_reserved]])), - ?assertEqual([{files_reserved, 2}], - rpc:call(Follower1, file_handle_cache, info, [[files_reserved]])), - ?assertEqual([{files_reserved, 2}], - rpc:call(Follower2, file_handle_cache, info, [[files_reserved]])), - force_leader_change(Servers, QQ), - {ok, _, {_, Leader0}} = ra:members({RaName, Server1}), - [Follower01, Follower02] = Servers -- [Leader0], - ?assertEqual([{files_reserved, 5}], - rpc:call(Leader0, file_handle_cache, info, [[files_reserved]])), - ?assertEqual([{files_reserved, 2}], - rpc:call(Follower01, file_handle_cache, info, [[files_reserved]])), - ?assertEqual([{files_reserved, 2}], - rpc:call(Follower02, file_handle_cache, info, [[files_reserved]])). - -file_handle_reservations_above_limit(Config) -> - [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, S1), - QQ = ?config(queue_name, Config), - QQ2 = ?config(alt_queue_name, Config), - - Limit = rpc:call(S1, file_handle_cache, get_limit, []), - - ok = rpc:call(S1, file_handle_cache, set_limit, [3]), - ok = rpc:call(S2, file_handle_cache, set_limit, [3]), - ok = rpc:call(S3, file_handle_cache, set_limit, [3]), - - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ?assertEqual({'queue.declare_ok', QQ2, 0, 0}, - declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - ok = rpc:call(S1, file_handle_cache, set_limit, [Limit]), - ok = rpc:call(S2, file_handle_cache, set_limit, [Limit]), - ok = rpc:call(S3, file_handle_cache, set_limit, [Limit]). - cleanup_data_dir(Config) -> %% With Khepri this test needs to run in a 3-node cluster, otherwise the queue can't %% be deleted in minority diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 7eeb31daaa6a..80f6093129eb 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -725,29 +725,6 @@ duplicate_delivery_test(C) -> ?assertEqual(1, lqueue:len(Messages)), ok. -state_enter_file_handle_leader_reservation_test(_) -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), - become_leader_handler => {m, f, [a]}}), - - Resource = {resource, <<"/">>, queue, <<"test">>}, - Effects = rabbit_fifo:state_enter(leader, S0), - ?assertMatch([{mod_call, m, f, [a, the_name]}, - _Timer, - {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} - | _], Effects), - ok. - -state_enter_file_handle_other_reservation_test(_) -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), - Effects = rabbit_fifo:state_enter(other, S0), - ?assertEqual([ - {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []} - ], - Effects), - ok. - state_enter_monitors_and_notifications_test(C) -> Oth = spawn(fun () -> ok end), {State0, _} = enq(C, 1, 1, first, test_init(test)), @@ -1251,8 +1228,7 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(C) -> Effects = rabbit_fifo:state_enter(leader, State1), %% 2 effects for each consumer process (channel process), 1 effect for the node, - %% 1 effect for file handle reservation - ?assertEqual(2 * 3 + 1 + 1 + 1 + 1, length(Effects)). + ?assertEqual(2 * 3 + 1 + 1 + 1, length(Effects)). single_active_consumer_state_enter_eol_include_waiting_consumers_test(C) -> Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), @@ -1282,9 +1258,8 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(C) -> Effects = rabbit_fifo:state_enter(eol, State1), %% 1 effect for each consumer process (channel process), - %% 1 effect for file handle reservation %% 1 effect for eol to handle rabbit_fifo_usage entries - ?assertEqual(5, length(Effects)). + ?assertEqual(4, length(Effects)). query_consumers_test(C) -> State0 = init(#{name => ?FUNCTION_NAME, diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index cc5844076982..787b60a30d00 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -63,8 +63,6 @@ end_per_group(_, Config) -> init_per_testcase(TestCase, Config) -> meck:new(rabbit_quorum_queue, [passthrough]), meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end), - meck:expect(rabbit_quorum_queue, file_handle_leader_reservation, fun (_) -> ok end), - meck:expect(rabbit_quorum_queue, file_handle_other_reservation, fun () -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), ra_server_sup_sup:remove_all(?RA_SYSTEM), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), diff --git a/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl b/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl index ff0778235c12..a35100f4e53b 100644 --- a/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl @@ -541,29 +541,6 @@ duplicate_delivery_test(_) -> ?assertEqual(1, maps:size(Messages)), ok. -state_enter_file_handle_leader_reservation_test(_) -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), - become_leader_handler => {m, f, [a]}}), - - Resource = {resource, <<"/">>, queue, <<"test">>}, - Effects = rabbit_fifo_v0:state_enter(leader, S0), - ?assertEqual([ - {mod_call, m, f, [a, the_name]}, - {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} - ], Effects), - ok. - -state_enter_file_handle_other_reservation_test(_) -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), - Effects = rabbit_fifo_v0:state_enter(other, S0), - ?assertEqual([ - {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []} - ], - Effects), - ok. - state_enter_monitors_and_notifications_test(_) -> Oth = spawn(fun () -> ok end), {State0, _} = enq(1, 1, first, test_init(test)), @@ -998,8 +975,7 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> Effects = rabbit_fifo_v0:state_enter(leader, State1), %% 2 effects for each consumer process (channel process), 1 effect for the node, - %% 1 effect for file handle reservation - ?assertEqual(2 * 3 + 1 + 1, length(Effects)). + ?assertEqual(2 * 3 + 1, length(Effects)). single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), @@ -1029,8 +1005,7 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> Effects = rabbit_fifo_v0:state_enter(eol, State1), %% 1 effect for each consumer process (channel process), - %% 1 effect for file handle reservation - ?assertEqual(4, length(Effects)). + ?assertEqual(3, length(Effects)). query_consumers_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, diff --git a/deps/rabbit/test/unit_file_handle_cache_SUITE.erl b/deps/rabbit/test/unit_file_handle_cache_SUITE.erl index 7e8406e27fef..5ba89315989b 100644 --- a/deps/rabbit/test/unit_file_handle_cache_SUITE.erl +++ b/deps/rabbit/test/unit_file_handle_cache_SUITE.erl @@ -25,9 +25,7 @@ groups() -> [ {non_parallel_tests, [], [ file_handle_cache, %% Change FHC limit. - file_handle_cache_reserve, file_handle_cache_reserve_release, - file_handle_cache_reserve_above_limit, file_handle_cache_reserve_monitor, file_handle_cache_reserve_open_file_above_limit ]} @@ -135,47 +133,6 @@ file_handle_cache1(_Config) -> ok = file_handle_cache:set_limit(Limit), passed. -file_handle_cache_reserve(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, file_handle_cache_reserve1, [Config]). - -file_handle_cache_reserve1(_Config) -> - Limit = file_handle_cache:get_limit(), - ok = file_handle_cache:set_limit(5), - %% Reserves are always accepted, even if above the limit - %% These are for special processes such as quorum queues - ok = file_handle_cache:set_reservation(7), - - Self = self(), - spawn(fun () -> ok = file_handle_cache:obtain(), - Self ! obtained - end), - - Props = file_handle_cache:info([files_reserved, sockets_used]), - ?assertEqual(7, proplists:get_value(files_reserved, Props)), - ?assertEqual(0, proplists:get_value(sockets_used, Props)), - - %% The obtain should still be blocked, as there are no file handles - %% available - receive - obtained -> - throw(error_file_obtained) - after 1000 -> - %% Let's release 5 file handles, that should leave - %% enough free for the `obtain` to go through - file_handle_cache:set_reservation(2), - Props0 = file_handle_cache:info([files_reserved, sockets_used]), - ?assertEqual(2, proplists:get_value(files_reserved, Props0)), - ?assertEqual(1, proplists:get_value(sockets_used, Props0)), - receive - obtained -> - ok = file_handle_cache:set_limit(Limit), - passed - after 5000 -> - throw(error_file_not_released) - end - end. - file_handle_cache_reserve_release(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, file_handle_cache_reserve_release1, [Config]). @@ -189,27 +146,6 @@ file_handle_cache_reserve_release1(_Config) -> ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])), passed. -file_handle_cache_reserve_above_limit(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, file_handle_cache_reserve_above_limit1, [Config]). - -file_handle_cache_reserve_above_limit1(_Config) -> - Limit = file_handle_cache:get_limit(), - ok = file_handle_cache:set_limit(5), - %% Reserves are always accepted, even if above the limit - %% These are for special processes such as quorum queues - ok = file_handle_cache:obtain(5), - ?assertEqual([{file_descriptor_limit, []}], rabbit_alarm:get_alarms()), - - ok = file_handle_cache:set_reservation(7), - - Props = file_handle_cache:info([files_reserved, sockets_used]), - ?assertEqual(7, proplists:get_value(files_reserved, Props)), - ?assertEqual(5, proplists:get_value(sockets_used, Props)), - - ok = file_handle_cache:set_limit(Limit), - passed. - file_handle_cache_reserve_open_file_above_limit(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, file_handle_cache_reserve_open_file_above_limit1, [Config]). diff --git a/deps/rabbit/test/unit_gen_server2_SUITE.erl b/deps/rabbit/test/unit_gen_server2_SUITE.erl index efb6f3b65e36..08d2ed905e23 100644 --- a/deps/rabbit/test/unit_gen_server2_SUITE.erl +++ b/deps/rabbit/test/unit_gen_server2_SUITE.erl @@ -65,8 +65,8 @@ gen_server2_with_state(Config) -> ?MODULE, gen_server2_with_state1, [Config]). gen_server2_with_state1(_Config) -> - fhc_state = gen_server2:with_state(file_handle_cache, - fun (S) -> element(1, S) end), + state = gen_server2:with_state(background_gc, + fun (S) -> element(1, S) end), passed. diff --git a/deps/rabbit_common/app.bzl b/deps/rabbit_common/app.bzl index 8b45db401c68..66bd9371fdb4 100644 --- a/deps/rabbit_common/app.bzl +++ b/deps/rabbit_common/app.bzl @@ -29,7 +29,6 @@ def all_beam_files(name = "all_beam_files"): "src/delegate.erl", "src/delegate_sup.erl", "src/file_handle_cache.erl", - "src/file_handle_cache_stats.erl", "src/mirrored_supervisor_locks.erl", "src/mnesia_sync.erl", "src/pmon.erl", @@ -124,7 +123,6 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/delegate.erl", "src/delegate_sup.erl", "src/file_handle_cache.erl", - "src/file_handle_cache_stats.erl", "src/mirrored_supervisor_locks.erl", "src/mnesia_sync.erl", "src/pmon.erl", @@ -211,7 +209,6 @@ def all_srcs(name = "all_srcs"): "src/delegate.erl", "src/delegate_sup.erl", "src/file_handle_cache.erl", - "src/file_handle_cache_stats.erl", "src/gen_server2.erl", "src/mirrored_supervisor_locks.erl", "src/mnesia_sync.erl", diff --git a/deps/rabbit_common/src/file_handle_cache.erl b/deps/rabbit_common/src/file_handle_cache.erl index 34247f2a9a5c..4e5c7901a30c 100644 --- a/deps/rabbit_common/src/file_handle_cache.erl +++ b/deps/rabbit_common/src/file_handle_cache.erl @@ -660,19 +660,16 @@ get_client_state(Pid) -> %%---------------------------------------------------------------------------- prim_file_read(Hdl, Size) -> - file_handle_cache_stats:update( - io_read, Size, fun() -> prim_file:read(Hdl, Size) end). + prim_file:read(Hdl, Size). prim_file_write(Hdl, Bytes) -> - file_handle_cache_stats:update( - io_write, iolist_size(Bytes), fun() -> prim_file:write(Hdl, Bytes) end). + prim_file:write(Hdl, Bytes). prim_file_sync(Hdl) -> - file_handle_cache_stats:update(io_sync, fun() -> prim_file:sync(Hdl) end). + prim_file:sync(Hdl). prim_file_position(Hdl, NewOffset) -> - file_handle_cache_stats:update( - io_seek, fun() -> prim_file:position(Hdl, NewOffset) end). + prim_file:position(Hdl, NewOffset). is_reader(Mode) -> lists:member(read, Mode). @@ -766,8 +763,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> Mode = case NewOrReopen of new -> Mode0; - reopen -> file_handle_cache_stats:update(io_reopen), - [read | Mode0] + reopen -> [read | Mode0] end, case prim_file:open(Path, Mode) of {ok, Hdl} -> @@ -1087,9 +1083,8 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(total_limit, #fhc_state{limit = Limit}) -> Limit; i(total_used, State) -> used(State); -i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit; -i(sockets_used, #fhc_state{obtain_count_socket = Count, - reserve_count_socket = RCount}) -> Count + RCount; +i(sockets_limit, _) -> 0; +i(sockets_used, _) -> 0; i(files_reserved, #fhc_state{reserve_count_file = RCount}) -> RCount; i(Item, _) -> throw({bad_argument, Item}). @@ -1104,7 +1099,6 @@ used(#fhc_state{open_count = C1, %%---------------------------------------------------------------------------- init([AlarmSet, AlarmClear]) -> - _ = file_handle_cache_stats:init(), Limit = case application:get_env(file_handles_high_watermark) of {ok, Watermark} when (is_integer(Watermark) andalso Watermark > 0) -> diff --git a/deps/rabbit_common/src/file_handle_cache_stats.erl b/deps/rabbit_common/src/file_handle_cache_stats.erl deleted file mode 100644 index 7b8eb920ad67..000000000000 --- a/deps/rabbit_common/src/file_handle_cache_stats.erl +++ /dev/null @@ -1,63 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(file_handle_cache_stats). - -%% stats about read / write operations that go through the fhc. - --export([init/0, update/3, update/2, update/1, inc/2, get/0]). - --define(TABLE, ?MODULE). - --define(COUNT, - [io_reopen, mnesia_ram_tx, mnesia_disk_tx, - msg_store_read, msg_store_write, - queue_index_write, queue_index_read]). --define(COUNT_TIME, [io_sync, io_seek]). --define(COUNT_TIME_BYTES, [io_read, io_write]). - --import(rabbit_misc, [safe_ets_update_counter/3, safe_ets_update_counter/4]). - -init() -> - _ = ets:new(?TABLE, [public, named_table, {write_concurrency,true}]), - [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME_BYTES, - Counter <- [count, bytes, time]], - [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME, - Counter <- [count, time]], - [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT, - Counter <- [count]]. - -update(Op, Bytes, Thunk) -> - {Time, Res} = timer_tc(Thunk), - _ = safe_ets_update_counter(?TABLE, {Op, count}, 1), - _ = safe_ets_update_counter(?TABLE, {Op, bytes}, Bytes), - _ = safe_ets_update_counter(?TABLE, {Op, time}, Time), - Res. - -update(Op, Thunk) -> - {Time, Res} = timer_tc(Thunk), - _ = safe_ets_update_counter(?TABLE, {Op, count}, 1), - _ = safe_ets_update_counter(?TABLE, {Op, time}, Time), - Res. - -update(Op) -> - _ = safe_ets_update_counter(?TABLE, {Op, count}, 1), - ok. - -inc(Op, Count) -> - _ = safe_ets_update_counter(?TABLE, {Op, count}, Count), - ok. - -get() -> - lists:sort(ets:tab2list(?TABLE)). - -timer_tc(Thunk) -> - T1 = erlang:monotonic_time(), - Res = Thunk(), - T2 = erlang:monotonic_time(), - Diff = erlang:convert_time_unit(T2 - T1, native, micro_seconds), - {Diff, Res}. diff --git a/deps/rabbit_common/src/worker_pool_worker.erl b/deps/rabbit_common/src/worker_pool_worker.erl index 283b3b1f7392..1444063ffb30 100644 --- a/deps/rabbit_common/src/worker_pool_worker.erl +++ b/deps/rabbit_common/src/worker_pool_worker.erl @@ -17,7 +17,6 @@ -export([start_link/1, next_job_from/2, submit/3, submit_async/2, run/1]). --export([set_maximum_since_use/2]). -export([set_timeout/2, set_timeout/3, clear_timeout/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -32,7 +31,6 @@ -spec submit(pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A. -spec submit_async(pid(), fun (() -> any()) | mfargs()) -> 'ok'. -spec run(fun (() -> A)) -> A; (mfargs()) -> any(). --spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. %%---------------------------------------------------------------------------- @@ -53,9 +51,6 @@ submit(Pid, Fun, ProcessModel) -> submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun, self()}). -set_maximum_since_use(Pid, Age) -> - gen_server2:cast(Pid, {set_maximum_since_use, Age}). - run({M, F, A}) -> apply(M, F, A); run(Fun) -> Fun(). @@ -76,15 +71,12 @@ run(Fun, single) -> %%---------------------------------------------------------------------------- init([PoolName]) -> - ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, - [self()]), ok = worker_pool:ready(PoolName, self()), put(worker_pool_worker, true), put(worker_pool_name, PoolName), {ok, undefined, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. @@ -120,10 +112,6 @@ handle_cast({submit_async, Fun, CPid}, {from, CPid, MRef}) -> ok = worker_pool:idle(get(worker_pool_name), self()), {noreply, undefined, hibernate}; -handle_cast({set_maximum_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - {noreply, State, hibernate}; - handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/status_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/status_command.ex index b69d25d96005..c37c0971ceee 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/status_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/status_command.ex @@ -166,8 +166,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.StatusCommand do file_descriptors = [ "\n#{bright("File Descriptors")}\n", - "Total: #{m[:file_descriptors][:total_used]}, limit: #{m[:file_descriptors][:total_limit]}", - "Sockets: #{m[:file_descriptors][:sockets_used]}, limit: #{m[:file_descriptors][:sockets_limit]}" + "Total: #{m[:file_descriptors][:total_used]}, limit: #{m[:file_descriptors][:total_limit]}" ] disk_space_section = [ diff --git a/deps/rabbitmq_management/priv/www/api/index.html b/deps/rabbitmq_management/priv/www/api/index.html index 330da4d6c15a..14bcaeb36a22 100644 --- a/deps/rabbitmq_management/priv/www/api/index.html +++ b/deps/rabbitmq_management/priv/www/api/index.html @@ -2038,18 +2038,6 @@
sockets_total
sockets_used
type
File descriptor count and limit, as reported by the operating \ system. The count includes network sockets and file handles.
\ -To optimize disk access RabbitMQ uses as many free descriptors as are \ - available, so the count may safely approach the limit. \ - However, if most of the file descriptors are used by sockets then \ - persister performance will be negatively impacted.
\ +To optimize disk access RabbitMQ uses as many file descriptors as \ + needed, so the limit must be high enough for safe operation.
\To change the limit on Unix / Linux, use "ulimit -n". To change \ the limit on Windows, set the ERL_MAX_PORTS environment variable
\To report used file handles on Windows, handle.exe from \ sysinternals must be installed in your path. You can download it \ here.
', - 'socket-descriptors': - 'The network sockets count and limit managed by RabbitMQ.The memory \ alarm for this node has gone off. It will block \ diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs index 23ed29e5c598..e1739b9415fb 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs @@ -92,14 +92,6 @@ <% } %>