Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0.2 -> main] SHiP: Fix hang on shutdown #820

Merged
merged 10 commits into from
Sep 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class session final : public session_base {
std::optional<log_catalog>& chain_state_log;
std::optional<log_catalog>& finality_data_log;

GetBlockID get_block_id;
GetBlockID get_block_id; // call from main app thread
GetBlock get_block;

///these items might be used on either the strand or main thread
Expand Down
34 changes: 18 additions & 16 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,24 @@ struct state_history_plugin_impl {
template <typename Protocol>
void create_listener(const std::string& address) {
const boost::posix_time::milliseconds accept_timeout(200);
// connections set must only be modified by main thread; run listener on main thread to avoid needing another post()
fc::create_listener<Protocol>(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
},
[this](const chain::block_id_type& block_id) {
return chain_plug->chain().fetch_block_by_id(block_id);
},
[this](session_base* conn) {
boost::asio::post(app().get_io_service(), [conn, this]() {
connections.erase(connections.find(conn));
});
}, _log));
// connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread
fc::create_listener<Protocol>(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable {
catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
},
[this](const chain::block_id_type& block_id) {
return chain_plug->chain().fetch_block_by_id(block_id);
},
[this](session_base* conn) {
boost::asio::post(app().get_io_service(), [conn, this]() {
connections.erase(connections.find(conn));
});
}, _log));
});
});
});
}
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DI
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_reqs_across_svnn_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_reqs_across_svnn_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_kill_client_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_kill_client_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/bridge_for_fork_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/bridge_for_fork_test_shape.json COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/lib_advance_test.py ${CMAKE_CURRENT_BINARY_DIR}/lib_advance_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
Expand Down Expand Up @@ -187,6 +188,8 @@ add_test(NAME ship_streamer_if_test COMMAND tests/ship_streamer_test.py -v --num
set_property(TEST ship_streamer_if_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_streamer_if_fetch_finality_data_test COMMAND tests/ship_streamer_test.py -v --num-clients 10 --activate-if --finality-data-history ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_streamer_if_fetch_finality_data_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_kill_client_test COMMAND tests/ship_kill_client_test.py -v --num-clients 20 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_kill_client_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests)
Expand Down
131 changes: 131 additions & 0 deletions tests/ship_kill_client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python3

import time
import json
import os
import shutil
import signal
import sys

from TestHarness import Account, Cluster, TestHelper, Utils, WalletMgr
from TestHarness.TestHelper import AppArgs

###############################################################
# ship_kill_client_test
#
# Setup a nodeos with SHiP (state_history_plugin).
# Connect a number of clients and then kill the clients and shutdown nodoes.
# nodeos should exit cleanly and not hang or SEGfAULT.
#
###############################################################

Print=Utils.Print

appArgs = AppArgs()
extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1)
args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running","--unshared"}, applicationSpecificArgs=appArgs)

Utils.Debug=args.v
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
dumpErrorDetails=args.dump_error_details
walletPort=TestHelper.DEFAULT_WALLET_PORT

# simpler to have two producer nodes then setup different accounts for trx generator
totalProducerNodes=2
totalNonProducerNodes=1
totalNodes=totalProducerNodes+totalNonProducerNodes

walletMgr=WalletMgr(True, port=walletPort)
testSuccessful=False

WalletdName=Utils.EosWalletName
shipTempDir=None

try:
TestHelper.printSystemInfo("BEGIN")

cluster.setWalletMgr(walletMgr)
Print("Stand up cluster")

shipNodeNum = 2
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --finality-data-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "

if cluster.launch(pnodes=totalProducerNodes, loadSystemContract=False,
totalNodes=totalNodes, totalProducers=totalProducerNodes, activateIF=True, biosFinalizer=False,
specificExtraNodeosArgs=specificExtraNodeosArgs) is False:
Utils.cmdError("launcher")
Utils.errorExit("Failed to stand up cluster.")

# verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=5)
Print("Cluster in Sync")

prodNode0 = cluster.getNode(0)
prodNode1 = cluster.getNode(1)
shipNode = cluster.getNode(shipNodeNum)

# cluster.waitOnClusterSync(blockAdvancing=3)
start_block_num = shipNode.getBlockNum()

#verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=3)
Print("Shutdown unneeded bios node")
cluster.biosNode.kill(signal.SIGTERM)

Print("Configure and launch txn generators")
targetTpsPerGenerator = 10
testTrxGenDurationSec=60*60
numTrxGenerators=2
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[cluster.defproduceraAccount.name, cluster.defproducerbAccount.name],
acctPrivKeysList=[cluster.defproduceraAccount.activePrivateKey,cluster.defproducerbAccount.activePrivateKey], nodeId=prodNode1.nodeId,
tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec,
waitToComplete=False)

status = cluster.waitForTrxGeneratorsSpinup(nodeId=prodNode1.nodeId, numGenerators=numTrxGenerators)
assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators"

prodNode1.waitForProducer("defproducera")

block_range = 100000 # we are going to kill the client, so just make this a huge number
end_block_num = start_block_num + block_range

shipClient = "tests/ship_streamer"
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas --fetch-finality-data"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
shipTempDir = os.path.join(Utils.DataDir, "ship")
os.makedirs(shipTempDir, exist_ok = True)
shipClientFilePrefix = os.path.join(shipTempDir, "client")

for i in range(0, args.num_clients):
outFile = open(f"{shipClientFilePrefix}{i}.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")


# allow time for all clients to connect
shipNode.waitForHeadToAdvance(5)
shipNode.waitForLibToAdvance()

Print(f"Kill all {args.num_clients} clients and ship node")
for index, (popen, _) in zip(range(len(clients)), clients):
popen.kill()
if index == len(clients)/2:
shipNode.kill(signal.SIGTERM)
assert not shipNode.verifyAlive(), "ship node did not shutdown"

testSuccessful = True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)
if shipTempDir is not None:
if testSuccessful and not args.keep_logs:
shutil.rmtree(shipTempDir, ignore_errors=True)

errorCode = 0 if testSuccessful else 1
exit(errorCode)