Skip to content

Commit

Permalink
Merge pull request redpanda-data#23662 from WillemKauf/tombstones_config
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf authored Nov 5, 2024
2 parents f515d8e + 817afe9 commit 8a949ed
Show file tree
Hide file tree
Showing 35 changed files with 693 additions and 65 deletions.
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
std::nullopt,
false,
std::nullopt,
tristate<std::chrono::milliseconds>{},
};

auto random_initial_revision_id
Expand Down
8 changes: 8 additions & 0 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "model/namespace.h"
#include "model/timestamp.h"
#include "storage/types.h"
#include "utils/tristate.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -318,6 +319,11 @@ metadata_cache::get_default_record_value_subject_name_strategy() const {
return pandaproxy::schema_registry::subject_name_strategy::topic_name;
}

std::optional<std::chrono::milliseconds>
metadata_cache::get_default_delete_retention_ms() const {
return config::shard_local_cfg().tombstone_retention_ms();
}

topic_properties metadata_cache::get_default_properties() const {
topic_properties tp;
tp.compression = {get_default_compression()};
Expand All @@ -335,6 +341,8 @@ topic_properties metadata_cache::get_default_properties() const {
get_default_retention_local_target_bytes()};
tp.retention_local_target_ms = tristate<std::chrono::milliseconds>{
get_default_retention_local_target_ms()};
tp.delete_retention_ms = tristate<std::chrono::milliseconds>{
get_default_delete_retention_ms()};

return tp;
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ class metadata_cache {
bool get_default_record_value_schema_id_validation() const;
pandaproxy::schema_registry::subject_name_strategy
get_default_record_value_subject_name_strategy() const;
std::optional<std::chrono::milliseconds>
get_default_delete_retention_ms() const;

topic_properties get_default_properties() const;
std::optional<partition_assignment>
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/topic_configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ storage::ntp_config topic_configuration::make_ntp_config(
.cloud_topic_enabled = properties.cloud_topic_enabled,
.iceberg_translation_interval_ms
= properties.iceberg_translation_interval_ms,
.tombstone_retention_ms = properties.delete_retention_ms,
});
}
return {
Expand Down
16 changes: 11 additions & 5 deletions src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"flush_ms: {}, "
"flush_bytes: {}, "
"remote_label: {}, iceberg_enabled: {}, "
"leaders_preference: {} ",
"iceberg_translation_interval_ms: {} ",
"leaders_preference: {}, "
"iceberg_translation_interval_ms: {}, "
"delete_retention_ms: {}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -77,7 +78,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.remote_label,
properties.iceberg_enabled,
properties.leaders_preference,
properties.iceberg_translation_interval_ms);
properties.iceberg_translation_interval_ms,
properties.delete_retention_ms);

if (config::shard_local_cfg().development_enable_cloud_topics()) {
fmt::print(
Expand Down Expand Up @@ -123,7 +125,8 @@ bool topic_properties::has_overrides() const {
|| flush_bytes.has_value() || remote_label.has_value()
|| (iceberg_enabled != storage::ntp_config::default_iceberg_enabled)
|| leaders_preference.has_value()
|| iceberg_translation_interval_ms.has_value();
|| iceberg_translation_interval_ms.has_value()
|| delete_retention_ms.is_engaged();

if (config::shard_local_cfg().development_enable_cloud_topics()) {
return overrides
Expand Down Expand Up @@ -166,6 +169,7 @@ topic_properties::get_ntp_cfg_overrides() const {
ret.iceberg_enabled = iceberg_enabled;
ret.cloud_topic_enabled = cloud_topic_enabled;
ret.iceberg_translation_interval_ms = iceberg_translation_interval_ms;
ret.tombstone_retention_ms = delete_retention_ms;
return ret;
}

Expand Down Expand Up @@ -257,7 +261,9 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
false,
std::nullopt,
false,
std::nullopt};
std::nullopt,
tristate<std::chrono::milliseconds>{disable_tristate},
};
}

} // namespace reflection
11 changes: 8 additions & 3 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ struct topic_properties
bool iceberg_enabled,
std::optional<config::leaders_preference> leaders_preference,
bool cloud_topic_enabled,
std::optional<std::chrono::milliseconds> iceberg_translation_interval)
std::optional<std::chrono::milliseconds> iceberg_translation_interval,
tristate<std::chrono::milliseconds> delete_retention_ms)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -116,7 +117,8 @@ struct topic_properties
, iceberg_enabled(iceberg_enabled)
, leaders_preference(std::move(leaders_preference))
, cloud_topic_enabled(cloud_topic_enabled)
, iceberg_translation_interval_ms(iceberg_translation_interval) {}
, iceberg_translation_interval_ms(iceberg_translation_interval)
, delete_retention_ms(delete_retention_ms) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -191,6 +193,8 @@ struct topic_properties

