diff --git a/src/base/meta_store.cpp b/src/base/meta_store.cpp index 94aaf7db7c..2b5e4721ca 100644 --- a/src/base/meta_store.cpp +++ b/src/base/meta_store.cpp @@ -34,6 +34,9 @@ const std::string meta_store::DATA_VERSION = "pegasus_data_version"; const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree"; const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME = "pegasus_last_manual_compact_finish_time"; +const std::string meta_store::LAST_MANUAL_COMPACT_USED_TIME = + "pegasus_last_manual_compact_used_time"; + const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal"; const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE = "prefer_write"; const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD = "bulk_load"; @@ -47,6 +50,21 @@ meta_store::meta_store(const char *log_prefix, _wt_opts.disableWAL = true; } +dsn::error_code meta_store::get_last_manual_compact_used_time(uint64_t *ts) const +{ + LOG_AND_RETURN_NOT_OK(ERROR_PREFIX, + get_value_from_meta_cf(false, LAST_MANUAL_COMPACT_USED_TIME, ts), + "get_value_from_meta_cf failed"); + return dsn::ERR_OK; +} + +void meta_store::set_last_manual_compact_used_time(uint64_t last_manual_compact_used_time) const +{ + CHECK_EQ_PREFIX( + ::dsn::ERR_OK, + set_value_to_meta_cf(LAST_MANUAL_COMPACT_USED_TIME, last_manual_compact_used_time)); +} + dsn::error_code meta_store::get_last_flushed_decree(uint64_t *decree) const { LOG_AND_RETURN_NOT_OK(ERROR_PREFIX, diff --git a/src/base/meta_store.h b/src/base/meta_store.h index 060692d5c4..c92256cb4a 100644 --- a/src/base/meta_store.h +++ b/src/base/meta_store.h @@ -52,6 +52,7 @@ class meta_store rocksdb::ColumnFamilyHandle *meta_cf) const; dsn::error_code get_data_version(uint32_t *version) const; dsn::error_code get_last_manual_compact_finish_time(uint64_t *ts) const; + dsn::error_code get_last_manual_compact_used_time(uint64_t *ts) const; std::string get_usage_scenario() const; void set_last_flushed_decree(uint64_t decree) const; @@ -59,6 +60,8 @@ class meta_store void set_last_manual_compact_finish_time(uint64_t last_manual_compact_finish_time) const; void set_usage_scenario(const std::string &usage_scenario) const; + void set_last_manual_compact_used_time(uint64_t last_manual_compact_used_time) const; + private: ::dsn::error_code get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const; @@ -92,6 +95,8 @@ class meta_store static const std::string DATA_VERSION; static const std::string LAST_FLUSHED_DECREE; static const std::string LAST_MANUAL_COMPACT_FINISH_TIME; + static const std::string LAST_MANUAL_COMPACT_USED_TIME; + static const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL; static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE; static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD; diff --git a/src/server/pegasus_manual_compact_service.cpp b/src/server/pegasus_manual_compact_service.cpp index a2cdf4812e..efaaf564a5 100644 --- a/src/server/pegasus_manual_compact_service.cpp +++ b/src/server/pegasus_manual_compact_service.cpp @@ -84,6 +84,11 @@ void pegasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_fini _manual_compact_last_finish_time_ms.store(last_finish_time_ms); } +void pegasus_manual_compact_service::init_last_used_time_ms(uint64_t last_used_time_ms) +{ + _manual_compact_last_time_used_ms.store(last_used_time_ms); +} + void pegasus_manual_compact_service::start_manual_compact_if_needed( const std::map &envs) { @@ -312,7 +317,7 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO } uint64_t start = begin_manual_compact(); - uint64_t finish = _app->do_manual_compact(options); + uint64_t finish = _app->do_manual_compact(options, start); end_manual_compact(start, finish); } diff --git a/src/server/pegasus_manual_compact_service.h b/src/server/pegasus_manual_compact_service.h index 329b10b7bf..d51e7cf957 100644 --- a/src/server/pegasus_manual_compact_service.h +++ b/src/server/pegasus_manual_compact_service.h @@ -45,6 +45,8 @@ class pegasus_manual_compact_service : public dsn::replication::replica_base void init_last_finish_time_ms(uint64_t last_finish_time_ms); + void init_last_used_time_ms(uint64_t last_used_time_ms); + void start_manual_compact_if_needed(const std::map &envs); // Called by pegasus_manual_compaction.sh diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 11adaa6f0f..97e2e9aee9 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1758,14 +1758,22 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv) dsn::ERR_LOCAL_APP_FAILURE, "open app failed, unsupported data version {}", _pegasus_data_version); - // update last manual compact finish timestamp + uint64_t last_manual_compact_used_time = 0; + LOG_AND_RETURN_NOT_OK( + ERROR_PREFIX, + _meta_store->get_last_manual_compact_used_time(&last_manual_compact_used_time), + "get_last_manual_compact_used_time failed"); + + // update last manual compact finish & used timestamp _manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time); + _manual_compact_svc.init_last_used_time_ms(last_manual_compact_used_time); cleanup.cancel(); } else { // Write initial meta data to meta CF and flush when create new DB. _meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX); _meta_store->set_last_flushed_decree(0); _meta_store->set_last_manual_compact_finish_time(0); + _meta_store->set_last_manual_compact_used_time(0); flush_all_family_columns(true); } @@ -3334,7 +3342,8 @@ ::dsn::error_code pegasus_server_impl::check_column_families(const std::string & return ::dsn::ERR_OK; } -uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptions &options) +uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptions &options, + uint64_t start) { // wait flush before compact to make all data compacted. uint64_t start_time = dsn_now_ms(); @@ -3355,6 +3364,11 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio status.ToString(), end_time - start_time); _meta_store->set_last_manual_compact_finish_time(end_time); + uint64_t last_manual_compact_finish_time = 0; + CHECK_OK_PREFIX( + _meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time)); + after_manual_compact(start_time, last_manual_compact_finish_time); + // generate new checkpoint and remove old checkpoints, in order to release storage asap if (!release_storage_after_manual_compact()) { // it is possible that the new checkpoint is not generated, if there was no data @@ -3375,12 +3389,17 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio // update rocksdb statistics immediately update_replica_rocksdb_statistics(); - uint64_t last_manual_compact_finish_time = 0; - CHECK_OK_PREFIX( - _meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time)); return last_manual_compact_finish_time; } +void pegasus_server_impl::after_manual_compact(std::uint64_t starttime, uint64_t endtime) +{ + // store last manual compact used time to meta store for learn situation + uint64_t used_time = endtime - starttime; + _meta_store->set_last_manual_compact_used_time(used_time); + flush_all_family_columns(true); +} + bool pegasus_server_impl::release_storage_after_manual_compact() { int64_t old_last_durable = last_durable_decree(); diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 69d02d0614..e2b5360ec9 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -350,7 +350,9 @@ class pegasus_server_impl : public pegasus_read_service std::string compression_type_to_str(rocksdb::CompressionType type); // return finish time recorded in rocksdb - uint64_t do_manual_compact(const rocksdb::CompactRangeOptions &options); + uint64_t do_manual_compact(const rocksdb::CompactRangeOptions &options, uint64_t start); + + void after_manual_compact(uint64_t starttime, uint64_t endtime); // generate new checkpoint and remove old checkpoints, in order to release storage asap // return true if release succeed (new checkpointed generated).