Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0.2] SHiP: Fix hang on shutdown #816

Merged
merged 8 commits into from
Sep 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class session final : public session_base {
std::optional<log_catalog>& chain_state_log;
std::optional<log_catalog>& finality_data_log;

GetBlockID get_block_id;
GetBlockID get_block_id; // call from main app thread
GetBlock get_block;

///these items might be used on either the strand or main thread
Expand Down
33 changes: 18 additions & 15 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,24 @@ struct state_history_plugin_impl {
void create_listener(const std::string& address) {
const boost::posix_time::milliseconds accept_timeout(200);
// connections set must only be modified by main thread; run listener on main thread to avoid needing another post()
fc::create_listener<Protocol>(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
},
[this](const chain::block_id_type& block_id) {
return chain_plug->chain().fetch_block_by_id(block_id);
},
[this](session_base* conn) {
boost::asio::post(app().get_io_service(), [conn, this]() {
connections.erase(connections.find(conn));
});
}, _log));
auto strand = boost::asio::make_strand(thread_pool.get_executor());
fc::create_listener<Protocol>(strand, _log, accept_timeout, address, "", [this, strand](Protocol::socket&& socket) {
boost::asio::post(app().get_io_service(), [this, strand, socket{std::move(socket)}]() mutable {
catch_and_log([this, &socket, &strand]() {
connections.emplace(new session(std::move(socket), std::move(strand), chain_plug->chain(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here seem to make only a single strand for all connections. And then this single strand is moved here? Doesn't look right. I think what you had before was better even though it still didn't make the websocket's default executor be a strand.

trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
},
[this](const chain::block_id_type& block_id) {
return chain_plug->chain().fetch_block_by_id(block_id);
},
[this](session_base* conn) {
boost::asio::post(app().get_io_service(), [conn, this]() {
connections.erase(connections.find(conn));
});
}, _log));
});
});
});
}
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DI
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_reqs_across_svnn_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_reqs_across_svnn_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_kill_client_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_kill_client_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/bridge_for_fork_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/bridge_for_fork_test_shape.json COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/lib_advance_test.py ${CMAKE_CURRENT_BINARY_DIR}/lib_advance_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
Expand Down Expand Up @@ -187,6 +188,8 @@ add_test(NAME ship_streamer_if_test COMMAND tests/ship_streamer_test.py -v --num
set_property(TEST ship_streamer_if_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_streamer_if_fetch_finality_data_test COMMAND tests/ship_streamer_test.py -v --num-clients 10 --activate-if --finality-data-history ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_streamer_if_fetch_finality_data_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_kill_client_test COMMAND tests/ship_kill_client_test.py -v --activate-if --num-clients 20 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_kill_client_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests)
Expand Down
132 changes: 132 additions & 0 deletions tests/ship_kill_client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python3

import time
import json
import os
import shutil
import signal
import sys

from TestHarness import Account, Cluster, TestHelper, Utils, WalletMgr
from TestHarness.TestHelper import AppArgs

###############################################################
# ship_kill_client_test
#
# Setup a nodeos with SHiP (state_history_plugin).
# Connect a number of clients and then kill the clients and shutdown nodoes.
# nodeos should exit cleanly and not hang or SEGfAULT.
#
###############################################################

Print=Utils.Print

appArgs = AppArgs()
extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1)
args = TestHelper.parse_args({"--activate-if","--dump-error-details","--keep-logs","-v","--leave-running","--unshared"}, applicationSpecificArgs=appArgs)
linh2931 marked this conversation as resolved.
Show resolved Hide resolved

Utils.Debug=args.v
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
activateIF=args.activate_if
dumpErrorDetails=args.dump_error_details
walletPort=TestHelper.DEFAULT_WALLET_PORT

totalProducerNodes=2
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
totalNonProducerNodes=1
totalNodes=totalProducerNodes+totalNonProducerNodes

walletMgr=WalletMgr(True, port=walletPort)
testSuccessful=False

WalletdName=Utils.EosWalletName
shipTempDir=None

try:
TestHelper.printSystemInfo("BEGIN")

cluster.setWalletMgr(walletMgr)
Print("Stand up cluster")

shipNodeNum = 2
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --finality-data-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "

if cluster.launch(pnodes=totalProducerNodes, loadSystemContract=False,
totalNodes=totalNodes, totalProducers=totalProducerNodes, activateIF=activateIF, biosFinalizer=False,
specificExtraNodeosArgs=specificExtraNodeosArgs) is False:
Utils.cmdError("launcher")
Utils.errorExit("Failed to stand up cluster.")

# verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=5)
Print("Cluster in Sync")

prodNode0 = cluster.getNode(0)
nonprodNode = cluster.getNode(1)
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
shipNode = cluster.getNode(shipNodeNum)

# cluster.waitOnClusterSync(blockAdvancing=3)
start_block_num = shipNode.getBlockNum()

#verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=3)
Print("Shutdown unneeded bios node")
cluster.biosNode.kill(signal.SIGTERM)

Print("Configure and launch txn generators")
targetTpsPerGenerator = 10
testTrxGenDurationSec=60*60
numTrxGenerators=2
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[cluster.defproduceraAccount.name, cluster.defproducerbAccount.name],
acctPrivKeysList=[cluster.defproduceraAccount.activePrivateKey,cluster.defproducerbAccount.activePrivateKey], nodeId=nonprodNode.nodeId,
tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec,
waitToComplete=False)

status = cluster.waitForTrxGeneratorsSpinup(nodeId=nonprodNode.nodeId, numGenerators=numTrxGenerators)
assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators"

nonprodNode.waitForProducer("defproducera")

block_range = 100000 # we are going to kill the client, so just make this a huge number
end_block_num = start_block_num + block_range

shipClient = "tests/ship_streamer"
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas --fetch-finality-data"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
shipTempDir = os.path.join(Utils.DataDir, "ship")
os.makedirs(shipTempDir, exist_ok = True)
shipClientFilePrefix = os.path.join(shipTempDir, "client")

for i in range(0, args.num_clients):
start = time.perf_counter()
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
outFile = open(f"{shipClientFilePrefix}{i}.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")


# allow time for all clients to connect
shipNode.waitForHeadToAdvance(5)
shipNode.waitForLibToAdvance()

Print(f"Kill all {args.num_clients} clients and ship node")
for index, (popen, _) in zip(range(len(clients)), clients):
popen.kill()
if index == len(clients)/2:
shipNode.kill(signal.SIGTERM)
assert not shipNode.verifyAlive(), "ship node did not shutdown"

testSuccessful = True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)
if shipTempDir is not None:
if testSuccessful and not args.keep_logs:
shutil.rmtree(shipTempDir, ignore_errors=True)

errorCode = 0 if testSuccessful else 1
exit(errorCode)