std::optional<std::chrono::milliseconds> iceberg_translation_interval_ms;

tristate<std::chrono::milliseconds> delete_retention_ms{disable_tristate};

bool is_compacted() const;
bool has_overrides() const;
bool requires_remote_erase() const;
Expand Down Expand Up @@ -236,7 +240,8 @@ struct topic_properties
iceberg_enabled,
leaders_preference,
cloud_topic_enabled,
iceberg_translation_interval_ms);
iceberg_translation_interval_ms,
delete_retention_ms);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down
27 changes: 27 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ topic_table::apply(create_topic_cmd cmd, model::offset offset) {
schema_id_validation_validator::ec);
}

if (!topic_multi_property_validation(cmd.value.cfg.properties)) {
return ss::make_ready_future<std::error_code>(
errc::topic_invalid_config);
}

std::optional<model::initial_revision_id> remote_revision
= cmd.value.cfg.properties.remote_topic_properties
? std::make_optional(
Expand Down Expand Up @@ -796,6 +801,22 @@ std::error_code topic_table::validate_force_reconfigurable_partitions(
return result;
}

bool topic_table::topic_multi_property_validation(
const topic_properties& properties) const {
// delete.retention.ms validation. Cannot be enabled alongside tiered
// storage.
if (!properties.delete_retention_ms.is_disabled()) {
if (
properties.shadow_indexing.has_value()
&& properties.shadow_indexing.value()
!= model::shadow_indexing_mode::disabled) {
return false;
}
}

return true;
}

template<typename T>
void incremental_update(
std::optional<T>& property, property_update<std::optional<T>> override) {
Expand Down Expand Up @@ -1028,6 +1049,8 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
incremental_update(
updated_properties.iceberg_translation_interval_ms,
overrides.iceberg_translation_interval_ms);
incremental_update(
updated_properties.delete_retention_ms, overrides.delete_retention_ms);

auto& properties = tp->second.get_configuration().properties;
// no configuration change, no need to generate delta
Expand All @@ -1039,6 +1062,10 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
co_return schema_id_validation_validator::ec;
}

if (!topic_multi_property_validation(updated_properties)) {
co_return make_error_code(errc::topic_invalid_config);
}

// Apply the changes
properties = std::move(updated_properties);

Expand Down
9 changes: 9 additions & 0 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,15 @@ class topic_table {
std::error_code validate_force_reconfigurable_partition(
const ntp_with_majority_loss&) const;

// Validation for the final property configuration from a
// update_topic_properties_cmd application. Allows user to perform
// validations that depend on more than one topic property.
//
// Returns true if the configured topic_properties is valid, and false
// otherwise.
bool
topic_multi_property_validation(const topic_properties& properties) const;

underlying_t _topics;
lifecycle_markers_t _lifecycle_markers;
disabled_partitions_t _disabled_partitions;
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ struct incremental_topic_updates
leaders_preference;
property_update<std::optional<std::chrono::milliseconds>>
iceberg_translation_interval_ms;
property_update<tristate<std::chrono::milliseconds>> delete_retention_ms;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
Expand Down Expand Up @@ -674,7 +675,8 @@ struct incremental_topic_updates
leaders_preference,
remote_read,
remote_write,
iceberg_translation_interval_ms);
iceberg_translation_interval_ms,
delete_retention_ms);
}

friend std::ostream&
Expand Down
4 changes: 3 additions & 1 deletion src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "model/tests/randoms.h"
#include "random/generators.h"
#include "test_utils/randoms.h"
#include "utils/tristate.h"

namespace compat {

Expand Down Expand Up @@ -653,7 +654,8 @@ struct instance_generator<cluster::topic_properties> {
false,
std::nullopt,
false,
std::nullopt};
std::nullopt,
tristate<std::chrono::milliseconds>{disable_tristate}};
}

static std::vector<cluster::topic_properties> limits() { return {}; }
Expand Down
2 changes: 2 additions & 0 deletions src/v/compat/cluster_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ inline void rjson_serialize(
w,
"iceberg_translation_interval_ms",
tps.iceberg_translation_interval_ms);
write_member(w, "delete_retention_ms", tps.delete_retention_ms);
w.EndObject();
}

Expand Down Expand Up @@ -705,6 +706,7 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) {
rd,
"iceberg_translation_interval_ms",
obj.iceberg_translation_interval_ms);
read_member(rd, "delete_retention_ms", obj.delete_retention_ms);
}

inline void rjson_serialize(
Expand Down
11 changes: 10 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,6 @@ configuration::configuration()
.visibility = visibility::user},
{},
validate_connection_rate)

