Skip to content

Commit

Permalink
Merge pull request redpanda-data#24494 from redpanda-data/stephan/sma…
Browse files Browse the repository at this point in the history
…ller-chunk-cache

Half chunk cache memory reservations
  • Loading branch information
StephanDollberg authored Dec 12, 2024
2 parents f4c472f + 032d8bb commit ad3206b
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 23 deletions.
5 changes: 5 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2456,6 +2456,11 @@ void application::wire_up_bootstrap_services() {
ss::smp::invoke_on_all([] {
return storage::internal::chunks().start();
}).get();
_deferred.emplace_back([] {
ss::smp::invoke_on_all([] {
return storage::internal::chunks().stop();
}).get();
});
construct_service(stress_fiber_manager).get();
syschecks::systemd_message("Constructing storage services").get();
construct_single_service_sharded(
Expand Down
14 changes: 7 additions & 7 deletions src/v/resource_mgmt/memory_groups.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ bool datalake_enabled() {
}

struct memory_shares {
constexpr static size_t chunk_cache = 3;
constexpr static size_t kafka = 3;
constexpr static size_t rpc = 2;
constexpr static size_t recovery = 1;
constexpr static size_t tiered_storage = 1;
constexpr static size_t data_transforms = 1;
constexpr static size_t datalake = 1;
constexpr static size_t chunk_cache = 15;
constexpr static size_t kafka = 30;
constexpr static size_t rpc = 20;
constexpr static size_t recovery = 10;
constexpr static size_t tiered_storage = 10;
constexpr static size_t data_transforms = 10;
constexpr static size_t datalake = 10;

static size_t total_shares(bool with_wasm, bool with_datalake) {
size_t total = chunk_cache + kafka + rpc + recovery + tiered_storage;
Expand Down
22 changes: 11 additions & 11 deletions src/v/resource_mgmt/tests/memory_groups_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

static constexpr size_t total_shares_without_optionals = 10;
static constexpr size_t total_wasm_shares = 1;
static constexpr size_t total_datalake_shares = 1;
static constexpr size_t total_shares_without_optionals = 85;
static constexpr size_t total_wasm_shares = 10;
static constexpr size_t total_datalake_shares = 10;

// It's not really useful to know the exact byte values for each of these
// numbers so we just make sure we're within a MB
Expand Down Expand Up @@ -68,33 +68,33 @@ TEST_P(MemoryGroupSharesTest, DividesSharesCorrectly) {
}
EXPECT_THAT(
groups.chunk_cache_min_memory(),
IsApprox(total_available_memory * 1.0 / total_shares));
IsApprox(total_available_memory * 5.0 / total_shares));
EXPECT_THAT(
groups.chunk_cache_max_memory(),
IsApprox(total_available_memory * 3.0 / total_shares));
IsApprox(total_available_memory * 15.0 / total_shares));
EXPECT_THAT(
groups.tiered_storage_max_memory(),
IsApprox(total_available_memory * 1.0 / total_shares));
IsApprox(total_available_memory * 10.0 / total_shares));
EXPECT_THAT(
groups.recovery_max_memory(),
IsApprox(total_available_memory * 1.0 / total_shares));
IsApprox(total_available_memory * 10.0 / total_shares));
EXPECT_THAT(
groups.kafka_total_memory(),
IsApprox(total_available_memory * 3.0 / total_shares));
IsApprox(total_available_memory * 30.0 / total_shares));
EXPECT_THAT(
groups.rpc_total_memory(),
IsApprox(total_available_memory * 2.0 / total_shares));
IsApprox(total_available_memory * 20.0 / total_shares));
if (wasm_enabled()) {
EXPECT_THAT(
groups.data_transforms_max_memory(),
IsApprox(total_available_memory * 1.0 / total_shares));
IsApprox(total_available_memory * 10.0 / total_shares));
} else {
EXPECT_THAT(groups.data_transforms_max_memory(), 0);
}
if (datalake_enabled()) {
EXPECT_THAT(
groups.datalake_max_memory(),
IsApprox(total_available_memory * 1.0 / total_shares));
IsApprox(total_available_memory * 10.0 / total_shares));
} else {
EXPECT_THAT(groups.datalake_max_memory(), 0);
}
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ redpanda_cc_library(
deps = [
"//src/v/base",
"//src/v/container:intrusive",
"//src/v/metrics",
"//src/v/ssx:semaphore",
"//src/v/utils:named_type",
"@seastar",
Expand Down
51 changes: 47 additions & 4 deletions src/v/storage/chunk_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "storage/chunk_cache.h"

#include "config/configuration.h"
#include "metrics/prometheus_sanitize.h"
#include "resource_mgmt/memory_groups.h"

#include <seastar/core/loop.hh>
Expand All @@ -25,6 +26,7 @@ chunk_cache::chunk_cache() noexcept
, _chunk_size(config::shard_local_cfg().append_chunk_size()) {}

ss::future<> chunk_cache::start() {
setup_metrics();
const auto num_chunks = memory_groups().chunk_cache_min_memory()
/ _chunk_size;
return ss::do_for_each(
Expand All @@ -37,6 +39,38 @@ ss::future<> chunk_cache::start() {
});
}

ss::future<> chunk_cache::stop() {
_metrics.clear();
return ss::now();
}

void chunk_cache::setup_metrics() {
if (config::shard_local_cfg().disable_metrics()) {
return;
}

namespace sm = ss::metrics;
_metrics.add_group(
prometheus_sanitize::metrics_name("chunk_cache"),
{
sm::make_gauge(
"total_size_bytes",
[this] { return _size_total; },
sm::description("Total size of all segment appender chunks in any "
"state, in bytes.")),
sm::make_gauge(
"available_size_bytes",
[this] { return _size_available; },
sm::description("Total size of all free segment appender chunks in "
"the cache, in bytes.")),
sm::make_counter(
"wait_count",
[this] { return _wait_for_chunk_count; },
sm::description("Count of how many times we had to wait for a chunk "
"to become available")),
});
}

void chunk_cache::add(const chunk_ptr& chunk) {
if (_size_available >= _size_target) {
_size_total -= _chunk_size;
Expand All @@ -54,16 +88,25 @@ ss::future<chunk_cache::chunk_ptr> chunk_cache::get() {
if (!_sem.waiters()) {
return do_get();
}
return ss::get_units(_sem, 1).then(
[this](ssx::semaphore_units) { return do_get(); });

return wait_and_get();
}

ss::future<chunk_cache::chunk_ptr> chunk_cache::do_get() {
if (auto c = pop_or_allocate(); c) {
return ss::make_ready_future<chunk_ptr>(c);
}
return ss::get_units(_sem, 1).then(
[this](ssx::semaphore_units) { return do_get(); });

return wait_and_get();
}

ss::future<chunk_cache::chunk_ptr> chunk_cache::wait_and_get() {
auto fut = ss::get_units(_sem, 1);
if (_sem.waiters()) {
_wait_for_chunk_count++;
}

return fut.then([this](ssx::semaphore_units) { return do_get(); });
}

chunk_cache::chunk_ptr chunk_cache::pop_or_allocate() {
Expand Down
7 changes: 7 additions & 0 deletions src/v/storage/chunk_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once

#include "base/seastarx.h"
#include "metrics/metrics.h"
#include "ssx/semaphore.h"
#include "storage/segment_appender_chunk.h"

Expand Down Expand Up @@ -42,6 +43,7 @@ class chunk_cache {
~chunk_cache() noexcept = default;

ss::future<> start();
ss::future<> stop();

void add(const chunk_ptr& chunk);

Expand All @@ -51,6 +53,8 @@ class chunk_cache {

private:
ss::future<chunk_ptr> do_get();
void setup_metrics();
ss::future<chunk_cache::chunk_ptr> wait_and_get();

chunk_ptr pop_or_allocate();

Expand All @@ -62,6 +66,9 @@ class chunk_cache {
const size_t _size_limit;

const size_t _chunk_size{0};

size_t _wait_for_chunk_count{0};
metrics::internal_metric_groups _metrics;
};

chunk_cache& chunks();
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/tests/log_segment_appender_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ iobuf make_iobuf_with_char(size_t len, unsigned char c) {
return ret;
}

size_t default_chunk_size() { return internal::chunks().chunk_size(); }
size_t default_chunk_size() { return storage::internal::chunks().chunk_size(); }

} // namespace

Expand Down

0 comments on commit ad3206b

Please sign in to comment.