From bc7c800702c53037dae8d5478b10f524693e6ceb Mon Sep 17 00:00:00 2001 From: Xiaozhong Pan Date: Fri, 28 Oct 2022 17:06:11 -0700 Subject: [PATCH] ensure bulk load data to LMAX using sstPartitioner Summary: - register custom SstPartitionerFactory to rocksdb - on bulk load, register index number used to the SstPartitionerFactory - on compaction, SstPartitionerFactory create SstParitioner that will split sst files with keys overlapping with bulk load indexes - on bulk load completion, trigger a compaction when sst file ingestion fails and retry Reviewed By: hermanlee Differential Revision: D41586846 fbshipit-source-id: fc33994a58636a1d3baf00607c3e7d2d517545a9 --- .../suite/rocksdb/r/add_index_inplace.result | 3 + .../r/bulk_load_use_sst_partitioner.result | 41 ++ mysql-test/suite/rocksdb/r/rocksdb.result | 1 + .../t/bulk_load_use_sst_partitioner.test | 80 ++++ ...bulk_load_use_sst_partitioner_basic.result | 121 ++++++ ...b_bulk_load_use_sst_partitioner_basic.test | 19 + storage/rocksdb/CMakeLists.txt | 1 + storage/rocksdb/ha_rocksdb.cc | 73 +++- storage/rocksdb/rdb_cf_manager.cc | 7 +- storage/rocksdb/rdb_cf_manager.h | 6 +- storage/rocksdb/rdb_cf_options.cc | 35 +- storage/rocksdb/rdb_cf_options.h | 3 +- storage/rocksdb/rdb_global.h | 4 +- storage/rocksdb/rdb_i_s.cc | 4 +- storage/rocksdb/rdb_sst_partitioner_factory.h | 376 ++++++++++++++++++ 15 files changed, 762 insertions(+), 12 deletions(-) create mode 100644 mysql-test/suite/rocksdb/r/bulk_load_use_sst_partitioner.result create mode 100644 mysql-test/suite/rocksdb/t/bulk_load_use_sst_partitioner.test create mode 100644 mysql-test/suite/rocksdb_sys_vars/r/rocksdb_bulk_load_use_sst_partitioner_basic.result create mode 100644 mysql-test/suite/rocksdb_sys_vars/t/rocksdb_bulk_load_use_sst_partitioner_basic.test create mode 100644 storage/rocksdb/rdb_sst_partitioner_factory.h diff --git a/mysql-test/suite/rocksdb/r/add_index_inplace.result b/mysql-test/suite/rocksdb/r/add_index_inplace.result index 3ea020e9a805..c775b4549be6 100644 --- a/mysql-test/suite/rocksdb/r/add_index_inplace.result +++ b/mysql-test/suite/rocksdb/r/add_index_inplace.result @@ -298,6 +298,7 @@ rocksdb_bulk_load_allow_unsorted OFF rocksdb_bulk_load_fail_if_not_bottommost_level OFF rocksdb_bulk_load_partial_index ON rocksdb_bulk_load_size 1000 +rocksdb_bulk_load_use_sst_partitioner OFF show session variables like 'rocksdb_bulk_load%'; Variable_name Value rocksdb_bulk_load ON @@ -306,6 +307,7 @@ rocksdb_bulk_load_allow_unsorted OFF rocksdb_bulk_load_fail_if_not_bottommost_level OFF rocksdb_bulk_load_partial_index ON rocksdb_bulk_load_size 1000 +rocksdb_bulk_load_use_sst_partitioner OFF CREATE TABLE t1 (i INT, j INT, PRIMARY KEY (i)) ENGINE = ROCKSDB; INSERT INTO t1 VALUES (1,1); # Disconnecting on con1 @@ -352,6 +354,7 @@ rocksdb_bulk_load_allow_unsorted OFF rocksdb_bulk_load_fail_if_not_bottommost_level OFF rocksdb_bulk_load_partial_index ON rocksdb_bulk_load_size 1000 +rocksdb_bulk_load_use_sst_partitioner OFF CREATE TABLE t1 (a VARCHAR(30)) ENGINE=RocksDB; INSERT INTO t1 (a) VALUES (REPEAT("a", 30)); INSERT INTO t1 (a) VALUES (REPEAT("a", 30)); diff --git a/mysql-test/suite/rocksdb/r/bulk_load_use_sst_partitioner.result b/mysql-test/suite/rocksdb/r/bulk_load_use_sst_partitioner.result new file mode 100644 index 000000000000..8727f61a2ce2 --- /dev/null +++ b/mysql-test/suite/rocksdb/r/bulk_load_use_sst_partitioner.result @@ -0,0 +1,41 @@ +SET rocksdb_bulk_load_use_sst_partitioner=0; +SET rocksdb_bulk_load_fail_if_not_bottommost_level=1; +SET rocksdb_bulk_load_allow_sk=1; +CREATE TABLE t1(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +CREATE TABLE t2(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +CREATE TABLE t3(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +SET rocksdb_bulk_load=1; +INSERT INTO t1 VALUES (1, 1, 1), (2, 1, 1), (3, 2, 1); +INSERT INTO t3 VALUES (1, 1, 3), (2, 1, 3), (3, 2, 3); +SET rocksdb_bulk_load=0; +set global rocksdb_compact_cf=""; +set global rocksdb_compact_cf="rev:cf1"; +SET rocksdb_bulk_load=1; +INSERT INTO t2 VALUES(1, 1, 2); +INSERT INTO t2 VALUES(2, 1, 2); +INSERT INTO t2 VALUES(3, 2, 2); +SET rocksdb_bulk_load=0; +ERROR HY000: [(null)] bulk load error: Operation failed. Try again.: Files cannot be ingested to Lmax. Please make sure key range of Lmax and ongoing compaction's output to Lmaxdoes not overlap with files to ingest. +select * from t2; +a b c +SET rocksdb_bulk_load_use_sst_partitioner=1; +CREATE TABLE t4(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +CREATE TABLE t5(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +CREATE TABLE t6(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +SET rocksdb_bulk_load=1; +INSERT INTO t4 VALUES (1, 1, 4), (2, 1, 4), (3, 2, 4); +INSERT INTO t6 VALUES (1, 1, 6), (2, 1, 6), (3, 2, 6); +SET rocksdb_bulk_load=0; +set global rocksdb_compact_cf=""; +set global rocksdb_compact_cf="rev:cf1"; +SET rocksdb_bulk_load=1; +INSERT INTO t5 VALUES(1, 1, 5); +INSERT INTO t5 VALUES(2, 1, 5); +INSERT INTO t5 VALUES(3, 2, 5); +SET rocksdb_bulk_load=0; +select * from t5; +a b c +1 1 5 +2 1 5 +3 2 5 +drop table t1, t2, t3, t4, t5, t6; diff --git a/mysql-test/suite/rocksdb/r/rocksdb.result b/mysql-test/suite/rocksdb/r/rocksdb.result index 236bfefbafb1..f78d13c328c9 100644 --- a/mysql-test/suite/rocksdb/r/rocksdb.result +++ b/mysql-test/suite/rocksdb/r/rocksdb.result @@ -912,6 +912,7 @@ rocksdb_bulk_load_allow_unsorted OFF rocksdb_bulk_load_fail_if_not_bottommost_level OFF rocksdb_bulk_load_partial_index ON rocksdb_bulk_load_size 1000 +rocksdb_bulk_load_use_sst_partitioner OFF rocksdb_bytes_per_sync 0 rocksdb_cache_dump ON rocksdb_cache_high_pri_pool_ratio 0.000000 diff --git a/mysql-test/suite/rocksdb/t/bulk_load_use_sst_partitioner.test b/mysql-test/suite/rocksdb/t/bulk_load_use_sst_partitioner.test new file mode 100644 index 000000000000..86e26813cf4f --- /dev/null +++ b/mysql-test/suite/rocksdb/t/bulk_load_use_sst_partitioner.test @@ -0,0 +1,80 @@ +# test cases verify the features that ensure the bulk load sst files +# will be placed in rocksdb bottommost level +--source include/have_rocksdb.inc +--source include/count_sessions.inc + +--disable_query_log +call mtr.add_suppression("failed to bulk load. status code = 13, status = Operation failed. Try again."); +call mtr.add_suppression("Error 504 finalizing last SST file while setting bulk loading variable"); +--enable_query_log + +# test case: +# verify bulk load fail when bottommost lvl has sst file with overlap key range +SET rocksdb_bulk_load_use_sst_partitioner=0; +SET rocksdb_bulk_load_fail_if_not_bottommost_level=1; +SET rocksdb_bulk_load_allow_sk=1; + +# index numbers in t2 will be greater than index number in t1 and less than +# index numbers in t3 +CREATE TABLE t1(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +CREATE TABLE t2(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +CREATE TABLE t3(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; + +SET rocksdb_bulk_load=1; +INSERT INTO t1 VALUES (1, 1, 1), (2, 1, 1), (3, 2, 1); +INSERT INTO t3 VALUES (1, 1, 3), (2, 1, 3), (3, 2, 3); +SET rocksdb_bulk_load=0; + +# compact the LMAX sst files created in the bulk load before +set global rocksdb_compact_cf=""; +set global rocksdb_compact_cf="rev:cf1"; + +SET rocksdb_bulk_load=1; +INSERT INTO t2 VALUES(1, 1, 2); +INSERT INTO t2 VALUES(2, 1, 2); +INSERT INTO t2 VALUES(3, 2, 2); + +# verify we get an error +--error ER_UNKNOWN_ERROR +SET rocksdb_bulk_load=0; + +# verify data not loaded to db +select * from t2; + +# test case: +# same as previous test case, except rocksdb_bulk_load_use_sst_partitioner +# is set +SET rocksdb_bulk_load_use_sst_partitioner=1; + +CREATE TABLE t4(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +CREATE TABLE t5(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; +CREATE TABLE t6(a INT, b INT, c INT, PRIMARY KEY(a), KEY k1(b, c), KEY k2(c) COMMENT 'cfname=rev:cf1') ENGINE=ROCKSDB; + +SET rocksdb_bulk_load=1; +INSERT INTO t4 VALUES (1, 1, 4), (2, 1, 4), (3, 2, 4); +INSERT INTO t6 VALUES (1, 1, 6), (2, 1, 6), (3, 2, 6); +SET rocksdb_bulk_load=0; + +# compact the LMAX sst files created in the bulk load before +set global rocksdb_compact_cf=""; +set global rocksdb_compact_cf="rev:cf1"; + +SET rocksdb_bulk_load=1; +INSERT INTO t5 VALUES(1, 1, 5); +INSERT INTO t5 VALUES(2, 1, 5); +INSERT INTO t5 VALUES(3, 2, 5); + +# now there is no error +SET rocksdb_bulk_load=0; + +# verify data loaded to db +select * from t5; + +# clean up +drop table t1, t2, t3, t4, t5, t6; + +--let SEARCH_FILE=$MYSQLTEST_VARDIR/log/mysqld.1.err +--let SEARCH_PATTERN=MyRocks: failed to bulk load, retry with compaction. +--source include/search_pattern_in_file.inc + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_bulk_load_use_sst_partitioner_basic.result b/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_bulk_load_use_sst_partitioner_basic.result new file mode 100644 index 000000000000..80ceaab32c05 --- /dev/null +++ b/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_bulk_load_use_sst_partitioner_basic.result @@ -0,0 +1,121 @@ +CREATE TABLE valid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO valid_values VALUES(1); +INSERT INTO valid_values VALUES(0); +INSERT INTO valid_values VALUES('on'); +INSERT INTO valid_values VALUES('off'); +CREATE TABLE invalid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO invalid_values VALUES('\'aaa\''); +INSERT INTO invalid_values VALUES('\'bbb\''); +SET @start_global_value = @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +SELECT @start_global_value; +@start_global_value +0 +SET @start_session_value = @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +SELECT @start_session_value; +@start_session_value +0 +'# Setting to valid values in global scope#' +"Trying to set variable @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to 1" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = 1; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +1 +"Setting the global scope variable back to default" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = DEFAULT; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Trying to set variable @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to 0" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = 0; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Setting the global scope variable back to default" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = DEFAULT; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Trying to set variable @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to on" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = on; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +1 +"Setting the global scope variable back to default" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = DEFAULT; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Trying to set variable @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to off" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = off; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Setting the global scope variable back to default" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = DEFAULT; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +'# Setting to valid values in session scope#' +"Trying to set variable @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to 1" +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = 1; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +1 +"Setting the session scope variable back to default" +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = DEFAULT; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Trying to set variable @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to 0" +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = 0; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Setting the session scope variable back to default" +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = DEFAULT; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Trying to set variable @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to on" +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = on; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +1 +"Setting the session scope variable back to default" +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = DEFAULT; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Trying to set variable @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to off" +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = off; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Setting the session scope variable back to default" +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = DEFAULT; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +'# Testing with invalid values in global scope #' +"Trying to set variable @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to 'aaa'" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = 'aaa'; +Got one of the listed errors +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +"Trying to set variable @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER to 'bbb'" +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = 'bbb'; +Got one of the listed errors +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +SET @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = @start_global_value; +SELECT @@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@global.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +SET @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER = @start_session_value; +SELECT @@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER; +@@session.ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +0 +DROP TABLE valid_values; +DROP TABLE invalid_values; diff --git a/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_bulk_load_use_sst_partitioner_basic.test b/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_bulk_load_use_sst_partitioner_basic.test new file mode 100644 index 000000000000..6f18d1961462 --- /dev/null +++ b/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_bulk_load_use_sst_partitioner_basic.test @@ -0,0 +1,19 @@ +--source include/have_rocksdb.inc + +CREATE TABLE valid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO valid_values VALUES(1); +INSERT INTO valid_values VALUES(0); +INSERT INTO valid_values VALUES('on'); +INSERT INTO valid_values VALUES('off'); + +CREATE TABLE invalid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO invalid_values VALUES('\'aaa\''); +INSERT INTO invalid_values VALUES('\'bbb\''); + +--let $sys_var=ROCKSDB_BULK_LOAD_USE_SST_PARTITIONER +--let $read_only=0 +--let $session=1 +--source ../include/rocksdb_sys_var.inc + +DROP TABLE valid_values; +DROP TABLE invalid_values; diff --git a/storage/rocksdb/CMakeLists.txt b/storage/rocksdb/CMakeLists.txt index 4ddb3ee677c8..9f14b784ff3b 100644 --- a/storage/rocksdb/CMakeLists.txt +++ b/storage/rocksdb/CMakeLists.txt @@ -290,6 +290,7 @@ SET(ROCKSDB_SOURCES logger.h rdb_datadic.cc rdb_datadic.h rdb_iterator.cc rdb_iterator.h + rdb_sst_partitioner_factory.h rdb_cf_options.cc rdb_cf_options.h rdb_cf_manager.cc rdb_cf_manager.h rdb_converter.cc rdb_converter.h diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 5101218c56d7..27730003e28c 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -92,6 +92,7 @@ #include "./rdb_iterator.h" #include "./rdb_mutex_wrapper.h" #include "./rdb_psi.h" +#include "./rdb_sst_partitioner_factory.h" #include "./rdb_threads.h" #include "./sql_dd.h" @@ -596,6 +597,10 @@ static int rocksdb_check_bulk_load_fail_if_not_bottommost_level( THD *const thd, struct SYS_VAR *var MY_ATTRIBUTE((__unused__)), void *save, struct st_mysql_value *value); +static int rocksdb_check_bulk_load_use_sst_partitioner( + THD *const thd, struct SYS_VAR *var MY_ATTRIBUTE((__unused__)), void *save, + struct st_mysql_value *value); + static void rocksdb_set_max_background_jobs(THD *thd, struct SYS_VAR *const var, void *const var_ptr, const void *const save); @@ -1168,6 +1173,12 @@ static MYSQL_THDVAR_BOOL( "Can be changed only when bulk load is disabled.", rocksdb_check_bulk_load_fail_if_not_bottommost_level, nullptr, false); +static MYSQL_THDVAR_BOOL( + bulk_load_use_sst_partitioner, PLUGIN_VAR_RQCMDARG, + "Use sst partitioner to split sst files to ensure bulk load sst files " + "can be ingested to bottommost level", + rocksdb_check_bulk_load_use_sst_partitioner, nullptr, false); + static MYSQL_SYSVAR_BOOL(enable_bulk_load_api, rocksdb_enable_bulk_load_api, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, "Enables using SstFileWriter for bulk loading", @@ -2501,6 +2512,7 @@ static struct SYS_VAR *rocksdb_system_variables[] = { MYSQL_SYSVAR(bulk_load_allow_sk), MYSQL_SYSVAR(bulk_load_allow_unsorted), MYSQL_SYSVAR(bulk_load_fail_if_not_bottommost_level), + MYSQL_SYSVAR(bulk_load_use_sst_partitioner), MYSQL_SYSVAR(trace_sst_api), MYSQL_SYSVAR(commit_in_the_middle), MYSQL_SYSVAR(blind_delete_primary_key), @@ -3215,6 +3227,10 @@ class Rdb_transaction { /* External merge sorts for bulk load: key ID -> merge sort instance */ std::unordered_map m_key_merge; + // register indexes used in bulk load to Rdb_sst_partitioner_factory, see + // comments in Rdb_sst_partitioner_factory for details + Rdb_bulk_load_index_registry m_bulk_load_index_registry; + /* Used to check for duplicate entries during fast unique secondary index creation. @@ -3248,6 +3264,32 @@ class Rdb_transaction { } }; + rocksdb::Status ingest_bulk_load_files( + std::vector &args) { + rocksdb::Status s = rdb->IngestExternalFiles(args); + if (!s.ok() && + m_bulk_load_index_registry.index_registered_in_sst_partitioner()) { + // NO_LINT_DEBUG + LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, + "MyRocks: failed to bulk load, retry with compaction. " + "status code = %d, status = %s", + s.code(), s.ToString().c_str()); + s = m_bulk_load_index_registry.compact_index_ranges( + rdb, getCompactRangeOptions()); + if (!s.ok()) { + // NO_LINT_DEBUG + LogPluginErrMsg(WARNING_LEVEL, ER_LOG_PRINTF_MSG, + "MyRocks: compaction failed in bulk load. " + "status code = %d, status = %s", + s.code(), s.ToString().c_str()); + return s; + } + // try again after compaction + s = rdb->IngestExternalFiles(args); + } + return s; + } + public: int get_key_merge(GL_INDEX_ID kd_gl_id, rocksdb::ColumnFamilyHandle *cf, Rdb_index_merge **key_merge) { @@ -3293,6 +3335,7 @@ class Rdb_transaction { m_curr_bulk_load.clear(); m_curr_bulk_load_tablename.clear(); m_key_merge.clear(); + m_bulk_load_index_registry.clear(); }); int rc = 0; @@ -3651,7 +3694,8 @@ class Rdb_transaction { "SST Tracing: Calling IngestExternalFile with '%zu' files", file_count); } - const rocksdb::Status s = rdb->IngestExternalFiles(args); + + const rocksdb::Status s = ingest_bulk_load_files(args); if (THDVAR(m_thd, trace_sst_api)) { LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, "SST Tracing: IngestExternalFile '%zu' files returned %s", @@ -3727,6 +3771,11 @@ class Rdb_transaction { return 0; } + bool add_index_to_sst_partitioner(rocksdb::ColumnFamilyHandle *cf, + const Rdb_key_def &kd) { + return m_bulk_load_index_registry.add_index(rdb, cf, kd.get_index_number()); + } + int start_bulk_load(ha_rocksdb *const bulk_load, std::shared_ptr sst_info) { /* @@ -6474,7 +6523,10 @@ static int rocksdb_init_internal(void *const p) { LogPluginErrMsg(INFORMATION_LEVEL, 0, "Column Families at start:"); for (size_t i = 0; i < cf_names.size(); ++i) { rocksdb::ColumnFamilyOptions opts; - cf_options_map->get_cf_options(cf_names[i], &opts); + bool cf_exists; + if (!cf_options_map->get_cf_options(cf_names[i], &opts, cf_exists)) { + DBUG_RETURN(HA_EXIT_FAILURE); + } LogPluginErrMsg(INFORMATION_LEVEL, 0, " cf=%s", cf_names[i].c_str()); LogPluginErrMsg(INFORMATION_LEVEL, 0, " write_buffer_size=%ld", @@ -10683,6 +10735,16 @@ int ha_rocksdb::bulk_load_key(Rdb_transaction *const tx, const Rdb_key_def &kd, rocksdb::ColumnFamilyHandle *cf = kd.get_cf(); + if (THDVAR(thd, bulk_load_use_sst_partitioner) && + !tx->add_index_to_sst_partitioner(cf, kd)) { + // NO_LINT_DEBUG + LogPluginErrMsg(WARNING_LEVEL, ER_LOG_PRINTF_MSG, + "MyRocks: failed to bulk load. Index number %d " + "is being used by another bulk load transaction.", + kd.get_index_number()); + DBUG_RETURN(HA_ERR_ROCKSDB_BULK_LOAD); + } + // In the case of unsorted inserts, m_sst_info allocated here is not // used to store the keys. It is still used to indicate when tables // are switched. @@ -16037,6 +16099,13 @@ static int rocksdb_check_bulk_load_fail_if_not_bottommost_level( return rocksdb_check_bulk_load_allow_unsorted(thd, var, save, value); } +static int rocksdb_check_bulk_load_use_sst_partitioner( + THD *const thd, struct SYS_VAR *var MY_ATTRIBUTE((__unused__)), void *save, + struct st_mysql_value *value) { + // reuse the same logic + return rocksdb_check_bulk_load_allow_unsorted(thd, var, save, value); +} + static void rocksdb_set_max_background_jobs(THD *thd, struct SYS_VAR *const var, void *const var_ptr, const void *const save) { diff --git a/storage/rocksdb/rdb_cf_manager.cc b/storage/rocksdb/rdb_cf_manager.cc index cc92064fd518..e6a92f40bb3f 100644 --- a/storage/rocksdb/rdb_cf_manager.cc +++ b/storage/rocksdb/rdb_cf_manager.cc @@ -169,8 +169,11 @@ std::shared_ptr Rdb_cf_manager::get_or_create_cf( } else { /* Create a Column Family. */ rocksdb::ColumnFamilyOptions opts; - - bool cf_name_found = m_cf_options->get_cf_options(cf_name, &opts); + bool cf_name_found; + if (!m_cf_options->get_cf_options(cf_name, &opts, cf_name_found)) { + RDB_MUTEX_UNLOCK_CHECK(m_mutex); + return cf_handle; + } if (create || cf_name_found) { LogPluginErrMsg(INFORMATION_LEVEL, 0, "Creating a column family %s", diff --git a/storage/rocksdb/rdb_cf_manager.h b/storage/rocksdb/rdb_cf_manager.h index 68295dceb499..b71038a3c578 100644 --- a/storage/rocksdb/rdb_cf_manager.h +++ b/storage/rocksdb/rdb_cf_manager.h @@ -116,10 +116,12 @@ class Rdb_cf_manager : public Ensure_initialized { const uint32 &cf_id, const std::string &cf_name, const bool is_per_partition_cf = false); - void get_cf_options(const std::string &cf_name, + /* return true when success */ + bool get_cf_options(const std::string &cf_name, rocksdb::ColumnFamilyOptions *const opts) MY_ATTRIBUTE((__nonnull__)) { - m_cf_options->get_cf_options(cf_name, opts); + bool cf_exists; + return m_cf_options->get_cf_options(cf_name, opts, cf_exists); } void update_options_map(const std::string &cf_name, diff --git a/storage/rocksdb/rdb_cf_options.cc b/storage/rocksdb/rdb_cf_options.cc index 13307ae00a15..68847c2b37ea 100644 --- a/storage/rocksdb/rdb_cf_options.cc +++ b/storage/rocksdb/rdb_cf_options.cc @@ -31,6 +31,7 @@ #include "./ha_rocksdb.h" #include "./rdb_cf_manager.h" #include "./rdb_compact_filter.h" +#include "./rdb_sst_partitioner_factory.h" namespace myrocks { @@ -59,6 +60,14 @@ bool Rdb_cf_options::init( return false; } + if (m_default_cf_opts.sst_partitioner_factory != nullptr) { + // NO_LINT_DEBUG + LogPluginErrMsg( + WARNING_LEVEL, ER_LOG_PRINTF_MSG, + "Invalid cf options, sst_partitioner_factory should not be set"); + return false; + } + return true; } @@ -357,14 +366,34 @@ std::shared_ptr Rdb_cf_options::get_cf_merge_operator( } bool Rdb_cf_options::get_cf_options(const std::string &cf_name, - rocksdb::ColumnFamilyOptions *const opts) { + rocksdb::ColumnFamilyOptions *const opts, + bool &cf_exists) { *opts = m_default_cf_opts; - bool ret = get(cf_name, opts); + cf_exists = get(cf_name, opts); // Set the comparator according to 'rev:' opts->comparator = get_cf_comparator(cf_name); opts->merge_operator = get_cf_merge_operator(cf_name); - return ret; + + // this sst partitioner is used in bulk load scenario, no need to set it for + // non-data cfs. + if (cf_name != DEFAULT_SYSTEM_CF_NAME && cf_name != DEFAULT_TMP_CF_NAME && + cf_name != DEFAULT_TMP_SYSTEM_CF_NAME) { + if (opts->sst_partitioner_factory != nullptr) { + // NO_LINT_DEBUG + LogPluginErrMsg( + WARNING_LEVEL, ER_LOG_PRINTF_MSG, + "Invalid cf options for %s, sst_partitioner_factory should not be " + "set.", + cf_name.c_str()); + return false; + } + opts->sst_partitioner_factory = + std::make_shared( + opts->comparator, opts->num_levels, + Rdb_cf_manager::is_cf_name_reverse(cf_name.c_str())); + } + return true; } } // namespace myrocks diff --git a/storage/rocksdb/rdb_cf_options.h b/storage/rocksdb/rdb_cf_options.h index 45ff3ef2477e..13e1fd933109 100644 --- a/storage/rocksdb/rdb_cf_options.h +++ b/storage/rocksdb/rdb_cf_options.h @@ -71,8 +71,9 @@ class Rdb_cf_options { std::shared_ptr get_cf_merge_operator( const std::string &cf_name); + /* return true when success */ bool get_cf_options(const std::string &cf_name, - rocksdb::ColumnFamilyOptions *const opts) + rocksdb::ColumnFamilyOptions *const opts, bool &cf_exists) MY_ATTRIBUTE((__nonnull__)); static bool parse_cf_options(const std::string &cf_options, diff --git a/storage/rocksdb/rdb_global.h b/storage/rocksdb/rdb_global.h index 12c77202091a..47ec99103d0d 100644 --- a/storage/rocksdb/rdb_global.h +++ b/storage/rocksdb/rdb_global.h @@ -328,9 +328,11 @@ static_assert(HA_ERR_ROCKSDB_FIRST > HA_ERR_LAST, const constexpr char rocksdb_hton_name[] = "ROCKSDB"; +using Index_id = uint32_t; + typedef struct _gl_index_id_s { uint32_t cf_id; - uint32_t index_id; + Index_id index_id; bool operator==(const struct _gl_index_id_s &other) const { return cf_id == other.cf_id && index_id == other.index_id; } diff --git a/storage/rocksdb/rdb_i_s.cc b/storage/rocksdb/rdb_i_s.cc index b3077de3ae35..fdb78ca19b1f 100644 --- a/storage/rocksdb/rdb_i_s.cc +++ b/storage/rocksdb/rdb_i_s.cc @@ -511,7 +511,9 @@ static int rdb_i_s_cfoptions_fill_table( rocksdb::ColumnFamilyOptions opts; assert(!cf_name.empty()); - cf_manager.get_cf_options(cf_name, &opts); + if (!cf_manager.get_cf_options(cf_name, &opts)) { + DBUG_RETURN(HA_EXIT_FAILURE); + } std::vector> cf_option_types = { {"COMPARATOR", opts.comparator == nullptr diff --git a/storage/rocksdb/rdb_sst_partitioner_factory.h b/storage/rocksdb/rdb_sst_partitioner_factory.h new file mode 100644 index 000000000000..eba1975e0e7a --- /dev/null +++ b/storage/rocksdb/rdb_sst_partitioner_factory.h @@ -0,0 +1,376 @@ +/* + Copyright (c) 2022, Facebook, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "rocksdb/sst_partitioner.h" + +#include "./rdb_cf_manager.h" +#include "./rdb_datadic.h" + +namespace myrocks { +/** + * get string with data written with index bytes + */ +inline std::string get_index_key(const Index_id index_id) { + std::string buf; + buf.resize(Rdb_key_def::INDEX_NUMBER_SIZE); + rdb_netbuf_store_index(reinterpret_cast(buf.data()), index_id); + return buf; +}; + +/** + * SstPartitioner is used in compaction to split output sst files. + * The compaction iterates keys in order, calls SstPartitioner with + * previous key and current key in the iteration, SstPartitioner can + * return PartitionerResult::kRequired to indicate a split is needed + * to create a new SST file starting with current key. + * + * This class splits output sst files when input keys crossing index boundaries. + * For example, given index id 8, this class will partition data to 3 parts, + * with index [0 - 7], [8], [9 - infinity). + * + * During bulk load, the compaction will create instances of this class with + * indexes used in bulk load. This will split SST files with keys overlap with + * bulk load data. Essentially, this creates "gaps" among SST files in LMAX to + * place the bulk load SST files. + */ +class Rdb_index_boundary_sst_partitioner : public rocksdb::SstPartitioner { + private: + const rocksdb::Comparator *m_comparator; + const bool m_is_reverse_cf; + // min and max values of indexes + std::vector> m_index_key_ranges; + std::string m_min_index_key; + std::string m_max_index_key; + + /** + * if a partition is required for given index + */ + bool should_partition(const std::string &index_key, + const rocksdb::Slice *previous_key, + const rocksdb::Slice *current_key) { + // for reverse cf, indexKey is upper limit of index data, + // for normal cf, indexKey is lower limit of index data. + // we want indexKey itself get partitioned with other keys in the index. + if (m_is_reverse_cf) { + return m_comparator->Compare(*previous_key, index_key) <= 0 && + m_comparator->Compare(*current_key, index_key) > 0; + } + return m_comparator->Compare(*previous_key, index_key) < 0 && + m_comparator->Compare(*current_key, index_key) >= 0; + } + + public: + explicit Rdb_index_boundary_sst_partitioner( + const std::set &index_ids, + const rocksdb::Comparator *comparator, const bool is_reverse_cf) + : m_comparator(comparator), m_is_reverse_cf(is_reverse_cf) { + assert(!index_ids.empty()); + for (auto index_id : index_ids) { + auto start_index_id = index_id; + auto end_index_id = index_id + 1; + if (is_reverse_cf) { + std::swap(start_index_id, end_index_id); + } + m_index_key_ranges.push_back(std::make_pair(get_index_key(start_index_id), + get_index_key(end_index_id))); + } + + // even index_ids is somehow empty, ShouldPartition will return kNotRequired + // regardless of the value of min/max index + Index_id min_index = 0; + Index_id max_index = 0; + if (!index_ids.empty()) { + min_index = *index_ids.cbegin(); + max_index = (*index_ids.rbegin()) + 1; + if (is_reverse_cf) { + std::swap(min_index, max_index); + } + } + m_min_index_key = get_index_key(min_index); + m_max_index_key = get_index_key(max_index); + } + + virtual ~Rdb_index_boundary_sst_partitioner() override {} + + const char *Name() const override { + return "Rdb_index_boundary_sst_partitioner"; + } + + rocksdb::PartitionerResult ShouldPartition( + const rocksdb::PartitionerRequest &request) override { + assert(m_comparator->Compare(*request.current_user_key, + *request.prev_user_key) > 0); + + if (m_comparator->Compare(*request.prev_user_key, m_max_index_key) > 0 || + m_comparator->Compare(*request.current_user_key, m_min_index_key) < 0) { + return rocksdb::PartitionerResult::kNotRequired; + } + for (auto &index_key_range : m_index_key_ranges) { + // partition sst file when the request keys cross index boundary + if (should_partition(index_key_range.first, request.prev_user_key, + request.current_user_key) || + should_partition(index_key_range.second, request.prev_user_key, + request.current_user_key)) { + return rocksdb::PartitionerResult::kRequired; + } + } + return rocksdb::PartitionerResult::kNotRequired; + }; + + bool CanDoTrivialMove(const rocksdb::Slice &smallest_user_key, + const rocksdb::Slice &largest_user_key) override { + return ShouldPartition(rocksdb::PartitionerRequest(smallest_user_key, + largest_user_key, 0)) == + rocksdb::PartitionerResult::kNotRequired; + }; +}; + +/** + * creates sst partitioner used in compaction. + * see comments in Rdb_index_boundary_sst_partitioner. + */ +class Rdb_sst_partitioner_factory : public rocksdb::SstPartitionerFactory { + const rocksdb::Comparator *m_comparator; + const int m_num_levels; + const bool m_is_reverse_cf; + std::mutex m_index_ids_mutex; + std::set m_index_ids; + + std::set get_index_ids() const { + // this method is marked const so we can call it + // in const CreatePartitioner method. + // and it is a real const method as it does not change + // the object's state. the const_cast here is needed + // to get a non-const reference to mutex, which is needed + // for lock_guard + const std::lock_guard lock( + const_cast(this)->m_index_ids_mutex); + std::set result(m_index_ids); + return result; + }; + + public: + explicit Rdb_sst_partitioner_factory(const rocksdb::Comparator *comparator, + int num_levels, int is_reverse_cf) + : m_comparator(comparator), + m_num_levels(num_levels), + m_is_reverse_cf(is_reverse_cf){}; + + ~Rdb_sst_partitioner_factory() override {} + + static const char *kClassName() { return "Rdb_sst_partitioner_factory"; } + const char *Name() const override { return kClassName(); } + + std::unique_ptr CreatePartitioner( + const rocksdb::SstPartitioner::Context &context) const override { + // we need special partitioner for Lmax. + // when rocksdb checks if SstPartitioner is used in a manual compaction, + // it passes -1 as output_level to indicate output_level is not yet known. + if (context.output_level == m_num_levels - 1 || + context.output_level == -1) { + auto index_ids = get_index_ids(); + if (!index_ids.empty()) { + // NO_LINT_DEBUG + LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, + "MyRocks: Rdb_sst_partitioner_factory creating " + "partitioner with %lu " + "indexes.", + index_ids.size()); + return std::unique_ptr( + new Rdb_index_boundary_sst_partitioner(index_ids, m_comparator, + m_is_reverse_cf)); + } + } + return {}; + }; + + /** + * add index to be used in sst partitioner later. + * returns true when success + */ + bool add_index(Index_id index_id) { + // NO_LINT_DEBUG + LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, + "MyRocks: Rdb_sst_partitioner_factory adding index %d.", + index_id); + const std::lock_guard lock(m_index_ids_mutex); + return m_index_ids.insert(index_id).second; + }; + + /** + * remove index, it won't be used in sst partitioner created + * afterwards. + * return true when success + */ + bool remove_index(Index_id index_id) { + // NO_LINT_DEBUG + LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, + "MyRocks: Rdb_sst_partitioner_factory removing index %d.", + index_id); + const std::lock_guard lock(m_index_ids_mutex); + return m_index_ids.erase(index_id) == 1; + } +}; + +/** + * stores index information used in bulk load. + * during bulk load + * 1. register indexes used in bulk load with sst partitioner factory, + * rocksdb will partition the sst files such that the sst files + * does not overlap with index data + * 2. ingest bulk load sst files to rocksdb, this could fail when + * there is sst files overlap with index data created before step 1 + * 3. if ingestion fails, trigger a compaction, this will guarantee + * no sst files overlaps with index data. After compaction, ingest + * bulk load sst files again + * 4. remove indexes from sst partitioner factory. + */ +class Rdb_bulk_load_index_registry { + // index -> sst partitioner factory + std::map m_partitioner_factories; + // cf -> indexes + std::map> m_cf_indexes; + + void add_cf_index_map(rocksdb::ColumnFamilyHandle *cf, Index_id index_id) { + auto find_result = m_cf_indexes.find(cf); + if (find_result == m_cf_indexes.end()) { + m_cf_indexes.emplace(cf, std::set{index_id}); + } else { + find_result->second.insert(index_id); + } + } + + public: + ~Rdb_bulk_load_index_registry() { clear(); } + + /** + * register the index with sst partitioner if it is + * not already registered. + * returns true when success. + */ + bool add_index(rocksdb::TransactionDB *rdb, rocksdb::ColumnFamilyHandle *cf, + Index_id index_id) { + if (m_partitioner_factories.count(index_id) != 0) { + // already processed this index, return + return true; + } + + auto sst_partitioner_factory = rdb->GetOptions(cf).sst_partitioner_factory; + auto rdb_sst_partitioner_factory = + dynamic_cast( + sst_partitioner_factory.get()); + if (rdb_sst_partitioner_factory == nullptr) { + // should never happen + // NO_LINT_DEBUG + LogPluginErrMsg( + WARNING_LEVEL, ER_LOG_PRINTF_MSG, + "MyRocks: Rdb_sst_partitioner_factory not registered for cf %s ", + cf->GetName().c_str()); + return false; + } + + bool index_added = rdb_sst_partitioner_factory->add_index(index_id); + if (!index_added) { + return false; + } else { + m_partitioner_factories.emplace(index_id, rdb_sst_partitioner_factory); + add_cf_index_map(cf, index_id); + } + + return true; + } + + /** + * remove registered indexes from sst partitioner factory and clear the + * index stored in this object. + * returns true when all indexes are removed. + */ + bool clear() { + bool success = true; + for (auto entry : m_partitioner_factories) { + bool removed = entry.second->remove_index(entry.first); + if (!removed) { + // unlikely + success = false; + } + } + m_partitioner_factories.clear(); + m_cf_indexes.clear(); + return success; + } + + /** + * returns true when we have index registered in + * sst partitioner factory + */ + bool index_registered_in_sst_partitioner() { + return !m_partitioner_factories.empty(); + } + + /** + * trigger compaction that covers all indexes registered in + * this object + */ + rocksdb::Status compact_index_ranges( + rocksdb::TransactionDB *rdb, + const rocksdb::CompactRangeOptions compact_range_options) { + rocksdb::Status status; + for (auto entry : m_cf_indexes) { + auto cf = entry.first; + const auto is_reverse_cf = + Rdb_cf_manager::is_cf_name_reverse(cf->GetName().c_str()); + const std::set index_ids = entry.second; + assert(!index_ids.empty()); + Index_id begin_index = *index_ids.cbegin(); + // use maxIndex + 1 to include the data from maxIndex itself in + // the compaction range, this works for both reverse and normal + // cfs. + Index_id end_index = (*index_ids.rbegin()) + 1; + if (is_reverse_cf) { + std::swap(begin_index, end_index); + } + // need to keep the string object alive for the rocksdb::Slice + const std::string begin_index_key = get_index_key(begin_index); + const std::string end_index_Key = get_index_key(end_index); + const rocksdb::Slice compact_begin_key = begin_index_key; + const rocksdb::Slice compact_end_key = end_index_Key; + // NO_LINT_DEBUG + LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, + "MyRocks: CompactRange on cf %s. key range ['%s', '%s'].", + cf->GetName().c_str(), + compact_begin_key.ToString(/*hex*/ true).c_str(), + compact_end_key.ToString(/*hex*/ true).c_str()); + + status = rdb->CompactRange(compact_range_options, cf, &compact_begin_key, + &compact_end_key); + if (!status.ok()) { + break; + } + } + return status; + } +}; + +} // namespace myrocks