-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[1.0.2] SHiP: Fix hang on shutdown #816
Changes from all commits
32c5b87
c09ec2a
6350739
80d5aef
49492dc
9736f6f
2641515
6df1c11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,22 +100,24 @@ struct state_history_plugin_impl { | |
template <typename Protocol> | ||
void create_listener(const std::string& address) { | ||
const boost::posix_time::milliseconds accept_timeout(200); | ||
// connections set must only be modified by main thread; run listener on main thread to avoid needing another post() | ||
fc::create_listener<Protocol>(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) { | ||
catch_and_log([this, &socket]() { | ||
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), | ||
trace_log, chain_state_log, finality_data_log, | ||
[this](const chain::block_num_type block_num) { | ||
return get_block_id(block_num); | ||
}, | ||
[this](const chain::block_id_type& block_id) { | ||
return chain_plug->chain().fetch_block_by_id(block_id); | ||
}, | ||
[this](session_base* conn) { | ||
boost::asio::post(app().get_io_service(), [conn, this]() { | ||
connections.erase(connections.find(conn)); | ||
}); | ||
}, _log)); | ||
// connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread | ||
fc::create_listener<Protocol>(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) { | ||
boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about this more, I think this creates a (fantastically remote) possibility of the socket being destroyed after its executor is destroyed. This occurs when this callback is |
||
catch_and_log([this, &socket]() { | ||
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), | ||
trace_log, chain_state_log, finality_data_log, | ||
[this](const chain::block_num_type block_num) { | ||
return get_block_id(block_num); | ||
}, | ||
[this](const chain::block_id_type& block_id) { | ||
return chain_plug->chain().fetch_block_by_id(block_id); | ||
}, | ||
[this](session_base* conn) { | ||
boost::asio::post(app().get_io_service(), [conn, this]() { | ||
connections.erase(connections.find(conn)); | ||
}); | ||
}, _log)); | ||
}); | ||
}); | ||
}); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import time | ||
import json | ||
import os | ||
import shutil | ||
import signal | ||
import sys | ||
|
||
from TestHarness import Account, Cluster, TestHelper, Utils, WalletMgr | ||
from TestHarness.TestHelper import AppArgs | ||
|
||
############################################################### | ||
# ship_kill_client_test | ||
# | ||
# Setup a nodeos with SHiP (state_history_plugin). | ||
# Connect a number of clients and then kill the clients and shutdown nodoes. | ||
# nodeos should exit cleanly and not hang or SEGfAULT. | ||
# | ||
############################################################### | ||
|
||
Print=Utils.Print | ||
|
||
appArgs = AppArgs() | ||
extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1) | ||
args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running","--unshared"}, applicationSpecificArgs=appArgs) | ||
|
||
Utils.Debug=args.v | ||
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs) | ||
dumpErrorDetails=args.dump_error_details | ||
walletPort=TestHelper.DEFAULT_WALLET_PORT | ||
|
||
# simpler to have two producer nodes then setup different accounts for trx generator | ||
totalProducerNodes=2 | ||
linh2931 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
totalNonProducerNodes=1 | ||
totalNodes=totalProducerNodes+totalNonProducerNodes | ||
|
||
walletMgr=WalletMgr(True, port=walletPort) | ||
testSuccessful=False | ||
|
||
WalletdName=Utils.EosWalletName | ||
shipTempDir=None | ||
|
||
try: | ||
TestHelper.printSystemInfo("BEGIN") | ||
|
||
cluster.setWalletMgr(walletMgr) | ||
Print("Stand up cluster") | ||
|
||
shipNodeNum = 2 | ||
specificExtraNodeosArgs={} | ||
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --finality-data-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin " | ||
|
||
if cluster.launch(pnodes=totalProducerNodes, loadSystemContract=False, | ||
totalNodes=totalNodes, totalProducers=totalProducerNodes, activateIF=True, biosFinalizer=False, | ||
specificExtraNodeosArgs=specificExtraNodeosArgs) is False: | ||
Utils.cmdError("launcher") | ||
Utils.errorExit("Failed to stand up cluster.") | ||
|
||
# verify nodes are in sync and advancing | ||
cluster.waitOnClusterSync(blockAdvancing=5) | ||
Print("Cluster in Sync") | ||
|
||
prodNode0 = cluster.getNode(0) | ||
prodNode1 = cluster.getNode(1) | ||
shipNode = cluster.getNode(shipNodeNum) | ||
|
||
# cluster.waitOnClusterSync(blockAdvancing=3) | ||
start_block_num = shipNode.getBlockNum() | ||
|
||
#verify nodes are in sync and advancing | ||
cluster.waitOnClusterSync(blockAdvancing=3) | ||
Print("Shutdown unneeded bios node") | ||
cluster.biosNode.kill(signal.SIGTERM) | ||
|
||
Print("Configure and launch txn generators") | ||
targetTpsPerGenerator = 10 | ||
testTrxGenDurationSec=60*60 | ||
numTrxGenerators=2 | ||
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[cluster.defproduceraAccount.name, cluster.defproducerbAccount.name], | ||
acctPrivKeysList=[cluster.defproduceraAccount.activePrivateKey,cluster.defproducerbAccount.activePrivateKey], nodeId=prodNode1.nodeId, | ||
tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec, | ||
waitToComplete=False) | ||
|
||
status = cluster.waitForTrxGeneratorsSpinup(nodeId=prodNode1.nodeId, numGenerators=numTrxGenerators) | ||
assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators" | ||
|
||
prodNode1.waitForProducer("defproducera") | ||
|
||
block_range = 100000 # we are going to kill the client, so just make this a huge number | ||
end_block_num = start_block_num + block_range | ||
|
||
shipClient = "tests/ship_streamer" | ||
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas --fetch-finality-data" | ||
if Utils.Debug: Utils.Print(f"cmd: {cmd}") | ||
clients = [] | ||
files = [] | ||
shipTempDir = os.path.join(Utils.DataDir, "ship") | ||
os.makedirs(shipTempDir, exist_ok = True) | ||
shipClientFilePrefix = os.path.join(shipTempDir, "client") | ||
|
||
for i in range(0, args.num_clients): | ||
outFile = open(f"{shipClientFilePrefix}{i}.out", "w") | ||
errFile = open(f"{shipClientFilePrefix}{i}.err", "w") | ||
Print(f"Start client {i}") | ||
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile) | ||
clients.append((popen, cmd)) | ||
files.append((outFile, errFile)) | ||
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}") | ||
|
||
|
||
# allow time for all clients to connect | ||
shipNode.waitForHeadToAdvance(5) | ||
shipNode.waitForLibToAdvance() | ||
|
||
Print(f"Kill all {args.num_clients} clients and ship node") | ||
for index, (popen, _) in zip(range(len(clients)), clients): | ||
popen.kill() | ||
if index == len(clients)/2: | ||
shipNode.kill(signal.SIGTERM) | ||
assert not shipNode.verifyAlive(), "ship node did not shutdown" | ||
|
||
testSuccessful = True | ||
finally: | ||
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails) | ||
if shipTempDir is not None: | ||
if testSuccessful and not args.keep_logs: | ||
shutil.rmtree(shipTempDir, ignore_errors=True) | ||
|
||
errorCode = 0 if testSuccessful else 1 | ||
exit(errorCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will need to be additional changes below here otherwise
connections
will be modified in the ship thread which is not allowed.But that's really still not enough to get it 100% correct if the intent is to make the default executor of the stream to be what we want -- the stream should be using a per-stream strand instead of the thread's executor. The problem is that
fc::create_listener
doesn't give access to the variant ofasync_accept()
that allows creating the new socket with a different default executor than the listening socket. This is a real shortcoming and without refactoringfc::create_listener
I'm not sure off hand how to fix that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case there is only one SHiP thread and its implicit strand. I agree that it would be better if
fc::create_listener
took a strand.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah for 1.0.x that's fine but we should really get it right on main -- the code is currently structured with the assumption that increasing ship threads 'just works'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems passing a strand works.