Skip to content

Commit

Permalink
fix(duplication): add warning message while trying to add a duplicati…
Browse files Browse the repository at this point in the history
…on that has been existing for the same table with the same remote cluster (#2038)

#2039
  • Loading branch information
empiredan authored Jun 4, 2024
1 parent dbe66a2 commit 6c96b5a
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 41 deletions.
26 changes: 12 additions & 14 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ void meta_duplication_service::add_duplication(duplication_add_rpc rpc)

std::shared_ptr<app_state> app;
duplication_info_s_ptr dup;
error_code resp_err = ERR_OK;
{
zauto_read_lock l(app_lock());

Expand Down Expand Up @@ -273,13 +274,13 @@ void meta_duplication_service::add_duplication(duplication_add_rpc rpc)

if (dup) {
// The duplication for the same app to the same remote cluster has existed.
remote_app_name = dup->remote_app_name;
remote_replica_count = dup->remote_replica_count;
LOG_INFO("no need to add duplication, since it has existed: app_name={}, "
resp_err = ERR_DUP_EXIST;
LOG_INFO("[{}] duplication has been existing: app_name={}, "
"remote_cluster_name={}, remote_app_name={}",
dup->log_prefix(),
request.app_name,
request.remote_cluster_name,
remote_app_name);
dup->remote_app_name);
} else {
// Check if other apps of this cluster are duplicated to the same remote app.
for (const auto & [ app_name, cur_app_state ] : _state->_exist_apps) {
Expand Down Expand Up @@ -313,15 +314,14 @@ void meta_duplication_service::add_duplication(duplication_add_rpc rpc)
app);
}

do_add_duplication(app, dup, rpc, remote_app_name, remote_replica_count);
do_add_duplication(app, dup, rpc, resp_err);
}

// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_add_rpc &rpc,
const std::string &remote_app_name,
const int32_t remote_replica_count)
const error_code &resp_err)
{
const auto &ec = dup->start(rpc.request().is_duplicating_checkpoint);
LOG_ERROR_DUP_HINT_AND_RETURN_IF_NOT(ec == ERR_OK,
Expand All @@ -335,10 +335,8 @@ void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
auto value = dup->to_json_blob();
std::queue<std::string> nodes({get_duplication_path(*app), std::to_string(dup->id)});
_meta_svc->get_meta_storage()->create_node_recursively(
std::move(nodes),
std::move(value),
[app, this, dup, rpc, remote_app_name, remote_replica_count]() mutable {
LOG_INFO("[{}] add duplication successfully [app_name: {}, follower: {}]",
std::move(nodes), std::move(value), [app, this, dup, rpc, resp_err]() mutable {
LOG_INFO("[{}] add duplication successfully [app_name: {}, remote_cluster_name: {}]",
dup->log_prefix(),
app->app_name,
dup->remote_cluster_name);
Expand All @@ -347,11 +345,11 @@ void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
dup->persist_status();

auto &resp = rpc.response();
resp.err = ERR_OK;
resp.err = resp_err;
resp.appid = app->app_id;
resp.dupid = dup->id;
resp.__set_remote_app_name(remote_app_name);
resp.__set_remote_replica_count(remote_replica_count);
resp.__set_remote_app_name(dup->remote_app_name);
resp.__set_remote_replica_count(dup->remote_replica_count);

zauto_write_lock l(app_lock());
refresh_duplicating_no_lock(app);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/duplication/meta_duplication_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "utils/fmt_logging.h"

namespace dsn {
class error_code;
class host_port;
class zrwlock_nr;

Expand Down Expand Up @@ -81,8 +82,7 @@ class meta_duplication_service
void do_add_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_add_rpc &rpc,
const std::string &remote_app_name,
const int32_t remote_replica_count);
const error_code &resp_err);

void do_modify_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
Expand Down
31 changes: 18 additions & 13 deletions src/meta/test/meta_duplication_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "meta/server_state.h"
#include "meta/test/misc/misc.h"
#include "meta_test_base.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/blob.h"
Expand Down Expand Up @@ -399,7 +400,7 @@ class meta_duplication_service_test : public meta_test_base
struct TestData
{
std::string app_name;
std::string remote;
std::string remote_cluster_name;

bool specified;
std::string remote_app_name;
Expand All @@ -414,13 +415,14 @@ class meta_duplication_service_test : public meta_test_base
kTestRemoteAppName,
kTestRemoteReplicaCount,
ERR_OK},
// A duplication that has been added would be found with its original remote_app_name.
// Add a duplication that has been existing for the same table with the same remote
// cluster.
{kTestAppName,
kTestRemoteClusterName,
true,
false,
kTestRemoteAppName,
kTestRemoteReplicaCount,
ERR_OK},
ERR_DUP_EXIST},
// The general case that duplicating to remote cluster with same remote_app_name.
{kTestSameAppName,
kTestRemoteClusterName,
Expand Down Expand Up @@ -477,10 +479,12 @@ class meta_duplication_service_test : public meta_test_base
for (auto test : tests) {
duplication_add_response resp;
if (test.specified) {
resp = create_dup(
test.app_name, test.remote, test.remote_app_name, test.remote_replica_count);
resp = create_dup(test.app_name,
test.remote_cluster_name,
test.remote_app_name,
test.remote_replica_count);
} else {
resp = create_dup_unspecified(test.app_name, test.remote);
resp = create_dup_unspecified(test.app_name, test.remote_cluster_name);
}

ASSERT_EQ(test.wec, resp.err);
Expand All @@ -494,7 +498,7 @@ class meta_duplication_service_test : public meta_test_base
ASSERT_TRUE(dup != nullptr);
ASSERT_EQ(app->app_id, dup->app_id);
ASSERT_EQ(duplication_status::DS_PREPARE, dup->_status);
ASSERT_EQ(test.remote, dup->remote_cluster_name);
ASSERT_EQ(test.remote_cluster_name, dup->remote_cluster_name);
ASSERT_EQ(test.remote_app_name, resp.remote_app_name);
ASSERT_EQ(test.remote_app_name, dup->remote_app_name);
ASSERT_EQ(test.remote_replica_count, resp.remote_replica_count);
Expand Down Expand Up @@ -524,23 +528,24 @@ TEST_F(meta_duplication_service_test, dup_op_upon_unavail_app)
create_app(test_app_unavail);
find_app(test_app_unavail)->status = app_status::AS_DROPPED;

dupid_t test_dup = create_dup(kTestAppName).dupid;

struct TestData
{
std::string app;

error_code wec;
} tests[] = {
{test_app_not_exist, ERR_APP_NOT_EXIST},
{test_app_unavail, ERR_APP_NOT_EXIST},

{kTestAppName, ERR_OK},
};

for (auto test : tests) {
const auto &resp = create_dup(test.app);
ASSERT_EQ(test.wec, resp.err);

ASSERT_EQ(test.wec, query_dup_info(test.app).err);
ASSERT_EQ(test.wec, create_dup(test.app).err);

// For the response with some error, `dupid` doesn't matter.
dupid_t test_dup = test.wec == ERR_OK ? resp.dupid : static_cast<dupid_t>(dsn_now_s());
ASSERT_EQ(test.wec,
change_dup_status(test.app, test_dup, duplication_status::DS_REMOVED).err);
}
Expand Down
11 changes: 8 additions & 3 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,21 @@
"ERROR: {}\n", \
fmt::format(msg, ##__VA_ARGS__))
#define SHELL_PRINTLN_WARNING(msg, ...) \
#define SHELL_PRINT_WARNING_BASE(msg, ...) \
fmt::print(stdout, \
fmt::emphasis::bold | fmt::fg(fmt::color::yellow), \
"WARNING: {}\n", \
"WARNING: {}", \
fmt::format(msg, ##__VA_ARGS__))
#define SHELL_PRINT_WARNING(msg, ...) SHELL_PRINT_WARNING_BASE(msg, ##__VA_ARGS__)
#define SHELL_PRINTLN_WARNING(msg, ...) \
SHELL_PRINT_WARNING_BASE("{}\n", fmt::format(msg, ##__VA_ARGS__))
#define SHELL_PRINT_OK_BASE(msg, ...) \
fmt::print(stdout, fmt::emphasis::bold | fmt::fg(fmt::color::green), msg, ##__VA_ARGS__)
#define SHELL_PRINT_OK(msg, ...) SHELL_PRINT_OK_BASE("{}", fmt::format(msg, ##__VA_ARGS__))
#define SHELL_PRINT_OK(msg, ...) SHELL_PRINT_OK_BASE(msg, ##__VA_ARGS__)
#define SHELL_PRINTLN_OK(msg, ...) SHELL_PRINT_OK_BASE("{}\n", fmt::format(msg, ##__VA_ARGS__))
Expand Down
25 changes: 16 additions & 9 deletions src/shell/commands/duplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ bool add_dup(command_executor *e, shell_context *sc, arguments args)
hint = err_resp.get_value().hint;
}

if (!err) {
if (!err && err.code() != dsn::ERR_DUP_EXIST) {
SHELL_PRINTLN_ERROR(
"adding duplication failed [app_name: {}, remote_cluster_name: {}, "
"is_duplicating_checkpoint: {}, remote_app_name: {}, remote_replica_count: {}, "
Expand All @@ -121,15 +121,22 @@ bool add_dup(command_executor *e, shell_context *sc, arguments args)
return true;
}

if (err.code() == dsn::ERR_DUP_EXIST) {
SHELL_PRINT_WARNING("duplication has been existing");
} else {
SHELL_PRINT_OK("adding duplication succeed");
}

const auto &resp = err_resp.get_value();
SHELL_PRINT_OK(
"adding duplication succeed [app_name: {}, remote_cluster_name: {}, appid: {}, dupid: "
"{}, is_duplicating_checkpoint: {}",
app_name,
remote_cluster_name,
resp.appid,
resp.dupid,
is_duplicating_checkpoint);
SHELL_PRINT_OK(" [app_name: {}, remote_cluster_name: {}, appid: {}, dupid: {}",
app_name,
remote_cluster_name,
resp.appid,
resp.dupid);

if (err) {
SHELL_PRINT_OK(", is_duplicating_checkpoint: {}", is_duplicating_checkpoint);
}

if (resp.__isset.remote_app_name) {
SHELL_PRINT_OK(", remote_app_name: {}", resp.remote_app_name);
Expand Down
3 changes: 3 additions & 0 deletions src/utils/error_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ DEFINE_ERR_CODE(ERR_RDB_CORRUPTION)
DEFINE_ERR_CODE(ERR_DISK_IO_ERROR)

DEFINE_ERR_CODE(ERR_CURL_FAILED)

DEFINE_ERR_CODE(ERR_DUP_EXIST)

} // namespace dsn

USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_code);

0 comments on commit 6c96b5a

Please sign in to comment.