Skip to content

Commit

Permalink
Merge pull request #993 from AntelopeIO/GH-985-interrupt-trx
Browse files Browse the repository at this point in the history
Support interrupt transaction
  • Loading branch information
heifner authored Nov 6, 2024
2 parents 728b641 + b519e9c commit cf7c6e8
Show file tree
Hide file tree
Showing 21 changed files with 416 additions and 60 deletions.
2 changes: 1 addition & 1 deletion libraries/appbase
47 changes: 42 additions & 5 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,8 @@ struct controller_impl {
async_t async_aggregation = async_t::yes; // by default we process incoming votes asynchronously
my_finalizers_t my_finalizers;
std::atomic<bool> writing_snapshot = false;
std::atomic<bool> applying_block = false;
platform_timer& main_thread_timer;

thread_local static platform_timer timer; // a copy for main thread and each read-only thread
#if defined(EOSIO_EOS_VM_RUNTIME_ENABLED) || defined(EOSIO_EOS_VM_JIT_RUNTIME_ENABLED)
Expand Down Expand Up @@ -1285,6 +1287,7 @@ struct controller_impl {
read_mode( cfg.read_mode ),
thread_pool(),
my_finalizers(cfg.finalizers_dir / config::safety_filename),
main_thread_timer(timer), // assumes constructor is called from main thread
wasmif( conf.wasm_runtime, conf.eosvmoc_tierup, db, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty() )
{
assert(cfg.chain_thread_pool_size > 0);
Expand Down Expand Up @@ -3784,6 +3787,9 @@ struct controller_impl {
}
}

applying_block = true;
auto apply = fc::make_scoped_exit([&](){ applying_block = false; });

transaction_trace_ptr trace;

size_t packed_idx = 0;
Expand All @@ -3810,7 +3816,11 @@ struct controller_impl {
std::holds_alternative<transaction_id_type>(receipt.trx);

if( transaction_failed && !transaction_can_fail) {
edump((*trace));
if (trace->except->code() == interrupt_exception::code_value) {
ilog("Interrupt of trx id: ${id}", ("id", trace->id));
} else {
edump((*trace));
}
throw *trace->except;
}

Expand Down Expand Up @@ -3879,7 +3889,8 @@ struct controller_impl {
} catch ( const boost::interprocess::bad_alloc& ) {
throw;
} catch ( const fc::exception& e ) {
edump((e.to_detail_string()));
if (e.code() != interrupt_exception::code_value)
edump((e.to_detail_string()));
abort_block();
throw;
} catch ( const std::exception& e ) {
Expand Down Expand Up @@ -4369,7 +4380,15 @@ struct controller_impl {
log_irreversible();
transition_to_savanna_if_needed();
return controller::apply_blocks_result::complete;
} FC_LOG_AND_RETHROW( )
} catch (fc::exception& e) {
if (e.code() != interrupt_exception::code_value) {
wlog("${d}", ("d",e.to_detail_string()));
FC_RETHROW_EXCEPTION(e, warn, "rethrow");
}
throw;
} catch (...) {
try { throw; } FC_LOG_AND_RETHROW()
}
}

controller::apply_blocks_result maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup )
Expand Down Expand Up @@ -4441,8 +4460,12 @@ struct controller_impl {
} catch ( const boost::interprocess::bad_alloc& ) {
throw;
} catch (const fc::exception& e) {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
("bn", bsp->block_num())("id", bsp->id())("p", bsp->previous())("e", e.to_detail_string()));
if (e.code() == interrupt_exception::code_value) {
ilog("interrupt while applying block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
} else {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
("bn", bsp->block_num())("id", bsp->id())("p", bsp->previous())("e", e.to_detail_string()));
}
except = std::current_exception();
} catch (const std::exception& e) {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
Expand Down Expand Up @@ -4505,6 +4528,16 @@ struct controller_impl {
return applied_trxs;
}

void interrupt_transaction() {
// Only interrupt transaction if applying a block. Speculative trxs already have a deadline set so they
// have limited run time already. This is to allow killing a long-running transaction in a block being
// validated.
if (applying_block) {
ilog("Interrupting apply block");
main_thread_timer.expire_now();
}
}

// @param if_active true if instant finality is active
static checksum256_type calc_merkle( deque<digest_type>&& digests, bool if_active ) {
if (if_active) {
Expand Down Expand Up @@ -5265,6 +5298,10 @@ deque<transaction_metadata_ptr> controller::abort_block() {
return my->abort_block();
}

void controller::interrupt_transaction() {
my->interrupt_transaction();
}

boost::asio::io_context& controller::get_thread_pool() {
return my->thread_pool.get_executor();
}
Expand Down
3 changes: 3 additions & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ namespace eosio::chain {
*/
deque<transaction_metadata_ptr> abort_block();

/// Expected to be called from signal handler
void interrupt_transaction();

/**
*
*/
Expand Down
8 changes: 5 additions & 3 deletions libraries/chain/include/eosio/chain/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ namespace eosio { namespace chain {
3080005, "Transaction CPU usage is too much for the remaining allowable usage of the current block" )
FC_DECLARE_DERIVED_EXCEPTION( deadline_exception, resource_exhausted_exception,
3080006, "Transaction took too long" )
FC_DECLARE_DERIVED_EXCEPTION( leeway_deadline_exception, deadline_exception,
3081001, "Transaction reached the deadline set due to leeway on account CPU limits" )

FC_DECLARE_DERIVED_EXCEPTION( greylist_net_usage_exceeded, resource_exhausted_exception,
3080007, "Transaction exceeded the current greylisted account network usage limit" )
FC_DECLARE_DERIVED_EXCEPTION( greylist_cpu_usage_exceeded, resource_exhausted_exception,
Expand All @@ -389,9 +392,8 @@ namespace eosio { namespace chain {
3080009, "Read-only transaction eos-vm-oc compile temporary failure" )
FC_DECLARE_DERIVED_EXCEPTION( ro_trx_vm_oc_compile_permanent_failure, resource_exhausted_exception,
3080010, "Read-only transaction eos-vm-oc compile permanent failure" )

FC_DECLARE_DERIVED_EXCEPTION( leeway_deadline_exception, deadline_exception,
3081001, "Transaction reached the deadline set due to leeway on account CPU limits" )
FC_DECLARE_DERIVED_EXCEPTION( interrupt_exception, resource_exhausted_exception,
3080011, "Transaction interrupted by signal" )

FC_DECLARE_DERIVED_EXCEPTION( authorization_exception, chain_exception,
3090000, "Authorization exception" )
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/platform_timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct platform_timer {

void start(fc::time_point tp);
void stop();
void expire_now();

/* Sets a callback for when timer expires. Be aware this could might fire from a signal handling context and/or
on any particular thread. Only a single callback can be registered at once; trying to register more will
Expand Down
27 changes: 12 additions & 15 deletions libraries/chain/platform_timer_asio_fallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,39 +57,36 @@ platform_timer::~platform_timer() {

void platform_timer::start(fc::time_point tp) {
if(tp == fc::time_point::maximum()) {
expired = 0;
expired = false;
return;
}
fc::microseconds x = tp.time_since_epoch() - fc::time_point::now().time_since_epoch();
if(x.count() <= 0)
expired = 1;
expired = true;
else {
#if 0
std::promise<void> p;
auto f = p.get_future();
checktime_ios->post([&p,this]() {
expired = 0;
p.set_value();
});
f.get();
#endif
expired = 0;
expired = false;
my->timer->expires_after(std::chrono::microseconds(x.count()));
my->timer->async_wait([this](const boost::system::error_code& ec) {
if(ec)
return;
expired = 1;
call_expiration_callback();
expire_now();
});
}
}

void platform_timer::expire_now() {
bool expected = false;
if (expired.compare_exchange_strong(expected, true)) {
call_expiration_callback();
}
}

void platform_timer::stop() {
if(expired)
return;

my->timer->cancel();
expired = 1;
expired = true;
}

}}
20 changes: 13 additions & 7 deletions libraries/chain/platform_timer_kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ platform_timer::platform_timer() {

if(c == 1 && anEvent.filter == EVFILT_TIMER) {
platform_timer* self = (platform_timer*)anEvent.udata;
self->expired = 1;
self->call_expiration_callback();
self->expire_now();
}
else if(c == 1 && anEvent.filter == EVFILT_USER)
return;
Expand Down Expand Up @@ -90,19 +89,26 @@ platform_timer::~platform_timer() {

void platform_timer::start(fc::time_point tp) {
if(tp == fc::time_point::maximum()) {
expired = 0;
expired = false;
return;
}
fc::microseconds x = tp.time_since_epoch() - fc::time_point::now().time_since_epoch();
if(x.count() <= 0)
expired = 1;
expired = true;
else {
struct kevent64_s aTimerEvent;
EV_SET64(&aTimerEvent, my->timerid, EVFILT_TIMER, EV_ADD|EV_ENABLE|EV_ONESHOT, NOTE_USECONDS|NOTE_CRITICAL, x.count(), (uint64_t)this, 0, 0);

expired = 0;
expired = false;
if(kevent64(kqueue_fd, &aTimerEvent, 1, NULL, 0, KEVENT_FLAG_IMMEDIATE, NULL) != 0)
expired = 1;
expired = true;
}
}

void platform_timer::expire_now() {
bool expected = false;
if (expired.compare_exchange_strong(expected, true)) {
call_expiration_callback();
}
}

Expand All @@ -113,7 +119,7 @@ void platform_timer::stop() {
struct kevent64_s stop_timer_event;
EV_SET64(&stop_timer_event, my->timerid, EVFILT_TIMER, EV_DELETE, 0, 0, 0, 0, 0);
kevent64(kqueue_fd, &stop_timer_event, 1, NULL, 0, KEVENT_FLAG_IMMEDIATE, NULL);
expired = 1;
expired = true;
}

}}
30 changes: 19 additions & 11 deletions libraries/chain/platform_timer_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include <fc/fwd_impl.hpp>
#include <fc/exception/exception.hpp>

#include <atomic>
#include <mutex>

#include <signal.h>
#include <time.h>
#include <sys/types.h>

namespace eosio { namespace chain {
namespace eosio::chain {

static_assert(std::atomic_bool::is_always_lock_free, "Only lock-free atomics AS-safe.");

Expand All @@ -19,18 +21,17 @@ struct platform_timer::impl {

static void sig_handler(int, siginfo_t* si, void*) {
platform_timer* self = (platform_timer*)si->si_value.sival_ptr;
self->expired = 1;
self->call_expiration_callback();
self->expire_now();
}
};

platform_timer::platform_timer() {
static_assert(sizeof(impl) <= fwd_size);

static bool initialized;
static std::mutex initalized_mutex;
static std::mutex initialized_mutex;

if(std::lock_guard guard(initalized_mutex); !initialized) {
if(std::lock_guard guard(initialized_mutex); !initialized) {
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_sigaction = impl::sig_handler;
Expand All @@ -55,19 +56,26 @@ platform_timer::~platform_timer() {

void platform_timer::start(fc::time_point tp) {
if(tp == fc::time_point::maximum()) {
expired = 0;
expired = false;
return;
}
fc::microseconds x = tp.time_since_epoch() - fc::time_point::now().time_since_epoch();
if(x.count() <= 0)
expired = 1;
expired = true;
else {
time_t secs = x.count() / 1000000;
long nsec = (x.count() - (secs*1000000)) * 1000;
struct itimerspec enable = {{0, 0}, {secs, nsec}};
expired = 0;
expired = false;
if(timer_settime(my->timerid, 0, &enable, NULL) != 0)
expired = 1;
expired = true;
}
}

void platform_timer::expire_now() {
bool expected = false;
if (expired.compare_exchange_strong(expected, true)) {
call_expiration_callback();
}
}

Expand All @@ -76,7 +84,7 @@ void platform_timer::stop() {
return;
struct itimerspec disable = {{0, 0}, {0, 0}};
timer_settime(my->timerid, 0, &disable, NULL);
expired = 1;
expired = true;
}

}}
}
5 changes: 4 additions & 1 deletion libraries/chain/transaction_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,10 @@ namespace eosio::chain {
return;

auto now = fc::time_point::now();
if( explicit_billed_cpu_time || deadline_exception_code == deadline_exception::code_value ) {
if (explicit_billed_cpu_time && block_deadline > now) {
EOS_THROW( interrupt_exception, "interrupt signaled, ran ${bt}us, start ${s}",
("bt", now - pseudo_start)("s", start) );
} else if( explicit_billed_cpu_time || deadline_exception_code == deadline_exception::code_value ) {
EOS_THROW( deadline_exception, "deadline exceeded ${billing_timer}us",
("billing_timer", now - pseudo_start)("now", now)("deadline", _deadline)("start", start) );
} else if( deadline_exception_code == block_cpu_usage_exceeded::code_value ) {
Expand Down
4 changes: 4 additions & 0 deletions plugins/test_control_api_plugin/test_control_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ void test_control_api_plugin::plugin_startup() {
TEST_CONTROL_RW_CALL(throw_on, 202, http_params_types::params_required)
}, appbase::exec_queue::read_write);

app().get_plugin<http_plugin>().add_api({
TEST_CONTROL_RW_CALL(swap_action, 202, http_params_types::params_required)
}, appbase::exec_queue::read_write);

}

void test_control_api_plugin::plugin_shutdown() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ class read_write {
};
empty throw_on(const throw_on_params& params) const;

private:
// produce a next block with `from` action replaced with `to` action
// requires Savanna to be active, this assumes blocks are is_proper_svnn_block
struct swap_action_params {
chain::name from; // replace from action in block to `to` action
chain::name to;
fc::crypto::private_key trx_priv_key;
fc::crypto::private_key blk_priv_key;
};
empty swap_action(const swap_action_params& params) const;

private:
test_control_ptr my;
};

Expand Down Expand Up @@ -68,3 +78,4 @@ class test_control_plugin : public plugin<test_control_plugin> {
FC_REFLECT(eosio::test_control_apis::empty, )
FC_REFLECT(eosio::test_control_apis::read_write::kill_node_on_producer_params, (producer)(where_in_sequence)(based_on_lib) )
FC_REFLECT(eosio::test_control_apis::read_write::throw_on_params, (signal)(exception) )
FC_REFLECT(eosio::test_control_apis::read_write::swap_action_params, (from)(to)(trx_priv_key)(blk_priv_key) )
Loading

0 comments on commit cf7c6e8

Please sign in to comment.