Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0.2] SHiP: Fix hang on shutdown #816

Merged
merged 8 commits into from
Sep 24, 2024
2 changes: 1 addition & 1 deletion plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ struct state_history_plugin_impl {
void create_listener(const std::string& address) {
const boost::posix_time::milliseconds accept_timeout(200);
// connections set must only be modified by main thread; run listener on main thread to avoid needing another post()
fc::create_listener<Protocol>(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
fc::create_listener<Protocol>(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will need to be additional changes below here otherwise connections will be modified in the ship thread which is not allowed.

But that's really still not enough to get it 100% correct if the intent is to make the default executor of the stream to be what we want -- the stream should be using a per-stream strand instead of the thread's executor. The problem is that fc::create_listener doesn't give access to the variant of async_accept() that allows creating the new socket with a different default executor than the listening socket. This is a real shortcoming and without refactoring fc::create_listener I'm not sure off hand how to fix that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case there is only one SHiP thread and its implicit strand. I agree that it would be better if fc::create_listener took a strand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for 1.0.x that's fine but we should really get it right on main -- the code is currently structured with the assumption that increasing ship threads 'just works'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems passing a strand works.

catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DI
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_reqs_across_svnn_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_reqs_across_svnn_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_kill_client_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_kill_client_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/bridge_for_fork_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/bridge_for_fork_test_shape.json COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/lib_advance_test.py ${CMAKE_CURRENT_BINARY_DIR}/lib_advance_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
Expand Down Expand Up @@ -187,6 +188,8 @@ add_test(NAME ship_streamer_if_test COMMAND tests/ship_streamer_test.py -v --num
set_property(TEST ship_streamer_if_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_streamer_if_fetch_finality_data_test COMMAND tests/ship_streamer_test.py -v --num-clients 10 --activate-if --finality-data-history ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_streamer_if_fetch_finality_data_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_kill_client_test COMMAND tests/ship_kill_client_test.py -v --activate-if --num-clients 20 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_kill_client_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests)
Expand Down
132 changes: 132 additions & 0 deletions tests/ship_kill_client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python3

import time
import json
import os
import shutil
import signal
import sys

from TestHarness import Account, Cluster, TestHelper, Utils, WalletMgr
from TestHarness.TestHelper import AppArgs

###############################################################
# ship_kill_client_test
#
# Setup a nodeos with SHiP (state_history_plugin).
# Connect a number of clients and then kill the clients and shutdown nodoes.
# nodeos should exit cleanly and not hang or SEGfAULT.
#
###############################################################

Print=Utils.Print

appArgs = AppArgs()
extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1)
args = TestHelper.parse_args({"--activate-if","--dump-error-details","--keep-logs","-v","--leave-running","--unshared"}, applicationSpecificArgs=appArgs)
linh2931 marked this conversation as resolved.
Show resolved Hide resolved

Utils.Debug=args.v
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
activateIF=args.activate_if
dumpErrorDetails=args.dump_error_details
walletPort=TestHelper.DEFAULT_WALLET_PORT

totalProducerNodes=2
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
totalNonProducerNodes=1
totalNodes=totalProducerNodes+totalNonProducerNodes

walletMgr=WalletMgr(True, port=walletPort)
testSuccessful=False

WalletdName=Utils.EosWalletName
shipTempDir=None

try:
TestHelper.printSystemInfo("BEGIN")

cluster.setWalletMgr(walletMgr)
Print("Stand up cluster")

shipNodeNum = 2
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --finality-data-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "

if cluster.launch(pnodes=totalProducerNodes, loadSystemContract=False,
totalNodes=totalNodes, totalProducers=totalProducerNodes, activateIF=activateIF, biosFinalizer=False,
specificExtraNodeosArgs=specificExtraNodeosArgs) is False:
Utils.cmdError("launcher")
Utils.errorExit("Failed to stand up cluster.")

# verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=5)
Print("Cluster in Sync")

prodNode0 = cluster.getNode(0)
nonprodNode = cluster.getNode(1)
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
shipNode = cluster.getNode(shipNodeNum)

# cluster.waitOnClusterSync(blockAdvancing=3)
start_block_num = shipNode.getBlockNum()

#verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=3)
Print("Shutdown unneeded bios node")
cluster.biosNode.kill(signal.SIGTERM)

Print("Configure and launch txn generators")
targetTpsPerGenerator = 10
testTrxGenDurationSec=60*60
numTrxGenerators=2
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[cluster.defproduceraAccount.name, cluster.defproducerbAccount.name],
acctPrivKeysList=[cluster.defproduceraAccount.activePrivateKey,cluster.defproducerbAccount.activePrivateKey], nodeId=nonprodNode.nodeId,
tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec,
waitToComplete=False)

status = cluster.waitForTrxGeneratorsSpinup(nodeId=nonprodNode.nodeId, numGenerators=numTrxGenerators)
assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators"

nonprodNode.waitForProducer("defproducera")

block_range = 100000 # we are going to kill the client, so just make this a huge number
end_block_num = start_block_num + block_range

shipClient = "tests/ship_streamer"
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas --fetch-finality-data"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
shipTempDir = os.path.join(Utils.DataDir, "ship")
os.makedirs(shipTempDir, exist_ok = True)
shipClientFilePrefix = os.path.join(shipTempDir, "client")

for i in range(0, args.num_clients):
start = time.perf_counter()
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
outFile = open(f"{shipClientFilePrefix}{i}.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")


# allow time for all clients to connect
shipNode.waitForHeadToAdvance(5)
shipNode.waitForLibToAdvance()

Print(f"Kill all {args.num_clients} clients and ship node")
for index, (popen, _) in zip(range(len(clients)), clients):
popen.kill()
if index == len(clients)/2:
shipNode.kill(signal.SIGTERM)
assert not shipNode.verifyAlive(), "ship node did not shutdown"

testSuccessful = True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)
if shipTempDir is not None:
if testSuccessful and not args.keep_logs:
shutil.rmtree(shipTempDir, ignore_errors=True)

errorCode = 0 if testSuccessful else 1
exit(errorCode)