From e100b878a1d5abf085b989d33fae5797f49dbc2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Fri, 29 Nov 2024 13:19:32 +0000 Subject: [PATCH 1/4] cluster: consider shard0 reserve in check_cluster_limits Improve the user error feedback when the `topic_partitions_reserve_shard0` cluster config is used and a user tried to allocate a topic that is above the partition limits. Previously this check was only considered as part of the `max_final_capacity` hard constraint, which meant that the kafka error message was more vague (No nodes are available to perform allocation after hard constraints were solved) and there were no clear broker logs to indicate this. Now this is also considered inside `check_cluster_limits` which leads to more specific error messages on both the kafka api (unable to create topic with 20 partitions due to hardware constraints) and in broker logs: ``` WARN 2024-11-29 13:18:13,907 [shard 0:main] cluster - partition_allocator.cc:183 - Refusing to create 20 partitions as total partition count 20 would exceed the core-based limit 18 (per-shard limit: 20, shard0 reservation: 2) ``` (cherry picked from commit b632190e2596c3bd651d3e0e21818e2a5f3b2c10) --- src/v/cluster/scheduling/partition_allocator.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index 3d4945d05507..8dad82089115 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -169,15 +169,19 @@ std::error_code partition_allocator::check_cluster_limits( // Refuse to create a partition count that would violate the per-core // limit. - const uint64_t core_limit = (effective_cpu_count * _partitions_per_shard()); + const uint64_t core_limit = (effective_cpu_count * _partitions_per_shard()) + - (broker_count * _partitions_reserve_shard0()); if (proposed_total_partitions > core_limit) { vlog( clusterlog.warn, "Refusing to create {} partitions as total partition count {} would " - "exceed core limit {}", + "exceed the core-based limit {} (per-shard limit: {}, shard0 " + "reservation: {})", new_partitions_replicas_requested, proposed_total_partitions, - effective_cpu_count * _partitions_per_shard()); + core_limit, + _partitions_per_shard(), + _partitions_reserve_shard0()); return errc::topic_invalid_partitions_core_limit; } From bbe2cb94061f4c460740fc3c50a73f0fe4cfbd13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Mon, 2 Dec 2024 20:00:26 +0000 Subject: [PATCH 2/4] cluster/test: configure reserve_shard0 in fixtures (cherry picked from commit cccb53dffd49fe594f8efcb78998d1365c17fe76) --- src/v/cluster/tests/partition_allocator_fixture.h | 6 +++--- src/v/cluster/tests/partition_allocator_tests.cc | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/tests/partition_allocator_fixture.h b/src/v/cluster/tests/partition_allocator_fixture.h index e56d7807fefe..97726b9b34d3 100644 --- a/src/v/cluster/tests/partition_allocator_fixture.h +++ b/src/v/cluster/tests/partition_allocator_fixture.h @@ -32,7 +32,6 @@ struct partition_allocator_fixture { static constexpr uint32_t partitions_per_shard = 1000; - static constexpr uint32_t partitions_reserve_shard0 = 2; partition_allocator_fixture() : partition_allocator_fixture(std::nullopt, std::nullopt) {} @@ -68,7 +67,7 @@ struct partition_allocator_fixture { broker.id(), broker.properties().cores, config::mock_binding(uint32_t{partitions_per_shard}), - config::mock_binding(uint32_t{partitions_reserve_shard0}), + partitions_reserve_shard0.bind(), kafka_internal_topics.bind())); } @@ -138,6 +137,7 @@ struct partition_allocator_fixture { cluster::partition_allocator& allocator() { return _allocator.local(); } config::mock_property> kafka_internal_topics{{}}; + config::mock_property partitions_reserve_shard0{2}; model::topic_namespace tn{model::kafka_namespace, model::topic{"test"}}; ss::sharded members; ss::sharded features; @@ -158,7 +158,7 @@ struct partition_allocator_fixture { config::mock_binding>(memory_per_partition), config::mock_binding>(fds_per_partition), config::mock_binding(uint32_t{partitions_per_shard}), - config::mock_binding(uint32_t{partitions_reserve_shard0}), + partitions_reserve_shard0.bind(), kafka_internal_topics.bind(), config::mock_binding(true)) .get(); diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 13992e981a04..525b5da00fd3 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -530,7 +530,7 @@ FIXTURE_TEST(updating_nodes_properties, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( it->second->max_capacity(), 10 * partition_allocator_fixture::partitions_per_shard - - partition_allocator_fixture::partitions_reserve_shard0); + - partitions_reserve_shard0()); } FIXTURE_TEST(change_replication_factor, partition_allocator_fixture) { From bc61df1862959f5642f4e13da9a519e1cd0156f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Wed, 4 Dec 2024 17:07:21 +0000 Subject: [PATCH 3/4] cluster: extract `allocation_node::is_internal_topic` Pure refactor. Extract for reuse in the next commit. (cherry picked from commit 4b4f6a2ac8e28e007983de7803e99dc6eefa3bd1) --- src/v/cluster/scheduling/allocation_node.cc | 30 +++++++++++---------- src/v/cluster/scheduling/allocation_node.h | 7 +++++ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/v/cluster/scheduling/allocation_node.cc b/src/v/cluster/scheduling/allocation_node.cc index 4f3436fc0427..e299b3292057 100644 --- a/src/v/cluster/scheduling/allocation_node.cc +++ b/src/v/cluster/scheduling/allocation_node.cc @@ -50,24 +50,26 @@ allocation_node::allocation_node( }); } -bool allocation_node::is_full( - const model::ntp& ntp, bool will_add_allocation) const { - // Internal topics are excluded from checks to prevent allocation failures - // when creating them. This is okay because they are fairly small in number - // compared to kafka user topic partitions. +bool allocation_node::is_internal_topic( + const config::binding>& internal_kafka_topics, + model::topic_namespace_view ntp) { auto is_internal_ns = ntp.ns == model::redpanda_ns || ntp.ns == model::kafka_internal_namespace; if (is_internal_ns) { - return false; + return true; } - const auto& internal_topics = _internal_kafka_topics(); - auto is_internal_topic = ntp.ns == model::kafka_namespace - && std::any_of( - internal_topics.cbegin(), - internal_topics.cend(), - [&ntp](const ss::sstring& topic) { - return topic == ntp.tp.topic(); - }); + const auto& internal_topics = internal_kafka_topics(); + return ntp.ns == model::kafka_namespace + && std::any_of( + internal_topics.cbegin(), + internal_topics.cend(), + [&ntp](const ss::sstring& topic) { return topic == ntp.tp; }); +} + +bool allocation_node::is_full( + const model::ntp& ntp, bool will_add_allocation) const { + auto is_internal_topic = allocation_node::is_internal_topic( + _internal_kafka_topics, model::topic_namespace_view{ntp}); auto count = _allocated_partitions; if (will_add_allocation) { diff --git a/src/v/cluster/scheduling/allocation_node.h b/src/v/cluster/scheduling/allocation_node.h index afe792f253ba..37cb38d49782 100644 --- a/src/v/cluster/scheduling/allocation_node.h +++ b/src/v/cluster/scheduling/allocation_node.h @@ -126,6 +126,13 @@ class allocation_node { } bool is_full(const model::ntp&, bool will_add_allocation) const; + // Internal topics are excluded from checks to prevent allocation failures + // when creating them. This is okay because they are fairly small in number + // compared to kafka user topic partitions. + static bool is_internal_topic( + const config::binding>& internal_kafka_topics, + model::topic_namespace_view ntp); + private: friend allocation_state; From 5fe9620149b34c58fc366cab96028ce5612dfc40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Mon, 2 Dec 2024 19:46:32 +0000 Subject: [PATCH 4/4] cluster: exclude internal topics from `check_cluster_limits` Internal topics are excluded from checks to prevent allocation failures when creating them. This is to ensure that lazy-allocated internal topics (eg. the transactions topic) can always be created. This excludes them from the global `check_cluster_limits`. There has already been a fixture test to effectively test that internal topics are excluded from the limit checks, however, it erroniously relied on the fact that the shard0 reservations were not considered in `check_cluster_limits` to allow the test to pass. (See `allocation_over_capacity` and the previous commit.) This adds a new test to validate that internal topics can be created even with partitions that are above the global shard0 reservation. (cherry picked from commit 19bc4f200c296576b5eebd6e3f0ab2bab02ead67) --- .../cluster/scheduling/partition_allocator.cc | 11 +++++++--- .../cluster/scheduling/partition_allocator.h | 4 +++- .../tests/partition_allocator_tests.cc | 22 +++++++++++++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index 8dad82089115..aee6e10f6b0f 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -109,11 +109,16 @@ allocation_constraints partition_allocator::default_constraints() { * with partitions that cannot be re-accommodated on smaller peers). */ std::error_code partition_allocator::check_cluster_limits( - const uint64_t new_partitions_replicas_requested) const { + const uint64_t new_partitions_replicas_requested, + const model::topic_namespace& topic) const { if (_members.local().nodes().empty()) { // Empty members table, we're probably running in a unit test return errc::success; } + if (allocation_node::is_internal_topic(_internal_kafka_topics, topic)) { + return errc::success; + } + // Calculate how many partition-replicas already exist, so that we can // check if the new topic would take us past any limits. uint64_t existent_partitions{0}; @@ -247,7 +252,7 @@ partition_allocator::allocate(simple_allocation_request simple_req) { const uint64_t create_count = static_cast(simple_req.additional_partitions) * static_cast(simple_req.replication_factor); - auto cluster_errc = check_cluster_limits(create_count); + auto cluster_errc = check_cluster_limits(create_count, simple_req.tp_ns); if (cluster_errc) { co_return cluster_errc; } @@ -280,7 +285,7 @@ partition_allocator::allocate(allocation_request request) { } } - auto cluster_errc = check_cluster_limits(create_count); + auto cluster_errc = check_cluster_limits(create_count, request._nt); if (cluster_errc) { co_return cluster_errc; } diff --git a/src/v/cluster/scheduling/partition_allocator.h b/src/v/cluster/scheduling/partition_allocator.h index 921b7e2a6b23..187646750b05 100644 --- a/src/v/cluster/scheduling/partition_allocator.h +++ b/src/v/cluster/scheduling/partition_allocator.h @@ -18,6 +18,7 @@ #include "cluster/scheduling/types.h" #include "config/property.h" #include "features/fwd.h" +#include "model/metadata.h" namespace cluster { @@ -152,7 +153,8 @@ class partition_allocator { // new_partitions_replicas_requested represents the total number of // partitions requested by a request. i.e. partitions * replicas requested. std::error_code check_cluster_limits( - const uint64_t new_partitions_replicas_requested) const; + const uint64_t new_partitions_replicas_requested, + const model::topic_namespace& topic) const; ss::future> do_allocate(allocation_request); diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 525b5da00fd3..25b6e7e1cfda 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -194,6 +194,28 @@ FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) { allocator().allocate(make_allocation_request(int_2, 1, 1)).get()); } +FIXTURE_TEST( + allocation_over_capacity_without_shard0, partition_allocator_fixture) { + // Disable shard0 reservations + partitions_reserve_shard0.update(0); + + register_node(0, 6); + register_node(1, 6); + register_node(2, 6); + + saturate_all_machines(); + auto gr = allocator().state().last_group_id(); + BOOST_REQUIRE( + allocator().allocate(make_allocation_request(1, 1)).get().has_error()); + // group id hasn't changed + BOOST_REQUIRE_EQUAL(allocator().state().last_group_id(), gr); + + // Make the topic internal and retry, should work. + kafka_internal_topics.update({tn.tp()}); + BOOST_REQUIRE(allocator().allocate(make_allocation_request(1, 1)).get()); + BOOST_REQUIRE_GT(allocator().state().last_group_id(), gr); +} + FIXTURE_TEST(max_allocation, partition_allocator_fixture) { register_node(0, 2); register_node(1, 2);