Skip to content

Commit

Permalink
Load dumped cache to block cache
Browse files Browse the repository at this point in the history
  • Loading branch information
lhsoft committed Apr 22, 2024
1 parent 6fbd02f commit 1d84f71
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 34 deletions.
8 changes: 4 additions & 4 deletions include/rocksdb/utilities/cache_dump_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ class CacheDumper {
class CacheDumpedLoader {
public:
virtual ~CacheDumpedLoader() = default;
virtual IOStatus RestoreCacheEntriesToSecondaryCache() {
virtual IOStatus RestoreCacheEntriesToCache() {
return IOStatus::NotSupported(
"RestoreCacheEntriesToSecondaryCache is not supported");
"RestoreCacheEntriesToCache is not supported");
}
};

Expand All @@ -134,10 +134,10 @@ Status NewDefaultCacheDumper(const CacheDumpOptions& dump_options,
std::unique_ptr<CacheDumper>* cache_dumper);

// Get the default cache dump loader
template <typename T>
Status NewDefaultCacheDumpedLoader(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
const BlockBasedTableOptions& toptions, const std::shared_ptr<T>& cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Load dumped cache to block cache
18 changes: 15 additions & 3 deletions utilities/cache_dump_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,26 @@ Status NewDefaultCacheDumper(const CacheDumpOptions& dump_options,
return Status::OK();
}

template <>
Status NewDefaultCacheDumpedLoader(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
const std::shared_ptr<SecondaryCache>& cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader) {
cache_dump_loader->reset(new CacheDumpedLoaderImpl(
dump_options, toptions, secondary_cache, std::move(reader)));
cache_dump_loader->reset(new CacheDumpedLoaderSecondaryCacheImpl(
dump_options, toptions, cache, std::move(reader)));
return Status::OK();
}

template <>
Status NewDefaultCacheDumpedLoader(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions, const std::shared_ptr<Cache>& cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader) {
cache_dump_loader->reset(new CacheDumpedLoaderBlockCacheImpl(
dump_options, toptions, cache, std::move(reader)));
return Status::OK();
}

Expand Down
150 changes: 133 additions & 17 deletions utilities/cache_dump_load_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,25 @@ IOStatus CacheDumperImpl::WriteFooter() {
// This is the main function to restore the cache entries to secondary cache.
// First, we check if all the arguments are valid. Then, we read the block
// sequentially from the reader and insert them to the secondary cache.
IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
IOStatus CacheDumpedLoaderImplBase::RestoreCacheEntriesToCache() {
// TODO: remove this line when options are used in the loader
(void)options_;
// Step 1: we check if all the arguments are valid
if (secondary_cache_ == nullptr) {
return IOStatus::InvalidArgument("Secondary Cache is null");
Status s = Check();
if (!s.ok()) {
return status_to_io_status(std::move(s));
}
if (reader_ == nullptr) {
return IOStatus::InvalidArgument("CacheDumpReader is null");
}
// Set the system clock
if (options_.clock == nullptr) {
return IOStatus::InvalidArgument("System clock is null");
}

clock_ = options_.clock;

deadline_ = options_.deadline;

// Step 2: read the header
// TODO: we need to check the cache dump format version and RocksDB version
Expand All @@ -280,6 +289,20 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
while (io_s.ok()) {
dump_unit.reset();
data.clear();

if (options_.max_size_bytes > 0 &&
loaded_size_bytes_ > options_.max_size_bytes) {
return IOStatus::OK();
}

uint64_t timestamp = clock_->NowMicros();
if (deadline_.count()) {
std::chrono::microseconds now = std::chrono::microseconds(timestamp);
if (now >= deadline_) {
return IOStatus::OK();
}
}

// read the content and store in the dump_unit
io_s = ReadCacheBlock(&data, &dump_unit);
if (!io_s.ok()) {
Expand All @@ -288,14 +311,13 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
if (dump_unit.type == CacheDumpUnitType::kFooter) {
break;
}
loaded_size_bytes_ += dump_unit.value_len;
// Create the uncompressed_block based on the information in the dump_unit
// (There is no block trailer here compatible with block-based SST file.)
Slice content =
Slice(static_cast<char*>(dump_unit.value), dump_unit.value_len);
Status s = secondary_cache_->InsertSaved(dump_unit.key, content);
if (!s.ok()) {
io_s = status_to_io_status(std::move(s));
}
io_s = InsertDumpUnitToCache(dump_unit);
// Slice content =
// Slice(static_cast<char*>(dump_unit.value), dump_unit.value_len);
// Status s = secondary_cache_->InsertSaved(dump_unit.key, content);
}
if (dump_unit.type == CacheDumpUnitType::kFooter) {
return IOStatus::OK();
Expand All @@ -306,8 +328,8 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {

// Read and copy the dump unit metadata to std::string data, decode and create
// the unit metadata based on the string
IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,
DumpUnitMeta* unit_meta) {
IOStatus CacheDumpedLoaderImplBase::ReadDumpUnitMeta(std::string* data,
DumpUnitMeta* unit_meta) {
assert(reader_ != nullptr);
assert(data != nullptr);
assert(unit_meta != nullptr);
Expand All @@ -321,8 +343,8 @@ IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,

// Read and copy the dump unit to std::string data, decode and create the unit
// based on the string
IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data,
DumpUnit* unit) {
IOStatus CacheDumpedLoaderImplBase::ReadDumpUnit(size_t len, std::string* data,
DumpUnit* unit) {
assert(reader_ != nullptr);
assert(data != nullptr);
assert(unit != nullptr);
Expand All @@ -339,8 +361,8 @@ IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data,
}

// Read the header
IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data,
DumpUnit* dump_unit) {
IOStatus CacheDumpedLoaderImplBase::ReadHeader(std::string* data,
DumpUnit* dump_unit) {
DumpUnitMeta header_meta;
header_meta.reset();
std::string meta_string;
Expand All @@ -361,8 +383,8 @@ IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data,
}

// Read the blocks after header is read out
IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data,
DumpUnit* dump_unit) {
IOStatus CacheDumpedLoaderImplBase::ReadCacheBlock(std::string* data,
DumpUnit* dump_unit) {
// According to the write process, we read the dump_unit_metadata first
DumpUnitMeta unit_meta;
unit_meta.reset();
Expand All @@ -386,4 +408,98 @@ IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data,
return io_s;
}

IOStatus CacheDumpedLoaderSecondaryCacheImpl::InsertDumpUnitToCache(
const DumpUnit& dump_unit) {
Slice content =
Slice(static_cast<char*>(dump_unit.value), dump_unit.value_len);
Status s = secondary_cache_->InsertSaved(dump_unit.key, content);
return status_to_io_status(std::move(s));
}

Status CacheDumpedLoaderSecondaryCacheImpl::Check() {
if (secondary_cache_ == nullptr) {
return Status::InvalidArgument("Secondary Cache is null");
}
return Status::OK();
}

IOStatus CacheDumpedLoaderBlockCacheImpl::InsertDumpUnitToCache(
const DumpUnit& dump_unit) {
Statistics* statistics = nullptr;
Slice data(static_cast<char*>(dump_unit.value), dump_unit.value_len);
BlockContents block =
BlockContents(AllocateAndCopyBlock(data, nullptr), data.size());
Status s = Status::OK();

switch (dump_unit.type) {
case CacheDumpUnitType::kData: {
const Cache::CacheItemHelper* helper =
GetCacheItemHelper(BlockType::kData, CacheTier::kVolatileTier);
std::unique_ptr<Block_kData> block_holder;
block_holder.reset(new Block_kData(
std::move(block), toptions_.read_amp_bytes_per_bit, statistics));
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache_->Insert(dump_unit.key, block_holder.get(), helper,
charge);
if (s.ok()) {
block_holder.release();
}
break;
}
case CacheDumpUnitType::kIndex: {
const Cache::CacheItemHelper* helper =
GetCacheItemHelper(BlockType::kIndex, CacheTier::kVolatileTier);
std::unique_ptr<Block_kIndex> block_holder;
block_holder.reset(new Block_kIndex(
std::move(block), toptions_.read_amp_bytes_per_bit, statistics));
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache_->Insert(dump_unit.key, block_holder.get(), helper,
charge);
if (s.ok()) {
block_holder.release();
}
break;
}
case CacheDumpUnitType::kFilter: {
const Cache::CacheItemHelper* helper =
GetCacheItemHelper(BlockType::kFilter, CacheTier::kVolatileTier);
std::unique_ptr<ParsedFullFilterBlock> block_holder;
block_holder.reset(new ParsedFullFilterBlock(
toptions_.filter_policy.get(), std::move(block)));
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache_->Insert(dump_unit.key, block_holder.get(), helper,
charge);
if (s.ok()) {
block_holder.release();
}
break;
}
case CacheDumpUnitType::kFilterMetaBlock: {
const Cache::CacheItemHelper* helper = GetCacheItemHelper(
BlockType::kFilterPartitionIndex, CacheTier::kVolatileTier);
std::unique_ptr<Block_kFilterPartitionIndex> block_holder;
block_holder.reset(new Block_kFilterPartitionIndex(
std::move(block), toptions_.read_amp_bytes_per_bit, statistics));
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache_->Insert(dump_unit.key, block_holder.get(), helper,
charge);
if (s.ok()) {
block_holder.release();
}
break;
}

default:
break;
}
return status_to_io_status(std::move(s));
}

Status CacheDumpedLoaderBlockCacheImpl::Check() {
if (block_cache_ == nullptr) {
return Status::InvalidArgument("Block Cache is null");
}
return Status::OK();
}

} // namespace ROCKSDB_NAMESPACE
62 changes: 52 additions & 10 deletions utilities/cache_dump_load_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "file/writable_file_writer.h"
#include "rocksdb/utilities/cache_dump_load.h"
#include "table/block_based/block.h"
#include "table/block_based/block_cache.h"
#include "table/block_based/block_type.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/parsed_full_filter_block.h"
Expand Down Expand Up @@ -131,17 +132,21 @@ class CacheDumperImpl : public CacheDumper {
};

// The default implementation of CacheDumpedLoader
class CacheDumpedLoaderImpl : public CacheDumpedLoader {
class CacheDumpedLoaderImplBase : public CacheDumpedLoader {
public:
CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& /*toptions*/,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader)
: options_(dump_options),
secondary_cache_(secondary_cache),
CacheDumpedLoaderImplBase(const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
std::unique_ptr<CacheDumpReader>&& reader)
: toptions_(toptions),
options_(dump_options),
reader_(std::move(reader)) {}
~CacheDumpedLoaderImpl() {}
IOStatus RestoreCacheEntriesToSecondaryCache() override;
~CacheDumpedLoaderImplBase() {}
IOStatus RestoreCacheEntriesToCache() override;
virtual Status Check() = 0;
virtual IOStatus InsertDumpUnitToCache(const DumpUnit& dump_unit) = 0;

protected:
BlockBasedTableOptions toptions_;

private:
IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta);
Expand All @@ -150,8 +155,45 @@ class CacheDumpedLoaderImpl : public CacheDumpedLoader {
IOStatus ReadCacheBlock(std::string* data, DumpUnit* dump_unit);

CacheDumpOptions options_;
std::shared_ptr<SecondaryCache> secondary_cache_;
std::unique_ptr<CacheDumpReader> reader_;
SystemClock* clock_;
// Deadline for loader in microseconds.
std::chrono::microseconds deadline_;
uint64_t loaded_size_bytes_ = 0;
};

class CacheDumpedLoaderSecondaryCacheImpl : public CacheDumpedLoaderImplBase {
public:
CacheDumpedLoaderSecondaryCacheImpl(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader)
: CacheDumpedLoaderImplBase(dump_options, toptions, std::move(reader)),
secondary_cache_(secondary_cache) {}
~CacheDumpedLoaderSecondaryCacheImpl() {}
Status Check() override;
IOStatus InsertDumpUnitToCache(const DumpUnit& dump_unit) override;

private:
std::shared_ptr<SecondaryCache> secondary_cache_;
};

class CacheDumpedLoaderBlockCacheImpl : public CacheDumpedLoaderImplBase {
public:
CacheDumpedLoaderBlockCacheImpl(const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<Cache>& cache,
std::unique_ptr<CacheDumpReader>&& reader)
: CacheDumpedLoaderImplBase(dump_options, toptions, std::move(reader)),
block_cache_(cache) {}
~CacheDumpedLoaderBlockCacheImpl() {}
Status Check() override;
IOStatus InsertDumpUnitToCache(const DumpUnit& dump_unit) override;

private:
std::shared_ptr<Cache> block_cache_;
std::unique_ptr<BlockCreateContext> block_create_context_;
};

// The default implementation of CacheDumpWriter. We write the blocks to a file
Expand Down

0 comments on commit 1d84f71

Please sign in to comment.