Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce the number of thread hops for read-only trxs #1702

Merged
merged 6 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,31 @@ class priority_queue_executor {
static constexpr uint16_t minimum_runtime_ms = 3;

// inform how many read_threads will be calling read_only/read_exclusive queues
// Currently only used to assert if exec_queue::read_exclusive is used without any read threads
// expected to only be called at program startup, not thread safe, not safe to call after startup
void init_read_threads(size_t num_read_threads) {
pri_queue_.init_read_threads(num_read_threads);
}

// not thread safe, see init_read_threads comment
size_t get_read_threads() const {
return pri_queue_.get_read_threads();
}

// assume application is started on the main thread
std::thread::id get_main_thread_id() const {
return main_thread_id_;
}

template <typename Func>
auto post( int priority, exec_queue q, Func&& func ) {
return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward<Func>(func)));
void post( int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
// no reason to post to io_service which then places this in the read_exclusive_handlers queue.
// read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue.
pri_queue_.add(priority, q, --order_, std::forward<Func>(func));
} else {
// post to io_service as the main thread may be blocked on io_service.run_one() in application::exec()
boost::asio::post(io_serv_, pri_queue_.wrap(priority, q, --order_, std::forward<Func>(func)));
}
}

// Legacy and deprecated. To be removed after cleaning up its uses in base appbase
Expand Down Expand Up @@ -125,6 +142,7 @@ class priority_queue_executor {

// members are ordered taking into account that the last one is destructed first
private:
std::thread::id main_thread_id_{ std::this_thread::get_id() };
boost::asio::io_service io_serv_;
appbase::exec_pri_queue pri_queue_;
std::atomic<std::size_t> order_{ std::numeric_limits<size_t>::max() }; // to maintain FIFO ordering in all queues within priority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ class exec_pri_queue : public boost::asio::execution_context
public:

// inform how many read_threads will be calling read_only/read_exclusive queues
// expected to only be called at program startup, not thread safe, not safe to call when lock_enabled_
void init_read_threads(size_t num_read_threads) {
assert(!lock_enabled_);
num_read_threads_ = num_read_threads;
}

// not strictly thread safe, see init_read_threads comment
size_t get_read_threads() const {
return num_read_threads_;
}

void stop() {
std::lock_guard g( mtx_ );
exiting_blocking_ = true;
Expand All @@ -60,7 +67,7 @@ class exec_pri_queue : public boost::asio::execution_context
assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive);
prio_queue& que = priority_que(q);
std::unique_ptr<queued_handler_base> handler(new queued_handler<Function>(priority, order, std::move(function)));
if (lock_enabled_) {
if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive
std::lock_guard g( mtx_ );
que.push( std::move( handler ) );
if (num_waiting_)
Expand Down
114 changes: 73 additions & 41 deletions libraries/custom_appbase/tests/custom_appbase_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,61 @@ using namespace appbase;

BOOST_AUTO_TEST_SUITE(custom_appbase_tests)

std::thread start_app_thread(appbase::scoped_app& app) {
const char* argv[] = { boost::unit_test::framework::current_test_case().p_name->c_str() };
BOOST_CHECK(app->initialize(sizeof(argv) / sizeof(char*), const_cast<char**>(argv)));
app->startup();
std::thread app_thread( [&]() {
app->exec();
} );
return app_thread;
}
class scoped_app_thread {
public:
explicit scoped_app_thread(bool delay_exec = false) {
app_thread_ = start_app_thread();
if (!delay_exec) {
start_exec();
}
}
~scoped_app_thread() { // destroy app instance so next instance gets a clean one
application::reset_app_singleton();
}

scoped_app_thread(const scoped_app_thread&) = delete;
scoped_app_thread& operator=(const scoped_app_thread&) = delete;

appbase::application* operator->() {
return app_;
}
const appbase::application* operator->() const {
return app_;
}

void start_exec() {
start_exec_.set_value();
}

void join() {
app_thread_.join();
}

private:
std::thread start_app_thread() {
std::promise<void> start_complete;
std::thread app_thread( [&]() {
assert(application::null_app_singleton());
app_ = &appbase::app();
const char* argv[] = { boost::unit_test::framework::current_test_case().p_name->c_str() };
BOOST_CHECK(app_->initialize(sizeof(argv) / sizeof(char*), const_cast<char**>(argv)));
app_->startup();
start_complete.set_value();
start_exec_.get_future().get();
app_->exec();
} );
start_complete.get_future().get();
return app_thread;
}

private:
std::thread app_thread_;
std::promise<void> start_exec_;
appbase::application* app_;
};


std::thread start_read_thread(appbase::scoped_app& app) {
std::thread start_read_thread(scoped_app_thread& app) {
static int num = 0;
std::thread read_thread( [&]() {
std::string name ="read-" + std::to_string(num++);
Expand All @@ -37,9 +81,8 @@ std::thread start_read_thread(appbase::scoped_app& app) {

// verify functions from both queues (read_only,read_write) are executed when execution window is not explicitly set
BOOST_AUTO_TEST_CASE( default_exec_window ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// post functions
std::map<int, int> rslts {};
int seq_num = 0;
Expand All @@ -51,19 +94,17 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) {
app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } );
app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } );
app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } );
app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts[8]=seq_num; ++seq_num; } );
app->executor().post( priority::low, exec_queue::read_exclusive, [&]() { rslts[9]=seq_num; ++seq_num; } );

