diff --git a/src/v/cluster/scheduling/allocation_node.cc b/src/v/cluster/scheduling/allocation_node.cc index 4f3436fc0427d..e299b32920579 100644 --- a/src/v/cluster/scheduling/allocation_node.cc +++ b/src/v/cluster/scheduling/allocation_node.cc @@ -50,24 +50,26 @@ allocation_node::allocation_node( }); } -bool allocation_node::is_full( - const model::ntp& ntp, bool will_add_allocation) const { - // Internal topics are excluded from checks to prevent allocation failures - // when creating them. This is okay because they are fairly small in number - // compared to kafka user topic partitions. +bool allocation_node::is_internal_topic( + const config::binding>& internal_kafka_topics, + model::topic_namespace_view ntp) { auto is_internal_ns = ntp.ns == model::redpanda_ns || ntp.ns == model::kafka_internal_namespace; if (is_internal_ns) { - return false; + return true; } - const auto& internal_topics = _internal_kafka_topics(); - auto is_internal_topic = ntp.ns == model::kafka_namespace - && std::any_of( - internal_topics.cbegin(), - internal_topics.cend(), - [&ntp](const ss::sstring& topic) { - return topic == ntp.tp.topic(); - }); + const auto& internal_topics = internal_kafka_topics(); + return ntp.ns == model::kafka_namespace + && std::any_of( + internal_topics.cbegin(), + internal_topics.cend(), + [&ntp](const ss::sstring& topic) { return topic == ntp.tp; }); +} + +bool allocation_node::is_full( + const model::ntp& ntp, bool will_add_allocation) const { + auto is_internal_topic = allocation_node::is_internal_topic( + _internal_kafka_topics, model::topic_namespace_view{ntp}); auto count = _allocated_partitions; if (will_add_allocation) { diff --git a/src/v/cluster/scheduling/allocation_node.h b/src/v/cluster/scheduling/allocation_node.h index afe792f253ba3..37cb38d49782d 100644 --- a/src/v/cluster/scheduling/allocation_node.h +++ b/src/v/cluster/scheduling/allocation_node.h @@ -126,6 +126,13 @@ class allocation_node { } bool is_full(const model::ntp&, bool will_add_allocation) const; + // Internal topics are excluded from checks to prevent allocation failures + // when creating them. This is okay because they are fairly small in number + // compared to kafka user topic partitions. + static bool is_internal_topic( + const config::binding>& internal_kafka_topics, + model::topic_namespace_view ntp); + private: friend allocation_state; diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index 3d4945d05507a..aee6e10f6b0f5 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -109,11 +109,16 @@ allocation_constraints partition_allocator::default_constraints() { * with partitions that cannot be re-accommodated on smaller peers). */ std::error_code partition_allocator::check_cluster_limits( - const uint64_t new_partitions_replicas_requested) const { + const uint64_t new_partitions_replicas_requested, + const model::topic_namespace& topic) const { if (_members.local().nodes().empty()) { // Empty members table, we're probably running in a unit test return errc::success; } + if (allocation_node::is_internal_topic(_internal_kafka_topics, topic)) { + return errc::success; + } + // Calculate how many partition-replicas already exist, so that we can // check if the new topic would take us past any limits. uint64_t existent_partitions{0}; @@ -169,15 +174,19 @@ std::error_code partition_allocator::check_cluster_limits( // Refuse to create a partition count that would violate the per-core // limit. - const uint64_t core_limit = (effective_cpu_count * _partitions_per_shard()); + const uint64_t core_limit = (effective_cpu_count * _partitions_per_shard()) + - (broker_count * _partitions_reserve_shard0()); if (proposed_total_partitions > core_limit) { vlog( clusterlog.warn, "Refusing to create {} partitions as total partition count {} would " - "exceed core limit {}", + "exceed the core-based limit {} (per-shard limit: {}, shard0 " + "reservation: {})", new_partitions_replicas_requested, proposed_total_partitions, - effective_cpu_count * _partitions_per_shard()); + core_limit, + _partitions_per_shard(), + _partitions_reserve_shard0()); return errc::topic_invalid_partitions_core_limit; } @@ -243,7 +252,7 @@ partition_allocator::allocate(simple_allocation_request simple_req) { const uint64_t create_count = static_cast(simple_req.additional_partitions) * static_cast(simple_req.replication_factor); - auto cluster_errc = check_cluster_limits(create_count); + auto cluster_errc = check_cluster_limits(create_count, simple_req.tp_ns); if (cluster_errc) { co_return cluster_errc; } @@ -276,7 +285,7 @@ partition_allocator::allocate(allocation_request request) { } } - auto cluster_errc = check_cluster_limits(create_count); + auto cluster_errc = check_cluster_limits(create_count, request._nt); if (cluster_errc) { co_return cluster_errc; } diff --git a/src/v/cluster/scheduling/partition_allocator.h b/src/v/cluster/scheduling/partition_allocator.h index 921b7e2a6b23b..187646750b05c 100644 --- a/src/v/cluster/scheduling/partition_allocator.h +++ b/src/v/cluster/scheduling/partition_allocator.h @@ -18,6 +18,7 @@ #include "cluster/scheduling/types.h" #include "config/property.h" #include "features/fwd.h" +#include "model/metadata.h" namespace cluster { @@ -152,7 +153,8 @@ class partition_allocator { // new_partitions_replicas_requested represents the total number of // partitions requested by a request. i.e. partitions * replicas requested. std::error_code check_cluster_limits( - const uint64_t new_partitions_replicas_requested) const; + const uint64_t new_partitions_replicas_requested, + const model::topic_namespace& topic) const; ss::future> do_allocate(allocation_request); diff --git a/src/v/cluster/tests/partition_allocator_fixture.h b/src/v/cluster/tests/partition_allocator_fixture.h index e56d7807fefef..97726b9b34d32 100644 --- a/src/v/cluster/tests/partition_allocator_fixture.h +++ b/src/v/cluster/tests/partition_allocator_fixture.h @@ -32,7 +32,6 @@ struct partition_allocator_fixture { static constexpr uint32_t partitions_per_shard = 1000; - static constexpr uint32_t partitions_reserve_shard0 = 2; partition_allocator_fixture() : partition_allocator_fixture(std::nullopt, std::nullopt) {} @@ -68,7 +67,7 @@ struct partition_allocator_fixture { broker.id(), broker.properties().cores, config::mock_binding(uint32_t{partitions_per_shard}), - config::mock_binding(uint32_t{partitions_reserve_shard0}), + partitions_reserve_shard0.bind(), kafka_internal_topics.bind())); } @@ -138,6 +137,7 @@ struct partition_allocator_fixture { cluster::partition_allocator& allocator() { return _allocator.local(); } config::mock_property> kafka_internal_topics{{}}; + config::mock_property partitions_reserve_shard0{2}; model::topic_namespace tn{model::kafka_namespace, model::topic{"test"}}; ss::sharded members; ss::sharded features; @@ -158,7 +158,7 @@ struct partition_allocator_fixture { config::mock_binding>(memory_per_partition), config::mock_binding>(fds_per_partition), config::mock_binding(uint32_t{partitions_per_shard}), - config::mock_binding(uint32_t{partitions_reserve_shard0}), + partitions_reserve_shard0.bind(), kafka_internal_topics.bind(), config::mock_binding(true)) .get(); diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 13992e981a04e..25b6e7e1cfdaa 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -194,6 +194,28 @@ FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) { allocator().allocate(make_allocation_request(int_2, 1, 1)).get()); } +FIXTURE_TEST( + allocation_over_capacity_without_shard0, partition_allocator_fixture) { + // Disable shard0 reservations + partitions_reserve_shard0.update(0); + + register_node(0, 6); + register_node(1, 6); + register_node(2, 6); + + saturate_all_machines(); + auto gr = allocator().state().last_group_id(); + BOOST_REQUIRE( + allocator().allocate(make_allocation_request(1, 1)).get().has_error()); + // group id hasn't changed + BOOST_REQUIRE_EQUAL(allocator().state().last_group_id(), gr); + + // Make the topic internal and retry, should work. + kafka_internal_topics.update({tn.tp()}); + BOOST_REQUIRE(allocator().allocate(make_allocation_request(1, 1)).get()); + BOOST_REQUIRE_GT(allocator().state().last_group_id(), gr); +} + FIXTURE_TEST(max_allocation, partition_allocator_fixture) { register_node(0, 2); register_node(1, 2); @@ -530,7 +552,7 @@ FIXTURE_TEST(updating_nodes_properties, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( it->second->max_capacity(), 10 * partition_allocator_fixture::partitions_per_shard - - partition_allocator_fixture::partitions_reserve_shard0); + - partitions_reserve_shard0()); } FIXTURE_TEST(change_replication_factor, partition_allocator_fixture) {