Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support success limit of ParallelChannel #2842

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/cn/combo_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ ParallelChannel (有时被称为“pchan”)同时访问其包含的sub channel

示例代码见[example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/)。

任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。
任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。

用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。

用户可以设置ParallelChannelOptions.success_limit来控制访问的最大成功次数,当成功的访问达到这个数目时,RPC会立刻结束。ParallelChannelOptions.fail_limit的优先级高于ParallelChannelOptions.success_limit,只有未设置fail_limit时,success_limit才会生效。

一个sub channel可多次加入同一个ParallelChannel。当你需要对同一个服务发起多次异步访问并等待它们完成的话,这很有用。

Expand Down
6 changes: 5 additions & 1 deletion docs/en/combo_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ We need a better abstraction. If several channels are combined into a larger one

Check [example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/) for an example.

Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, including `ParallelChannel` and other combo channels. Set `ParallelChannelOptions.fail_limit` to control maximum number of failures. When number of failed responses reaches the limit, the RPC is ended immediately rather than waiting for timeout.
Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, including `ParallelChannel` and other combo channels.

Set `ParallelChannelOptions.fail_limit` to control maximum number of failures. When number of failed responses reaches the limit, the RPC is ended immediately rather than waiting for timeout.

Set `ParallelChannelOptions.sucess_limit` to control maximum number of successful responses. When number of successful responses reaches the limit, the RPC is ended immediately.`ParallelChannelOptions.fail_limit` has a higher priority than `ParallelChannelOptions.success_limit`. Success_limit will take effect only when fail_limit is not set.

A sub channel can be added to the same `ParallelChannel` more than once, which is useful when you need to initiate multiple asynchronous RPC to the same service and wait for their completions.

Expand Down
68 changes: 45 additions & 23 deletions src/brpc/parallel_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,8 @@
#include "brpc/details/controller_private_accessor.h"
#include "brpc/parallel_channel.h"


