diff --git a/docs/cn/server.md b/docs/cn/server.md index 9606ab1d40..0e68c51d40 100644 --- a/docs/cn/server.md +++ b/docs/cn/server.md @@ -1057,6 +1057,24 @@ Protobuf arena是一种Protobuf message内存管理机制,有着提高内存 注意:从Protobuf v3.14.0开始,[默认开启arena](https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0)。但是Protobuf v3.14.0之前的版本,用户需要再proto文件中加上选项:`option cc_enable_arenas = true;`,所以为了兼容性,可以统一都加上该选项。 +## server端忽略eovercrowded +### server级别忽略eovercrowded +设置ServerOptions.ignore_eovercrowded,默认值0代表不忽略 + +### method级别忽略eovercrowded +server.IgnoreEovercrowdedOf("...") = ...可设置method级别的ignore_eovercrowded。也可以通过设置ServerOptions.ignore_eovercrowded一次性为所有的method设置忽略eovercrowded。 + +```c++ +ServerOptions.ignore_eovercrowded = true; // Set the default ignore_eovercrowded for all methods +server.IgnoreEovercrowdedOf("example.EchoService.Echo") = true; +``` + +此设置一般**发生在AddService后,server启动前**。当设置失败时(比如对应的method不存在),server会启动失败同时提示用户修正IgnoreEovercrowdedOf设置错误。 + +当ServerOptions.ignore_eovercrowded和server.IgnoreEovercrowdedOf("...")=...同时被设置时,任何一个设置为true,就表示会忽略eovercrowded。 + +注意:没有service级别的ignore_eovercrowded。 + # FAQ ### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF是什么意思 diff --git a/docs/en/server.md b/docs/en/server.md index 1c2923a6b3..e49f8512ae 100644 --- a/docs/en/server.md +++ b/docs/en/server.md @@ -1051,6 +1051,26 @@ Users can set `ServerOptions.rpc_pb_message_factory = brpc::GetArenaRpcPBMessage Note: Since Protocol Buffers v3.14.0, Arenas are now unconditionally enabled. However, for versions prior to Protobuf v3.14.0, users need to add the option `option cc_enable_arenas = true;` to the proto file. so for compatibility, this option can be added uniformly. +## Ignoring eovercrowded on server-side +### Ignore eovercrowded on server-level + +Set ServerOptions.ignore_eovercrowded. Default value is 0 which means not ignored. + +### Ignore eovercrowded on method-level + +server.IgnoreEovercrowdedOf("...") = … sets ignore_eovercrowded of the method. Possible settings: + +```c++ +ServerOptions.ignore_eovercrowded = true; // Set the default ignore_eovercrowded for all methods +server.IgnoreEovercrowdedOf("example.EchoService.Echo") = true; +``` + +The code is generally put **after AddService, before Start() of the server**. When a setting fails(namely the method does not exist), server will fail to start and notify user to fix settings on IgnoreEovercrowdedOf. + +When method-level and server-level ignore_eovercrowded are both set, if any one of them is set to true, eovercrowded will be ignored. + +NOTE: No service-level ignore_eovercrowded. + # FAQ ### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF diff --git a/src/brpc/baidu_master_service.cpp b/src/brpc/baidu_master_service.cpp index 0b98373292..b1b0fa0dbf 100644 --- a/src/brpc/baidu_master_service.cpp +++ b/src/brpc/baidu_master_service.cpp @@ -22,7 +22,7 @@ namespace brpc { BaiduMasterService::BaiduMasterService() - :_status(new(std::nothrow) MethodStatus) { + : _status(new (std::nothrow) MethodStatus), _ignore_eovercrowded(false) { LOG_IF(FATAL, NULL == _status) << "Fail to new MethodStatus"; } diff --git a/src/brpc/baidu_master_service.h b/src/brpc/baidu_master_service.h index 9dc7ebbf90..11846fd9b4 100644 --- a/src/brpc/baidu_master_service.h +++ b/src/brpc/baidu_master_service.h @@ -45,6 +45,14 @@ class BaiduMasterService : public ::google::protobuf::Service return _max_concurrency; } + bool ignore_eovercrowded() { + return _ignore_eovercrowded; + } + + void set_ignore_eovercrowded(bool ignore_eovercrowded) { + _ignore_eovercrowded = ignore_eovercrowded; + } + virtual void ProcessRpcRequest(Controller* cntl, const SerializedRequest* request, SerializedResponse* response, @@ -92,6 +100,7 @@ friend class Server; MethodStatus* _status; AdaptiveMaxConcurrency _max_concurrency; + bool _ignore_eovercrowded; }; } diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 53866262b5..f8ea1f6361 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -491,12 +491,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { cntl->SetFailed(ELOGOFF, "Server is stopping"); break; } - - if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { - cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", - butil::endpoint2str(socket->remote_side()).c_str()); - break; - } if (!server_accessor.AddConcurrency(cntl.get())) { cntl->SetFailed( @@ -524,6 +518,13 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { google::protobuf::Service* svc = NULL; google::protobuf::MethodDescriptor* method = NULL; if (NULL != server->options().baidu_master_service) { + if (socket->is_overcrowded() && + !server->options().ignore_eovercrowded && + !server->options().baidu_master_service->ignore_eovercrowded()) { + cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } svc = server->options().baidu_master_service; auto sampled_request = new (std::nothrow) SampledRequest; if (NULL == sampled_request) { @@ -586,6 +587,14 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL); break; } + if (socket->is_overcrowded() && + !server->options().ignore_eovercrowded && + !mp->ignore_eovercrowded) { + cntl->SetFailed( + EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } // Switch to service-specific error. non_service_error.release(); method_status = mp->status; diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 76f43c0524..2dd18076fb 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -1495,7 +1495,9 @@ void ProcessHttpRequest(InputMessageBase *msg) { // NOTE: accesses to builtin services are not counted as part of // concurrency, therefore are not limited by ServerOptions.max_concurrency. if (!sp->is_builtin_service && !sp->params.is_tabbed) { - if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { + if (socket->is_overcrowded() && + !server->options().ignore_eovercrowded && + !sp->ignore_eovercrowded) { cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", butil::endpoint2str(socket->remote_side()).c_str()); return; diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index 20e9c827d2..bf4bd86f5c 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -422,12 +422,6 @@ void ProcessHuluRequest(InputMessageBase* msg_base) { break; } - if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { - cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", - butil::endpoint2str(socket->remote_side()).c_str()); - break; - } - if (!server_accessor.AddConcurrency(cntl.get())) { cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", server->options().max_concurrency); @@ -454,6 +448,14 @@ void ProcessHuluRequest(InputMessageBase* msg_base) { sp->service->CallMethod(sp->method, cntl.get(), &breq, &bres, NULL); break; } + if (socket->is_overcrowded() && + !server->options().ignore_eovercrowded && + !sp->ignore_eovercrowded) { + cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } + // Switch to service-specific error. non_service_error.release(); method_status = sp->status; diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index ad58022ffd..ae128b4051 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -381,12 +381,6 @@ void ProcessSofaRequest(InputMessageBase* msg_base) { break; } - if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { - cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", - butil::endpoint2str(socket->remote_side()).c_str()); - break; - } - if (!server_accessor.AddConcurrency(cntl.get())) { cntl->SetFailed( ELIMIT, "Reached server's max_concurrency=%d", @@ -406,6 +400,13 @@ void ProcessSofaRequest(InputMessageBase* msg_base) { meta.method().c_str()); break; } + if (socket->is_overcrowded() && + !server->options().ignore_eovercrowded && + !sp->ignore_eovercrowded) { + cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } // Switch to service-specific error. non_service_error.release(); method_status = sp->status; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 0110761af7..e904e6c8af 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -180,7 +180,8 @@ Server::MethodProperty::MethodProperty() , http_url(NULL) , service(NULL) , method(NULL) - , status(NULL) { + , status(NULL) + , ignore_eovercrowded(false) { } static timeval GetUptime(void* arg/*start_time*/) { @@ -412,6 +413,7 @@ Server::Server(ProfilerLinker) , _builtin_service_count(0) , _virtual_service_count(0) , _failed_to_set_max_concurrency_of_method(false) + , _failed_to_set_ignore_eovercrowded(false) , _am(NULL) , _internal_am(NULL) , _first_service(NULL) @@ -795,6 +797,7 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) { #endif static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0); +static bool g_default_ignore_eovercrowded(false); int Server::StartInternal(const butil::EndPoint& endpoint, const PortRange& port_range, @@ -806,6 +809,12 @@ int Server::StartInternal(const butil::EndPoint& endpoint, "fix it before starting server"; return -1; } + if (_failed_to_set_ignore_eovercrowded) { + _failed_to_set_ignore_eovercrowded = false; + LOG(ERROR) << "previous call to IgnoreEovercrowdedOf() was failed, " + "fix it before starting server"; + return -1; + } if (InitializeOnce() != 0) { LOG(ERROR) << "Fail to initialize Server[" << version() << ']'; return -1; @@ -2302,6 +2311,38 @@ int Server::MaxConcurrencyOf(google::protobuf::Service* service, return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name); } +bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) { + MethodProperty* mp = _method_map.seek(full_method_name); + if (mp == NULL) { + LOG(ERROR) << "Fail to find method=" << full_method_name; + _failed_to_set_ignore_eovercrowded = true; + return g_default_ignore_eovercrowded; + } + if (IsRunning()) { + LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started"; + return g_default_ignore_eovercrowded; + } + if (mp->status == NULL) { + LOG(ERROR) << "method=" << mp->method->full_name() + << " does not support ignore_eovercrowded"; + _failed_to_set_ignore_eovercrowded = true; + return g_default_ignore_eovercrowded; + } + return mp->ignore_eovercrowded; +} + +bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const { + MethodProperty* mp = _method_map.seek(full_method_name); + if (IsRunning()) { + LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started"; + return g_default_ignore_eovercrowded; + } + if (mp == NULL || mp->status == NULL) { + return false; + } + return mp->ignore_eovercrowded; +} + bool Server::AcceptRequest(Controller* cntl) const { const Interceptor* interceptor = _options.interceptor; if (!interceptor) { diff --git a/src/brpc/server.h b/src/brpc/server.h index ee5a500d1b..8d1b093cc7 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -417,6 +417,12 @@ class Server { const google::protobuf::MethodDescriptor* method; MethodStatus* status; AdaptiveMaxConcurrency max_concurrency; + // ignore_eovercrowded on method-level, it should be used with carefulness. + // It might introduce inbalance between methods, + // as some methods(ignore_eovercrowded=true) might never return eovercrowded + // while other methods(ignore_eovercrowded=false) keep returning eovercrowded. + // currently only valid for baidu_master_service, baidu_rpc, http_rpc, hulu_pbrpc and sofa_pbrpc protocols + bool ignore_eovercrowded; MethodProperty(); }; @@ -595,6 +601,9 @@ class Server { int MaxConcurrencyOf(google::protobuf::Service* service, const butil::StringPiece& method_name) const; + bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name); + bool IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const; + int Concurrency() const { return butil::subtle::NoBarrier_Load(&_concurrency); }; @@ -731,6 +740,7 @@ friend class Controller; // number of the virtual services for mapping URL to methods. int _virtual_service_count; bool _failed_to_set_max_concurrency_of_method; + bool _failed_to_set_ignore_eovercrowded; Acceptor* _am; Acceptor* _internal_am;