Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Add callback for when async compile completes
  - Use to kill action if it is still running in non-oc
- Kick off another queued compile when compile finishes
- Add executing_action_id and queued_time to oc compile ipc_protocol
- Pause billing timer when looking up and processing oc code
- Move wasm_interface apply function to wasm_interface_impl
  • Loading branch information
heifner committed Nov 11, 2024
1 parent 29ac711 commit 9a6c881
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 102 deletions.
2 changes: 1 addition & 1 deletion libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,7 @@ struct controller_impl {
thread_pool(),
my_finalizers(cfg.finalizers_dir / config::safety_filename),
main_thread_timer(timer), // assumes constructor is called from main thread
wasmif( conf.wasm_runtime, conf.eosvmoc_tierup, db, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty() )
wasmif( conf.wasm_runtime, conf.eosvmoc_tierup, db, main_thread_timer, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty() )
{
assert(cfg.chain_thread_pool_size > 0);
thread_pool.start( cfg.chain_thread_pool_size, [this]( const fc::exception& e ) {
Expand Down
10 changes: 7 additions & 3 deletions libraries/chain/include/eosio/chain/wasm_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

namespace eosio { namespace chain {

class apply_context;
struct platform_timer;

class apply_context;
class wasm_runtime_interface;
class controller;
namespace eosvmoc { struct config; }
Expand Down Expand Up @@ -48,7 +50,7 @@ namespace eosio { namespace chain {

inline static bool test_disable_tierup = false; // set by unittests to test tierup failing

wasm_interface(vm_type vm, vm_oc_enable eosvmoc_tierup, const chainbase::database& d, const std::filesystem::path data_dir, const eosvmoc::config& eosvmoc_config, bool profile);
wasm_interface(vm_type vm, vm_oc_enable eosvmoc_tierup, const chainbase::database& d, platform_timer& main_thread_timer, const std::filesystem::path data_dir, const eosvmoc::config& eosvmoc_config, bool profile);
~wasm_interface();

#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
Expand All @@ -57,6 +59,9 @@ namespace eosio { namespace chain {

// returns true if EOS VM OC is enabled
bool is_eos_vm_oc_enabled() const;

// return internal executing action id, used for testing
uint64_t get_executing_action_id() const;
#endif

//call before dtor to skip what can be minutes of dtor overhead with some runtimes; can cause leaks
Expand All @@ -82,7 +87,6 @@ namespace eosio { namespace chain {
std::function<bool(const digest_type& code_hash, uint8_t vm_type, uint8_t vm_version, apply_context& context)> substitute_apply;

private:
vm_oc_enable eosvmoc_tierup;
unique_ptr<struct wasm_interface_impl> my;
};

Expand Down
96 changes: 92 additions & 4 deletions libraries/chain/include/eosio/chain/wasm_interface_private.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ namespace eosio { namespace chain {
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
struct eosvmoc_tier {
// Called from main thread
eosvmoc_tier(const std::filesystem::path& d, const eosvmoc::config& c, const chainbase::database& db)
: cc(d, c, db) {
eosvmoc_tier(const std::filesystem::path& d, const eosvmoc::config& c, const chainbase::database& db,
eosvmoc::code_cache_async::compile_complete_callback cb)
: cc(d, c, db, std::move(cb)) {
// Construct exec and mem for the main thread
exec = std::make_unique<eosvmoc::executor>(cc);
mem = std::make_unique<eosvmoc::memory>(wasm_constraints::maximum_linear_memory/wasm_constraints::wasm_page_size);
Expand All @@ -70,9 +71,12 @@ struct eosvmoc_tier {
#endif

wasm_interface_impl(wasm_interface::vm_type vm, wasm_interface::vm_oc_enable eosvmoc_tierup, const chainbase::database& d,
const std::filesystem::path data_dir, const eosvmoc::config& eosvmoc_config, bool profile)
platform_timer& main_thread_timer, const std::filesystem::path data_dir,
const eosvmoc::config& eosvmoc_config, bool profile)
: db(d)
, main_thread_timer(main_thread_timer)
, wasm_runtime_time(vm)
, eosvmoc_tierup(eosvmoc_tierup)
{
#ifdef EOSIO_EOS_VM_RUNTIME_ENABLED
if(vm == wasm_interface::vm_type::eos_vm)
Expand All @@ -96,13 +100,93 @@ struct eosvmoc_tier {
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
if(eosvmoc_tierup != wasm_interface::vm_oc_enable::oc_none) {
EOS_ASSERT(vm != wasm_interface::vm_type::eos_vm_oc, wasm_exception, "You can't use EOS VM OC as the base runtime when tier up is activated");
eosvmoc = std::make_unique<eosvmoc_tier>(data_dir, eosvmoc_config, d);
eosvmoc = std::make_unique<eosvmoc_tier>(data_dir, eosvmoc_config, d, [this](uint64_t executing_action_id, fc::time_point queued_time) {
async_compile_complete(executing_action_id, queued_time);
});
}
#endif
}

~wasm_interface_impl() = default;

#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
// called from async thread
void async_compile_complete(uint64_t exec_action_id, fc::time_point queued_time) {
if (exec_action_id == executing_action_id) { // is action still executing?
ilog("EOS VM OC tier up compile complete ${t}ms, interrupting non-oc execution id: ${id}",
("t", (fc::time_point::now() - queued_time).count()/1000)("id", exec_action_id));
eos_vm_oc_compile_interrupt = true;
main_thread_timer.expire_now();
}
}
#endif

void apply( const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version, apply_context& context ) {
while (true) {
bool attempt_tierup = false;
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
attempt_tierup = eosvmoc && (eosvmoc_tierup == wasm_interface::vm_oc_enable::oc_all || context.should_use_eos_vm_oc());
++executing_action_id;
auto ex = fc::make_scoped_exit([&]() {
eos_vm_oc_compile_interrupt = false;
++executing_action_id; // indicate no longer executing
});
if (attempt_tierup) {
const chain::eosvmoc::code_descriptor* cd = nullptr;
chain::eosvmoc::code_cache_base::get_cd_failure failure = chain::eosvmoc::code_cache_base::get_cd_failure::temporary;
try {
// Ideally all validator nodes would switch to using oc before block producer nodes so that validators
// are never overwhelmed. Compile whitelisted account contracts first on non-produced blocks. This makes
// it more likely that validators will switch to the oc compiled contract before the block producer runs
// an action for the contract with oc.
chain::eosvmoc::code_cache_async::mode m;
m.whitelisted = context.is_eos_vm_oc_whitelisted();
m.high_priority = m.whitelisted && context.is_applying_block();
m.write_window = context.control.is_write_window();
auto timer_pause = fc::make_scoped_exit([&](){
context.trx_context.resume_billing_timer();
});
context.trx_context.pause_billing_timer();
cd = eosvmoc->cc.get_descriptor_for_code(m, executing_action_id, code_hash, vm_version, failure);
if (wasm_interface::test_disable_tierup)
cd = nullptr;
} catch (...) {
// swallow errors here, if EOS VM OC has gone in to the weeds we shouldn't bail: continue to try and run baseline
// In the future, consider moving bits of EOS VM that can fire exceptions and such out of this call path
static bool once_is_enough;
if (!once_is_enough)
elog("EOS VM OC has encountered an unexpected failure");
once_is_enough = true;
}
if (cd) {
if (!context.is_applying_block()) // read_only_trx_test.py looks for this log statement
tlog("${a} speculatively executing ${i} ${h} with eos vm oc", ("a", context.get_receiver())("h", code_hash));
eosvmoc->exec->execute(*cd, *eosvmoc->mem, context);
break;
}
}
#endif
auto start = fc::time_point::now();
try {
get_instantiated_module(code_hash, vm_type, vm_version, context.trx_context)->apply(context);
} catch (const interrupt_exception& e) {
if (attempt_tierup && context.is_applying_block() && eos_vm_oc_compile_interrupt) {
wlog("EOS VM OC compile complete id: ${id}, interrupt of ${r} <= ${a}::${act} after ${t}ms code ${h}",
("id", executing_action_id.load())("r", context.get_receiver())("a", context.get_action().account)
("act", context.get_action().name)("t", (fc::time_point::now()-start).count()/1000)("h", code_hash));
// currently no time limit on apply block, however timer does need to be reset
context.trx_context.resume_billing_timer(start);
continue; // attempt tierup again now that compile is complete
}
throw;
}
break;
}
}

// used for testing
uint64_t get_executing_action_id() const { return executing_action_id; }

bool is_code_cached(const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) const {
// This method is only called from tests; performance is not critical.
// No need for an additional check if we should lock or not.
Expand Down Expand Up @@ -212,7 +296,11 @@ struct eosvmoc_tier {
wasm_cache_index wasm_instantiation_cache;

const chainbase::database& db;
platform_timer& main_thread_timer;
const wasm_interface::vm_type wasm_runtime_time;
const wasm_interface::vm_oc_enable eosvmoc_tierup;
std::atomic<uint32_t> executing_action_id{0}; // monotonic increasing for each action apply, doesn't matter if it wraps
std::atomic<bool> eos_vm_oc_compile_interrupt{false};

#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
std::unique_ptr<struct eosvmoc_tier> eosvmoc{nullptr}; // used by all threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include <boost/interprocess/mem_algo/rbtree_best_fit.hpp>
#include <boost/asio/local/datagram_protocol.hpp>


#include <thread>

namespace std {
Expand Down Expand Up @@ -47,6 +46,9 @@ using allocator_t = bip::rbtree_best_fit<bip::null_mutex_family, bip::offset_ptr

struct config;

inline size_t hash_value(const eosio::chain::eosvmoc::code_tuple& ct) {
return boost::hash<eosio::chain::eosvmoc::code_tuple>()(ct);
}

class code_cache_base {
public:
Expand Down Expand Up @@ -92,22 +94,30 @@ class code_cache_base {

std::filesystem::path _cache_file_path;
int _cache_fd;
std::atomic<uint64_t> _executing_id{0}; // id of executing action

io_context _ctx;
local::datagram_protocol::socket _compile_monitor_write_socket{_ctx};
local::datagram_protocol::socket _compile_monitor_write_socket{_ctx}; // protected by _mtx for async
local::datagram_protocol::socket _compile_monitor_read_socket{_ctx};

struct queued_compile_entry {
compile_wasm_message msg;
std::vector<wrapped_fd> fds_to_pass;

const code_tuple& code() const { return msg.code; }
};
//these are really only useful to the async code cache, but keep them here so free_code can be shared
using queued_compilies_t = boost::multi_index_container<
compile_wasm_message,
queued_compile_entry,
indexed_by<
sequenced<>,
hashed_unique<tag<by_hash>,
member<compile_wasm_message, code_tuple, &compile_wasm_message::code>>
const_mem_fun<queued_compile_entry, const code_tuple&, &queued_compile_entry::code>>
>
>;
queued_compilies_t _queued_compiles;
std::unordered_map<code_tuple, bool> _outstanding_compiles_and_poison;
std::mutex _mtx;
queued_compilies_t _queued_compiles; // protected by _mtx
std::unordered_map<code_tuple, bool> _outstanding_compiles_and_poison; // protected by _mtx

size_t _free_bytes_eviction_threshold;
void check_eviction_threshold(size_t free_bytes);
Expand All @@ -121,21 +131,30 @@ class code_cache_base {

class code_cache_async : public code_cache_base {
public:
code_cache_async(const std::filesystem::path& data_dir, const eosvmoc::config& eosvmoc_config, const chainbase::database& db);
// called from async thread, provides executing_action_id of any compiles spawned by get_descriptor_for_code
using compile_complete_callback = std::function<void(uint64_t, fc::time_point)>;

code_cache_async(const std::filesystem::path& data_dir, const eosvmoc::config& eosvmoc_config,
const chainbase::database& db, compile_complete_callback cb);
~code_cache_async();

//If code is in cache: returns pointer & bumps to front of MRU list
//If code is not in cache, and not blacklisted, and not currently compiling: return nullptr and kick off compile
//otherwise: return nullptr
const code_descriptor* const get_descriptor_for_code(mode m, const digest_type& code_id, const uint8_t& vm_version, get_cd_failure& failure);
const code_descriptor* const
get_descriptor_for_code(mode m, uint64_t executing_action_id,
const digest_type& code_id, const uint8_t& vm_version, get_cd_failure& failure);

private:
compile_complete_callback _compile_complete_func; // called from async thread, provides executing_action_id
std::thread _monitor_reply_thread;
boost::lockfree::spsc_queue<wasm_compilation_result_message> _result_queue;
void wait_on_compile_monitor_message();
std::tuple<size_t, size_t> consume_compile_thread_queue();
std::unordered_set<code_tuple> _blacklist;
size_t _threads;

void wait_on_compile_monitor_message();
std::tuple<size_t, size_t> consume_compile_thread_queue();
void process_queued_compiles();
};

class code_cache_sync : public code_cache_base {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ struct code_tuple {

struct compile_wasm_message {
code_tuple code;
uint64_t executing_action_id{0}; // action id that initiated the compilation
fc::time_point queued_time; // when compilation was queued to begin
std::optional<eosvmoc::subjective_compile_limits> limits;
//Two sent fd: 1) communication socket for result, 2) the wasm to compile
};
Expand All @@ -35,6 +37,8 @@ struct code_compilation_result_message {
unsigned apply_offset;
int starting_memory_pages;
unsigned initdata_prologue_size;
uint64_t executing_action_id{0}; // action id that initiated the compilation
fc::time_point queued_time; // when compilation was queued to begin
//Two sent fds: 1) wasm code, 2) initial memory snapshot
};

Expand All @@ -50,6 +54,8 @@ struct wasm_compilation_result_message {
code_tuple code;
wasm_compilation_result result;
size_t cache_free_bytes;
uint64_t executing_action_id{0}; // action id that initiated the compilation, copied from compile_wasm_message
fc::time_point queued_time; // when compilation was queued to begin, copied from compile_wasm_message
};

using eosvmoc_message = std::variant<initialize_message,
Expand All @@ -63,9 +69,9 @@ using eosvmoc_message = std::variant<initialize_message,
FC_REFLECT(eosio::chain::eosvmoc::initialize_message, )
FC_REFLECT(eosio::chain::eosvmoc::initalize_response_message, (error_message))
FC_REFLECT(eosio::chain::eosvmoc::code_tuple, (code_id)(vm_version))
FC_REFLECT(eosio::chain::eosvmoc::compile_wasm_message, (code)(limits))
FC_REFLECT(eosio::chain::eosvmoc::compile_wasm_message, (code)(executing_action_id)(queued_time)(limits))
FC_REFLECT(eosio::chain::eosvmoc::evict_wasms_message, (codes))
FC_REFLECT(eosio::chain::eosvmoc::code_compilation_result_message, (start)(apply_offset)(starting_memory_pages)(initdata_prologue_size))
FC_REFLECT(eosio::chain::eosvmoc::code_compilation_result_message, (start)(apply_offset)(starting_memory_pages)(initdata_prologue_size)(executing_action_id)(queued_time))
FC_REFLECT(eosio::chain::eosvmoc::compilation_result_unknownfailure, )
FC_REFLECT(eosio::chain::eosvmoc::compilation_result_toofull, )
FC_REFLECT(eosio::chain::eosvmoc::wasm_compilation_result_message, (code)(result)(cache_free_bytes))
FC_REFLECT(eosio::chain::eosvmoc::wasm_compilation_result_message, (code)(result)(cache_free_bytes)(executing_action_id)(queued_time))
44 changes: 8 additions & 36 deletions libraries/chain/wasm_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@

namespace eosio { namespace chain {

wasm_interface::wasm_interface(vm_type vm, vm_oc_enable eosvmoc_tierup, const chainbase::database& d, const std::filesystem::path data_dir, const eosvmoc::config& eosvmoc_config, bool profile)
: eosvmoc_tierup(eosvmoc_tierup), my( new wasm_interface_impl(vm, eosvmoc_tierup, d, data_dir, eosvmoc_config, profile) ) {}
wasm_interface::wasm_interface(vm_type vm, vm_oc_enable eosvmoc_tierup, const chainbase::database& d,
platform_timer& main_thread_timer, const std::filesystem::path data_dir, const eosvmoc::config& eosvmoc_config, bool profile)
: my( new wasm_interface_impl(vm, eosvmoc_tierup, d, main_thread_timer, data_dir, eosvmoc_config, profile) ) {}

wasm_interface::~wasm_interface() {}

Expand Down Expand Up @@ -87,40 +88,7 @@ namespace eosio { namespace chain {
void wasm_interface::apply( const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version, apply_context& context ) {
if (substitute_apply && substitute_apply(code_hash, vm_type, vm_version, context))
return;
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
if (my->eosvmoc && (eosvmoc_tierup == wasm_interface::vm_oc_enable::oc_all || context.should_use_eos_vm_oc())) {
const chain::eosvmoc::code_descriptor* cd = nullptr;
chain::eosvmoc::code_cache_base::get_cd_failure failure = chain::eosvmoc::code_cache_base::get_cd_failure::temporary;
try {
// Ideally all validator nodes would switch to using oc before block producer nodes so that validators
// are never overwhelmed. Compile whitelisted account contracts first on non-produced blocks. This makes
// it more likely that validators will switch to the oc compiled contract before the block producer runs
// an action for the contract with oc.
chain::eosvmoc::code_cache_async::mode m;
m.whitelisted = context.is_eos_vm_oc_whitelisted();
m.high_priority = m.whitelisted && context.is_applying_block();
m.write_window = context.control.is_write_window();
cd = my->eosvmoc->cc.get_descriptor_for_code(m, code_hash, vm_version, failure);
if (test_disable_tierup)
cd = nullptr;
} catch (...) {
// swallow errors here, if EOS VM OC has gone in to the weeds we shouldn't bail: continue to try and run baseline
// In the future, consider moving bits of EOS VM that can fire exceptions and such out of this call path
static bool once_is_enough;
if (!once_is_enough)
elog("EOS VM OC has encountered an unexpected failure");
once_is_enough = true;
}
if (cd) {
if (!context.is_applying_block()) // read_only_trx_test.py looks for this log statement
tlog("${a} speculatively executing ${h} with eos vm oc", ("a", context.get_receiver())("h", code_hash));
my->eosvmoc->exec->execute(*cd, *my->eosvmoc->mem, context);
return;
}
}
#endif

my->get_instantiated_module(code_hash, vm_type, vm_version, context.trx_context)->apply(context);
my->apply( code_hash, vm_type, vm_version, context );
}

bool wasm_interface::is_code_cached(const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) const {
Expand All @@ -131,6 +99,10 @@ namespace eosio { namespace chain {
bool wasm_interface::is_eos_vm_oc_enabled() const {
return my->is_eos_vm_oc_enabled();
}

uint64_t wasm_interface::get_executing_action_id() const {
return my->get_executing_action_id();
}
#endif

wasm_instantiated_module_interface::~wasm_instantiated_module_interface() = default;
Expand Down
Loading

0 comments on commit 9a6c881

Please sign in to comment.