diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp index 572c2d3ec6..324c5db818 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp @@ -327,7 +327,7 @@ class session final : public session_base { std::optional& chain_state_log; std::optional& finality_data_log; - GetBlockID get_block_id; + GetBlockID get_block_id; // call from main app thread GetBlock get_block; ///these items might be used on either the strand or main thread diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 89eb78b9d6..df6b839a31 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -100,22 +100,24 @@ struct state_history_plugin_impl { template void create_listener(const std::string& address) { const boost::posix_time::milliseconds accept_timeout(200); - // connections set must only be modified by main thread; run listener on main thread to avoid needing another post() - fc::create_listener(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) { - catch_and_log([this, &socket]() { - connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), - trace_log, chain_state_log, finality_data_log, - [this](const chain::block_num_type block_num) { - return get_block_id(block_num); - }, - [this](const chain::block_id_type& block_id) { - return chain_plug->chain().fetch_block_by_id(block_id); - }, - [this](session_base* conn) { - boost::asio::post(app().get_io_service(), [conn, this]() { - connections.erase(connections.find(conn)); - }); - }, _log)); + // connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread + fc::create_listener(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) { + boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { + catch_and_log([this, &socket]() { + connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), + trace_log, chain_state_log, finality_data_log, + [this](const chain::block_num_type block_num) { + return get_block_id(block_num); + }, + [this](const chain::block_id_type& block_id) { + return chain_plug->chain().fetch_block_by_id(block_id); + }, + [this](session_base* conn) { + boost::asio::post(app().get_io_service(), [conn, this]() { + connections.erase(connections.find(conn)); + }); + }, _log)); + }); }); }); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index be8d73d774..a99d2b7d96 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -55,6 +55,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DI configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_reqs_across_svnn_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_reqs_across_svnn_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_kill_client_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_kill_client_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/bridge_for_fork_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/bridge_for_fork_test_shape.json COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/lib_advance_test.py ${CMAKE_CURRENT_BINARY_DIR}/lib_advance_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY) @@ -187,6 +188,8 @@ add_test(NAME ship_streamer_if_test COMMAND tests/ship_streamer_test.py -v --num set_property(TEST ship_streamer_if_test PROPERTY LABELS long_running_tests) add_test(NAME ship_streamer_if_fetch_finality_data_test COMMAND tests/ship_streamer_test.py -v --num-clients 10 --activate-if --finality-data-history ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST ship_streamer_if_fetch_finality_data_test PROPERTY LABELS long_running_tests) +add_test(NAME ship_kill_client_test COMMAND tests/ship_kill_client_test.py -v --num-clients 20 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +set_property(TEST ship_kill_client_test PROPERTY LABELS nonparallelizable_tests) add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests) diff --git a/tests/ship_kill_client_test.py b/tests/ship_kill_client_test.py new file mode 100755 index 0000000000..a050271063 --- /dev/null +++ b/tests/ship_kill_client_test.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 + +import time +import json +import os +import shutil +import signal +import sys + +from TestHarness import Account, Cluster, TestHelper, Utils, WalletMgr +from TestHarness.TestHelper import AppArgs + +############################################################### +# ship_kill_client_test +# +# Setup a nodeos with SHiP (state_history_plugin). +# Connect a number of clients and then kill the clients and shutdown nodoes. +# nodeos should exit cleanly and not hang or SEGfAULT. +# +############################################################### + +Print=Utils.Print + +appArgs = AppArgs() +extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1) +args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running","--unshared"}, applicationSpecificArgs=appArgs) + +Utils.Debug=args.v +cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs) +dumpErrorDetails=args.dump_error_details +walletPort=TestHelper.DEFAULT_WALLET_PORT + +# simpler to have two producer nodes then setup different accounts for trx generator +totalProducerNodes=2 +totalNonProducerNodes=1 +totalNodes=totalProducerNodes+totalNonProducerNodes + +walletMgr=WalletMgr(True, port=walletPort) +testSuccessful=False + +WalletdName=Utils.EosWalletName +shipTempDir=None + +try: + TestHelper.printSystemInfo("BEGIN") + + cluster.setWalletMgr(walletMgr) + Print("Stand up cluster") + + shipNodeNum = 2 + specificExtraNodeosArgs={} + specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --finality-data-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin " + + if cluster.launch(pnodes=totalProducerNodes, loadSystemContract=False, + totalNodes=totalNodes, totalProducers=totalProducerNodes, activateIF=True, biosFinalizer=False, + specificExtraNodeosArgs=specificExtraNodeosArgs) is False: + Utils.cmdError("launcher") + Utils.errorExit("Failed to stand up cluster.") + + # verify nodes are in sync and advancing + cluster.waitOnClusterSync(blockAdvancing=5) + Print("Cluster in Sync") + + prodNode0 = cluster.getNode(0) + prodNode1 = cluster.getNode(1) + shipNode = cluster.getNode(shipNodeNum) + + # cluster.waitOnClusterSync(blockAdvancing=3) + start_block_num = shipNode.getBlockNum() + + #verify nodes are in sync and advancing + cluster.waitOnClusterSync(blockAdvancing=3) + Print("Shutdown unneeded bios node") + cluster.biosNode.kill(signal.SIGTERM) + + Print("Configure and launch txn generators") + targetTpsPerGenerator = 10 + testTrxGenDurationSec=60*60 + numTrxGenerators=2 + cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[cluster.defproduceraAccount.name, cluster.defproducerbAccount.name], + acctPrivKeysList=[cluster.defproduceraAccount.activePrivateKey,cluster.defproducerbAccount.activePrivateKey], nodeId=prodNode1.nodeId, + tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec, + waitToComplete=False) + + status = cluster.waitForTrxGeneratorsSpinup(nodeId=prodNode1.nodeId, numGenerators=numTrxGenerators) + assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators" + + prodNode1.waitForProducer("defproducera") + + block_range = 100000 # we are going to kill the client, so just make this a huge number + end_block_num = start_block_num + block_range + + shipClient = "tests/ship_streamer" + cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas --fetch-finality-data" + if Utils.Debug: Utils.Print(f"cmd: {cmd}") + clients = [] + files = [] + shipTempDir = os.path.join(Utils.DataDir, "ship") + os.makedirs(shipTempDir, exist_ok = True) + shipClientFilePrefix = os.path.join(shipTempDir, "client") + + for i in range(0, args.num_clients): + outFile = open(f"{shipClientFilePrefix}{i}.out", "w") + errFile = open(f"{shipClientFilePrefix}{i}.err", "w") + Print(f"Start client {i}") + popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile) + clients.append((popen, cmd)) + files.append((outFile, errFile)) + Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}") + + + # allow time for all clients to connect + shipNode.waitForHeadToAdvance(5) + shipNode.waitForLibToAdvance() + + Print(f"Kill all {args.num_clients} clients and ship node") + for index, (popen, _) in zip(range(len(clients)), clients): + popen.kill() + if index == len(clients)/2: + shipNode.kill(signal.SIGTERM) + assert not shipNode.verifyAlive(), "ship node did not shutdown" + + testSuccessful = True +finally: + TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails) + if shipTempDir is not None: + if testSuccessful and not args.keep_logs: + shutil.rmtree(shipTempDir, ignore_errors=True) + +errorCode = 0 if testSuccessful else 1 +exit(errorCode)