, transactional_id_expiration_ms(
*this,
"transactional_id_expiration_ms",
Expand Down Expand Up @@ -945,6 +944,16 @@ configuration::configuration()
"How often to trigger background compaction.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
10s)
, tombstone_retention_ms(
*this,
"tombstone_retention_ms",
"The retention time for tombstone records in a compacted topic. Cannot "
"be enabled at the same time as any of `cloud_storage_enabled`, "
"`cloud_storage_enable_remote_read`, or "
"`cloud_storage_enable_remote_write`.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
std::nullopt,
validate_tombstone_retention_ms)
, log_disable_housekeeping_for_tests(
*this,
"log_disable_housekeeping_for_tests",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ struct configuration final : public config_store {
// same as log.retention.ms in kafka
retention_duration_property log_retention_ms;
property<std::chrono::milliseconds> log_compaction_interval_ms;
// same as delete.retention.ms in kafka
property<std::optional<std::chrono::milliseconds>> tombstone_retention_ms;
property<bool> log_disable_housekeeping_for_tests;
property<bool> log_compaction_use_sliding_window;
// same as retention.size in kafka - TODO: size not implemented
Expand Down
36 changes: 36 additions & 0 deletions src/v/config/validators.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "config/configuration.h"
#include "model/namespace.h"
#include "model/validation.h"
#include "serde/rw/chrono.h"
#include "ssx/sformat.h"
#include "utils/inet_address_wrapper.h"

Expand Down Expand Up @@ -247,4 +248,39 @@ validate_cloud_storage_api_endpoint(const std::optional<ss::sstring>& os) {
return std::nullopt;
}

std::optional<ss::sstring> validate_tombstone_retention_ms(
const std::optional<std::chrono::milliseconds>& ms) {
if (ms.has_value()) {
// For simplicity's sake, cloud storage enable/read/write permissions
// cannot be enabled at the same time as tombstone_retention_ms at the
// cluster level, to avoid the case in which redpanda refuses to create
// new, misconfigured topics due to cluster defaults
const auto& cloud_storage_enabled
= config::shard_local_cfg().cloud_storage_enabled;
const auto& cloud_storage_remote_write
= config::shard_local_cfg().cloud_storage_enable_remote_write;
const auto& cloud_storage_remote_read
= config::shard_local_cfg().cloud_storage_enable_remote_read;
if (
cloud_storage_enabled() || cloud_storage_remote_write()
|| cloud_storage_remote_read()) {
return fmt::format(
"cannot set {} if any of ({}, {}, {}) are enabled at the cluster "
"level",
config::shard_local_cfg().tombstone_retention_ms.name(),
cloud_storage_enabled.name(),
cloud_storage_remote_write.name(),
cloud_storage_remote_read.name());
}

if (ms.value() < 1ms || ms.value() > serde::max_serializable_ms) {
return fmt::format(
"tombstone_retention_ms should be in range: [1, {}]",
serde::max_serializable_ms);
}
}

return std::nullopt;
}

}; // namespace config
3 changes: 3 additions & 0 deletions src/v/config/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ validate_audit_excluded_topics(const std::vector<ss::sstring>&);
std::optional<ss::sstring>
validate_cloud_storage_api_endpoint(const std::optional<ss::sstring>& os);

std::optional<ss::sstring> validate_tombstone_retention_ms(
const std::optional<std::chrono::milliseconds>& ms);

}; // namespace config
17 changes: 16 additions & 1 deletion src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ create_topic_properties_update(
std::apply(apply_op(op_t::none), update.custom_properties.serde_fields());

static_assert(
std::tuple_size_v<decltype(update.properties.serde_fields())> == 31,
std::tuple_size_v<decltype(update.properties.serde_fields())> == 32,
"If you added a property, please decide on it's default alter config "
"policy, and handle the update in the loop below");
static_assert(
Expand Down Expand Up @@ -115,6 +115,13 @@ create_topic_properties_update(
= update.properties.get_shadow_indexing();
update_properties_shadow_indexing.op = op_t::none;

/*
Likewise, delete.retention.ms should be prevented from being changed
unless explicitly requested, due to tight coupling with shadow indexing
properties.
*/
update.properties.delete_retention_ms.op = op_t::none;

// Now that the defaults are set, continue to set properties from the
// request

Expand Down Expand Up @@ -349,6 +356,14 @@ create_topic_properties_update(
}
throw validation_error("Cloud topics is not enabled");
}
if (cfg.name == topic_property_delete_retention_ms) {
parse_and_set_tristate(
update.properties.delete_retention_ms,
cfg.value,
kafka::config_resource_operation::set,
delete_retention_ms_validator{});
continue;
}

if (cfg.name == topic_property_iceberg_translation_interval_ms) {
parse_and_set_optional_duration(
Expand Down
Loading

0 comments on commit 8a949ed

Please sign in to comment.