From ac0678e56686944f3f3ad2b484036437aa811dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 12 Jun 2024 12:26:01 +0200 Subject: [PATCH] Remove most of the fd related FHC code Stats were not removed, including management UI stats relating to FDs. Web-MQTT and Web-STOMP configuration relating to FHC were not removed. The file_handle_cache itself must be kept until we remove CQv1. --- .../src/amqp_network_connection.erl | 8 --- deps/rabbit/src/rabbit.erl | 7 +- deps/rabbit/src/rabbit_amqqueue.erl | 21 ------ deps/rabbit/src/rabbit_amqqueue_process.erl | 7 -- .../src/rabbit_classic_queue_index_v2.erl | 25 +------ .../src/rabbit_classic_queue_store_v2.erl | 5 -- deps/rabbit/src/rabbit_fifo.erl | 11 +-- deps/rabbit/src/rabbit_fifo_v0.erl | 10 +-- deps/rabbit/src/rabbit_fifo_v1.erl | 10 +-- deps/rabbit/src/rabbit_file.erl | 72 ++++++++----------- deps/rabbit/src/rabbit_networking.erl | 8 +-- deps/rabbit/src/rabbit_quorum_queue.erl | 21 ------ deps/rabbit/src/rabbit_reader.erl | 6 +- deps/rabbit/src/rabbit_vhost.erl | 2 - deps/rabbit/test/backing_queue_SUITE.erl | 14 +--- deps/rabbit/test/quorum_queue_SUITE.erl | 57 --------------- deps/rabbit/test/rabbit_fifo_SUITE.erl | 23 ------ deps/rabbit/test/rabbit_fifo_int_SUITE.erl | 2 - deps/rabbit/test/rabbit_fifo_v0_SUITE.erl | 23 ------ deps/rabbit/test/unit_gen_server2_SUITE.erl | 4 +- deps/rabbit_common/src/worker_pool_worker.erl | 12 ---- .../src/rabbit_web_mqtt_handler.erl | 34 ++------- .../src/rabbit_web_stomp_handler.erl | 29 ++------ 23 files changed, 55 insertions(+), 356 deletions(-) diff --git a/deps/amqp_client/src/amqp_network_connection.erl b/deps/amqp_client/src/amqp_network_connection.erl index 3ce107545e8f..a5ef739ea0f3 100644 --- a/deps/amqp_client/src/amqp_network_connection.erl +++ b/deps/amqp_client/src/amqp_network_connection.erl @@ -118,7 +118,6 @@ do_connect({Addr, Family}, connection_timeout = Timeout, socket_options = ExtraOpts}, SIF, State) -> - ok = obtain(), case gen_tcp:connect(Addr, Port, [Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts, Timeout) of @@ -134,7 +133,6 @@ do_connect({Addr, Family}, SIF, State) -> {ok, GlobalSslOpts} = application:get_env(amqp_client, ssl_options), app_utils:start_applications([asn1, crypto, public_key, ssl]), - ok = obtain(), case gen_tcp:connect(Addr, Port, [Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts, Timeout) of @@ -379,11 +377,5 @@ handshake_recv(Expecting) -> end end. -obtain() -> - case code:is_loaded(file_handle_cache) of - false -> ok; - _ -> file_handle_cache:obtain() - end. - get_reason(#'connection.close'{reply_code = ErrCode}) -> ?PROTOCOL:amqp_exception(ErrCode). diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 4b5e8d37920b..996e63006935 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -741,9 +741,6 @@ status() -> get_disk_free_limit, []}}, {disk_free, {rabbit_disk_monitor, get_disk_free, []}}]), - S3 = rabbit_misc:with_exit_handler( - fun () -> [] end, - fun () -> [{file_descriptors, file_handle_cache:info()}] end), S4 = [{processes, [{limit, erlang:system_info(process_limit)}, {used, erlang:system_info(process_count)}]}, {run_queue, erlang:statistics(run_queue)}, @@ -778,7 +775,7 @@ status() -> (_) -> false end, maps:to_list(product_info())), - S1 ++ S2 ++ S3 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8. + S1 ++ S2 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8. alarms() -> Alarms = rabbit_misc:with_exit_handler(rabbit_misc:const([]), @@ -1653,7 +1650,7 @@ config_files() -> start_fhc() -> ok = rabbit_sup:start_restartable_child( file_handle_cache, - [fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]), + [fun(_) -> ok end, fun(_) -> ok end]), ensure_working_fhc(). ensure_working_fhc() -> diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index c8db0ee67e5a..038dd9b76f35 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -7,7 +7,6 @@ -module(rabbit_amqqueue). --export([warn_file_limit/0]). -export([recover/1, stop/1, start/1, declare/6, declare/7, delete_immediately/1, delete_exclusive/2, delete/4, purge/1, forget_all_durable/1]). @@ -119,21 +118,6 @@ active, activity_status, arguments]). -define(KILL_QUEUE_DELAY_INTERVAL, 100). -warn_file_limit() -> - DurableQueues = find_recoverable_queues(), - L = length(DurableQueues), - - %% if there are not enough file handles, the server might hang - %% when trying to recover queues, warn the user: - case file_handle_cache:get_limit() < L of - true -> - rabbit_log:warning( - "Recovering ~tp queues, available file handles: ~tp. Please increase max open file handles limit to at least ~tp!", - [L, file_handle_cache:get_limit(), L]); - false -> - ok - end. - -spec recover(rabbit_types:vhost()) -> {Recovered :: [amqqueue:amqqueue()], Failed :: [amqqueue:amqqueue()]}. @@ -183,11 +167,6 @@ find_local_durable_queues(VHostName) -> rabbit_queue_type:is_recoverable(Q) end). -find_recoverable_queues() -> - rabbit_db_queue:filter_all_durable(fun(Q) -> - rabbit_queue_type:is_recoverable(Q) - end). - -spec declare(name(), boolean(), boolean(), diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index a72e657471a4..5f565e198622 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -210,8 +210,6 @@ init_it2(Recover, From, State = #q{q = Q, (Res == created orelse Res == existing) -> case matches(Recover, Q, Q1) of true -> - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), @@ -1194,7 +1192,6 @@ prioritise_cast(Msg, _Len, State) -> delete_immediately -> 8; {delete_exclusive, _Pid} -> 8; {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; {ack, _AckTags, _ChPid} -> 4; %% [1] {resume, _ChPid} -> 3; @@ -1510,10 +1507,6 @@ handle_cast({set_ram_duration_target, Duration}, BQS1 = BQ:set_ram_duration_target(Duration, BQS), noreply(State#q{backing_queue_state = BQS1}); -handle_cast({set_maximum_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - handle_cast({credit, SessionPid, CTag, Credit, Drain}, #q{q = Q, backing_queue = BQ, diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index ffe1dbf5752c..cf1e3ec23e64 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -41,15 +41,6 @@ -define(HEADER_SIZE, 64). %% bytes -define(ENTRY_SIZE, 32). %% bytes -%% The file_handle_cache module tracks reservations at -%% the level of the process. This means we cannot -%% handle them independently in the store and index. -%% Because the index may reserve more FDs than the -%% store the index becomes responsible for this and -%% will always reserve at least 2 FDs, and release -%% everything when terminating. --define(STORE_FD_RESERVATIONS, 2). - -include_lib("rabbit_common/include/rabbit.hrl"). %% Set to true to get an awful lot of debug logs. -if(false). @@ -538,7 +529,6 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir, ok = file:sync(Fd), ok = file:close(Fd) end, OpenFds), - file_handle_cache:release_reservation(), %% Write recovery terms for faster recovery. _ = rabbit_recovery_terms:store(VHost, filename:basename(rabbit_file:binary_to_filename(Dir)), @@ -555,7 +545,6 @@ delete_and_terminate(State = #qi { dir = Dir, _ = maps:map(fun(_, Fd) -> ok = file:close(Fd) end, OpenFds), - file_handle_cache:release_reservation(), %% Erase the data on disk. ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)), State#qi{ segments = #{}, @@ -626,18 +615,9 @@ new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments }) %% using too many FDs when the consumer lags a lot. We %% limit at 4 because we try to keep up to 2 for reading %% and 2 for writing. -reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds }) +reduce_fd_usage(_SegmentToOpen, State = #qi{ fds = OpenFds }) when map_size(OpenFds) < 4 -> - %% The only case where we need to update reservations is - %% when we are opening a segment that wasn't already open, - %% and we are not closing another segment at the same time. - case OpenFds of - #{SegmentToOpen := _} -> - State; - _ -> - file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds) + 1), - State - end; + State; reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) -> case OpenFds0 of #{SegmentToOpen := _} -> @@ -868,7 +848,6 @@ delete_segment(Segment, State0 = #qi{ fds = OpenFds0 }) -> State = case maps:take(Segment, OpenFds0) of {Fd, OpenFds} -> ok = file:close(Fd), - file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds)), State0#qi{ fds = OpenFds }; error -> State0 diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index 90fe79d29a95..a98e666f853c 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -41,11 +41,6 @@ %% need to look into the store to discard them. Messages on disk %% will be dropped at the same time as the index deletes the %% corresponding segment file. -%% -%% The file_handle_cache reservations are done by the v2 index -%% because they are handled at a pid level. Since we are using -%% up to 2 FDs in this module we make the index reserve 2 extra -%% FDs. -module(rabbit_classic_queue_store_v2). diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index ebe29cd6116b..e843595586c2 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -894,10 +894,8 @@ state_enter0(leader, #?MODULE{consumers = Cons, Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), - FHReservation = [{mod_call, rabbit_quorum_queue, - file_handle_leader_reservation, [Resource]}], NotifyDecs = notify_decorators_startup(Resource), - Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs], + Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs], case BLH of undefined -> Effects; @@ -914,12 +912,7 @@ state_enter0(eol, #?MODULE{enqueuers = Enqs, AllConsumers = maps:merge(Custs, WaitingConsumers1), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ - [{aux, eol}, - {mod_call, rabbit_quorum_queue, file_handle_release_reservation, []} | Effects]; -state_enter0(State, #?MODULE{cfg = #cfg{resource = _Resource}}, Effects) - when State =/= leader -> - FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, - [FHReservation | Effects]; + [{aux, eol} | Effects]; state_enter0(_, _, Effects) -> %% catch all as not handling all states Effects. diff --git a/deps/rabbit/src/rabbit_fifo_v0.erl b/deps/rabbit/src/rabbit_fifo_v0.erl index 2161f9e4b011..3ada7f56b23f 100644 --- a/deps/rabbit/src/rabbit_fifo_v0.erl +++ b/deps/rabbit/src/rabbit_fifo_v0.erl @@ -548,7 +548,6 @@ state_enter(leader, #?STATE{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, cfg = #cfg{name = Name, - resource = Resource, become_leader_handler = BLH}, prefix_msgs = {0, [], 0, []} }) -> @@ -559,8 +558,7 @@ state_enter(leader, #?STATE{consumers = Cons, Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), - FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}], - Effects = Mons ++ Nots ++ NodeMons ++ FHReservation, + Effects = Mons ++ Nots ++ NodeMons, case BLH of undefined -> Effects; @@ -575,11 +573,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs, #{}, WaitingConsumers0), AllConsumers = maps:merge(Custs, WaitingConsumers1), [{send_msg, P, eol, ra_event} - || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ - [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; -state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> - FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, - [FHReservation]; + || P <- maps:keys(maps:merge(Enqs, AllConsumers))]; state_enter(_, _) -> %% catch all as not handling all states []. diff --git a/deps/rabbit/src/rabbit_fifo_v1.erl b/deps/rabbit/src/rabbit_fifo_v1.erl index a6f4c7868d17..98b762b08520 100644 --- a/deps/rabbit/src/rabbit_fifo_v1.erl +++ b/deps/rabbit/src/rabbit_fifo_v1.erl @@ -676,7 +676,6 @@ state_enter(leader, #?STATE{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, cfg = #cfg{name = Name, - resource = Resource, become_leader_handler = BLH}, prefix_msgs = {0, [], 0, []} }) -> @@ -687,8 +686,7 @@ state_enter(leader, #?STATE{consumers = Cons, Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), - FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}], - Effects = Mons ++ Nots ++ NodeMons ++ FHReservation, + Effects = Mons ++ Nots ++ NodeMons, case BLH of undefined -> Effects; @@ -704,11 +702,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs, AllConsumers = maps:merge(Custs, WaitingConsumers1), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ - [{aux, eol}, - {mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; -state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> - FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, - [FHReservation]; + [{aux, eol}]; state_enter(_, _) -> %% catch all as not handling all states []. diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl index a21fea3a7f75..1f7182611992 100644 --- a/deps/rabbit/src/rabbit_file.erl +++ b/deps/rabbit/src/rabbit_file.erl @@ -18,8 +18,6 @@ -export([filename_as_a_directory/1]). -export([filename_to_binary/1, binary_to_filename/1]). --import(file_handle_cache, [with_handle/1, with_handle/2]). - -define(TMP_EXT, ".tmp"). %%---------------------------------------------------------------------------- @@ -56,7 +54,7 @@ file_size(File) -> -spec ensure_dir((file:filename())) -> ok_or_error(). -ensure_dir(File) -> with_handle(fun () -> ensure_dir_internal(File) end). +ensure_dir(File) -> ensure_dir_internal(File). ensure_dir_internal("/") -> ok; @@ -81,16 +79,17 @@ wildcard(Pattern, Dir) -> -spec list_dir(file:filename()) -> rabbit_types:ok_or_error2([file:filename()], any()). -list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end). +list_dir(Dir) -> prim_file:list_dir(Dir). read_file_info(File) -> - with_handle(fun () -> file:read_file_info(File, [raw]) end). + file:read_file_info(File, [raw]). -spec read_term_file (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()). read_term_file(File) -> try + %% @todo OTP-27+ has file:read_file(File, [raw]). F = fun() -> {ok, FInfo} = file:read_file_info(File, [raw]), {ok, Fd} = file:open(File, [read, raw, binary]), @@ -100,7 +99,7 @@ read_term_file(File) -> file:close(Fd) end end, - {ok, Data} = with_handle(F), + {ok, Data} = F(), {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), TokenGroups = group_tokens(Tokens), {ok, [begin @@ -166,22 +165,19 @@ with_synced_copy(Path, Modes, Fun) -> true -> {error, append_not_supported, Path}; false -> - with_handle( - fun () -> - Bak = Path ++ ?TMP_EXT, - case prim_file:open(Bak, Modes) of - {ok, Hdl} -> - try - Result = Fun(Hdl), - ok = prim_file:sync(Hdl), - ok = prim_file:rename(Bak, Path), - Result - after - prim_file:close(Hdl) - end; - {error, _} = E -> E - end - end) + Bak = Path ++ ?TMP_EXT, + case prim_file:open(Bak, Modes) of + {ok, Hdl} -> + try + Result = Fun(Hdl), + ok = prim_file:sync(Hdl), + ok = prim_file:rename(Bak, Path), + Result + after + prim_file:close(Hdl) + end; + {error, _} = E -> E + end end. %% TODO the semantics of this function are rather odd. But see bug 25021. @@ -198,16 +194,12 @@ append_file(File, Suffix) -> append_file(_, _, "") -> ok; append_file(File, 0, Suffix) -> - with_handle(fun () -> - case prim_file:open([File, Suffix], [append]) of - {ok, Fd} -> prim_file:close(Fd); - Error -> Error - end - end); + case prim_file:open([File, Suffix], [append]) of + {ok, Fd} -> prim_file:close(Fd); + Error -> Error + end; append_file(File, _, Suffix) -> - case with_handle(2, fun () -> - file:copy(File, {[File, Suffix], [append]}) - end) of + case file:copy(File, {[File, Suffix], [append]}) of {ok, _BytesCopied} -> ok; Error -> Error end. @@ -223,21 +215,19 @@ ensure_parent_dirs_exist(Filename) -> -spec rename(file:filename(), file:filename()) -> ok_or_error(). -rename(Old, New) -> with_handle(fun () -> prim_file:rename(Old, New) end). +rename(Old, New) -> prim_file:rename(Old, New). -spec delete([file:filename()]) -> ok_or_error(). -delete(File) -> with_handle(fun () -> prim_file:delete(File) end). +delete(File) -> prim_file:delete(File). -spec recursive_delete([file:filename()]) -> rabbit_types:ok_or_error({file:filename(), any()}). recursive_delete(Files) -> - with_handle( - fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path); - (_Path, {error, _Err} = Error) -> Error - end, ok, Files) - end). + lists:foldl(fun (Path, ok) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files). recursive_delete1(Path) -> case is_dir_no_handle(Path) and not(is_symlink_no_handle(Path)) of @@ -315,10 +305,8 @@ recursive_copy(Src, Dest) -> lock_file(Path) -> case is_file(Path) of true -> {error, eexist}; - false -> with_handle( - fun () -> {ok, Lock} = prim_file:open(Path, [write]), - ok = prim_file:close(Lock) - end) + false -> {ok, Lock} = prim_file:open(Path, [write]), + ok = prim_file:close(Lock) end. -spec filename_as_a_directory(file:filename()) -> file:filename(). diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index da242d0f6d87..508e0a0e2b9f 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -576,7 +576,7 @@ handshake(Ref, ProxyProtocolEnabled) -> {'EXIT', normal} -> {error, handshake_failed}; {ok, Sock} -> - setup_socket(Sock), + ok = tune_buffer_size(Sock), {ok, {rabbit_proxy_socket, Sock, ProxyInfo}} end end; @@ -585,15 +585,11 @@ handshake(Ref, ProxyProtocolEnabled) -> {'EXIT', normal} -> {error, handshake_failed}; {ok, Sock} -> - setup_socket(Sock), + ok = tune_buffer_size(Sock), {ok, Sock} end end. -setup_socket(Sock) -> - ok = tune_buffer_size(Sock), - ok = file_handle_cache:obtain(). - tune_buffer_size(Sock) -> case tune_buffer_size1(Sock) of ok -> ok; diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 58dae5c6562f..8062cdac0ece 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -51,9 +51,6 @@ grow/4, grow/5]). -export([transfer_leadership/2, get_replicas/1, queue_length/1]). --export([file_handle_leader_reservation/1, - file_handle_other_reservation/0]). --export([file_handle_release_reservation/0]). -export([list_with_minimum_quorum/0, list_with_local_promotable/0, list_with_local_promotable_for_cli/0, @@ -1414,24 +1411,6 @@ matches_strategy(even, Members) -> is_match(Subj, E) -> nomatch /= re:run(Subj, E). -file_handle_leader_reservation(QName) -> - try - {ok, Q} = rabbit_amqqueue:lookup(QName), - ClusterSize = length(get_nodes(Q)), - file_handle_cache:set_reservation(2 + ClusterSize) - catch Class:Err -> - rabbit_log:warning("~s:~s/~b failed with ~w ~w", - [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, - Class, Err]) - end. - - -file_handle_other_reservation() -> - file_handle_cache:set_reservation(2). - -file_handle_release_reservation() -> - file_handle_cache:release_reservation(). - -spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}. reclaim_memory(Vhost, QueueName) -> QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index f85e92b42e4b..9b805502741d 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -374,11 +374,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> after %% We don't call gen_tcp:close/1 here since it waits for %% pending output to be sent, which results in unnecessary - %% delays. We could just terminate - the reader is the - %% controlling process and hence its termination will close - %% the socket. However, to keep the file_handle_cache - %% accounting as accurate as possible we ought to close the - %% socket w/o delay before termination. + %% delays. rabbit_net:fast_close(RealSocket), rabbit_networking:unregister_connection(self()), rabbit_core_metrics:connection_closed(self()), diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 80882bf524e5..db56c722c5d8 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -36,8 +36,6 @@ recover() -> %% faster than other nodes handled DOWN messages from us. rabbit_amqqueue:on_node_down(node()), - rabbit_amqqueue:warn_file_limit(), - %% Prepare rabbit_semi_durable_route table {Time, _} = timer:tc(fun() -> rabbit_binding:recover() diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index f242cec4379a..ce0a676c9a81 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -99,8 +99,7 @@ init_per_group(Group, Config) -> rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps() ++ [ - fun(C) -> init_per_group1(Group, C) end, - fun setup_file_handle_cache/1 + fun(C) -> init_per_group1(Group, C) end ]); false -> rabbit_ct_helpers:run_steps(Config, [ @@ -139,17 +138,6 @@ init_per_group1(from_cluster_node2, Config) -> init_per_group1(_, Config) -> Config. -setup_file_handle_cache(Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, setup_file_handle_cache1, []), - Config. - -setup_file_handle_cache1() -> - %% FIXME: Why are we doing this? - application:set_env(rabbit, file_handles_high_watermark, 100), - ok = file_handle_cache:set_limit(100), - ok. - end_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 789ca6d5a6b6..0081c7069fb6 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -80,8 +80,6 @@ groups() -> reject_after_leader_transfer, shrink_all, rebalance, - file_handle_reservations, - file_handle_reservations_above_limit, node_removal_is_not_quorum_critical, leader_locator_client_local, leader_locator_balanced, @@ -2052,61 +2050,6 @@ node_removal_is_not_quorum_critical(Config) -> ?assertEqual([], Qs). -file_handle_reservations(Config) -> - case rabbit_ct_helpers:is_mixed_versions() of - true -> - {skip, "file_handle_reservations tests isn't mixed version compatible"}; - false -> - file_handle_reservations0(Config) - end. - -file_handle_reservations0(Config) -> - Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), - {ok, _, {_, Leader}} = ra:members({RaName, Server1}), - [Follower1, Follower2] = Servers -- [Leader], - ?assertEqual([{files_reserved, 5}], - rpc:call(Leader, file_handle_cache, info, [[files_reserved]])), - ?assertEqual([{files_reserved, 2}], - rpc:call(Follower1, file_handle_cache, info, [[files_reserved]])), - ?assertEqual([{files_reserved, 2}], - rpc:call(Follower2, file_handle_cache, info, [[files_reserved]])), - force_leader_change(Servers, QQ), - {ok, _, {_, Leader0}} = ra:members({RaName, Server1}), - [Follower01, Follower02] = Servers -- [Leader0], - ?assertEqual([{files_reserved, 5}], - rpc:call(Leader0, file_handle_cache, info, [[files_reserved]])), - ?assertEqual([{files_reserved, 2}], - rpc:call(Follower01, file_handle_cache, info, [[files_reserved]])), - ?assertEqual([{files_reserved, 2}], - rpc:call(Follower02, file_handle_cache, info, [[files_reserved]])). - -file_handle_reservations_above_limit(Config) -> - [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, S1), - QQ = ?config(queue_name, Config), - QQ2 = ?config(alt_queue_name, Config), - - Limit = rpc:call(S1, file_handle_cache, get_limit, []), - - ok = rpc:call(S1, file_handle_cache, set_limit, [3]), - ok = rpc:call(S2, file_handle_cache, set_limit, [3]), - ok = rpc:call(S3, file_handle_cache, set_limit, [3]), - - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ?assertEqual({'queue.declare_ok', QQ2, 0, 0}, - declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - ok = rpc:call(S1, file_handle_cache, set_limit, [Limit]), - ok = rpc:call(S2, file_handle_cache, set_limit, [Limit]), - ok = rpc:call(S3, file_handle_cache, set_limit, [Limit]). - cleanup_data_dir(Config) -> %% With Khepri this test needs to run in a 3-node cluster, otherwise the queue can't %% be deleted in minority diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 7eeb31daaa6a..a1680c654306 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -725,29 +725,6 @@ duplicate_delivery_test(C) -> ?assertEqual(1, lqueue:len(Messages)), ok. -state_enter_file_handle_leader_reservation_test(_) -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), - become_leader_handler => {m, f, [a]}}), - - Resource = {resource, <<"/">>, queue, <<"test">>}, - Effects = rabbit_fifo:state_enter(leader, S0), - ?assertMatch([{mod_call, m, f, [a, the_name]}, - _Timer, - {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} - | _], Effects), - ok. - -state_enter_file_handle_other_reservation_test(_) -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), - Effects = rabbit_fifo:state_enter(other, S0), - ?assertEqual([ - {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []} - ], - Effects), - ok. - state_enter_monitors_and_notifications_test(C) -> Oth = spawn(fun () -> ok end), {State0, _} = enq(C, 1, 1, first, test_init(test)), diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index cc5844076982..787b60a30d00 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -63,8 +63,6 @@ end_per_group(_, Config) -> init_per_testcase(TestCase, Config) -> meck:new(rabbit_quorum_queue, [passthrough]), meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end), - meck:expect(rabbit_quorum_queue, file_handle_leader_reservation, fun (_) -> ok end), - meck:expect(rabbit_quorum_queue, file_handle_other_reservation, fun () -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), ra_server_sup_sup:remove_all(?RA_SYSTEM), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), diff --git a/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl b/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl index ff0778235c12..988ab8b764b2 100644 --- a/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl @@ -541,29 +541,6 @@ duplicate_delivery_test(_) -> ?assertEqual(1, maps:size(Messages)), ok. -state_enter_file_handle_leader_reservation_test(_) -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), - become_leader_handler => {m, f, [a]}}), - - Resource = {resource, <<"/">>, queue, <<"test">>}, - Effects = rabbit_fifo_v0:state_enter(leader, S0), - ?assertEqual([ - {mod_call, m, f, [a, the_name]}, - {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} - ], Effects), - ok. - -state_enter_file_handle_other_reservation_test(_) -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), - Effects = rabbit_fifo_v0:state_enter(other, S0), - ?assertEqual([ - {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []} - ], - Effects), - ok. - state_enter_monitors_and_notifications_test(_) -> Oth = spawn(fun () -> ok end), {State0, _} = enq(1, 1, first, test_init(test)), diff --git a/deps/rabbit/test/unit_gen_server2_SUITE.erl b/deps/rabbit/test/unit_gen_server2_SUITE.erl index efb6f3b65e36..08d2ed905e23 100644 --- a/deps/rabbit/test/unit_gen_server2_SUITE.erl +++ b/deps/rabbit/test/unit_gen_server2_SUITE.erl @@ -65,8 +65,8 @@ gen_server2_with_state(Config) -> ?MODULE, gen_server2_with_state1, [Config]). gen_server2_with_state1(_Config) -> - fhc_state = gen_server2:with_state(file_handle_cache, - fun (S) -> element(1, S) end), + state = gen_server2:with_state(background_gc, + fun (S) -> element(1, S) end), passed. diff --git a/deps/rabbit_common/src/worker_pool_worker.erl b/deps/rabbit_common/src/worker_pool_worker.erl index 283b3b1f7392..1444063ffb30 100644 --- a/deps/rabbit_common/src/worker_pool_worker.erl +++ b/deps/rabbit_common/src/worker_pool_worker.erl @@ -17,7 +17,6 @@ -export([start_link/1, next_job_from/2, submit/3, submit_async/2, run/1]). --export([set_maximum_since_use/2]). -export([set_timeout/2, set_timeout/3, clear_timeout/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -32,7 +31,6 @@ -spec submit(pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A. -spec submit_async(pid(), fun (() -> any()) | mfargs()) -> 'ok'. -spec run(fun (() -> A)) -> A; (mfargs()) -> any(). --spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. %%---------------------------------------------------------------------------- @@ -53,9 +51,6 @@ submit(Pid, Fun, ProcessModel) -> submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun, self()}). -set_maximum_since_use(Pid, Age) -> - gen_server2:cast(Pid, {set_maximum_since_use, Age}). - run({M, F, A}) -> apply(M, F, A); run(Fun) -> Fun(). @@ -76,15 +71,12 @@ run(Fun, single) -> %%---------------------------------------------------------------------------- init([PoolName]) -> - ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, - [self()]), ok = worker_pool:ready(PoolName, self()), put(worker_pool_worker, true), put(worker_pool_name, PoolName), {ok, undefined, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. @@ -120,10 +112,6 @@ handle_cast({submit_async, Fun, CPid}, {from, CPid, MRef}) -> ok = worker_pool:idle(get(worker_pool_name), self()), {noreply, undefined, hibernate}; -handle_cast({set_maximum_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - {noreply, State, hibernate}; - handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index d336fb6a3e29..48d1fae89feb 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -41,8 +41,7 @@ conserve = false :: boolean(), stats_timer :: option(rabbit_event:state()), keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(), - conn_name :: option(binary()), - should_use_fhc :: rabbit_types:option(boolean()) + conn_name :: option(binary()) }). -type state() :: #state{}. @@ -82,15 +81,10 @@ init(Req, Opts) -> true -> WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), WsOpts = maps:merge(#{compress => true}, WsOpts0), - ShouldUseFHC = application:get_env(?APP, use_file_handle_cache, true), - case ShouldUseFHC of - true -> ?LOG_INFO("Web MQTT: file handle cache use is enabled"); - false -> ?LOG_INFO("Web MQTT: file handle cache use is disabled") - end, {?MODULE, cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req), - #state{socket = maps:get(proxy_header, Req, undefined), should_use_fhc = ShouldUseFHC}, + #state{socket = maps:get(proxy_header, Req, undefined)}, WsOpts} end end. @@ -111,15 +105,8 @@ info(Pid, Items) -> -spec websocket_init(state()) -> {cowboy_websocket:commands(), state()} | {cowboy_websocket:commands(), state(), hibernate}. -websocket_init(State0 = #state{socket = Sock, should_use_fhc = ShouldUseFHC}) -> +websocket_init(State0 = #state{socket = Sock}) -> logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [web_mqtt]}), - case ShouldUseFHC of - true -> - ok = file_handle_cache:obtain(); - false -> ok; - undefined -> - ok = file_handle_cache:obtain() - end, case rabbit_net:connection_string(Sock, inbound) of {ok, ConnStr} -> ConnName = rabbit_data_coercion:to_binary(ConnStr), @@ -271,18 +258,10 @@ terminate(Reason, Request, #state{} = State) -> terminate(_Reason, _Request, {SendWill, #state{conn_name = ConnName, proc_state = PState, - keepalive = KState, - should_use_fhc = ShouldUseFHC} = State}) -> + keepalive = KState} = State}) -> ?LOG_INFO("Web MQTT closing connection ~ts", [ConnName]), maybe_emit_stats(State), _ = rabbit_mqtt_keepalive:cancel_timer(KState), - case ShouldUseFHC of - true -> - ok = file_handle_cache:release(); - false -> ok; - undefined -> - ok = file_handle_cache:release() - end, case PState of connect_packet_unprocessed -> ok; @@ -296,12 +275,9 @@ terminate(_Reason, _Request, no_supported_sub_protocol(Protocol, Req) -> %% The client MUST include “mqtt” in the list of WebSocket Sub Protocols it offers [MQTT-6.0.0-3]. ?LOG_ERROR("Web MQTT: 'mqtt' not included in client offered subprotocols: ~tp", [Protocol]), - %% Set should_use_fhc to false, because at this early stage of init no fhc - %% obtain was called, so terminate/3 should not call fhc release - %% (even if use_file_handle_cache is true) {ok, cowboy_req:reply(400, #{<<"connection">> => <<"close">>}, Req), - #state{should_use_fhc = false}}. + #state{}}. handle_data(Data, State0 = #state{}) -> case handle_data1(Data, State0) of diff --git a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl index 07dd03737b2f..c727ec3de505 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl @@ -42,8 +42,7 @@ peername, auth_hd, stats_timer, - connection, - should_use_fhc :: rabbit_types:option(boolean()) + connection }). -define(APP, rabbitmq_web_stomp). @@ -84,11 +83,6 @@ init(Req0, Opts) -> end, WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), WsOpts = maps:merge(#{compress => true}, WsOpts0), - ShouldUseFHC = application:get_env(?APP, use_file_handle_cache, true), - case ShouldUseFHC of - true -> ?LOG_INFO("Web STOMP: file handle cache use is enabled"); - false -> ?LOG_INFO("Web STOMP: file handle cache use is disabled") - end, {?MODULE, Req, #state{ frame_type = proplists:get_value(type, Opts, text), heartbeat_sup = KeepaliveSup, @@ -98,18 +92,10 @@ init(Req0, Opts) -> conserve_resources = false, socket = SockInfo, peername = PeerAddr, - auth_hd = cowboy_req:header(<<"authorization">>, Req), - should_use_fhc = ShouldUseFHC + auth_hd = cowboy_req:header(<<"authorization">>, Req) }, WsOpts}. -websocket_init(State = #state{should_use_fhc = ShouldUseFHC}) -> - case ShouldUseFHC of - true -> - ok = file_handle_cache:obtain(); - false -> ok; - undefined -> - ok = file_handle_cache:obtain() - end, +websocket_init(State) -> process_flag(trap_exit, true), {ok, ProcessorState} = init_processor_state(State), {ok, rabbit_event:init_stats_timer( @@ -330,15 +316,8 @@ maybe_block(State, _) -> stop(State) -> stop(State, 1000, "STOMP died"). -stop(State = #state{proc_state = ProcState, should_use_fhc = ShouldUseFHC}, CloseCode, Error0) -> +stop(State = #state{proc_state = ProcState}, CloseCode, Error0) -> maybe_emit_stats(State), - case ShouldUseFHC of - true -> - ok = file_handle_cache:release(); - false -> ok; - undefined -> - ok = file_handle_cache:release() - end, _ = rabbit_stomp_processor:flush_and_die(ProcState), Error1 = rabbit_data_coercion:to_binary(Error0), {[{close, CloseCode, Error1}], State}.