namespace brpc {

ParallelChannelOptions::ParallelChannelOptions()
: timeout_ms(500)
, fail_limit(-1) {
}

DECLARE_bool(usercode_in_pthread);

// Not see difference when memory is cached.
Expand All @@ -45,12 +39,15 @@ static __thread Memory tls_cached_pchan_mem = { 0, NULL };

class ParallelChannelDone : public google::protobuf::Closure {
private:
ParallelChannelDone(int fail_limit, int ndone, int nchan, int memsize,
ParallelChannelDone(int fail_limit, int success_limit,
int ndone, int nchan, int memsize,
Controller* cntl, google::protobuf::Closure* user_done)
: _fail_limit(fail_limit)
, _success_limit(success_limit)
, _ndone(ndone)
, _nchan(nchan)
, _memsize(memsize)
, _current_success(0)
, _current_fail(0)
, _current_done(0)
, _cntl(cntl)
Expand All @@ -59,15 +56,13 @@ class ParallelChannelDone : public google::protobuf::Closure {
, _callmethod_pthread(0) {
}

~ParallelChannelDone() { }

public:
class SubDone : public google::protobuf::Closure {
public:
SubDone() : shared_data(NULL) {
}

~SubDone() {
~SubDone() override {
// Can't delete request/response in ~SubCall because the
// object is copyable.
if (ap.flags & DELETE_REQUEST) {
Expand All @@ -78,7 +73,7 @@ class ParallelChannelDone : public google::protobuf::Closure {
}
}

void Run() {
void Run() override {
shared_data->OnSubDoneRun(this);
}

Expand All @@ -89,7 +84,8 @@ class ParallelChannelDone : public google::protobuf::Closure {
};

static ParallelChannelDone* Create(
int fail_limit, int ndone, const SubCall* aps, int nchan,
int fail_limit, int success_limit,
int ndone, const SubCall* aps, int nchan,
Controller* cntl, google::protobuf::Closure* user_done) {
// We need to create the object in this way because _sub_done is
// dynamically allocated.
Expand Down Expand Up @@ -130,8 +126,8 @@ class ParallelChannelDone : public google::protobuf::Closure {
return NULL;
}
#endif
ParallelChannelDone* d = new (mem) ParallelChannelDone(
fail_limit, ndone, nchan, memsize, cntl, user_done);
auto d = new (mem) ParallelChannelDone(
fail_limit, success_limit, ndone, nchan, memsize, cntl, user_done);

// Apply client settings of _cntl to controllers of sub calls, except
// timeout. If we let sub channel do their timeout separately, when
Expand Down Expand Up @@ -183,7 +179,7 @@ class ParallelChannelDone : public google::protobuf::Closure {
}
}

void Run() {
void Run() override {
const int ec = _cntl->ErrorCode();
if (ec == EPCHANFINISH) {
// all sub calls finished. Clear the error and we'll set
Expand Down Expand Up @@ -220,14 +216,25 @@ class ParallelChannelDone : public google::protobuf::Closure {
if (fin != NULL) {
// [ called from SubDone::Run() ]

// Count failed sub calls, if fail_limit is reached, cancel others.
if (fin->cntl.FailedInline() &&
_current_fail.fetch_add(1, butil::memory_order_relaxed) + 1
== _fail_limit) {
int error_code = fin->cntl.ErrorCode();
// EPCHANFINISH is not an error of sub calls.
bool fail = 0 != error_code && EPCHANFINISH != error_code;
bool cancel =
// Count failed sub calls, if `fail_limit' is reached, cancel others.
(fail && _current_fail.fetch_add(1, butil::memory_order_relaxed) + 1
== _fail_limit) ||
// Count successful sub calls, if `success_limit' is reached, cancel others.
(0 == error_code &&
_current_success.fetch_add(1, butil::memory_order_relaxed) + 1
== _success_limit);

if (cancel) {
// Only cancel once by `fail_limit' or `success_limit'.
for (int i = 0; i < _ndone; ++i) {
SubDone* sd = sub_done(i);
if (fin != sd) {
bthread_id_error(sd->cntl.call_id(), ECANCELED);
bthread_id_error(
sd->cntl.call_id(), fail ? ECANCELED : EPCHANFINISH);
}
}
}
Expand Down Expand Up @@ -423,13 +430,15 @@ class ParallelChannelDone : public google::protobuf::Closure {

private:
int _fail_limit;
int _success_limit;
int _ndone;
int _nchan;
#if defined(__clang__)
int ALLOW_UNUSED _memsize;
#else
int _memsize;
#endif
butil::atomic<int> _current_success;
butil::atomic<int> _current_fail;
butil::atomic<uint32_t> _current_done;
Controller* _cntl;
Expand Down Expand Up @@ -602,6 +611,7 @@ void ParallelChannel::CallMethod(
ParallelChannelDone* d = NULL;
int ndone = nchan;
int fail_limit = 1;
int success_limit = 1;
DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);

if (cntl->FailedInline()) {
Expand Down Expand Up @@ -655,9 +665,21 @@ void ParallelChannel::CallMethod(
fail_limit = ndone;
}
}

d = ParallelChannelDone::Create(fail_limit, ndone, aps, nchan,
cntl, done);

// `success_limit' is only valid when `fail_limit' is not set.
if (_options.fail_limit >= 0 || _options.success_limit < 0) {
success_limit = ndone;
} else {
success_limit = _options.success_limit;
if (success_limit < 1) {
success_limit = 1;
} else if (success_limit > ndone) {
success_limit = ndone;
}
}

d = ParallelChannelDone::Create(
fail_limit, success_limit, ndone, aps, nchan, cntl, done);
if (NULL == d) {
cntl->SetFailed(ENOMEM, "Fail to new ParallelChannelDone");
goto FAIL;
Expand Down
28 changes: 15 additions & 13 deletions src/brpc/parallel_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class CallMapper : public SharedObject {
}

// Only callable by subclasses and butil::intrusive_ptr
virtual ~CallMapper() {}
~CallMapper() override = default;
};

// Clone req_base typed `Req'.
Expand Down Expand Up @@ -140,12 +140,11 @@ class ResponseMerger : public SharedObject {
FAIL_ALL
};

ResponseMerger() { }
virtual Result Merge(google::protobuf::Message* response,
const google::protobuf::Message* sub_response) = 0;
protected:
// Only callable by subclasses and butil::intrusive_ptr
virtual ~ResponseMerger() { }
~ResponseMerger() override = default;
};

struct ParallelChannelOptions {
Expand All @@ -156,7 +155,7 @@ struct ParallelChannelOptions {
// Overridable by Controller.set_timeout_ms().
// Default: 500 (milliseconds)
// Maximum: 0x7fffffff (roughly 30 days)
int32_t timeout_ms;
int32_t timeout_ms{500};

// The RPC is considered to be successful if number of failed sub RPC
// does not reach this limit. Even if the RPC is timedout or canceled,
Expand All @@ -165,10 +164,14 @@ struct ParallelChannelOptions {
// the timeout) when the limit is reached.
// Default: number of sub channels, meaning that the RPC to ParallChannel
// does not fail unless all sub RPC failed.
int fail_limit;
int fail_limit{-1};

// Construct with default options.
ParallelChannelOptions();
// The RPC is considered to be successful when number of successful sub
// RPC reach this limit.
// Default: number of sub channels, meaning that the RPC to ParallChannel
// does not return unless all sub RPC succeed.
// Note: `success_limit' is only valid when `fail_limit' is not set.
int success_limit{ -1};
};

// ParallelChannel(aka "pchan") accesses all sub channels simultaneously with
Expand All @@ -185,8 +188,7 @@ struct ParallelChannelOptions {
class ParallelChannel : public ChannelBase {
friend class Controller;
public:
ParallelChannel() { }
~ParallelChannel();
~ParallelChannel() override;

// Initialize ParallelChannel with `options'.
// NOTE: Currently this function always returns 0.
Expand Down Expand Up @@ -234,7 +236,7 @@ friend class Controller;
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done);
google::protobuf::Closure* done) override;

// Number of sub channels.
size_t channel_count() const { return _chans.size(); }
Expand All @@ -245,10 +247,10 @@ friend class Controller;

// Minimum weight of sub channels.
// FIXME(gejun): be minimum of top(nchan-fail_limit)
int Weight();
int Weight() override;

// Put description into `os'.
void Describe(std::ostream& os, const DescribeOptions&) const;
void Describe(std::ostream& os, const DescribeOptions&) const override;

public:
struct SubChan {
Expand All @@ -263,7 +265,7 @@ friend class Controller;

protected:
static void* RunDoneAndDestroy(void* arg);
int CheckHealth();
int CheckHealth() override;

ParallelChannelOptions _options;
ChannelList _chans;
Expand Down
85 changes: 83 additions & 2 deletions test/brpc_channel_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,24 @@ class ChannelTest : public ::testing::Test{
}
};

class SuccessLimitCallMapper : public brpc::CallMapper {
public:
brpc::SubCall Map(int channel_index,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
google::protobuf::Message* response) override {
auto req = brpc::Clone<test::EchoRequest>(req_base);
req->set_code(channel_index + 1/*non-zero*/);
if (_index++ > 0) {
req->set_sleep_us(5 * 1000);
}
return brpc::SubCall(method, req, response->New(),
brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE);
}
private:
size_t _index{0};
};

class MergeNothing : public brpc::ResponseMerger {
Result Merge(google::protobuf::Message* /*response*/,
const google::protobuf::Message* /*sub_response*/) {
Expand Down Expand Up @@ -826,7 +844,60 @@ class ChannelTest : public ::testing::Test{
}
StopAndJoin();
}


void TestSuccessLimitParallel(bool single_server, bool async, bool short_connection) {
std::cout << " *** single=" << single_server
<< " async=" << async
<< " short=" << short_connection << std::endl;

ASSERT_EQ(0, StartAccept(_ep));
const size_t NCHANS = 8;
brpc::Channel subchans[NCHANS];
brpc::ParallelChannel channel;
brpc::ParallelChannelOptions options;
// Only care about the first successful response.
options.success_limit = 1;
channel.Init(&options);
butil::intrusive_ptr<brpc::CallMapper> fast_call_mapper(new SuccessLimitCallMapper);
for (size_t i = 0; i < NCHANS; ++i) {
SetUpChannel(&subchans[i], single_server, short_connection);
ASSERT_EQ(0, channel.AddChannel(
&subchans[i], brpc::DOESNT_OWN_CHANNEL, fast_call_mapper, NULL));
}
brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
req.set_code(23);
CallMethod(&channel, &cntl, &req, &res, async);

EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
for (int i = 0; i < cntl.sub_count(); ++i) {
EXPECT_TRUE(cntl.sub(i)) << "i=" << i;
if (0 == i) {
EXPECT_TRUE(!cntl.sub(i)->Failed()) << "i=" << i;
} else {
EXPECT_TRUE(cntl.sub(i)->Failed()) << "i=" << i;
EXPECT_EQ(brpc::EPCHANFINISH, cntl.sub(i)->ErrorCode()) << "i=" << i;
}
}
EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
ASSERT_EQ(1, res.code_list_size());
ASSERT_EQ((int)1, res.code_list(0));
if (short_connection) {
// Sleep to let `_messenger' detect `Socket' being `SetFailed'
const int64_t start_time = butil::gettimeofday_us();
while (_messenger.ConnectionCount() != 0) {
EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
bthread_usleep(1000);
}
} else {
EXPECT_GE(1ul, _messenger.ConnectionCount());
}
StopAndJoin();
}

struct CancelerArg {
int64_t sleep_before_cancel_us;
brpc::CallId cid;
Expand Down Expand Up @@ -2382,7 +2453,7 @@ TEST_F(ChannelTest, success_parallel) {
}

TEST_F(ChannelTest, success_duplicated_parallel) {
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
for (int k = 0; k <=1; ++k) { // Flag ShortConnection
TestSuccessDuplicatedParallel(i, j, k);
Expand Down Expand Up @@ -2421,6 +2492,16 @@ TEST_F(ChannelTest, success_parallel2) {
}
}

TEST_F(ChannelTest, success_limit_parallel) {
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
for (int k = 0; k <=1; ++k) { // Flag ShortConnection
TestSuccessLimitParallel(i, j, k);
}
}
}
}

TEST_F(ChannelTest, cancel_before_callmethod) {
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
Expand Down
Loading