Skip to content

Commit

Permalink
PS-9148 feature: Refactored dictionary flusher thread startup / termi…
Browse files Browse the repository at this point in the history
…nation

https://perconadev.atlassian.net/browse/PS-9148

'command_service_tuple' extended with one more member 'thread' which is used
to initialize / deinitialize threads intended to be used as MySQL threads.

'sql_context' class constructor now takes one extra parameter that allows to
specify whether the user wants to associate a new session (including an instance
of the 'THD' class) with the calling thread. Internally it is done by setting the
'MYSQL_COMMAND_LOCAL_THD_HANDLE' option to nullptr.
Also 'sql_context' now tries to open connections on behalf of the internal
predefined 'mysql.session' MySQL user (instead of 'root').

Reworked 'static_sql_context_builder' class - it now creates a shared  "static"
instance of he 'sql_context'  class inside the class constructor and passes true
as its 'initialize_thread' parameter meaning an intent to associate the calling
thread with this connection. Before this change, the construction was done inside
the very first call to 'do_build()'.

The regular ("new instance per request" ) implementation of the
'basic_sql_context_builder', 'default_sql_context_builder', now passes false as
the 'initialize_thread' parameter (meaning no association with the thread needed).

Significantly reworked 'dictionary_flusher_thread':
- instead of composed 'query_cache' object it now expects its component
  'query_cache_core' and 'query_builder' as constructor arguments. This allows
  to create an instance of the 'static_sql_context_builder' and 'query_cache'
  directly inside the thread function.
- Instead of 'stopped_' boolean flag, we now have a state enumeration ('initial'
  'initialization_failure', 'operational', 'stopped')
- the implementation no longer uses 'std::conditional_variable' for awaiting timer
  events / termination requests. Instead, it just wakes up periodically (once per
  second) and checks it it needs to reload the cache. This is necessary to be able
  to respond to graceful termination requests like 'KILL CONNECTION' or shutdown
  closing sessions at shutdown.
- added 'request_termination()' method used inside component 'deinit()' handler.
- 'do_periodic_reload()' function now looks more lake a state machine performing
  different actions and making transitions to different states.
- added new logic to wait for Sessin Server availability inside the
  'do_periodic_reload()' function.

Reworked 'thread_handler_context' class - it now uses 'mysql_command_thread'
service to initialize / deinitialize the thread for MySQL.

Various MTR test case that use dictionary functions updated with explicit
granting necessary privileges on the dictionary table to the
'mysql.session'@'localhost' internal MySQL user.

Added new 'component_masking_functions.flusher_thread_connection_reuse'
MTR test case that checks that the same MySQL internal connection (created
via 'mysql_command_xxx' services) can be used several times (without closing
and re-opening) by the background flusher thread.

Added new 'component_masking_functions.flusher_thread_immediate_restart'
MTR test case that check for proper behavior during server shutdown
immediately after installing the component.

Added new 'wait_for_component_uninstall.inc' MTR include file which can be used
to perform several attempts to 'UNINSTALL COMPONENT' until it succeeds or
reaches the max number of attempts.
  • Loading branch information
percona-ysorokin committed Dec 7, 2024
1 parent e04feb2 commit db12fad
Show file tree
Hide file tree
Showing 33 changed files with 603 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ namespace masking_functions {
// mysql_service_mysql_command_field_info,
// mysql_service_mysql_command_options,
// mysql_service_mysql_command_factory,
// mysql_service_mysql_command_error_info
// mysql_service_mysql_command_error_info,
// mysql_service_mysql_command_thread
// };
// ...
// sql_context ctx{primitive_singleton<mysql_command_query>::instance()};
Expand All @@ -49,6 +50,7 @@ struct command_service_tuple {
SERVICE_TYPE(mysql_command_options) * options;
SERVICE_TYPE(mysql_command_factory) * factory;
SERVICE_TYPE(mysql_command_error_info) * error_info;
SERVICE_TYPE(mysql_command_thread) * thread;
};

} // namespace masking_functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,70 @@

#include "masking_functions/dictionary_flusher_thread_fwd.hpp"

#include <condition_variable>
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>

#include "masking_functions/query_cache_fwd.hpp"
#include "masking_functions/query_builder_fwd.hpp"
#include "masking_functions/query_cache_core_fwd.hpp"