// Stop app. Use the lowest priority to make sure this function to execute the last
app->executor().post( priority::lowest, exec_queue::read_only, [&]() {
// read_only_queue should only contain the current lambda function,
// and read_write_queue should have executed all its functions
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 2u );
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u );
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -86,9 +127,8 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) {

// verify functions only from read_only queue are processed during read window on the main thread
BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// set to run functions from read_only queue only
app->executor().init_read_threads(1);
app->executor().set_to_read_window([](){return false;});
Expand All @@ -115,7 +155,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 4u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -134,9 +174,8 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {

// verify no functions are executed during read window if read_only & read_exclusive queue is empty
BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// set to run functions from read_only & read_exclusive queues only
app->executor().init_read_threads(1);
app->executor().set_to_read_window([](){return false;});
Expand All @@ -163,7 +202,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 10u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -176,8 +215,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {

// verify functions from both queues (read_only, read_write) are processed in write window, but not read_exclusive
BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);
scoped_app_thread app;

// set to run functions from both queues
app->executor().is_write_window();
Expand All @@ -197,20 +235,17 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {
app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } );
app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } );
app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } );
app->executor().post( priority::highest,exec_queue::read_exclusive, [&]() { rslts[12]=seq_num; ++seq_num; } );
app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts[13]=seq_num; ++seq_num; } );
app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts[14]=seq_num; ++seq_num; } );

// stop application. Use lowest at the end to make sure this executes the last
app->executor().post( priority::lowest, exec_queue::read_only, [&]() {
// read_queue should have current function and write_queue's functions are all executed
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 3u);
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u);
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u );
app->quit();
} );

app_thread.join();
app.join();

// queues are emptied after exec
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand Down Expand Up @@ -245,7 +280,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {

// verify tasks from both queues (read_only, read_exclusive) are processed in read window
BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
appbase::scoped_app app;
scoped_app_thread app(true);

app->executor().init_read_threads(3);
// set to run functions from read_only & read_exclusive queues only
Expand Down Expand Up @@ -289,7 +324,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

auto app_thread = start_app_thread(app);
app.start_exec();
constexpr int num_expected = 13; // 16 - 3 read_write

auto read_thread1 = start_read_thread(app);
Expand All @@ -307,7 +342,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
};
work.reset();
app->quit();
app_thread.join();
app.join();

// queues are emptied after exec
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand Down Expand Up @@ -340,10 +375,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {

// verify tasks from both queues (read_only, read_exclusive) are processed in read window
BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
appbase::scoped_app app;

auto app_thread = start_app_thread(app);
std::thread::id app_thread_id = app_thread.get_id();
scoped_app_thread app;

// set to run functions from read_only & read_exclusive queues only
app->executor().init_read_threads(3);
Expand Down Expand Up @@ -400,15 +432,15 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
};

app->quit();
app_thread.join();
app.join();

// exactly number of posts processed
BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){ return v != std::thread::id(); }), num_expected );

const auto run_on_1 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread1_id; });
const auto run_on_2 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread2_id; });
const auto run_on_3 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread3_id; });
const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app_thread_id; });
const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app->executor().get_main_thread_id(); });

BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected);
BOOST_CHECK(run_on_1 > 0);
Expand Down
3 changes: 2 additions & 1 deletion plugins/chain_api_plugin/chain_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ void chain_api_plugin::plugin_startup() {
CHAIN_RO_CALL(get_required_keys, 200, http_params_types::params_required),
CHAIN_RO_CALL(get_transaction_id, 200, http_params_types::params_required),
// transaction related APIs will be posted to read_write queue after keys are recovered, they are safe to run in parallel until they post to the read_write queue
CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required),
CHAIN_RO_CALL_ASYNC(compute_transaction, chain_apis::read_only::compute_transaction_results, 200, http_params_types::params_required),
CHAIN_RW_CALL_ASYNC(push_transaction, chain_apis::read_write::push_transaction_results, 202, http_params_types::params_required),
CHAIN_RW_CALL_ASYNC(push_transactions, chain_apis::read_write::push_transactions_results, 202, http_params_types::params_required),
Expand All @@ -170,6 +169,8 @@ void chain_api_plugin::plugin_startup() {
}

_http_plugin.add_async_api({
// chain_plugin send_read_only_transaction will post to read_exclusive queue
CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required),
CHAIN_RO_CALL_WITH_400(get_raw_block, 200, http_params_types::params_required),
CHAIN_RO_CALL_WITH_400(get_block_header, 200, http_params_types::params_required)
});
Expand Down
10 changes: 9 additions & 1 deletion plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,7 @@ void read_write::push_transactions(const read_write::push_transactions_params& p
} CATCH_AND_CALL(next);
}

// called from read-exclusive thread for read-only
template<class API, class Result>
void api_base::send_transaction_gen(API &api, send_transaction_params_t params, next_function<Result> next) {
try {
Expand Down Expand Up @@ -2567,12 +2568,19 @@ void read_only::compute_transaction(compute_transaction_params params, next_func
}

void read_only::send_read_only_transaction(send_read_only_transaction_params params, next_function<send_read_only_transaction_results> next) {
static bool read_only_enabled = app().executor().get_read_threads() > 0;
EOS_ASSERT( read_only_enabled, unsupported_feature,
"read-only transactions execution not enabled on API node. Set read-only-threads > 0" );

send_transaction_params_t gen_params { .return_failure_trace = false,
.retry_trx = false,
.retry_trx_num_blocks = std::nullopt,
.trx_type = transaction_metadata::trx_type::read_only,
.transaction = std::move(params.transaction) };
return send_transaction_gen(*this, std::move(gen_params), std::move(next));
// run read-only trx exclusively on read-only threads
app().executor().post(priority::low, exec_queue::read_exclusive, [this, gen_params{std::move(gen_params)}, next{std::move(next)}]() mutable {
send_transaction_gen(*this, std::move(gen_params), std::move(next));
});
}

read_only::get_transaction_id_result read_only::get_transaction_id( const read_only::get_transaction_id_params& params, const fc::time_point& ) const {
Expand Down
16 changes: 8 additions & 8 deletions plugins/http_plugin/include/eosio/http_plugin/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,18 @@ struct http_plugin_state {
*/
inline auto make_http_response_handler(http_plugin_state& plugin_state, detail::abstract_conn_ptr session_ptr, http_content_type content_type) {
return [&plugin_state,
session_ptr{std::move(session_ptr)}, content_type](int code, std::optional<fc::variant> response) {
session_ptr{std::move(session_ptr)}, content_type](int code, std::optional<fc::variant> response) mutable {
auto payload_size = detail::in_flight_sizeof(response);
if(auto error_str = session_ptr->verify_max_bytes_in_flight(payload_size); !error_str.empty()) {
session_ptr->send_busy_response(std::move(error_str));
return;
}

plugin_state.bytes_in_flight += payload_size;

// post back to an HTTP thread to allow the response handler to be called from any thread
boost::asio::post(plugin_state.thread_pool.get_executor(),
[&plugin_state, session_ptr, code, payload_size, response = std::move(response), content_type]() {
boost::asio::dispatch(plugin_state.thread_pool.get_executor(),
Copy link
Contributor

@greg7mdp greg7mdp Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time figuring out why dispatch is appropriate here? Why can't we just execute the lambda here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatch is the one that allows execution of the lambda here. post is the one that make sure it always executes outside this thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to call dispatch instead of just executing the lambda's code here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be called from the application thread, in that case we want to do the json parsing on the http thread.

[&plugin_state, session_ptr{std::move(session_ptr)}, code, payload_size, response = std::move(response), content_type]() {
if(auto error_str = session_ptr->verify_max_bytes_in_flight(0); !error_str.empty()) {
session_ptr->send_busy_response(std::move(error_str));
return;
}

try {
plugin_state.bytes_in_flight -= payload_size;
if (response.has_value()) {
Expand Down
Loading