-
Notifications
You must be signed in to change notification settings - Fork 4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Yang Liming
committed
Sep 26, 2023
1 parent
7d1df9f
commit eaf8033
Showing
34 changed files
with
1,121 additions
and
143 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
|
||
# Bthread tagged task group | ||
|
||
在很多应用开发过程中都会有线程资源隔离的需求,比如服务分为控制层和数据层,数据层的请求压力大,不需要控制层受到影响;再比如,服务有多个磁盘,希望磁盘之间没有什么影响资源上的影响等。bthread的任务组打标签就是实现bthread的worker线程池按照tag分组,让不同分组之间达到没有互相影响的目的。客户端需要配合服务端使用,客户端配置channel的connection_group选项将请求分类,这些分类请求将自然的分配到服务端不同的线程池tag分组里面去。 | ||
|
||
# 使用方式 | ||
|
||
在example/bthread_tag_echo_c++里面有一个实例代码,分别启动服务端和客户端,服务端将worker划分成3个tag(分组),客户端可以设置不同的connection_group来访问不同的分组,例子里面设置为Test1、Test2、Test3。 | ||
|
||
```c++ | ||
服务端启动 | ||
./echo_server -task_group_ntags 3 -bthread_concurrency 20 -bthread_min_concurrency 12 -event_dispatcher_num 2 | ||
|
||
客户端启动 | ||
./echo_client -dummy_port 8888 -use_bthread true -connection_group="Test1" | ||
./echo_client -dummy_port 8888 -use_bthread true -connection_group="Test2" | ||
./echo_client -dummy_port 8888 -use_bthread true -connection_group="Test3" | ||
``` | ||
|
||
一般情况应用创建的bthread并需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行,如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值。 | ||
|
||
# 监控 | ||
|
||
目前监控上按照tag划分的指标有,线程的数量、线程的使用量、bthread_count、连接信息 | ||
|
||
线程使用量:![img](../images/bthread_tag_worker_usage.png) | ||
|
||
worker线程动态调整,使用该功能需要将bthread_min_concurrency配置成非0。![img](../images/bthread_tag_add_worker.png) | ||
|
||
connections:![img](../images/bthread_tag_connections.png) | ||
|
||
连接详情,客户端:![img](../images/bthread_tag_connection_send.png) | ||
|
||
连接详情,服务端:![img](../images/bthread_tag_connection_recv.png) |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
cmake_minimum_required(VERSION 2.8.10) | ||
project(multi_threaded_echo_c++ C CXX) | ||
|
||
option(LINK_SO "Whether examples are linked dynamically" OFF) | ||
|
||
execute_process( | ||
COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" | ||
OUTPUT_VARIABLE OUTPUT_PATH | ||
) | ||
|
||
set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) | ||
|
||
include(FindThreads) | ||
include(FindProtobuf) | ||
protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto) | ||
# include PROTO_HEADER | ||
include_directories(${CMAKE_CURRENT_BINARY_DIR}) | ||
|
||
# Search for libthrift* by best effort. If it is not found and brpc is | ||
# compiled with thrift protocol enabled, a link error would be reported. | ||
find_library(THRIFT_LIB NAMES thrift) | ||
if (NOT THRIFT_LIB) | ||
set(THRIFT_LIB "") | ||
endif() | ||
|
||
find_path(GPERFTOOLS_INCLUDE_DIR NAMES gperftools/heap-profiler.h) | ||
find_library(GPERFTOOLS_LIBRARIES NAMES tcmalloc_and_profiler) | ||
include_directories(${GPERFTOOLS_INCLUDE_DIR}) | ||
|
||
find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) | ||
if(LINK_SO) | ||
find_library(BRPC_LIB NAMES brpc) | ||
else() | ||
find_library(BRPC_LIB NAMES libbrpc.a brpc) | ||
endif() | ||
if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) | ||
message(FATAL_ERROR "Fail to find brpc") | ||
endif() | ||
include_directories(${BRPC_INCLUDE_PATH}) | ||
|
||
find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) | ||
find_library(GFLAGS_LIBRARY NAMES gflags libgflags) | ||
if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) | ||
message(FATAL_ERROR "Fail to find gflags") | ||
endif() | ||
include_directories(${GFLAGS_INCLUDE_PATH}) | ||
|
||
execute_process( | ||
COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'" | ||
OUTPUT_VARIABLE GFLAGS_NS | ||
) | ||
if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE") | ||
execute_process( | ||
COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'" | ||
OUTPUT_VARIABLE GFLAGS_NS | ||
) | ||
endif() | ||
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") | ||
include(CheckFunctionExists) | ||
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) | ||
if(NOT HAVE_CLOCK_GETTIME) | ||
set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") | ||
endif() | ||
endif() | ||
|
||
set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}") | ||
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") | ||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DBRPC_ENABLE_CPU_PROFILER") | ||
|
||
if(CMAKE_VERSION VERSION_LESS "3.1.3") | ||
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") | ||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") | ||
endif() | ||
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") | ||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") | ||
endif() | ||
else() | ||
set(CMAKE_CXX_STANDARD 11) | ||
set(CMAKE_CXX_STANDARD_REQUIRED ON) | ||
endif() | ||
|
||
find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) | ||
find_library(LEVELDB_LIB NAMES leveldb) | ||
if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) | ||
message(FATAL_ERROR "Fail to find leveldb") | ||
endif() | ||
include_directories(${LEVELDB_INCLUDE_PATH}) | ||
|
||
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") | ||
set(OPENSSL_ROOT_DIR | ||
"/usr/local/opt/openssl" # Homebrew installed OpenSSL | ||
) | ||
endif() | ||
|
||
find_package(OpenSSL) | ||
include_directories(${OPENSSL_INCLUDE_DIR}) | ||
|
||
|
||
set(DYNAMIC_LIB | ||
${CMAKE_THREAD_LIBS_INIT} | ||
${GFLAGS_LIBRARY} | ||
${PROTOBUF_LIBRARIES} | ||
${LEVELDB_LIB} | ||
${OPENSSL_CRYPTO_LIBRARY} | ||
${OPENSSL_SSL_LIBRARY} | ||
${THRIFT_LIB} | ||
dl | ||
) | ||
|
||
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") | ||
set(DYNAMIC_LIB ${DYNAMIC_LIB} | ||
pthread | ||
"-framework CoreFoundation" | ||
"-framework CoreGraphics" | ||
"-framework CoreData" | ||
"-framework CoreText" | ||
"-framework Security" | ||
"-framework Foundation" | ||
"-Wl,-U,_MallocExtension_ReleaseFreeMemory" | ||
"-Wl,-U,_ProfilerStart" | ||
"-Wl,-U,_ProfilerStop") | ||
endif() | ||
|
||
add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) | ||
add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) | ||
|
||
target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) | ||
target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) | ||
|
||
file(COPY ${PROJECT_SOURCE_DIR}/key.pem | ||
DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) | ||
file(COPY ${PROJECT_SOURCE_DIR}/cert.pem | ||
DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
-----BEGIN CERTIFICATE----- | ||
MIIEUTCCAzmgAwIBAgIBADANBgkqhkiG9w0BAQQFADB9MQswCQYDVQQGEwJDTjER | ||
MA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5naGFpMQ4wDAYDVQQKEwVC | ||
YWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQxHDAaBgkqhkiG9w0BCQEW | ||
DXNhdEBiYWlkdS5jb20wHhcNMTUwNzE2MDMxOTUxWhcNMTgwNTA1MDMxOTUxWjB9 | ||
MQswCQYDVQQGEwJDTjERMA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5n | ||
aGFpMQ4wDAYDVQQKEwVCYWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQx | ||
HDAaBgkqhkiG9w0BCQEWDXNhdEBiYWlkdS5jb20wggEiMA0GCSqGSIb3DQEBAQUA | ||
A4IBDwAwggEKAoIBAQCqdyAeHY39tqY1RYVbfpqZjZlJDtZb04znxjgQrX+mKmLb | ||
mwvXgJojlfn2Qcgp4NKYFqDFb9tU/Gbb436dRvkHyWOz0RPMspR0TTRU1NIY8wRy | ||
0A1LOCgLHsbRJHqktGjylejALdgsspFWyDY9bEfb4oWsnKGzJqcvIDXrPmMOOY4o | ||
pbA9SufSzwRZN7Yzc5jAedpaF9SK78RQXtvV0+JfCUwBsBWPKevRFFUrN7rQBYjP | ||
cgV/HgDuquPrqnESVSYyfEBKZba6cmNb+xzO3cB1brPTtobSXh+0o/0CtRA+2m63 | ||
ODexxCLntgkPm42IYCJLM15xTatcfVX/3LHQ31DrAgMBAAGjgdswgdgwHQYDVR0O | ||
BBYEFGcd7lA//bSAoSC/NbWRx/H+O1zpMIGoBgNVHSMEgaAwgZ2AFGcd7lA//bSA | ||
oSC/NbWRx/H+O1zpoYGBpH8wfTELMAkGA1UEBhMCQ04xETAPBgNVBAgTCFNoYW5n | ||
aGFpMREwDwYDVQQHEwhTaGFuZ2hhaTEOMAwGA1UEChMFQmFpZHUxDDAKBgNVBAsT | ||
A0lORjEMMAoGA1UEAxMDU0FUMRwwGgYJKoZIhvcNAQkBFg1zYXRAYmFpZHUuY29t | ||
ggEAMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEEBQADggEBAKfoCn8SpLk3uQyT | ||
X+oygcRWfTeJtN3D5J69NCMJ7wB+QPfpEBPwiqMgdbp4bRJ98H7x5UQsHT+EDOT/ | ||
9OmipomHInFY4W1ew11zNKwuENeRrnZwTcCiVLZsxZsAU41ZeI5Yq+2WdtxnePCR | ||
VL1/NjKOq+WoRdb2nLSNDWgYMkLRVlt32hyzryyrBbmaxUl8BxnPqUiWduMwsZUz | ||
HNpXkoa1xTSd+En1SHYWfMg8BOVuV0I0/fjUUG9AXVqYpuogfbjAvibVNWAmxOfo | ||
fOjCPCGoJC1ET3AxYkgXGwioobz0pK/13k2pV+wu7W4g+6iTfz+hwZbPsUk2a/5I | ||
f6vXFB0= | ||
-----END CERTIFICATE----- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
// A client sending requests to server by multiple threads. | ||
|
||
#include <gflags/gflags.h> | ||
#include <bthread/bthread.h> | ||
#include <butil/logging.h> | ||
#include <brpc/server.h> | ||
#include <brpc/channel.h> | ||
#include "echo.pb.h" | ||
#include <bvar/bvar.h> | ||
|
||
DEFINE_int32(thread_num, 50, "Number of threads to send requests"); | ||
DEFINE_bool(use_bthread, false, "Use bthread to send requests"); | ||
DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with requests"); | ||
DEFINE_int32(request_size, 16, "Bytes of each request"); | ||
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto"); | ||
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); | ||
DEFINE_string(server, "0.0.0.0:8002", "IP Address of server"); | ||
DEFINE_string(load_balancer, "", "The algorithm for load balancing"); | ||
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); | ||
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); | ||
DEFINE_bool(dont_fail, false, "Print fatal when some call failed"); | ||
DEFINE_bool(enable_ssl, false, "Use SSL connection"); | ||
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port"); | ||
DEFINE_string(connection_group, "", "Connection group for channel"); | ||
DEFINE_int32(bthread_tag, BTHREAD_TAG_DEFAULT, "bthread used tag"); | ||
|
||
std::string g_request; | ||
std::string g_attachment; | ||
|
||
bvar::LatencyRecorder g_latency_recorder("client"); | ||
bvar::Adder<int> g_error_count("client_error_count"); | ||
|
||
static void* sender(void* arg) { | ||
// Normally, you should not call a Channel directly, but instead construct | ||
// a stub Service wrapping it. stub can be shared by all threads as well. | ||
example::EchoService_Stub stub(static_cast<google::protobuf::RpcChannel*>(arg)); | ||
|
||
int log_id = 0; | ||
while (!brpc::IsAskedToQuit()) { | ||
// We will receive response synchronously, safe to put variables | ||
// on stack. | ||
example::EchoRequest request; | ||
example::EchoResponse response; | ||
brpc::Controller cntl; | ||
|
||
request.set_message(g_request); | ||
cntl.set_log_id(log_id++); // set by user | ||
// Set attachment which is wired to network directly instead of | ||
// being serialized into protobuf messages. | ||
cntl.request_attachment().append(g_attachment); | ||
|
||
// Because `done'(last parameter) is NULL, this function waits until | ||
// the response comes back or error occurs(including timedout). | ||
stub.Echo(&cntl, &request, &response, NULL); | ||
if (!cntl.Failed()) { | ||
g_latency_recorder << cntl.latency_us(); | ||
} else { | ||
g_error_count << 1; | ||
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail) | ||
<< "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us(); | ||
// We can't connect to the server, sleep a while. Notice that this | ||
// is a specific sleeping to prevent this thread from spinning too | ||
// fast. You should continue the business logic in a production | ||
// server rather than sleeping. | ||
bthread_usleep(50000); | ||
} | ||
} | ||
return NULL; | ||
} | ||
|
||
int main(int argc, char* argv[]) { | ||
// Parse gflags. We recommend you to use gflags as well. | ||
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); | ||
|
||
// A Channel represents a communication line to a Server. Notice that | ||
// Channel is thread-safe and can be shared by all threads in your program. | ||
brpc::Channel channel; | ||
|
||
// Initialize the channel, NULL means using default options. | ||
brpc::ChannelOptions options; | ||
if (FLAGS_enable_ssl) { | ||
options.mutable_ssl_options(); | ||
} | ||
options.protocol = FLAGS_protocol; | ||
options.connection_type = FLAGS_connection_type; | ||
options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100); | ||
options.timeout_ms = FLAGS_timeout_ms; | ||
options.max_retry = FLAGS_max_retry; | ||
options.connection_group = FLAGS_connection_group; | ||
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) { | ||
LOG(ERROR) << "Fail to initialize channel"; | ||
return -1; | ||
} | ||
|
||
if (FLAGS_attachment_size > 0) { | ||
g_attachment.resize(FLAGS_attachment_size, 'a'); | ||
} | ||
if (FLAGS_request_size <= 0) { | ||
LOG(ERROR) << "Bad request_size=" << FLAGS_request_size; | ||
return -1; | ||
} | ||
g_request.resize(FLAGS_request_size, 'r'); | ||
|
||
if (FLAGS_dummy_port >= 0) { | ||
brpc::StartDummyServerAt(FLAGS_dummy_port); | ||
} | ||
|
||
std::vector<bthread_t> bids; | ||
std::vector<pthread_t> pids; | ||
if (!FLAGS_use_bthread) { | ||
pids.resize(FLAGS_thread_num); | ||
for (int i = 0; i < FLAGS_thread_num; ++i) { | ||
if (pthread_create(&pids[i], NULL, sender, &channel) != 0) { | ||
LOG(ERROR) << "Fail to create pthread"; | ||
return -1; | ||
} | ||
} | ||
} else { | ||
bids.resize(FLAGS_thread_num); | ||
for (int i = 0; i < FLAGS_thread_num; ++i) { | ||
bthread_attr_t attr = BTHREAD_ATTR_NORMAL; | ||
attr.tag = FLAGS_bthread_tag; | ||
if (bthread_start_background( | ||
&bids[i], &attr, sender, &channel) != 0) { | ||
LOG(ERROR) << "Fail to create bthread"; | ||
return -1; | ||
} | ||
} | ||
} | ||
|
||
while (!brpc::IsAskedToQuit()) { | ||
sleep(1); | ||
LOG(INFO) << "Sending EchoRequest at qps=" << g_latency_recorder.qps(1) | ||
<< " latency=" << g_latency_recorder.latency(1); | ||
} | ||
|
||
LOG(INFO) << "EchoClient is going to quit"; | ||
for (int i = 0; i < FLAGS_thread_num; ++i) { | ||
if (!FLAGS_use_bthread) { | ||
pthread_join(pids[i], NULL); | ||
} else { | ||
bthread_join(bids[i], NULL); | ||
} | ||
} | ||
|
||
return 0; | ||
} |
Oops, something went wrong.