Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

method level option to ignore server eovercrowded #2820

Merged
merged 9 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/brpc/baidu_master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class BaiduMasterService : public ::google::protobuf::Service
return _max_concurrency;
}

bool& IgnoreEOverCrowded() {
chenBright marked this conversation as resolved.
Show resolved Hide resolved
return ignore_eovercrowded;
}

virtual void ProcessRpcRequest(Controller* cntl,
const SerializedRequest* request,
SerializedResponse* response,
Expand Down Expand Up @@ -92,6 +96,7 @@ friend class Server;

MethodStatus* _status;
AdaptiveMaxConcurrency _max_concurrency;
bool ignore_eovercrowded = false;
chenBright marked this conversation as resolved.
Show resolved Hide resolved
};

}
Expand Down
21 changes: 15 additions & 6 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,12 +491,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
break;
}

if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}

if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
Expand Down Expand Up @@ -524,6 +518,13 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
google::protobuf::Service* svc = NULL;
google::protobuf::MethodDescriptor* method = NULL;
if (NULL != server->options().baidu_master_service) {
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!server->options().baidu_master_service->IgnoreEOverCrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
svc = server->options().baidu_master_service;
auto sampled_request = new (std::nothrow) SampledRequest;
if (NULL == sampled_request) {
Expand Down Expand Up @@ -586,6 +587,14 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);
break;
}
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!mp->ignore_eovercrowded) {
cntl->SetFailed(
EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
// Switch to service-specific error.
non_service_error.release();
method_status = mp->status;
Expand Down
4 changes: 3 additions & 1 deletion src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,9 @@ void ProcessHttpRequest(InputMessageBase *msg) {
// NOTE: accesses to builtin services are not counted as part of
// concurrency, therefore are not limited by ServerOptions.max_concurrency.
if (!sp->is_builtin_service && !sp->params.is_tabbed) {
if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) {
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!sp->ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
return;
Expand Down
14 changes: 8 additions & 6 deletions src/brpc/policy/hulu_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,6 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
break;
}

if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}

if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
Expand All @@ -454,6 +448,14 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
sp->service->CallMethod(sp->method, cntl.get(), &breq, &bres, NULL);
break;
}
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!sp->ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}

// Switch to service-specific error.
non_service_error.release();
method_status = sp->status;
Expand Down
13 changes: 7 additions & 6 deletions src/brpc/policy/sofa_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,6 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
break;
}

if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}

if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
Expand All @@ -406,6 +400,13 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
meta.method().c_str());
break;
}
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!sp->ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
// Switch to service-specific error.
non_service_error.release();
method_status = sp->status;
Expand Down
76 changes: 76 additions & 0 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ Server::Server(ProfilerLinker)
, _builtin_service_count(0)
, _virtual_service_count(0)
, _failed_to_set_max_concurrency_of_method(false)
, _failed_to_set_ignore_eovercrowded(false)
, _am(NULL)
, _internal_am(NULL)
, _first_service(NULL)
Expand Down Expand Up @@ -795,6 +796,7 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
#endif

static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0);
static bool g_default_ignore_eovercrowded(false);

int Server::StartInternal(const butil::EndPoint& endpoint,
const PortRange& port_range,
Expand All @@ -806,6 +808,12 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
"fix it before starting server";
return -1;
}
if (_failed_to_set_ignore_eovercrowded) {
_failed_to_set_ignore_eovercrowded = false;
LOG(ERROR) << "previous call to IgnoreEovercrowdedOf() was failed, "
"fix it before starting server";
return -1;
}
if (InitializeOnce() != 0) {
LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
return -1;
Expand Down Expand Up @@ -2302,6 +2310,74 @@ int Server::MaxConcurrencyOf(google::protobuf::Service* service,
return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
}

bool& Server::IgnoreEovercrowdedOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started";
return g_default_ignore_eovercrowded;
}
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support ignore_eovercrowded";
_failed_to_set_ignore_eovercrowded = true;
return g_default_ignore_eovercrowded;
}
return mp->ignore_eovercrowded;
}

bool Server::IgnoreEovercrowdedOf(const MethodProperty* mp) const {
if (IsRunning()) {
LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started";
return g_default_ignore_eovercrowded;
}
if (mp == NULL || mp->status == NULL) {
return false;
chenBright marked this conversation as resolved.
Show resolved Hide resolved
}
return mp->ignore_eovercrowded;
}

bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_ignore_eovercrowded = true;
return g_default_ignore_eovercrowded;
}
return IgnoreEovercrowdedOf(mp);
}

bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const {
return IgnoreEovercrowdedOf(_method_map.seek(full_method_name));
}

bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) {
MethodProperty* mp = const_cast<MethodProperty*>(
FindMethodPropertyByFullName(full_service_name, method_name));
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_service_name
<< '/' << method_name;
_failed_to_set_ignore_eovercrowded = true;
return g_default_ignore_eovercrowded;
}
return IgnoreEovercrowdedOf(mp);
}

bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const {
return IgnoreEovercrowdedOf(FindMethodPropertyByFullName(
full_service_name, method_name));
}

bool& Server::IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) {
return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name);
}

bool Server::IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const {
return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name);
}

bool Server::AcceptRequest(Controller* cntl) const {
const Interceptor* interceptor = _options.interceptor;
if (!interceptor) {
Expand Down
22 changes: 22 additions & 0 deletions src/brpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ class Server {
const google::protobuf::MethodDescriptor* method;
MethodStatus* status;
AdaptiveMaxConcurrency max_concurrency;
// ignore_eovercrowded on method-level, it should be used with carefulness.
// It might introduce inbalance between methods,
// as some methods(ignore_eovercrowded=1) might never return eovercrowded
chenBright marked this conversation as resolved.
Show resolved Hide resolved
// while other methods(ignore_eovercrowded=0) keep returning eovercrowded.
// currently only valid for baidu_rpc, http_rpc, hulu_pbrpc and sofa_pbrpc protocols
bool ignore_eovercrowded = false;
chenBright marked this conversation as resolved.
Show resolved Hide resolved

MethodProperty();
};
Expand Down Expand Up @@ -595,6 +601,19 @@ class Server {
int MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const;

bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name);
bool IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const;

bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name);
bool IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const;

bool& IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name);
bool IgnoreEovercrowdedOf(google::protobuf::Service* service,
chenBright marked this conversation as resolved.
Show resolved Hide resolved
const butil::StringPiece& method_name) const;

int Concurrency() const {
return butil::subtle::NoBarrier_Load(&_concurrency);
};
Expand Down Expand Up @@ -699,6 +718,8 @@ friend class Controller;

AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const;
bool& IgnoreEovercrowdedOf(MethodProperty*);
bool IgnoreEovercrowdedOf(const MethodProperty*) const;

static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out);
Expand Down Expand Up @@ -731,6 +752,7 @@ friend class Controller;
// number of the virtual services for mapping URL to methods.
int _virtual_service_count;
bool _failed_to_set_max_concurrency_of_method;
bool _failed_to_set_ignore_eovercrowded;
Acceptor* _am;
Acceptor* _internal_am;

Expand Down
Loading