Skip to content

Commit

Permalink
set tags workers unlimitedly (#2801)
Browse files Browse the repository at this point in the history
* set tags workers unlimitedly

* fix set concurrency test

---------

Co-authored-by: jiazheng.jia <[email protected]>
  • Loading branch information
zhengJade and jiazheng.jia authored Oct 31, 2024
1 parent 282776a commit e1bf467
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 21 deletions.
6 changes: 3 additions & 3 deletions docs/cn/bthread_tagged_task_group.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@

```c++
服务端启动
./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 -bthread_min_concurrency 12 -event_dispatcher_num 1
./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 -bthread_min_concurrency 8 -event_dispatcher_num 1

客户端启动
./echo_client -dummy_port 8888 -server "0.0.0.0:8002" -use_bthread true
./echo_client -dummy_port 8889 -server "0.0.0.0:8003" -use_bthread true
```

FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。
FLAGS_bthread_concurrency为所有线程的数,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。
一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。

Q:如何动态改变分组线程的数量?

A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数
A:你可以根据你的服务更自由的设计你的每个分组的线程数,启动的时候会根据你设置的 bthread_concurrency 来初始化线程池,如果你设置了 bthread_min_concurrency,那么会根据 bthread_min_concurrency 来设置线程池,对于 server 来说,num_threads 就是该 tag 对应的 worker 数量。可以通过设置 FLAGS_bthread_current_tag 和 FLAGS_bthread_concurrency_by_tag 来改变某个分组的线程数。如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_threads的含义是所有分组的 worker 总数

Q:不同分组之间有什么关系吗?

Expand Down
4 changes: 2 additions & 2 deletions example/bthread_tag_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server");
DEFINE_int32(tag1, 0, "Server1 tag");
DEFINE_int32(tag2, 1, "Server2 tag");
DEFINE_int32(tag3, 2, "Background task tag");
DEFINE_int32(num_threads1, 4, "Thread number of server1");
DEFINE_int32(num_threads2, 4, "Thread number of server2");
DEFINE_int32(num_threads1, 6, "Thread number of server1");
DEFINE_int32(num_threads2, 16, "Thread number of server2");
DEFINE_int32(idle_timeout_s, -1,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
Expand Down
6 changes: 1 addition & 5 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,11 +1044,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
if (original_bthread_tag == BTHREAD_TAG_INVALID) {
bthread_setconcurrency(_options.num_threads);
} else {
bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
}
bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
}

for (MethodMap::iterator it = _method_map.begin();
Expand Down
20 changes: 11 additions & 9 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,20 +397,22 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
}
auto c = bthread::get_or_new_task_control();
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
auto ngroup = c->concurrency();
auto tag_ngroup = c->concurrency(tag);
auto add = num - tag_ngroup;
if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
LOG(ERROR) << "Fail to set concurrency by tag " << tag
<< ", Total concurrency larger than bthread_concurrency";
return EPERM;
}
auto added = 0;

if (add > 0) {
added = c->add_workers(add, tag);
auto added = c->add_workers(add, tag);
bthread::FLAGS_bthread_concurrency += added;
return (add == added ? 0 : EPERM);

} else if (add < 0){
LOG(WARNING) << "Fail to set concurrency by tag: " << tag
<< ", tag concurrency must larger than old oncurrency. old concurrency: "
<< tag_ngroup << ", new concurrency: " << num;
return EPERM;
} else {
return 0;
}
return (num == tag_ngroup ? 0 : EPERM);
}

int bthread_about_to_quit() {
Expand Down
6 changes: 4 additions & 2 deletions test/bthread_setconcurrency_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,11 @@ int concurrency_by_tag(int num) {

TEST(BthreadTest, concurrency_by_tag) {
ASSERT_EQ(concurrency_by_tag(1), false);
auto con = bthread_getconcurrency_by_tag(0);
auto tag_con = bthread_getconcurrency_by_tag(0);
auto con = bthread_getconcurrency();
ASSERT_EQ(concurrency_by_tag(con), true);
ASSERT_EQ(concurrency_by_tag(con + 1), false);
ASSERT_EQ(concurrency_by_tag(con + 1), true);
ASSERT_EQ(bthread_getconcurrency(), con+1);
bthread_setconcurrency(con + 1);
ASSERT_EQ(concurrency_by_tag(con + 1), true);
}
Expand Down

0 comments on commit e1bf467

Please sign in to comment.