namespace masking_functions {

// Facts that impacted 'dictionary_flusher_thread' design.
// - 'mysql_command_xxx' services cannot be used inside component 'init()' /
// 'deinit()' handlers as that may mess up with the 'THD' object of the
// connection that executes 'INSTALL COMPONENT' / 'UNINSTALL COMPONENT'.
// Therefore, 'INSTALL COMPONENT' cannot immediately reload dictionary
// cache. It can only spawn a background thread that will do this later.
//
// - MySQL internal connection (an instance of the 'sql_context' class,
// which in turn uses 'mysql_command_xxx' services) must not be created
// in the component 'init()' handler but inside the background thread.
// The main reason for this is that it needs to have its own THD
// object associated with it. Internally it is done by seting
// 'MYSQL_COMMAND_LOCAL_THD_HANDLE' option for the internal connection.
//
// - MySQL Service registry is locked inside component 'init()' / 'deinit()'
// handlers. In other words, we cannot instruct component 'init()' handler
// to wait for background thread to initiate the connection as this will
// result in a deadlock.
//
// - Similarly, we cannot instruct component 'deinit()' handler to wait for
// internal connection to be closed using regular means. However, if we
// set the 'MYSQL_NO_LOCK_REGISTRY' for this internal connection, it will
// be closed without trying to lock MySQL Service registry (which is
// already locked by the 'UNINSTALL COMPONENT' logic).
//
// - During startup when server installs components that are marked for
// loading in the Data Dictionary, Session Server is not yet available
// ('srv_session_server_is_available()' returns false) and
// 'mysql_command_xxx' cannot be used immediately. Therefore, the
// first step the background thread needs to do is to wait until the
// Session Server becomes available (until
// 'srv_session_server_is_available()' returns true) and only then
// initiate an internal connection.
//
// - During shutdown MySQL server before uninstalling components tries
// to gracefully close all remaining sessions (those registered in the
// session manager). Our background thread is also in this list as it
// sets 'MYSQL_COMMAND_LOCAL_THD_HANDLE' option for the internal
// connection. Therefore, our background thread needs to respond to
// 'KILL CONNECTION' statement (to setting 'thd->is_killed') because
// otherwise the thread will be killed by force (via 'ptheread_cancel()'
// or similar) which most probably will result in server crash.
//
// - In rare cases when 'UNINSTALL COMPONENT' is called immediately after
// 'INSTALL COMPONENT' (and when background thread has been spawned but
// not yet initialized the connection), the only safe way to avoid
// deadlocks is to let 'UNINSTALL COMPONENT' ('deinit()' handler) to fail
// earlier (without waiting for the background thread to join) by just
// requesting it to stop later (by setting the state to 'stopped').
// Performing several attempts to 'UNINSTALL COMPONENT' should eventually
// succeed.
class dictionary_flusher_thread {
public:
dictionary_flusher_thread(const query_cache_ptr &cache,
dictionary_flusher_thread(const query_cache_core_ptr &cache_core,
const query_builder_ptr &sql_query_builder,
std::uint64_t flush_interval_seconds);
dictionary_flusher_thread(const dictionary_flusher_thread &other) = delete;
dictionary_flusher_thread(dictionary_flusher_thread &&other) = delete;
Expand All @@ -39,13 +91,20 @@ class dictionary_flusher_thread {
delete;
~dictionary_flusher_thread();

// sets state to stopped and returns a flag indicating whether it is safe to
// wait for this thread to join inside the component 'deinit()' handler
// (if before this call the thread was in any state but initial - when the
// internal connection had already been initialized, in other words)
bool request_termination();

private:
query_cache_ptr cache_;
query_cache_core_ptr cache_core_;
query_builder_ptr sql_query_builder_;
std::uint64_t flush_interval_seconds_;

bool stopped_;
std::mutex flusher_mutex_;
std::condition_variable flusher_condition_var_;
enum class state_type;
using atomic_state_type = std::atomic<state_type>;
atomic_state_type state_;

struct jthread_deleter {
void operator()(void *ptr) const noexcept;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class sql_context {
std::function<void(const field_value_container<NumberOfFields> &)>;

sql_context(const command_service_tuple &services,
sql_context_registry_access registry_locking_mode);
sql_context_registry_access registry_locking_mode,
bool initialize_thread);

sql_context(sql_context const &) = delete;
sql_context(sql_context &&) noexcept = default;
Expand All @@ -53,6 +54,8 @@ class sql_context {

~sql_context() = default;

void reset();

const command_service_tuple &get_services() const noexcept {
return *impl_.get_deleter().services;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ namespace masking_functions {
class static_sql_context_builder : public basic_sql_context_builder {
public:
static_sql_context_builder(const command_service_tuple &services,
sql_context_registry_access registry_locking_mode)
: basic_sql_context_builder{services},
registry_locking_mode_{registry_locking_mode},
static_instance_{} {}
sql_context_registry_access registry_locking_mode);
~static_sql_context_builder() override;

private:
sql_context_registry_access registry_locking_mode_;
mutable sql_context_ptr static_instance_;
sql_context_ptr static_instance_;

sql_context_ptr do_build() const override;
};
Expand Down
48 changes: 28 additions & 20 deletions components/masking_functions/src/component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
along with this program; if not, write to the Free Software Foundation,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */

#include <memory>
#include <stdexcept>
#include <string>
#include <utility>

#include <boost/preprocessor/stringize.hpp>

#include <my_dbug.h>

#include <mysql/components/component_implementation.h>

#include <mysql/components/services/component_sys_var_service.h>
Expand All @@ -32,11 +39,9 @@

#include <mysqlpp/udf_error_reporter.hpp>

#include <my_dbug.h>
#include <mysqld_error.h>

#include <sql/current_thd.h>
#include <sql/debug_sync.h>
#include "sql/debug_sync.h"

#include "masking_functions/command_service_tuple.hpp"
#include "masking_functions/component_sys_variable_service_tuple.hpp"
Expand Down Expand Up @@ -72,6 +77,7 @@ REQUIRES_SERVICE_PLACEHOLDER(mysql_command_field_info);
REQUIRES_SERVICE_PLACEHOLDER(mysql_command_options);
REQUIRES_SERVICE_PLACEHOLDER(mysql_command_factory);
REQUIRES_SERVICE_PLACEHOLDER(mysql_command_error_info);
REQUIRES_SERVICE_PLACEHOLDER(mysql_command_thread);

REQUIRES_PSI_THREAD_SERVICE_PLACEHOLDER;

Expand Down Expand Up @@ -133,7 +139,8 @@ static mysql_service_status_t component_init() {
mysql_service_mysql_command_field_info,
mysql_service_mysql_command_options,
mysql_service_mysql_command_factory,
mysql_service_mysql_command_error_info};
mysql_service_mysql_command_error_info,
mysql_service_mysql_command_thread};
masking_functions::primitive_singleton<
masking_functions::component_sys_variable_service_tuple>::instance() =
masking_functions::component_sys_variable_service_tuple{
Expand Down Expand Up @@ -186,26 +193,19 @@ static mysql_service_status_t component_init() {
const auto flush_interval_seconds{
masking_functions::get_flush_interval_seconds()};
if (flush_interval_seconds > 0U) {
auto static_sql_context_builder{
std::make_shared<masking_functions::static_sql_context_builder>(
command_services,
masking_functions::sql_context_registry_access::non_locking)};
auto flusher_cache{std::make_shared<masking_functions::query_cache>(
cache_core, static_sql_context_builder, sql_query_builder)};

auto flusher{
std::make_unique<masking_functions::dictionary_flusher_thread>(
flusher_cache, flush_interval_seconds)};
cache_core, sql_query_builder, flush_interval_seconds)};

masking_functions::primitive_singleton<
masking_functions::dictionary_flusher_thread_ptr>::instance() =
std::move(flusher);

DBUG_EXECUTE_IF("masking_functions_flusher_create", {
DBUG_SET("-d,masking_functions_flusher_create");
const char act[] =
"now WAIT_FOR masking_functions_flusher_create_after_signal";
assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
DBUG_EXECUTE_IF("enable_masking_functions_flusher_create_sync", {
MYSQL_THD extracted_thd{nullptr};
mysql_service_mysql_current_thread_reader->get(&extracted_thd);
assert(extracted_thd != nullptr);
DEBUG_SYNC(extracted_thd, "masking_functions_after_flusher_create");
});
}

Expand All @@ -221,10 +221,17 @@ static mysql_service_status_t component_init() {

static mysql_service_status_t component_deinit() {
int result = 0;
auto &flusher{masking_functions::primitive_singleton<
masking_functions::dictionary_flusher_thread_ptr>::instance()};
if (flusher && !flusher->request_termination()) {
// early return if flusher thread cannot be safely stopped
// (if it hasn't tried to initialize the internal MySQL command service
// connection yet)
result = 1;
return result;
}

masking_functions::primitive_singleton<
masking_functions::dictionary_flusher_thread_ptr>::instance()
.reset();
flusher.reset();

masking_functions::primitive_singleton<
masking_functions::query_cache_ptr>::instance()
Expand Down Expand Up @@ -280,6 +287,7 @@ BEGIN_COMPONENT_REQUIRES(CURRENT_COMPONENT_NAME)
REQUIRES_SERVICE(mysql_command_options),
REQUIRES_SERVICE(mysql_command_factory),
REQUIRES_SERVICE(mysql_command_error_info),
REQUIRES_SERVICE(mysql_command_thread),

REQUIRES_SERVICE(udf_registration),
REQUIRES_SERVICE(dynamic_privilege_register),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
namespace masking_functions {

sql_context_ptr default_sql_context_builder::do_build() const {
return std::make_shared<sql_context>(get_services(), registry_locking_mode_);
return std::make_shared<sql_context>(get_services(), registry_locking_mode_,
false);
}

} // namespace masking_functions
Loading

0 comments on commit db12fad

Please sign in to comment.