diff --git a/docs/cn/bthread_tagged_task_group.md b/docs/cn/bthread_tagged_task_group.md new file mode 100644 index 0000000000..ecef2d7deb --- /dev/null +++ b/docs/cn/bthread_tagged_task_group.md @@ -0,0 +1,34 @@ + +# Bthread tagged task group + +在很多应用开发过程中都会有线程资源隔离的需求,比如服务分为控制层和数据层,数据层的请求压力大,不需要控制层受到影响;再比如,服务有多个磁盘,希望磁盘之间没有什么影响资源上的影响等。bthread的任务组打标签就是实现bthread的worker线程池按照tag分组,让不同分组之间达到没有互相影响的目的。客户端需要配合服务端使用,客户端配置channel的connection_group选项将请求分类,这些分类请求将自然的分配到服务端不同的线程池tag分组里面去。 + +# 使用方式 + +在example/bthread_tag_echo_c++里面有一个实例代码,分别启动服务端和客户端,服务端将worker划分成3个tag(分组),客户端可以设置不同的connection_group来访问不同的分组,例子里面设置为Test1、Test2、Test3。 + +```c++ +服务端启动 +./echo_server -task_group_ntags 3 -bthread_concurrency 20 -bthread_min_concurrency 12 -event_dispatcher_num 2 + +客户端启动 +./echo_client -dummy_port 8888 -use_bthread true -connection_group="Test1" +./echo_client -dummy_port 8888 -use_bthread true -connection_group="Test2" +./echo_client -dummy_port 8888 -use_bthread true -connection_group="Test3" +``` + +一般情况应用创建的bthread并需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行,如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值。 + +# 监控 + +目前监控上按照tag划分的指标有,线程的数量、线程的使用量、bthread_count、连接信息 + +线程使用量:![img](../images/bthread_tag_worker_usage.png) + +worker线程动态调整,使用该功能需要将bthread_min_concurrency配置成非0。![img](../images/bthread_tag_add_worker.png) + +connections:![img](../images/bthread_tag_connections.png) + +连接详情,客户端:![img](../images/bthread_tag_connection_send.png) + +连接详情,服务端:![img](../images/bthread_tag_connection_recv.png) diff --git a/docs/images/bthread_tag_add_worker.png b/docs/images/bthread_tag_add_worker.png new file mode 100644 index 0000000000..f13eb029a6 Binary files /dev/null and b/docs/images/bthread_tag_add_worker.png differ diff --git a/docs/images/bthread_tag_connection_recv.png b/docs/images/bthread_tag_connection_recv.png new file mode 100644 index 0000000000..391f6e5b18 Binary files /dev/null and b/docs/images/bthread_tag_connection_recv.png differ diff --git a/docs/images/bthread_tag_connection_send.png b/docs/images/bthread_tag_connection_send.png new file mode 100644 index 0000000000..d3133a020b Binary files /dev/null and b/docs/images/bthread_tag_connection_send.png differ diff --git a/docs/images/bthread_tag_connections.png b/docs/images/bthread_tag_connections.png new file mode 100644 index 0000000000..8b4618177b Binary files /dev/null and b/docs/images/bthread_tag_connections.png differ diff --git a/docs/images/bthread_tag_worker_usage.png b/docs/images/bthread_tag_worker_usage.png new file mode 100644 index 0000000000..9bfb189f30 Binary files /dev/null and b/docs/images/bthread_tag_worker_usage.png differ diff --git a/example/bthread_tag_echo_c++/CMakeLists.txt b/example/bthread_tag_echo_c++/CMakeLists.txt new file mode 100644 index 0000000000..a3c47a35ff --- /dev/null +++ b/example/bthread_tag_echo_c++/CMakeLists.txt @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(multi_threaded_echo_c++ C CXX) + +option(LINK_SO "Whether examples are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +# Search for libthrift* by best effort. If it is not found and brpc is +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + set(THRIFT_LIB "") +endif() + +find_path(GPERFTOOLS_INCLUDE_DIR NAMES gperftools/heap-profiler.h) +find_library(GPERFTOOLS_LIBRARIES NAMES tcmalloc_and_profiler) +include_directories(${GPERFTOOLS_INCLUDE_DIR}) + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if(LINK_SO) + find_library(BRPC_LIB NAMES brpc) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) +endif() +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +execute_process( + COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS +) +if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE") + execute_process( + COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS + ) +endif() +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}") +set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DBRPC_ENABLE_CPU_PROFILER") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(OPENSSL_ROOT_DIR + "/usr/local/opt/openssl" # Homebrew installed OpenSSL + ) +endif() + +find_package(OpenSSL) +include_directories(${OPENSSL_INCLUDE_DIR}) + + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${OPENSSL_CRYPTO_LIBRARY} + ${OPENSSL_SSL_LIBRARY} + ${THRIFT_LIB} + dl + ) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + pthread + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop") +endif() + +add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) +add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) + +target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) +target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) + +file(COPY ${PROJECT_SOURCE_DIR}/key.pem + DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) +file(COPY ${PROJECT_SOURCE_DIR}/cert.pem + DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/example/bthread_tag_echo_c++/cert.pem b/example/bthread_tag_echo_c++/cert.pem new file mode 100644 index 0000000000..28bcc21e4b --- /dev/null +++ b/example/bthread_tag_echo_c++/cert.pem @@ -0,0 +1,26 @@ +-----BEGIN CERTIFICATE----- +MIIEUTCCAzmgAwIBAgIBADANBgkqhkiG9w0BAQQFADB9MQswCQYDVQQGEwJDTjER +MA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5naGFpMQ4wDAYDVQQKEwVC +YWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQxHDAaBgkqhkiG9w0BCQEW +DXNhdEBiYWlkdS5jb20wHhcNMTUwNzE2MDMxOTUxWhcNMTgwNTA1MDMxOTUxWjB9 +MQswCQYDVQQGEwJDTjERMA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5n +aGFpMQ4wDAYDVQQKEwVCYWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQx +HDAaBgkqhkiG9w0BCQEWDXNhdEBiYWlkdS5jb20wggEiMA0GCSqGSIb3DQEBAQUA +A4IBDwAwggEKAoIBAQCqdyAeHY39tqY1RYVbfpqZjZlJDtZb04znxjgQrX+mKmLb +mwvXgJojlfn2Qcgp4NKYFqDFb9tU/Gbb436dRvkHyWOz0RPMspR0TTRU1NIY8wRy +0A1LOCgLHsbRJHqktGjylejALdgsspFWyDY9bEfb4oWsnKGzJqcvIDXrPmMOOY4o +pbA9SufSzwRZN7Yzc5jAedpaF9SK78RQXtvV0+JfCUwBsBWPKevRFFUrN7rQBYjP +cgV/HgDuquPrqnESVSYyfEBKZba6cmNb+xzO3cB1brPTtobSXh+0o/0CtRA+2m63 +ODexxCLntgkPm42IYCJLM15xTatcfVX/3LHQ31DrAgMBAAGjgdswgdgwHQYDVR0O +BBYEFGcd7lA//bSAoSC/NbWRx/H+O1zpMIGoBgNVHSMEgaAwgZ2AFGcd7lA//bSA +oSC/NbWRx/H+O1zpoYGBpH8wfTELMAkGA1UEBhMCQ04xETAPBgNVBAgTCFNoYW5n +aGFpMREwDwYDVQQHEwhTaGFuZ2hhaTEOMAwGA1UEChMFQmFpZHUxDDAKBgNVBAsT +A0lORjEMMAoGA1UEAxMDU0FUMRwwGgYJKoZIhvcNAQkBFg1zYXRAYmFpZHUuY29t +ggEAMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEEBQADggEBAKfoCn8SpLk3uQyT +X+oygcRWfTeJtN3D5J69NCMJ7wB+QPfpEBPwiqMgdbp4bRJ98H7x5UQsHT+EDOT/ +9OmipomHInFY4W1ew11zNKwuENeRrnZwTcCiVLZsxZsAU41ZeI5Yq+2WdtxnePCR +VL1/NjKOq+WoRdb2nLSNDWgYMkLRVlt32hyzryyrBbmaxUl8BxnPqUiWduMwsZUz +HNpXkoa1xTSd+En1SHYWfMg8BOVuV0I0/fjUUG9AXVqYpuogfbjAvibVNWAmxOfo +fOjCPCGoJC1ET3AxYkgXGwioobz0pK/13k2pV+wu7W4g+6iTfz+hwZbPsUk2a/5I +f6vXFB0= +-----END CERTIFICATE----- diff --git a/example/bthread_tag_echo_c++/client.cpp b/example/bthread_tag_echo_c++/client.cpp new file mode 100644 index 0000000000..d70f73ccac --- /dev/null +++ b/example/bthread_tag_echo_c++/client.cpp @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A client sending requests to server by multiple threads. + +#include +#include +#include +#include +#include +#include "echo.pb.h" +#include + +DEFINE_int32(thread_num, 50, "Number of threads to send requests"); +DEFINE_bool(use_bthread, false, "Use bthread to send requests"); +DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with requests"); +DEFINE_int32(request_size, 16, "Bytes of each request"); +DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server, "0.0.0.0:8002", "IP Address of server"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_bool(dont_fail, false, "Print fatal when some call failed"); +DEFINE_bool(enable_ssl, false, "Use SSL connection"); +DEFINE_int32(dummy_port, -1, "Launch dummy server at this port"); +DEFINE_string(connection_group, "", "Connection group for channel"); +DEFINE_int32(bthread_tag, BTHREAD_TAG_DEFAULT, "bthread used tag"); + +std::string g_request; +std::string g_attachment; + +bvar::LatencyRecorder g_latency_recorder("client"); +bvar::Adder g_error_count("client_error_count"); + +static void* sender(void* arg) { + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + example::EchoService_Stub stub(static_cast(arg)); + + int log_id = 0; + while (!brpc::IsAskedToQuit()) { + // We will receive response synchronously, safe to put variables + // on stack. + example::EchoRequest request; + example::EchoResponse response; + brpc::Controller cntl; + + request.set_message(g_request); + cntl.set_log_id(log_id++); // set by user + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl.request_attachment().append(g_attachment); + + // Because `done'(last parameter) is NULL, this function waits until + // the response comes back or error occurs(including timedout). + stub.Echo(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + g_latency_recorder << cntl.latency_us(); + } else { + g_error_count << 1; + CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail) + << "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us(); + // We can't connect to the server, sleep a while. Notice that this + // is a specific sleeping to prevent this thread from spinning too + // fast. You should continue the business logic in a production + // server rather than sleeping. + bthread_usleep(50000); + } + } + return NULL; +} + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + if (FLAGS_enable_ssl) { + options.mutable_ssl_options(); + } + options.protocol = FLAGS_protocol; + options.connection_type = FLAGS_connection_type; + options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100); + options.timeout_ms = FLAGS_timeout_ms; + options.max_retry = FLAGS_max_retry; + options.connection_group = FLAGS_connection_group; + if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + if (FLAGS_attachment_size > 0) { + g_attachment.resize(FLAGS_attachment_size, 'a'); + } + if (FLAGS_request_size <= 0) { + LOG(ERROR) << "Bad request_size=" << FLAGS_request_size; + return -1; + } + g_request.resize(FLAGS_request_size, 'r'); + + if (FLAGS_dummy_port >= 0) { + brpc::StartDummyServerAt(FLAGS_dummy_port); + } + + std::vector bids; + std::vector pids; + if (!FLAGS_use_bthread) { + pids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (pthread_create(&pids[i], NULL, sender, &channel) != 0) { + LOG(ERROR) << "Fail to create pthread"; + return -1; + } + } + } else { + bids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.tag = FLAGS_bthread_tag; + if (bthread_start_background( + &bids[i], &attr, sender, &channel) != 0) { + LOG(ERROR) << "Fail to create bthread"; + return -1; + } + } + } + + while (!brpc::IsAskedToQuit()) { + sleep(1); + LOG(INFO) << "Sending EchoRequest at qps=" << g_latency_recorder.qps(1) + << " latency=" << g_latency_recorder.latency(1); + } + + LOG(INFO) << "EchoClient is going to quit"; + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (!FLAGS_use_bthread) { + pthread_join(pids[i], NULL); + } else { + bthread_join(bids[i], NULL); + } + } + + return 0; +} diff --git a/example/bthread_tag_echo_c++/echo.proto b/example/bthread_tag_echo_c++/echo.proto new file mode 100644 index 0000000000..e963faf577 --- /dev/null +++ b/example/bthread_tag_echo_c++/echo.proto @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax="proto2"; +option cc_generic_services = true; + +package example; + +message EchoRequest { + required string message = 1; +}; + +message EchoResponse { + required string message = 1; +}; + +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); +}; diff --git a/example/bthread_tag_echo_c++/key.pem b/example/bthread_tag_echo_c++/key.pem new file mode 100644 index 0000000000..e3f64d1e17 --- /dev/null +++ b/example/bthread_tag_echo_c++/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAqncgHh2N/bamNUWFW36amY2ZSQ7WW9OM58Y4EK1/pipi25sL +14CaI5X59kHIKeDSmBagxW/bVPxm2+N+nUb5B8ljs9ETzLKUdE00VNTSGPMEctAN +SzgoCx7G0SR6pLRo8pXowC3YLLKRVsg2PWxH2+KFrJyhsyanLyA16z5jDjmOKKWw +PUrn0s8EWTe2M3OYwHnaWhfUiu/EUF7b1dPiXwlMAbAVjynr0RRVKze60AWIz3IF +fx4A7qrj66pxElUmMnxASmW2unJjW/sczt3AdW6z07aG0l4ftKP9ArUQPtputzg3 +scQi57YJD5uNiGAiSzNecU2rXH1V/9yx0N9Q6wIDAQABAoIBADN3khflnnhKzDXr +To9IU08nRG+dbjT9U16rJ0RJze+SfpSFZHblWiSCZJzoUZHrUkofEt1pn1QyfK/J +KPI9enTSZirlZk/4XwAaS0GNm/1yahZsIIdkZhqtaSO+GtVdrw4HGuXjMZCVPXJx +MocrCSsnYmqyQ9P+SJ3e4Mis5mVllwDiUVlnTIamSSt16qkPdamLSJrxvI4LirQK +9MZWNLoDFpRU1MJxQ/QzrEC3ONTq4j++AfbGzYTmDDtLeM8OSH5o72YXZ2JkaA4c +xCzHFT+NaJYxF7esn/ctzGg50LYl8IF2UQtzOkX2l3l/OktIB1w+jGV6ONb1EWx5 +4zkkzNkCgYEA2EXj7GMsyNE3OYdMw8zrqQKUMON2CNnD+mBseGlr22/bhXtzpqK8 +uNel8WF1ezOnVvNsU8pml/W/mKUu6KQt5JfaDzen3OKjzTABVlbJxwFhPvwAeaIA +q/tmSKyqiCgOMbR7Cq4UEwGf2A9/RII4JEC0/aipRU5srF65OYPUOJcCgYEAycco +DFVG6jUw9w68t/X4f7NT4IYP96hSAqLUPuVz2fWwXKLWEX8JiMI+Ue3PbMz6mPcs +4vMu364u4R3IuzrrI+PRK9iTa/pahBP6eF6ZpbY1ObI8CVLTrqUS9p22rr9lBm8V +EZA9hwcHLYt+PWzaKcsFpbP4+AeY7nBBbL9CAM0CgYAzuJsmeB1ItUgIuQOxu7sM +AzLfcjZTLYkBwreOIGAL7XdJN9nTmw2ZAvGLhWwsF5FIaRSaAUiBxOKaJb7PIhxb +k7kxdHTvjT/xHS7ksAK3VewkvO18KTMR7iBq9ugdgb7LQkc+qZzhYr0QVbxw7Ndy +TAs8sm4wxe2VV13ilFVXZwKBgDfU6ZnwBr1Llo7l/wYQA4CiSDU6IzTt2DNuhrgY +mWPX/cLEM+OHeUXkKYZV/S0n0rd8vWjWzUOLWOFlcmOMPAAkS36MYM5h6aXeOVIR +KwaVUkjyrnYN+xC6EHM41JGp1/RdzECd3sh8A1pw3K92bS9fQ+LD18IZqBFh8lh6 +23KJAoGAe48SwAsaGvqRO61Taww/Wf+YpGc9lnVbCvNFGScYaycPMqaRBUBmz/U3 +QQgpQY8T7JIECbA8sf78SlAZ9x93r0UQ70RekV3WzKAQHfHK8nqTjd3T0+i4aySO +yQpYYCgE24zYO6rQgwrhzI0S4rWe7izDDlg0RmLtQh7Xw+rlkAQ= +-----END RSA PRIVATE KEY----- diff --git a/example/bthread_tag_echo_c++/server.cpp b/example/bthread_tag_echo_c++/server.cpp new file mode 100644 index 0000000000..b747487f2a --- /dev/null +++ b/example/bthread_tag_echo_c++/server.cpp @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A server to receive EchoRequest and send back EchoResponse. + +#include +#include +#include +#include "echo.pb.h" + +DEFINE_bool(echo_attachment, true, "Echo attachment as well"); +DEFINE_int32(port, 8002, "TCP Port of this server"); +DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); +DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); +DEFINE_int32(internal_port, -1, "Only allow builtin services at this port"); + +namespace example { +// Your implementation of EchoService +class EchoServiceImpl : public EchoService { +public: + EchoServiceImpl() {} + ~EchoServiceImpl() {} + void Echo(google::protobuf::RpcController* cntl_base, + const EchoRequest* request, + EchoResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = + static_cast(cntl_base); + + // Echo request and its attachment + response->set_message(request->message()); + if (FLAGS_echo_attachment) { + cntl->response_attachment().append(cntl->request_attachment()); + } + } +}; +} // namespace example + +DEFINE_bool(h, false, "print help information"); + +int main(int argc, char* argv[]) { + std::string help_str = "dummy help infomation"; + GFLAGS_NS::SetUsageMessage(help_str); + + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + if (FLAGS_h) { + fprintf(stderr, "%s\n%s\n%s", help_str.c_str(), help_str.c_str(), help_str.c_str()); + return 0; + } + + // Generally you only need one Server. + brpc::Server server; + + // Instance of your service. + example::EchoServiceImpl echo_service_impl; + + // Add the service into server. Notice the second parameter, because the + // service is put on stack, we don't want server to delete it, otherwise + // use brpc::SERVER_OWNS_SERVICE. + if (server.AddService(&echo_service_impl, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + // Start the server. + brpc::ServerOptions options; + options.mutable_ssl_options()->default_cert.certificate = "cert.pem"; + options.mutable_ssl_options()->default_cert.private_key = "key.pem"; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + options.max_concurrency = FLAGS_max_concurrency; + options.internal_port = FLAGS_internal_port; + if (server.Start(FLAGS_port, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp index f2d1c0871c..b1ad710f16 100644 --- a/src/brpc/acceptor.cpp +++ b/src/brpc/acceptor.cpp @@ -40,7 +40,8 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool) , _empty_cond(&_map_mutex) , _force_ssl(false) , _ssl_ctx(NULL) - , _use_rdma(false) { + , _use_rdma(false) + , _tag_state(SocketTagOptions::TAG_RECV) { } Acceptor::~Acceptor() { @@ -295,6 +296,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) { options.on_edge_triggered_events = InputMessenger::OnNewMessages; } options.use_rdma = am->_use_rdma; + options.tag.state = am->_tag_state; if (Socket::Create(options, &socket_id) != 0) { LOG(ERROR) << "Fail to create Socket"; continue; diff --git a/src/brpc/acceptor.h b/src/brpc/acceptor.h index c82cdcc19a..dbdaa3e2cc 100644 --- a/src/brpc/acceptor.h +++ b/src/brpc/acceptor.h @@ -112,6 +112,8 @@ friend class Server; // Whether to use rdma or not bool _use_rdma; + + SocketTagOptions::State _tag_state; }; } // namespace brpc diff --git a/src/brpc/details/usercode_backup_pool.cpp b/src/brpc/details/usercode_backup_pool.cpp index 489def454c..386f826326 100644 --- a/src/brpc/details/usercode_backup_pool.cpp +++ b/src/brpc/details/usercode_backup_pool.cpp @@ -28,7 +28,7 @@ namespace bthread { // Defined in bthread/task_control.cpp -void run_worker_startfn(); +void run_worker_startfn(bthread_tag_t); } @@ -112,7 +112,7 @@ int UserCodeBackupPool::Init() { // Entry of backup thread for running user code. void UserCodeBackupPool::UserCodeRunningLoop() { - bthread::run_worker_startfn(); + bthread::run_worker_startfn(BTHREAD_TAG_DEFAULT); #ifdef BAIDU_INTERNAL logging::ComlogInitializer comlog_initializer; #endif diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index e62092860c..8271d3a2a8 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -25,6 +25,8 @@ #include "brpc/event_dispatcher.h" #include "brpc/reloadable_flags.h" +DECLARE_int32(task_group_ntags); + namespace brpc { DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher"); @@ -36,16 +38,17 @@ static EventDispatcher* g_edisp = NULL; static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT; static void StopAndJoinGlobalDispatchers() { - for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) { + for (int i = 0; i < FLAGS_task_group_ntags * FLAGS_event_dispatcher_num; ++i) { g_edisp[i].Stop(); g_edisp[i].Join(); } } void InitializeGlobalDispatchers() { - g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num]; - for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) { - const bthread_attr_t attr = FLAGS_usercode_in_pthread ? - BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; + g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num]; + for (int i = 0; i < FLAGS_task_group_ntags * FLAGS_event_dispatcher_num; ++i) { + bthread_attr_t attr = + FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; + attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags; CHECK_EQ(0, g_edisp[i].Start(&attr)); } // This atexit is will be run before g_task_control.stop() because above @@ -53,13 +56,13 @@ void InitializeGlobalDispatchers() { CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers)); } -EventDispatcher& GetGlobalEventDispatcher(int fd) { +EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) { pthread_once(&g_edisp_once, InitializeGlobalDispatchers); - if (FLAGS_event_dispatcher_num == 1) { + if (FLAGS_task_group_ntags == 1 && FLAGS_event_dispatcher_num == 1) { return g_edisp[0]; } int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num; - return g_edisp[index]; + return g_edisp[tag + FLAGS_task_group_ntags * index]; } } // namespace brpc diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index 1f165cfc70..8b5abbd7f6 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -99,7 +99,7 @@ friend class rdma::RdmaEndpoint; int _wakeup_fds[2]; }; -EventDispatcher& GetGlobalEventDispatcher(int fd); +EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag = BTHREAD_TAG_DEFAULT); } // namespace brpc diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index a634dd376c..3e28992efe 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -39,6 +39,7 @@ #include "brpc/errno.pb.h" #include "brpc/event_dispatcher.h" // RemoveConsumer #include "brpc/socket.h" +#include "brpc/socket_tag_map.h" #include "brpc/describable.h" // Describable #include "brpc/circuit_breaker.h" // CircuitBreaker #include "brpc/input_messenger.h" @@ -50,6 +51,7 @@ #include "brpc/details/health_check.h" #include "brpc/rdma/rdma_endpoint.h" #include "brpc/rdma/rdma_helper.h" +#include "bthread/task_group.h" #if defined(OS_MACOSX) #include #endif @@ -100,6 +102,12 @@ DEFINE_int32(connect_timeout_as_unreachable, 3, DECLARE_int32(health_check_timeout_ms); +DEFINE_int32(socket_negotiate_tag_timeout_ms, 100, "Socket negotiate tag timeout millisecond"); +BRPC_VALIDATE_GFLAG(socket_negotiate_tag_timeout_ms, PassValidate); + +DEFINE_int32(socket_negotiate_tag_nretry, 3, "Socket negotiate tag retry times"); +BRPC_VALIDATE_GFLAG(socket_negotiate_tag_nretry, PassValidate); + static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) { return v >= 2 && v < 1000/*large enough*/; } @@ -108,6 +116,8 @@ BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable, const int WAIT_EPOLLOUT_TIMEOUT_MS = 50; +const char* TAG_PROTO = "PTAG"; + class BAIDU_CACHELINE_ALIGNMENT SocketPool { friend class Socket; public: @@ -579,8 +589,12 @@ int Socket::ResetFileDescriptor(int fd) { EnableKeepaliveIfNeeded(fd); + if (!NegotiateSocketTagMessage(fd)) { + return -1; + } + if (_on_edge_triggered_events) { - if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) { + if (GetGlobalEventDispatcher(fd, _tag.tag).AddConsumer(id(), fd) != 0) { PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher"; _fd.store(-1, butil::memory_order_release); @@ -653,6 +667,67 @@ void Socket::EnableKeepaliveIfNeeded(int fd) { #endif } +bool Socket::NegotiateSocketTagMessage(int fd) { + const size_t header_size = strlen(TAG_PROTO); + const size_t payload_size = sizeof(_tag.key.channel_signature.data); + ssize_t remain_size = header_size + payload_size; + if (_tag.state == SocketTagOptions::TAG_RECV) { + butil::IOPortal buf; + for (int i = 0; i < FLAGS_socket_negotiate_tag_nretry; ++i) { + auto res = buf.append_from_file_descriptor(fd, remain_size); + if (res == -1) { + RPC_VLOG << "Fail to recv socket tag message"; + } else { + remain_size -= res; + } + if (buf.size() >= header_size) { + char header_buf[header_size]; + buf.copy_to(header_buf, header_size); + void* dummy = header_buf; + if (*(const uint32_t*)dummy != *(const uint32_t*)TAG_PROTO) { + _read_buf.append(buf); + return true; + } + } + if (remain_size == 0) { + break; + } + bthread_usleep(FLAGS_socket_negotiate_tag_timeout_ms * 1000); + } + if (remain_size != 0) { + return false; + } + buf.pop_front(header_size); + ChannelSignature sig; + buf.cutn(&sig.data, payload_size); + if (sig.data[0] == 0 && sig.data[1] == 0) { // sig is 0:0 means connection_group == "" + return true; + } + _tag.key = {ServerNode(local_side()), sig}; + _tag.tag = SockeTagMapGetOrNew(_tag.key); + return true; + } else if (_tag.state == SocketTagOptions::TAG_SEND) { + butil::IOBuf buf; + buf.append(TAG_PROTO); + buf.append(&_tag.key.channel_signature.data, payload_size); + for (int i = 0; i < FLAGS_socket_negotiate_tag_nretry; ++i) { + auto res = buf.cut_into_file_descriptor(fd); + if (res < 0) { + RPC_VLOG << "Fail to send socket tag message"; + bthread_usleep(FLAGS_socket_negotiate_tag_timeout_ms * 1000); + } else { + remain_size -= res; + } + if (remain_size == 0) { + return true; + } + } + } else { + return true; + } + return false; +} + // SocketId = 32-bit version + 32-bit slot. // version: from version part of _versioned_nref, must be an EVEN number. // slot: designated by ResourcePool. @@ -741,6 +816,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed); m->_unwritten_bytes.store(0, butil::memory_order_relaxed); m->_keepalive_options = options.keepalive_options; + m->_tag = options.tag; CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed)); // Must be last one! Internal fields of this Socket may be access // just after calling ResetFileDescriptor. @@ -793,7 +869,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); if (ValidFileDescriptor(prev_fd)) { if (_on_edge_triggered_events != NULL) { - GetGlobalEventDispatcher(prev_fd).RemoveConsumer(prev_fd); + GetGlobalEventDispatcher(prev_fd, _tag.tag).RemoveConsumer(prev_fd); } close(prev_fd); if (CreatedByConnect()) { @@ -927,7 +1003,6 @@ int Socket::isolated_times() const { int Socket::SetFailed(int error_code, const char* error_fmt, ...) { if (error_code == 0) { - CHECK(false) << "error_code is 0"; error_code = EFAILEDSOCKET; } const uint32_t id_ver = VersionOfSocketId(_this_id); @@ -1101,7 +1176,7 @@ void Socket::OnRecycle() { const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); if (ValidFileDescriptor(prev_fd)) { if (_on_edge_triggered_events != NULL) { - GetGlobalEventDispatcher(prev_fd).RemoveConsumer(prev_fd); + GetGlobalEventDispatcher(prev_fd, _tag.tag).RemoveConsumer(prev_fd); } close(prev_fd); if (create_by_connect) { @@ -1229,7 +1304,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) { // Do not need to check addressable since it will be called by // health checker which called `SetFailed' before const int expected_val = _epollout_butex->load(butil::memory_order_relaxed); - EventDispatcher& edisp = GetGlobalEventDispatcher(fd); + EventDispatcher& edisp = GetGlobalEventDispatcher(fd, _tag.tag); if (edisp.AddEpollOut(id(), fd, pollin) != 0) { return -1; } @@ -1290,6 +1365,7 @@ int Socket::Connect(const timespec* abstime, // be added into epoll device soon SocketId connect_id; SocketOptions options; + options.tag = _tag; options.user = req; if (Socket::Create(options, &connect_id) != 0) { LOG(FATAL) << "Fail to create Socket"; @@ -1304,7 +1380,7 @@ int Socket::Connect(const timespec* abstime, // Add `sockfd' into epoll so that `HandleEpollOutRequest' will // be called with `req' when epoll event reaches - if (GetGlobalEventDispatcher(sockfd). + if (GetGlobalEventDispatcher(sockfd, _tag.tag). AddEpollOut(connect_id, sockfd, false) != 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll"; @@ -1375,7 +1451,8 @@ int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) { if (_fd.load(butil::memory_order_consume) >= 0) { return 0; } - + // Set tag for client side socket + _tag.tag = bthread_self_tag(); // Have to hold a reference for `req' SocketUniquePtr s; ReAddress(&s); @@ -1444,7 +1521,7 @@ int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) { } // We've got the right to call user callback // The timer will be removed inside destructor of EpollOutRequest - GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false); + GetGlobalEventDispatcher(req->fd, _tag.tag).RemoveEpollOut(id(), req->fd, false); return req->on_epollout_event(req->fd, error_code, req->data); } @@ -2458,6 +2535,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { ptr->_rdma_ep->DebugInfo(os); } #endif + { os << "\ntag=" << ptr->_tag.DebugString(); } } int Socket::CheckHealth() { diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 74f7360ea2..40fca5ffd4 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -37,6 +37,7 @@ #include "brpc/options.pb.h" // ConnectionType #include "brpc/socket_id.h" // SocketId #include "brpc/socket_message.h" // SocketMessagePtr +#include "brpc/socket_map_key.h" // SocketMapKey #include "bvar/bvar.h" namespace brpc { @@ -187,6 +188,44 @@ struct SocketKeepaliveOptions { int keepalive_count; }; +struct SocketTagOptions { + enum State { + TAG_DISABLE, + TAG_RECV, + TAG_SEND, + }; + + SocketTagOptions() : key({}), state(TAG_DISABLE), tag(BTHREAD_TAG_DEFAULT) {} + SocketTagOptions(const SocketMapKey& _key, State _state) + : key(_key), state(_state), tag(BTHREAD_TAG_DEFAULT) {} + std::string DebugString() const { + std::stringstream ss; + ss << "{"; + ss << "\n key={" + << "\n endpoint=" << key.peer; + ss << "\n signature=" << std::hex << key.channel_signature.data[0]; + ss << std::hex << key.channel_signature.data[1] << "\n }"; + ss << "\n state=" << StateToString(state); + ss << "\n tag=" << tag; + ss << "\n}"; + return ss.str(); + } + const char* StateToString(State s) const { + switch (s) { + case TAG_DISABLE: + return "DISABLE"; + case TAG_RECV: + return "RECV"; + case TAG_SEND: + return "SEND"; + } + return "Bad State"; + } + SocketMapKey key; // socket tag hash + State state; // socket tag state + bthread_tag_t tag; // socket tag +}; + // TODO: Comment fields struct SocketOptions { SocketOptions(); @@ -218,6 +257,8 @@ struct SocketOptions { // Socket keepalive related options. // Refer to `SocketKeepaliveOptions' for details. std::shared_ptr keepalive_options; + // Tag of this socket + SocketTagOptions tag; }; // Abstractions on reading from and writing into file descriptors. @@ -644,6 +685,8 @@ friend void DereferenceSocket(Socket*); void EnableKeepaliveIfNeeded(int fd); + bool NegotiateSocketTagMessage(int fd); + // Wait until nref hits `expected_nref' and reset some internal resources. int WaitAndReset(int32_t expected_nref); @@ -750,6 +793,7 @@ friend void DereferenceSocket(Socket*); // [ Set in ResetFileDescriptor ] butil::atomic _fd; // -1 when not connected. + SocketTagOptions _tag; // tag of this socket int _tos; // Type of service which is actually only 8bits. int64_t _reset_fd_real_us; // When _fd was reset, in microseconds. diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index d169afd8a9..f039f1cf60 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -237,6 +237,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, opt.remote_side = key.peer.addr; opt.initial_ssl_ctx = ssl_ctx; opt.use_rdma = use_rdma; + opt.tag = {key, SocketTagOptions::TAG_SEND}; if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) { PLOG(FATAL) << "Fail to create socket to " << key.peer; return -1; diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index 893239461d..6286e88965 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -25,56 +25,10 @@ #include "brpc/socket_id.h" // SockdetId #include "brpc/options.pb.h" // ProtocolType #include "brpc/input_messenger.h" // InputMessageHandler -#include "brpc/server_node.h" // ServerNode +#include "brpc/socket_map_key.h" // SocketMapKey namespace brpc { -// Different signature means that the Channel needs separate sockets. -struct ChannelSignature { - uint64_t data[2]; - - ChannelSignature() { Reset(); } - void Reset() { data[0] = data[1] = 0; } -}; - -inline bool operator==(const ChannelSignature& s1, const ChannelSignature& s2) { - return s1.data[0] == s2.data[0] && s1.data[1] == s2.data[1]; -} -inline bool operator!=(const ChannelSignature& s1, const ChannelSignature& s2) { - return !(s1 == s2); -} - -// The following fields uniquely define a Socket. In other word, -// Socket can't be shared between 2 different SocketMapKeys -struct SocketMapKey { - explicit SocketMapKey(const butil::EndPoint& pt) - : peer(pt) - {} - SocketMapKey(const butil::EndPoint& pt, const ChannelSignature& cs) - : peer(pt), channel_signature(cs) - {} - SocketMapKey(const ServerNode& sn, const ChannelSignature& cs) - : peer(sn), channel_signature(cs) - {} - - ServerNode peer; - ChannelSignature channel_signature; -}; - -inline bool operator==(const SocketMapKey& k1, const SocketMapKey& k2) { - return k1.peer == k2.peer && k1.channel_signature == k2.channel_signature; -}; - -struct SocketMapKeyHasher { - size_t operator()(const SocketMapKey& key) const { - size_t h = butil::DefaultHasher()(key.peer.addr); - h = h * 101 + butil::DefaultHasher()(key.peer.tag); - h = h * 101 + key.channel_signature.data[1]; - return h; - } -}; - - // Try to share the Socket to `key'. If the Socket does not exist, create one. // The corresponding SocketId is written to `*id'. If this function returns // successfully, SocketMapRemove() MUST be called when the Socket is not needed. diff --git a/src/brpc/socket_map_key.h b/src/brpc/socket_map_key.h new file mode 100644 index 0000000000..b01e4a0d9f --- /dev/null +++ b/src/brpc/socket_map_key.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_SOCKET_MAP_KEY_H +#define BRPC_SOCKET_MAP_KEY_H + +#include "brpc/server_node.h" // ServerNode + +namespace brpc { + +// Different signature means that the Channel needs separate sockets. +struct ChannelSignature { + uint64_t data[2]; + + ChannelSignature() { Reset(); } + void Reset() { data[0] = data[1] = 0; } +}; + +inline bool operator==(const ChannelSignature& s1, const ChannelSignature& s2) { + return s1.data[0] == s2.data[0] && s1.data[1] == s2.data[1]; +} +inline bool operator!=(const ChannelSignature& s1, const ChannelSignature& s2) { + return !(s1 == s2); +} + +// The following fields uniquely define a Socket. In other word, +// Socket can't be shared between 2 different SocketMapKeys +struct SocketMapKey { + explicit SocketMapKey(const butil::EndPoint& pt) : peer(pt) {} + SocketMapKey(const butil::EndPoint& pt, const ChannelSignature& cs) + : peer(pt), channel_signature(cs) {} + SocketMapKey(const ServerNode& sn, const ChannelSignature& cs) + : peer(sn), channel_signature(cs) {} + + ServerNode peer; + ChannelSignature channel_signature; +}; + +inline bool operator==(const SocketMapKey& k1, const SocketMapKey& k2) { + return k1.peer == k2.peer && k1.channel_signature == k2.channel_signature; +}; + +struct SocketMapKeyHasher { + size_t operator()(const SocketMapKey& key) const { + size_t h = butil::DefaultHasher()(key.peer.addr); + h = h * 101 + butil::DefaultHasher()(key.peer.tag); + h = h * 101 + key.channel_signature.data[1]; + return h; + } +}; + +} // namespace brpc + +#endif // BRPC_SOCKET_MAP_KEY_H diff --git a/src/brpc/socket_tag_map.cpp b/src/brpc/socket_tag_map.cpp new file mode 100644 index 0000000000..70d0da402d --- /dev/null +++ b/src/brpc/socket_tag_map.cpp @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "butil/scoped_lock.h" +#include "brpc/socket_tag_map.h" + +DECLARE_int32(task_group_ntags); + +namespace brpc { + +static pthread_once_t g_socket_tag_map_init = PTHREAD_ONCE_INIT; +static butil::static_atomic g_socket_tag_map = BUTIL_STATIC_ATOMIC_INIT(NULL); + +static void CreateServerSideSocketTagMap() { + auto socket_tag_map = new SocketTagMap; + if (socket_tag_map->Init(FLAGS_task_group_ntags) != 0) { + LOG(FATAL) << "Fail to init SocketTagMap"; + exit(1); + } + g_socket_tag_map.store(socket_tag_map, butil::memory_order_release); +} + +SocketTagMap* get_or_new_server_side_socket_tag_map() { + pthread_once(&g_socket_tag_map_init, CreateServerSideSocketTagMap); + return g_socket_tag_map.load(butil::memory_order_consume); +} + +bthread_tag_t SockeTagMapGetOrNew(const SocketMapKey& key) { + return get_or_new_server_side_socket_tag_map()->GetOrNew(key); +} + +int SocketTagMap::Init(const size_t suggest_map_size) { return _map.init(suggest_map_size); } + +bthread_tag_t SocketTagMap::GetOrNew(const SocketMapKey& key) { + std::unique_lock mu(_mutex); + auto* it = _map.seek(key); + if (it) { + return *it; + } + auto tag = _next_tag.fetch_add(1, butil::memory_order_relaxed) % FLAGS_task_group_ntags; + _map[key] = tag; + return tag; +} + +} // namespace brpc diff --git a/src/brpc/socket_tag_map.h b/src/brpc/socket_tag_map.h new file mode 100644 index 0000000000..5d10f3549c --- /dev/null +++ b/src/brpc/socket_tag_map.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_SOCKET_TAG_MAP_H +#define BRPC_SOCKET_TAG_MAP_H + +#include "butil/containers/flat_map.h" +#include "bthread/types.h" +#include "brpc/socket_map_key.h" + +namespace brpc { + +bthread_tag_t SockeTagMapGetOrNew(const SocketMapKey& key); + +class SocketTagMap { +public: + // Default bthread tag will used for brpc buildin bthreads and requests with connection_group + // which is empty, so we init _next_tag BTHREAD_TAG_DEFAULT + 1. + SocketTagMap() : _next_tag(BTHREAD_TAG_DEFAULT + 1) {} + ~SocketTagMap() {} + int Init(const size_t suggest_map_size); + bthread_tag_t GetOrNew(const SocketMapKey& key); + +private: + typedef butil::FlatMap Map; + butil::Mutex _mutex; + Map _map; + butil::atomic _next_tag; +}; + +} // namespace brpc + +#endif // BRPC_SOCKET_TAG_MAP_H diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index 5ac0c3b1de..9a43221137 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -43,6 +43,9 @@ static bool never_set_bthread_concurrency = true; static bool validate_bthread_concurrency(const char*, int32_t val) { // bthread_setconcurrency sets the flag on success path which should // not be strictly in a validator. But it's OK for a int flag. + if (val < FLAGS_task_group_ntags * BTHREAD_MIN_CONCURRENCY) { + return false; + } return bthread_setconcurrency(val) == 0; } const int ALLOW_UNUSED register_FLAGS_bthread_concurrency = @@ -104,6 +107,9 @@ static bool validate_bthread_min_concurrency(const char*, int32_t val) { if (val < BTHREAD_MIN_CONCURRENCY || val > FLAGS_bthread_concurrency) { return false; } + if (val < FLAGS_task_group_ntags * BTHREAD_MIN_CONCURRENCY) { + return false; + } TaskControl* c = get_task_control(); if (!c) { return true; @@ -123,26 +129,29 @@ __thread TaskGroup* tls_task_group_nosignal = NULL; BUTIL_FORCE_INLINE int start_from_non_worker(bthread_t* __restrict tid, const bthread_attr_t* __restrict attr, - void * (*fn)(void*), + void* (*fn)(void*), void* __restrict arg) { TaskControl* c = get_or_new_task_control(); if (NULL == c) { return ENOMEM; } + TaskGroup* g = NULL; if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) { // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons: // 1. NOSIGNAL is often for creating many bthreads in batch, // inserting into the same TaskGroup maximizes the batch. // 2. bthread_flush() needs to know which TaskGroup to flush. - TaskGroup* g = tls_task_group_nosignal; + g = tls_task_group_nosignal; if (NULL == g) { - g = c->choose_one_group(); + g = c->choose_one_group_with_tag(attr->tag); tls_task_group_nosignal = g; } return g->start_background(tid, attr, fn, arg); } - return c->choose_one_group()->start_background( - tid, attr, fn, arg); + if (attr != NULL) { + g = c->choose_one_group_with_tag(attr->tag); + } + return g->start_background(tid, attr, fn, arg); } struct TidTraits { @@ -175,8 +184,14 @@ int bthread_start_urgent(bthread_t* __restrict tid, void* __restrict arg) { bthread::TaskGroup* g = bthread::tls_task_group; if (g) { - // start from worker - return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg); + // if attribute is null use thread local task group + if (attr == nullptr) { + return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg); + } + // if attribute tag is null or default use thread local task group + if (attr->tag == BTHREAD_TAG_DEFAULT || attr->tag == g->tag()) { + return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg); + } } return bthread::start_from_non_worker(tid, attr, fn, arg); } @@ -187,8 +202,14 @@ int bthread_start_background(bthread_t* __restrict tid, void* __restrict arg) { bthread::TaskGroup* g = bthread::tls_task_group; if (g) { - // start from worker - return g->start_background(tid, attr, fn, arg); + // if attribute is null use thread local task group + if (attr == nullptr) { + return g->start_background(tid, attr, fn, arg); + } + // if attribute tag is null or default use thread local task group + if (attr->tag == BTHREAD_TAG_DEFAULT || attr->tag == g->tag()) { + return g->start_background(tid, attr, fn, arg); + } } return bthread::start_from_non_worker(tid, attr, fn, arg); } @@ -433,5 +454,10 @@ int bthread_list_join(bthread_list_t* list) { static_cast(list->impl)->apply(bthread::TidJoiner()); return 0; } - + +bthread_tag_t bthread_self_tag(void) { + return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag() + : BTHREAD_TAG_DEFAULT; +} + } // extern "C" diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h index 3f55eb6764..ad84d7bc6d 100644 --- a/src/bthread/bthread.h +++ b/src/bthread/bthread.h @@ -327,6 +327,9 @@ extern int bthread_setspecific(bthread_key_t key, void* data); // If the key is invalid or deleted, return NULL. extern void* bthread_getspecific(bthread_key_t key); +// Return current bthread tag +extern bthread_tag_t bthread_self_tag(void); + __END_DECLS #endif // BTHREAD_BTHREAD_H diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp index 19b03725f3..5ac44e1b71 100644 --- a/src/bthread/butex.cpp +++ b/src/bthread/butex.cpp @@ -273,15 +273,16 @@ void butex_destroy(void* butex) { } inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) { - TaskGroup* g; + TaskGroup* g = tls_task_group; if (nosignal) { - g = tls_task_group_nosignal; - if (NULL == g) { - g = c->choose_one_group(); + if (NULL == tls_task_group_nosignal) { + g = g ? g : c->choose_one_group(); tls_task_group_nosignal = g; + } else { + g = tls_task_group_nosignal; } } else { - g = tls_task_group ? tls_task_group : c->choose_one_group(); + g = g ? g : c->choose_one_group(); } return g; } diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 15f1d7b693..7a42780f41 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -39,6 +39,7 @@ DEFINE_int32(task_group_runqueue_capacity, 4096, "capacity of runqueue in each TaskGroup"); DEFINE_int32(task_group_yield_before_idle, 0, "TaskGroup yields so many times before idle"); +DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags"); namespace bthread { @@ -47,37 +48,45 @@ DECLARE_int32(bthread_min_concurrency); extern pthread_mutex_t g_task_control_mutex; extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; -void (*g_worker_startfn)() = NULL; +void (*g_worker_startfn)(bthread_tag_t) = NULL; // May be called in other modules to run startfn in non-worker pthreads. -void run_worker_startfn() { +void run_worker_startfn(bthread_tag_t tag) { if (g_worker_startfn) { - g_worker_startfn(); + g_worker_startfn(tag); } } +struct WorkerThreadArgs { + WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {} + TaskControl* c; + bthread_tag_t tag; +}; + void* TaskControl::worker_thread(void* arg) { - run_worker_startfn(); + auto dummy = static_cast(arg); + auto c = dummy->c; + auto tag = dummy->tag; + delete dummy; + run_worker_startfn(tag); #ifdef BAIDU_INTERNAL logging::ComlogInitializer comlog_initializer; #endif - - TaskControl* c = static_cast(arg); - TaskGroup* g = c->create_group(); + + TaskGroup* g = c->create_group(tag); TaskStatistics stat; if (NULL == g) { LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self(); return NULL; } std::string worker_thread_name = butil::string_printf( - "brpc_worker:%d", - c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed)); + "brpc_wkr:%d-%d", g->tag(), c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed)); butil::PlatformThread::SetName(worker_thread_name.c_str()); - BT_VLOG << "Created worker=" << pthread_self() - << " bthread=" << g->main_tid(); - + BT_VLOG << "Created worker=" << pthread_self() << " bthread=" << g->main_tid() + << " tag=" << g->tag(); tls_task_group = g; c->_nworkers << 1; + c->tag_nworkers(g->tag()) << 1; g->run_main_task(); stat = g->main_stat(); @@ -87,10 +96,11 @@ void* TaskControl::worker_thread(void* arg) { tls_task_group = NULL; g->destroy_self(); c->_nworkers << -1; + c->tag_nworkers(g->tag()) << -1; return NULL; } -TaskGroup* TaskControl::create_group() { +TaskGroup* TaskControl::create_group(bthread_tag_t tag) { TaskGroup* g = new (std::nothrow) TaskGroup(this); if (NULL == g) { LOG(FATAL) << "Fail to new TaskGroup"; @@ -101,7 +111,7 @@ TaskGroup* TaskControl::create_group() { delete g; return NULL; } - if (_add_group(g) != 0) { + if (_add_group(g, tag) != 0) { delete g; return NULL; } @@ -117,6 +127,19 @@ static double get_cumulated_worker_time_from_this(void *arg) { return static_cast(arg)->get_cumulated_worker_time(); } +struct CumulatedWithTagArgs { + CumulatedWithTagArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), t(_t) {} + TaskControl* c; + bthread_tag_t t; +}; + +static double get_cumulated_worker_time_from_this_with_tag(void* arg) { + auto a = static_cast(arg); + auto c = a->c; + auto t = a->t; + return c->get_cumulated_worker_time_with_tag(t); +} + static int64_t get_cumulated_switch_count_from_this(void *arg) { return static_cast(arg)->get_cumulated_switch_count(); } @@ -128,7 +151,10 @@ static int64_t get_cumulated_signal_count_from_this(void *arg) { TaskControl::TaskControl() // NOTE: all fileds must be initialized before the vars. : _ngroup(0) - , _groups((TaskGroup**)calloc(BTHREAD_MAX_CONCURRENCY, sizeof(TaskGroup*))) + , _groups((TaskGroup**)calloc(BTHREAD_MAX_CONCURRENCY * FLAGS_task_group_ntags, + sizeof(TaskGroup*))) + , _tagged_ngroup(FLAGS_task_group_ntags) + , _tagged_groups(FLAGS_task_group_ntags) , _stop(false) , _concurrency(0) , _next_worker_id(0) @@ -144,6 +170,8 @@ TaskControl::TaskControl() , _signal_per_second(&_cumulated_signal_count) , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") + , _pl(FLAGS_task_group_ntags) + , _add_workers_with_tag(BTHREAD_TAG_INVALID) { // calloc shall set memory to zero CHECK(_groups) << "Fail to create array of groups"; @@ -160,6 +188,18 @@ int TaskControl::init(int concurrency) { } _concurrency = concurrency; + // task group group by tags + for (int i = 0; i < FLAGS_task_group_ntags; ++i) { + _tagged_ngroup[i].store(0, std::memory_order_relaxed); + auto tag_str = std::to_string(i); + _tagged_nworkers.push_back(new bvar::Adder("bthread_worker_count", tag_str)); + _tagged_cumulated_worker_time.push_back(new bvar::PassiveStatus( + get_cumulated_worker_time_from_this_with_tag, new CumulatedWithTagArgs{this, i})); + _tagged_worker_usage_second.push_back(new bvar::PerSecond>( + "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); + _tagged_nbthreads.push_back(new bvar::Adder("bthread_count", tag_str)); + } + // Make sure TimerThread is ready. if (get_or_create_global_timer_thread() == NULL) { LOG(ERROR) << "Fail to get global_timer_thread"; @@ -168,7 +208,8 @@ int TaskControl::init(int concurrency) { _workers.resize(_concurrency); for (int i = 0; i < _concurrency; ++i) { - const int rc = pthread_create(&_workers[i], NULL, worker_thread, this); + auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags); + const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg); if (rc) { LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc); return -1; @@ -182,8 +223,12 @@ int TaskControl::init(int concurrency) { // Wait for at least one group is added so that choose_one_group() // never returns NULL. // TODO: Handle the case that worker quits before add_group - while (_ngroup == 0) { - usleep(100); // TODO: Elaborate + for (int i = 0; i < FLAGS_task_group_ntags;) { + if (_tagged_ngroup[i].load(std::memory_order_acquire) == 0) { + usleep(100); // TODO: Elaborate + continue; + } + ++i; } return 0; } @@ -202,8 +247,11 @@ int TaskControl::add_workers(int num) { // Worker will add itself to _idle_workers, so we have to add // _concurrency before create a worker. _concurrency.fetch_add(1); + auto tag = (_add_workers_with_tag != BTHREAD_TAG_INVALID) ? _add_workers_with_tag + : i % FLAGS_task_group_ntags; + auto arg = new WorkerThreadArgs(this, tag); const int rc = pthread_create( - &_workers[i + old_concurency], NULL, worker_thread, this); + &_workers[i + old_concurency], NULL, worker_thread, arg); if (rc) { LOG(WARNING) << "Fail to create _workers[" << i + old_concurency << "], " << berror(rc); @@ -216,15 +264,37 @@ int TaskControl::add_workers(int num) { return _concurrency.load(butil::memory_order_relaxed) - old_concurency; } +int TaskControl::add_workers_with_tag(int num, bthread_tag_t tag) { + _add_workers_with_tag = tag; + auto rc = add_workers(num); + _add_workers_with_tag = BTHREAD_TAG_INVALID; + return rc; +} + TaskGroup* TaskControl::choose_one_group() { - const size_t ngroup = _ngroup.load(butil::memory_order_acquire); + DCHECK(tls_task_group == nullptr || tls_task_group->tag() == BTHREAD_TAG_DEFAULT) + << "There will be a performance penalty for choosing other tagged group"; + auto groups = tag_group(BTHREAD_TAG_DEFAULT); + const auto ngroup = tag_ngroup(BTHREAD_TAG_DEFAULT).load(butil::memory_order_acquire); if (ngroup != 0) { - return _groups[butil::fast_rand_less_than(ngroup)]; + return groups[butil::fast_rand_less_than(ngroup)]; } CHECK(false) << "Impossible: ngroup is 0"; return NULL; } +TaskGroup* TaskControl::choose_one_group_with_tag(bthread_tag_t tag) { + DCHECK(tls_task_group == nullptr || tls_task_group->tag() == tag) + << "There will be a performance penalty for choosing other tagged group"; + tag = tag % FLAGS_task_group_ntags; + auto groups = tag_group(tag); + const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire); + if (ngroup != 0) { + return groups[butil::fast_rand_less_than(ngroup)]; + } + return NULL; +} + extern int stop_and_join_epoll_threads(); void TaskControl::stop_and_join() { @@ -238,8 +308,10 @@ void TaskControl::stop_and_join() { _stop = true; _ngroup.exchange(0, butil::memory_order_relaxed); } - for (int i = 0; i < PARKING_LOT_NUM; ++i) { - _pl[i].stop(); + for (int i = 0; i < FLAGS_task_group_ntags; ++i) { + for (auto& pl : _pl[i]) { + pl.stop(); + } } // Interrupt blocking operations. for (size_t i = 0; i < _workers.size(); ++i) { @@ -266,7 +338,7 @@ TaskControl::~TaskControl() { _groups = NULL; } -int TaskControl::_add_group(TaskGroup* g) { +int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) { if (__builtin_expect(NULL == g, 0)) { return -1; } @@ -279,10 +351,17 @@ int TaskControl::_add_group(TaskGroup* g) { _groups[ngroup] = g; _ngroup.store(ngroup + 1, butil::memory_order_release); } + g->set_tag(tag); + g->set_pl(&_pl[tag][butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM]); + size_t ntag = _tagged_ngroup[tag].load(butil::memory_order_relaxed); + if (ntag < (size_t)BTHREAD_MAX_CONCURRENCY) { + _tagged_groups[tag][ntag] = g; + _tagged_ngroup[tag].store(ntag + 1, butil::memory_order_release); + } mu.unlock(); // See the comments in _destroy_group // TODO: Not needed anymore since non-worker pthread cannot have TaskGroup - signal_task(65536); + // signal_task(65536, tag); return 0; } @@ -303,11 +382,13 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool erased = false; { BAIDU_SCOPED_LOCK(_modify_group_mutex); - const size_t ngroup = _ngroup.load(butil::memory_order_relaxed); + auto tag = g->tag(); + auto groups = tag_group(tag); + const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed); for (size_t i = 0; i < ngroup; ++i) { - if (_groups[i] == g) { + if (groups[i] == g) { // No need for atomic_thread_fence because lock did it. - _groups[i] = _groups[ngroup - 1]; + groups[i] = groups[ngroup - 1]; // Change _ngroup and keep _groups unchanged at last so that: // - If steal_task sees the newest _ngroup, it would not touch // _groups[ngroup -1] @@ -317,7 +398,7 @@ int TaskControl::_destroy_group(TaskGroup* g) { // overwrite it, since we do signal_task in _add_group(), // we think the pending tasks of _groups[ngroup - 1] would // not miss. - _ngroup.store(ngroup - 1, butil::memory_order_release); + tag_ngroup(tag).store(ngroup - 1, butil::memory_order_release); //_groups[ngroup - 1] = NULL; erased = true; break; @@ -339,9 +420,10 @@ int TaskControl::_destroy_group(TaskGroup* g) { } bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { + auto tag = tls_task_group->tag(); // 1: Acquiring fence is paired with releasing fence in _add_group to // avoid accessing uninitialized slot of _groups. - const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/); + const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/); if (0 == ngroup) { return false; } @@ -349,8 +431,9 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { // NOTE: Don't return inside `for' iteration since we need to update |seed| bool stolen = false; size_t s = *seed; + auto groups = tag_group(tag); for (size_t i = 0; i < ngroup; ++i, s += offset) { - TaskGroup* g = _groups[s % ngroup]; + TaskGroup* g = groups[s % ngroup]; // g is possibly NULL because of concurrent _destroy_group if (g) { if (g->_rq.steal(tid)) { @@ -367,7 +450,7 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { return stolen; } -void TaskControl::signal_task(int num_task) { +void TaskControl::signal_task(int num_task, bthread_tag_t tag) { if (num_task <= 0) { return; } @@ -378,14 +461,15 @@ void TaskControl::signal_task(int num_task) { if (num_task > 2) { num_task = 2; } + auto& pl = tag_pl(tag); int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; - num_task -= _pl[start_index].signal(1); + num_task -= pl[start_index].signal(1); if (num_task > 0) { for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { if (++start_index >= PARKING_LOT_NUM) { start_index = 0; } - num_task -= _pl[start_index].signal(1); + num_task -= pl[start_index].signal(1); } } if (num_task > 0 && @@ -394,7 +478,7 @@ void TaskControl::signal_task(int num_task) { // TODO: Reduce this lock BAIDU_SCOPED_LOCK(g_task_control_mutex); if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) { - add_workers(1); + add_workers_with_tag(1, tag); } } } @@ -427,6 +511,19 @@ double TaskControl::get_cumulated_worker_time() { return cputime_ns / 1000000000.0; } +double TaskControl::get_cumulated_worker_time_with_tag(bthread_tag_t tag) { + int64_t cputime_ns = 0; + BAIDU_SCOPED_LOCK(_modify_group_mutex); + const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed); + for (size_t i = 0; i < ngroup; ++i) { + auto groups = tag_group(tag); + if (groups[i]) { + cputime_ns += groups[i]->_cumulated_cputime_ns; + } + } + return cputime_ns / 1000000000.0; +} + int64_t TaskControl::get_cumulated_switch_count() { int64_t c = 0; BAIDU_SCOPED_LOCK(_modify_group_mutex); diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index e318c26501..75c1c3ed00 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -26,6 +26,9 @@ #include // std::ostream #endif #include // size_t +#include +#include +#include #include "butil/atomicops.h" // butil::atomic #include "bvar/bvar.h" // bvar::PassiveStatus #include "bthread/task_meta.h" // TaskMeta @@ -33,6 +36,7 @@ #include "bthread/work_stealing_queue.h" // WorkStealingQueue #include "bthread/parking_lot.h" +DECLARE_int32(task_group_ntags); namespace bthread { class TaskGroup; @@ -49,13 +53,13 @@ class TaskControl { int init(int nconcurrency); // Create a TaskGroup in this control. - TaskGroup* create_group(); + TaskGroup* create_group(bthread_tag_t tag); // Steal a task from a "random" group. bool steal_task(bthread_t* tid, size_t* seed, size_t offset); // Tell other groups that `n' tasks was just added to caller's runqueue - void signal_task(int num_task); + void signal_task(int num_task, bthread_tag_t tag); // Stop and join worker threads in TaskControl. void stop_and_join(); @@ -67,6 +71,7 @@ class TaskControl { void print_rq_sizes(std::ostream& os); double get_cumulated_worker_time(); + double get_cumulated_worker_time_with_tag(bthread_tag_t tag); int64_t get_cumulated_switch_count(); int64_t get_cumulated_signal_count(); @@ -74,25 +79,47 @@ class TaskControl { // Return the number of workers actually added, which may be less than |num| int add_workers(int num); + int add_workers_with_tag(int num, bthread_tag_t tag); + // Choose one TaskGroup (randomly right now). // If this method is called after init(), it never returns NULL. TaskGroup* choose_one_group(); + // Choose one TaskGroup with tag + // If no tag found or tag index greater or equal ngroup will return NULL. + TaskGroup* choose_one_group_with_tag(bthread_tag_t tag); + private: + typedef std::array TaggedGroups; + static const int PARKING_LOT_NUM = 4; + typedef std::array TaggedParkingLot; // Add/Remove a TaskGroup. // Returns 0 on success, -1 otherwise. - int _add_group(TaskGroup*); + int _add_group(TaskGroup*, bthread_tag_t tag); int _destroy_group(TaskGroup*); + // Tag group + TaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; } + + // Tag ngroup + butil::atomic& tag_ngroup(int tag) { return _tagged_ngroup[tag]; } + + // Tag parking slot + TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; } + static void delete_task_group(void* arg); static void* worker_thread(void* task_control); bvar::LatencyRecorder& exposed_pending_time(); bvar::LatencyRecorder* create_exposed_pending_time(); + bvar::Adder& tag_nworkers(bthread_tag_t tag); + bvar::Adder& tag_nbthreads(bthread_tag_t tag); butil::atomic _ngroup; TaskGroup** _groups; + std::vector> _tagged_ngroup; + std::vector _tagged_groups; butil::Mutex _modify_group_mutex; bool _stop; @@ -112,8 +139,13 @@ class TaskControl { bvar::PassiveStatus _status; bvar::Adder _nbthreads; - static const int PARKING_LOT_NUM = 4; - ParkingLot _pl[PARKING_LOT_NUM]; + std::vector*> _tagged_nworkers; + std::vector*> _tagged_cumulated_worker_time; + std::vector>*> _tagged_worker_usage_second; + std::vector*> _tagged_nbthreads; + + std::vector _pl; + bthread_tag_t _add_workers_with_tag; }; inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { @@ -124,6 +156,14 @@ inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { return *pt; } +inline bvar::Adder& TaskControl::tag_nworkers(bthread_tag_t tag) { + return *_tagged_nworkers[tag]; +} + +inline bvar::Adder& TaskControl::tag_nbthreads(bthread_tag_t tag) { + return *_tagged_nbthreads[tag]; +} + } // namespace bthread #endif // BTHREAD_TASK_CONTROL_H diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index cbae7c5bfa..66a8f2b61a 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -40,7 +40,7 @@ namespace bthread { static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = { - BTHREAD_STACKTYPE_UNKNOWN, 0, NULL }; + BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_DEFAULT }; static bool pass_bool(const char*, bool) { return true; } @@ -192,10 +192,11 @@ TaskGroup::TaskGroup(TaskControl* c) #ifndef NDEBUG , _sched_recursive_guard(0) #endif + , _tag(BTHREAD_TAG_DEFAULT) { _steal_seed = butil::fast_rand(); _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)]; - _pl = &c->_pl[butil::fmix64(pthread_numeric_id()) % TaskControl::PARKING_LOT_NUM]; + // _pl = &c->_pl[butil::fmix64(pthread_numeric_id()) % TaskControl::PARKING_LOT_NUM]; CHECK(c); } @@ -335,6 +336,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) { butex_wake_except(m->version_butex, 0); g->_control->_nbthreads << -1; + g->_control->tag_nbthreads(g->tag()) << -1; g->set_remained(TaskGroup::_release_last_context, m); ending_sched(&g); @@ -392,6 +394,7 @@ int TaskGroup::start_foreground(TaskGroup** pg, TaskGroup* g = *pg; g->_control->_nbthreads << 1; + g->_control->tag_nbthreads(g->tag()) << 1; if (g->is_current_pthread_task()) { // never create foreground task in pthread. g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL)); @@ -448,6 +451,7 @@ int TaskGroup::start_background(bthread_t* __restrict th, LOG(INFO) << "Started bthread " << m->tid; } _control->_nbthreads << 1; + _control->tag_nbthreads(tag()) << 1; if (REMOTE) { ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL)); } else { @@ -658,7 +662,7 @@ void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) { const int additional_signal = _num_nosignal; _num_nosignal = 0; _nsignaled += 1 + additional_signal; - _control->signal_task(1 + additional_signal); + _control->signal_task(1 + additional_signal, _tag); } } @@ -667,7 +671,7 @@ void TaskGroup::flush_nosignal_tasks() { if (val) { _num_nosignal = 0; _nsignaled += val; - _control->signal_task(val); + _control->signal_task(val, _tag); } } @@ -688,7 +692,7 @@ void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) { _remote_num_nosignal = 0; _remote_nsignaled += 1 + additional_signal; _remote_rq._mutex.unlock(); - _control->signal_task(1 + additional_signal); + _control->signal_task(1 + additional_signal, _tag); } } @@ -701,7 +705,7 @@ void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) { _remote_num_nosignal = 0; _remote_nsignaled += val; locked_mutex.unlock(); - _control->signal_task(val); + _control->signal_task(val, _tag); } void TaskGroup::ready_to_run_general(bthread_t tid, bool nosignal) { @@ -738,7 +742,9 @@ struct SleepArgs { static void ready_to_run_from_timer_thread(void* arg) { CHECK(tls_task_group == NULL); const SleepArgs* e = static_cast(arg); - e->group->control()->choose_one_group()->ready_to_run_remote(e->tid); + auto g = e->group; + auto tag = g->tag(); + g->control()->choose_one_group_with_tag(tag)->ready_to_run_remote(e->tid); } void TaskGroup::_add_sleep_event(void* void_args) { diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index 2a1bb2a93d..d859867871 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -182,6 +182,8 @@ class TaskGroup { // process make go on indefinitely. void push_rq(bthread_t tid); + bthread_tag_t tag() const { return _tag; } + private: friend class TaskControl; @@ -221,6 +223,10 @@ friend class TaskControl; return _control->steal_task(tid, &_steal_seed, _steal_offset); } + void set_tag(bthread_tag_t tag) { _tag = tag; } + + void set_pl(ParkingLot* pl) { _pl = pl; } + TaskMeta* _cur_meta; // the control that this group belongs to @@ -249,6 +255,8 @@ friend class TaskControl; int _remote_nsignaled; int _sched_recursive_guard; + // tag of this taskgroup + bthread_tag_t _tag; }; } // namespace bthread diff --git a/src/bthread/timer_thread.cpp b/src/bthread/timer_thread.cpp index 3b2f8a7698..a845ba8ca7 100644 --- a/src/bthread/timer_thread.cpp +++ b/src/bthread/timer_thread.cpp @@ -32,7 +32,7 @@ namespace bthread { // Defined in task_control.cpp -void run_worker_startfn(); +void run_worker_startfn(bthread_tag_t); const TimerThread::TaskId TimerThread::INVALID_TASK_ID = 0; @@ -313,7 +313,7 @@ static T deref_value(void* arg) { } void TimerThread::run() { - run_worker_startfn(); + run_worker_startfn(BTHREAD_TAG_DEFAULT); #ifdef BAIDU_INTERNAL logging::ComlogInitializer comlog_initializer; #endif diff --git a/src/bthread/types.h b/src/bthread/types.h index a84e4793e7..4a92efba20 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -32,6 +32,11 @@ typedef uint64_t bthread_t; // tid returned by bthread_start_* never equals this value. static const bthread_t INVALID_BTHREAD = 0; +// bthread tag default is 0 +typedef int bthread_tag_t; +static const bthread_tag_t BTHREAD_TAG_INVALID = -1; +static const bthread_tag_t BTHREAD_TAG_DEFAULT = 0; + struct sockaddr; typedef unsigned bthread_stacktype_t; @@ -93,12 +98,14 @@ typedef struct bthread_attr_t { bthread_stacktype_t stack_type; bthread_attrflags_t flags; bthread_keytable_pool_t* keytable_pool; + bthread_tag_t tag; #if defined(__cplusplus) void operator=(unsigned stacktype_and_flags) { stack_type = (stacktype_and_flags & 7); flags = (stacktype_and_flags & ~(unsigned)7u); keytable_pool = NULL; + tag = BTHREAD_TAG_DEFAULT; } bthread_attr_t operator|(unsigned other_flags) const { CHECK(!(other_flags & 7)) << "flags=" << other_flags; @@ -116,24 +123,22 @@ typedef struct bthread_attr_t { // obvious drawback is that you need more worker pthreads when you have a lot // of such bthreads. static const bthread_attr_t BTHREAD_ATTR_PTHREAD = -{ BTHREAD_STACKTYPE_PTHREAD, 0, NULL }; +{ BTHREAD_STACKTYPE_PTHREAD, 0, NULL, BTHREAD_TAG_DEFAULT }; // bthreads created with following attributes will have different size of // stacks. Default is BTHREAD_ATTR_NORMAL. -static const bthread_attr_t BTHREAD_ATTR_SMALL = -{ BTHREAD_STACKTYPE_SMALL, 0, NULL }; -static const bthread_attr_t BTHREAD_ATTR_NORMAL = -{ BTHREAD_STACKTYPE_NORMAL, 0, NULL }; -static const bthread_attr_t BTHREAD_ATTR_LARGE = -{ BTHREAD_STACKTYPE_LARGE, 0, NULL }; +static const bthread_attr_t BTHREAD_ATTR_SMALL = {BTHREAD_STACKTYPE_SMALL, 0, NULL, + BTHREAD_TAG_DEFAULT}; +static const bthread_attr_t BTHREAD_ATTR_NORMAL = {BTHREAD_STACKTYPE_NORMAL, 0, NULL, + BTHREAD_TAG_DEFAULT}; +static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, NULL, + BTHREAD_TAG_DEFAULT}; // bthreads created with this attribute will print log when it's started, // context-switched, finished. static const bthread_attr_t BTHREAD_ATTR_DEBUG = { - BTHREAD_STACKTYPE_NORMAL, - BTHREAD_LOG_START_AND_FINISH | BTHREAD_LOG_CONTEXT_SWITCH, - NULL -}; + BTHREAD_STACKTYPE_NORMAL, BTHREAD_LOG_START_AND_FINISH | BTHREAD_LOG_CONTEXT_SWITCH, NULL, + BTHREAD_TAG_DEFAULT}; static const size_t BTHREAD_EPOLL_THREAD_NUM = 1; static const bthread_t BTHREAD_ATOMIC_INIT = 0; diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h index 61f4b1ab7e..b200fe7fee 100644 --- a/src/bthread/unstable.h +++ b/src/bthread/unstable.h @@ -82,7 +82,7 @@ extern int bthread_connect(int sockfd, const struct sockaddr* serv_addr, // Add a startup function that each pthread worker will run at the beginning // To run code at the end, use butil::thread_atexit() // Returns 0 on success, error code otherwise. -extern int bthread_set_worker_startfn(void (*start_fn)()); +extern int bthread_set_worker_startfn(void (*start_fn)(bthread_tag_t)); // Stop all bthread and worker pthreads. // You should avoid calling this function which may cause bthread after main()