diff --git a/src/common/json_helper.h b/src/common/json_helper.h index 4329bd2a8a..cbd94af945 100644 --- a/src/common/json_helper.h +++ b/src/common/json_helper.h @@ -237,6 +237,12 @@ JSON_DECODE_ENTRIES(input, t, __VA_ARGS__); \ } +#define JSON_ENCODE_OBJ(writer, name, ...) \ + do { \ + writer.Key(#name); \ + dsn::json::json_encode(writer, __VA_ARGS__); \ + } while (0) + namespace dsn { namespace json { diff --git a/src/replica/duplication/replica_duplicator.h b/src/replica/duplication/replica_duplicator.h index ebf4473b99..e9df7d7cb6 100644 --- a/src/replica/duplication/replica_duplicator.h +++ b/src/replica/duplication/replica_duplicator.h @@ -23,6 +23,7 @@ #include #include "common//duplication_common.h" +#include "common/json_helper.h" #include "common/replication_other_types.h" #include "duplication_types.h" #include "replica/replica_base.h" @@ -143,6 +144,25 @@ class replica_duplicator : public replica_base, public pipeline::base void set_duplication_plog_checking(bool checking); + // Encode current progress of this duplication into json. + template + void encode_progress(TWriter &writer) const + { + writer.StartObject(); + + JSON_ENCODE_OBJ(writer, dupid, _id); + JSON_ENCODE_OBJ(writer, remote_cluster_name, _remote_cluster_name); + JSON_ENCODE_OBJ(writer, remote_app_name, _remote_app_name); + + { + zauto_read_lock l(_lock); + JSON_ENCODE_OBJ(writer, confirmed_decree, _progress.last_decree); + JSON_ENCODE_OBJ(writer, persisted_decree, _progress.confirmed_decree); + } + + writer.EndObject(); + } + private: friend class duplication_test_base; friend class replica_duplicator_test; diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index d60bf57b20..9d2153559a 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -22,7 +22,11 @@ #include "common//duplication_common.h" #include "common/gpid.h" +#include "common/replication_enums.h" +#include "metadata_types.h" #include "replica/duplication/replica_duplicator.h" +#include "replica/duplication/replica_duplicator_manager.h" +#include "replica/replica.h" #include "replica_duplicator_manager.h" #include "utils/autoref_ptr.h" #include "utils/errors.h" @@ -41,29 +45,56 @@ replica_duplicator_manager::replica_duplicator_manager(replica *r) { } +void replica_duplicator_manager::update_duplication_map( + const std::map &new_dup_map) +{ + if (new_dup_map.empty() || _replica->status() != partition_status::PS_PRIMARY) { + remove_all_duplications(); + return; + } + + remove_non_existed_duplications(new_dup_map); + + for (const auto &kv2 : new_dup_map) { + sync_duplication(kv2.second); + } +} + std::vector replica_duplicator_manager::get_duplication_confirms_to_update() const { zauto_lock l(_lock); std::vector updates; - for (const auto &kv : _duplications) { - replica_duplicator *duplicator = kv.second.get(); - duplication_progress p = duplicator->progress(); - if (p.last_decree != p.confirmed_decree || - (kv.second->status() == duplication_status::DS_PREPARE && p.checkpoint_has_prepared)) { - if (p.last_decree < p.confirmed_decree) { - LOG_ERROR_PREFIX("invalid decree state: p.last_decree({}) < p.confirmed_decree({})", - p.last_decree, - p.confirmed_decree); - continue; - } - duplication_confirm_entry entry; - entry.dupid = duplicator->id(); - entry.confirmed_decree = p.last_decree; - entry.__set_checkpoint_prepared(p.checkpoint_has_prepared); - updates.emplace_back(entry); + for (const auto & [ _, dup ] : _duplications) { + // There are two conditions when we should send confirmed decrees to meta server to update + // the progress: + // + // 1. the acknowledged decree from remote cluster has changed, making it different from + // the one that is persisted in zk by meta server; otherwise, + // + // 2. the duplication has been in the stage of synchronizing checkpoint to the remote + // cluster, and the synchronized checkpoint has been ready. + const auto &progress = dup->progress(); + if (progress.last_decree == progress.confirmed_decree && + (dup->status() != duplication_status::DS_PREPARE || + !progress.checkpoint_has_prepared)) { + continue; } + + if (progress.last_decree < progress.confirmed_decree) { + LOG_ERROR_PREFIX( + "invalid decree state: progress.last_decree({}) < progress.confirmed_decree({})", + progress.last_decree, + progress.confirmed_decree); + continue; + } + + duplication_confirm_entry entry; + entry.dupid = dup->id(); + entry.confirmed_decree = progress.last_decree; + entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared); + updates.emplace_back(entry); } return updates; } @@ -191,5 +222,17 @@ replica_duplicator_manager::get_dup_states() const return ret; } +void replica_duplicator_manager::remove_all_duplications() +{ + // fast path + if (_duplications.empty()) { + return; + } + + LOG_WARNING_PREFIX("remove all duplication, replica status = {}", + enum_to_string(_replica->status())); + _duplications.clear(); +} + } // namespace replication } // namespace dsn diff --git a/src/replica/duplication/replica_duplicator_manager.h b/src/replica/duplication/replica_duplicator_manager.h index 51bcbd1e1d..413176a16f 100644 --- a/src/replica/duplication/replica_duplicator_manager.h +++ b/src/replica/duplication/replica_duplicator_manager.h @@ -24,19 +24,16 @@ #include #include "common//duplication_common.h" -#include "common/replication_enums.h" #include "common/replication_other_types.h" #include "duplication_types.h" -#include "metadata_types.h" -#include "replica/replica.h" #include "replica/replica_base.h" #include "replica_duplicator.h" -#include "utils/fmt_logging.h" #include "utils/metrics.h" #include "utils/zlocks.h" namespace dsn { namespace replication { +class replica; /// replica_duplicator_manager manages the set of duplications on this replica. /// \see duplication_sync_timer @@ -51,19 +48,7 @@ class replica_duplicator_manager : public replica_base // - replica is not primary on replica-server perspective (status != PRIMARY) // - replica is not primary on meta-server perspective (progress.find(partition_id) == end()) // - the app is not assigned with duplication (dup_map.empty()) - void update_duplication_map(const std::map &new_dup_map) - { - if (new_dup_map.empty() || _replica->status() != partition_status::PS_PRIMARY) { - remove_all_duplications(); - return; - } - - remove_non_existed_duplications(new_dup_map); - - for (const auto &kv2 : new_dup_map) { - sync_duplication(kv2.second); - } - } + void update_duplication_map(const std::map &new_dup_map); /// collect updated duplication confirm points from this replica. std::vector get_duplication_confirms_to_update() const; @@ -93,21 +78,30 @@ class replica_duplicator_manager : public replica_base }; std::vector get_dup_states() const; + // Encode current progress of all duplication into json. + template + void encode_progress(TWriter &writer) const + { + zauto_lock l(_lock); + + if (_duplications.empty()) { + return; + } + + writer.Key("duplications"); + writer.StartArray(); + for (const auto & [ _, dup ] : _duplications) { + dup->encode_progress(writer); + } + writer.EndArray(); + } + private: void sync_duplication(const duplication_entry &ent); void remove_non_existed_duplications(const std::map &); - void remove_all_duplications() - { - // fast path - if (_duplications.empty()) - return; - - LOG_WARNING_PREFIX("remove all duplication, replica status = {}", - enum_to_string(_replica->status())); - _duplications.clear(); - } + void remove_all_duplications(); private: friend class duplication_sync_timer_test; diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 5bb1f17b83..be31df14fd 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -41,10 +41,10 @@ #include "common/replication_common.h" #include "common/replication_enums.h" #include "consensus_types.h" -#include "duplication/replica_duplicator_manager.h" #include "duplication/replica_follower.h" #include "mutation.h" #include "mutation_log.h" +#include "replica/duplication/replica_duplicator_manager.h" #include "replica/prepare_list.h" #include "replica/replica_context.h" #include "replica/replication_app_base.h" @@ -578,6 +578,10 @@ mutation_ptr replica::new_mutation(decree decree) return mu; } +decree replica::last_applied_decree() const { return _app->last_committed_decree(); } + +decree replica::last_flushed_decree() const { return _app->last_flushed_decree(); } + decree replica::last_durable_decree() const { return _app->last_durable_decree(); } decree replica::last_prepared_decree() const diff --git a/src/replica/replica.h b/src/replica/replica.h index 3b90641cdd..ae0118dc05 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -35,8 +35,10 @@ #include #include +#include "common/json_helper.h" #include "common/replication_other_types.h" #include "dsn.layer2_types.h" +#include "duplication/replica_duplicator_manager.h" // IWYU pragma: keep #include "meta_admin_types.h" #include "metadata_types.h" #include "mutation.h" @@ -96,7 +98,6 @@ class replica; class replica_backup_manager; class replica_bulk_loader; class replica_disk_migrator; -class replica_duplicator_manager; class replica_follower; class replica_split_manager; class replica_stub; @@ -223,8 +224,37 @@ class replica : public serverlet, public ref_counter, public replica_ba const app_info *get_app_info() const { return &_app_info; } decree max_prepared_decree() const { return _prepare_list->max_decree(); } decree last_committed_decree() const { return _prepare_list->last_committed_decree(); } + + // The last decree that has been applied into rocksdb memtable. + decree last_applied_decree() const; + + // The last decree that has been flushed into rocksdb sst. + decree last_flushed_decree() const; + decree last_prepared_decree() const; decree last_durable_decree() const; + + // Encode current progress of decrees into json, including both local writes and duplications + // of this replica. + template + void encode_progress(TWriter &writer) const + { + writer.StartObject(); + + JSON_ENCODE_OBJ(writer, max_prepared_decree, max_prepared_decree()); + JSON_ENCODE_OBJ(writer, max_plog_decree, _private_log->max_decree(get_gpid())); + JSON_ENCODE_OBJ(writer, max_plog_commit_on_disk, _private_log->max_commit_on_disk()); + JSON_ENCODE_OBJ(writer, last_committed_decree, last_committed_decree()); + JSON_ENCODE_OBJ(writer, last_applied_decree, last_applied_decree()); + JSON_ENCODE_OBJ(writer, last_flushed_decree, last_flushed_decree()); + JSON_ENCODE_OBJ(writer, last_durable_decree, last_durable_decree()); + JSON_ENCODE_OBJ(writer, max_gc_decree, _private_log->max_gced_decree(get_gpid())); + + _duplication_mgr->encode_progress(writer); + + writer.EndObject(); + } + const std::string &dir() const { return _dir; } uint64_t create_time_milliseconds() const { return _create_time_ms; } const char *name() const { return replica_name(); } @@ -429,13 +459,6 @@ class replica : public serverlet, public ref_counter, public replica_ba error_code background_sync_checkpoint(); void catch_up_with_private_logs(partition_status::type s); void on_checkpoint_completed(error_code err); - void on_copy_checkpoint_ack(error_code err, - const std::shared_ptr &req, - const std::shared_ptr &resp); - void on_copy_checkpoint_file_completed(error_code err, - size_t sz, - std::shared_ptr resp, - const std::string &chk_dir); // Enable/Disable plog garbage collection to be executed. For example, to duplicate data // to target cluster, we could firstly disable plog garbage collection, then do copy_data. diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 3558fb7a45..a12c22efb6 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -29,6 +29,7 @@ // IWYU pragma: no_include #include #include +#include #include #include #include @@ -37,8 +38,8 @@ #include #include #include -#include #include +#include #include #include @@ -47,6 +48,7 @@ #include "bulk_load/replica_bulk_loader.h" #include "common/backup_common.h" #include "common/duplication_common.h" +#include "common/json_helper.h" #include "common/replication.codes.h" #include "common/replication_enums.h" #include "disk_cleaner.h" @@ -2335,6 +2337,22 @@ void replica_stub::register_ctrl_command() }); })); + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.query-progress", + "Query the progress of decrees, including both local writes and duplications for " + "replicas specified by comma-separated list of 'app_id' or 'app_id.partition_id', " + "or all replicas for empty", + "[id1,id2,...]", + [this](const std::vector &args) { + return exec_command_on_replica(args, true, [](const replica_ptr &rep) { + std::ostringstream out; + rapidjson::OStreamWrapper wrapper(out); + dsn::json::PrettyJsonWriter writer(wrapper); + rep->encode_progress(writer); + return out.str(); + }); + })); + #ifdef DSN_ENABLE_GPERF _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _release_tcmalloc_memory, diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h index c3559c095d..2a88618f64 100644 --- a/src/replica/replication_app_base.h +++ b/src/replica/replication_app_base.h @@ -238,7 +238,13 @@ class replication_app_base : public replica_base // // Query methods. // + + // Get the decree of the last flushed mutation. -1 means failed to get. + virtual replication::decree last_flushed_decree() const = 0; + + // Get the decree of the last created checkpoint. virtual replication::decree last_durable_decree() const = 0; + // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK. virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0; diff --git a/src/replica/storage/simple_kv/simple_kv.server.impl.h b/src/replica/storage/simple_kv/simple_kv.server.impl.h index 240ed87899..b296c1f83d 100644 --- a/src/replica/storage/simple_kv/simple_kv.server.impl.h +++ b/src/replica/storage/simple_kv/simple_kv.server.impl.h @@ -70,7 +70,9 @@ class simple_kv_service_impl : public simple_kv_service virtual ::dsn::error_code stop(bool cleanup = false) override; - virtual int64_t last_durable_decree() const override { return _last_durable_decree; } + int64_t last_flushed_decree() const override { return _last_durable_decree; } + + int64_t last_durable_decree() const override { return _last_durable_decree; } virtual ::dsn::error_code sync_checkpoint() override; diff --git a/src/replica/storage/simple_kv/test/simple_kv.server.impl.h b/src/replica/storage/simple_kv/test/simple_kv.server.impl.h index 1235cdbc68..8b80396a02 100644 --- a/src/replica/storage/simple_kv/test/simple_kv.server.impl.h +++ b/src/replica/storage/simple_kv/test/simple_kv.server.impl.h @@ -82,7 +82,9 @@ class simple_kv_service_impl : public application::simple_kv_service virtual ::dsn::error_code stop(bool cleanup = false) override; - virtual int64_t last_durable_decree() const override { return _last_durable_decree; } + int64_t last_flushed_decree() const override { return _last_durable_decree; } + + int64_t last_durable_decree() const override { return _last_durable_decree; } virtual ::dsn::error_code sync_checkpoint() override; diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h index 6d7725b787..cc631143b0 100644 --- a/src/replica/test/mock_utils.h +++ b/src/replica/test/mock_utils.h @@ -83,6 +83,8 @@ class mock_replication_app_base : public replication_app_base // we mock the followings void update_app_envs(const std::map &envs) override { _envs = envs; } void query_app_envs(std::map &out) override { out = _envs; } + + decree last_flushed_decree() const override { return _last_durable_decree; } decree last_durable_decree() const override { return _last_durable_decree; } // TODO(heyuchen): implement this function in further pull request diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 35419d8efb..75c95a8128 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -2168,47 +2168,49 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char } LOG_INFO_PREFIX("copy checkpoint to dir({}) succeed", checkpoint_dir); - if (checkpoint_decree != nullptr) { - rocksdb::DB *snapshot_db = nullptr; - std::vector handles_opened; - auto cleanup = [&](bool remove_checkpoint) { - if (remove_checkpoint && !::dsn::utils::filesystem::remove_path(checkpoint_dir)) { - LOG_ERROR_PREFIX("remove checkpoint directory {} failed", checkpoint_dir); - } - if (snapshot_db) { - for (auto handle : handles_opened) { - if (handle) { - snapshot_db->DestroyColumnFamilyHandle(handle); - handle = nullptr; - } + if (checkpoint_decree == nullptr) { + return ::dsn::ERR_OK; + } + + rocksdb::DB *snapshot_db = nullptr; + std::vector handles_opened; + auto cleanup = [&](bool remove_checkpoint) { + if (remove_checkpoint && !::dsn::utils::filesystem::remove_path(checkpoint_dir)) { + LOG_ERROR_PREFIX("remove checkpoint directory {} failed", checkpoint_dir); + } + if (snapshot_db) { + for (auto handle : handles_opened) { + if (handle) { + snapshot_db->DestroyColumnFamilyHandle(handle); + handle = nullptr; } - delete snapshot_db; - snapshot_db = nullptr; } - }; - - // Because of RocksDB's restriction, we have to to open default column family even though - // not use it - std::vector column_families( - {{meta_store::DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}, - {meta_store::META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}}); - status = rocksdb::DB::OpenForReadOnly( - _db_opts, checkpoint_dir, column_families, &handles_opened, &snapshot_db); - if (!status.ok()) { - LOG_ERROR_PREFIX( - "OpenForReadOnly from {} failed, error = {}", checkpoint_dir, status.ToString()); + delete snapshot_db; snapshot_db = nullptr; - cleanup(true); - return ::dsn::ERR_LOCAL_APP_FAILURE; } - CHECK_EQ_PREFIX(handles_opened.size(), 2); - CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME); - uint64_t last_flushed_decree = - _meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]); - *checkpoint_decree = last_flushed_decree; + }; - cleanup(false); + // Because of RocksDB's restriction, we have to to open default column family even though + // not use it + std::vector column_families( + {{meta_store::DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}, + {meta_store::META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}}); + status = rocksdb::DB::OpenForReadOnly( + _db_opts, checkpoint_dir, column_families, &handles_opened, &snapshot_db); + if (!status.ok()) { + LOG_ERROR_PREFIX( + "OpenForReadOnly from {} failed, error = {}", checkpoint_dir, status.ToString()); + snapshot_db = nullptr; + cleanup(true); + return ::dsn::ERR_LOCAL_APP_FAILURE; } + CHECK_EQ_PREFIX(handles_opened.size(), 2); + CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME); + uint64_t last_flushed_decree = + _meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]); + *checkpoint_decree = last_flushed_decree; + + cleanup(false); return ::dsn::ERR_OK; } @@ -2318,6 +2320,17 @@ pegasus_server_impl::storage_apply_checkpoint(chkpt_apply_mode mode, return ::dsn::ERR_OK; } +int64_t pegasus_server_impl::last_flushed_decree() const +{ + uint64_t decree = 0; + const auto &err = _meta_store->get_last_flushed_decree(&decree); + if (dsn_unlikely(err != dsn::ERR_OK)) { + return -1; + } + + return static_cast(decree); +} + bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_type, const ::dsn::blob &filter_pattern, const ::dsn::blob &value) diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 361d9cbbae..d902e647dc 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -223,6 +223,8 @@ class pegasus_server_impl : public pegasus_read_service ::dsn::error_code storage_apply_checkpoint(chkpt_apply_mode mode, const dsn::replication::learn_state &state) override; + int64_t last_flushed_decree() const override; + int64_t last_durable_decree() const override { return _last_durable_decree.load(); } void update_app_envs(const std::map &envs) override;