Skip to content

Commit

Permalink
Merge pull request redpanda-data#23231 from WillemKauf/tombstone_impl…
Browse files Browse the repository at this point in the history
…ementation

[CORE-7229] `storage`: add tombstone deletion implementation to local storage compaction
  • Loading branch information
WillemKauf authored Sep 18, 2024
2 parents d416647 + c4fa727 commit 2e1cdb6
Show file tree
Hide file tree
Showing 33 changed files with 1,126 additions and 99 deletions.
3 changes: 3 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloud) {
model::timestamp::min(),
1,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down Expand Up @@ -516,6 +517,7 @@ TEST_P(CloudStorageEndToEndManualTest, TestTimequeryAfterArchivalGC) {
model::timestamp::min(),
1, // max_bytes_in_log
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down Expand Up @@ -809,6 +811,7 @@ TEST_P(EndToEndFixture, TestCloudStorageTimequery) {
model::timestamp::max(),
0,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage/tests/read_replica_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ FIXTURE_TEST(test_read_replica_basic_sync, read_replica_e2e_fixture) {
topic_name, model::partition_id(0), next)
.get();
BOOST_REQUIRE(!consumed_records.empty());
for (const auto& [k, v] : consumed_records) {
BOOST_REQUIRE_EQUAL(k, ssx::sformat("key{}", next()));
for (const auto& kv : consumed_records) {
BOOST_REQUIRE_EQUAL(kv.key, ssx::sformat("key{}", next()));
next += model::offset(1);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/async_data_uploader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class async_data_uploader_fixture : public redpanda_thread_fixture {
model::timestamp::min(),
std::nullopt,
max_collect_offset,
std::nullopt,
ss::default_priority_class(),
as);

Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ struct reupload_fixture : public archiver_fixture {
model::timestamp::max(),
std::nullopt,
max_collectible,
std::nullopt,
ss::default_priority_class(),
abort_source})
.get();
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ FIXTURE_TEST(
model::timestamp::now(),
std::nullopt,
model::offset{999},
std::nullopt,
ss::default_priority_class(),
as))
.get0();
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/tests/manual_log_deletion_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ struct manual_deletion_fixture : public raft_test_fixture {
retention_timestamp,
100_MiB,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as,
storage::ntp_sanitizer_config{.sanitize_only = true}))
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/tests/tx_compaction_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class tx_executor {
model::timestamp::now().value() - ret_duration.count()),
std::nullopt,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
dummy_as,
})
Expand Down Expand Up @@ -228,6 +229,7 @@ class tx_executor {
model::timestamp::min(),
std::nullopt,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as);
// Compacts until a single sealed segment remains, other than the
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/tests/group_tx_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ ss::future<> run_workload(
model::timestamp::max(),
std::nullopt,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
dummy_as,
})
Expand Down
24 changes: 17 additions & 7 deletions src/v/kafka/server/tests/produce_consume_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,16 @@ kafka_produce_transport::produce_partition_requests(
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));
kafka::produce_request::partition partition;
for (auto& [k, v] : records) {
for (auto& kv : records) {
const auto& k = kv.key;
const auto& v_opt = kv.val;
iobuf key_buf;
key_buf.append(k.data(), k.size());
iobuf val_buf;
val_buf.append(v.data(), v.size());
std::optional<iobuf> val_buf;
if (v_opt.has_value()) {
const auto& v = v_opt.value();
val_buf = iobuf::from({v.data(), v.size()});
}
builder.add_raw_kv(std::move(key_buf), std::move(val_buf));
}
if (ts.has_value()) {
Expand Down Expand Up @@ -169,10 +174,15 @@ ss::future<pid_to_kvs_map_t> kafka_consume_transport::consume(
auto& records_for_partition = ret[partition.partition_index];
for (auto& r : records) {
iobuf_const_parser key_buf(r.key());
iobuf_const_parser val_buf(r.value());
records_for_partition.emplace_back(kv_t{
key_buf.read_string(key_buf.bytes_left()),
val_buf.read_string(val_buf.bytes_left())});
auto key_str = key_buf.read_string(key_buf.bytes_left());
if (r.is_tombstone()) {
records_for_partition.emplace_back(key_str);
} else {
iobuf_const_parser val_buf(r.value());
auto val_str = val_buf.read_string(
val_buf.bytes_left());
records_for_partition.emplace_back(key_str, val_str);
}
}
}
}
Expand Down
21 changes: 16 additions & 5 deletions src/v/kafka/server/tests/produce_consume_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,29 @@ namespace tests {

struct kv_t {
ss::sstring key;
ss::sstring val;

std::optional<ss::sstring> val;
friend std::ostream& operator<<(std::ostream& o, const kv_t& kv);

kv_t(ss::sstring k, ss::sstring v)
: key(std::move(k))
, val(std::move(v)) {}

kv_t(ss::sstring k)
: key(std::move(k))
, val(std::nullopt) {}

friend bool operator==(const kv_t& l, const kv_t& r) {
return std::tie(l.key, l.val) == std::tie(r.key, r.val);
}

bool is_tombstone() const { return !val.has_value(); }

static std::vector<kv_t> sequence(
size_t start,
size_t num_records,
std::optional<size_t> val_start = std::nullopt,
size_t key_cardinality = 0) {
size_t key_cardinality = 0,
bool produce_tombstones = false) {
size_t vstart = val_start.value_or(start);
std::vector<kv_t> records;
records.reserve(num_records);
Expand All @@ -45,8 +51,13 @@ struct kv_t {
if (key_cardinality > 0) {
key = key % key_cardinality;
}
records.emplace_back(
ssx::sformat("key{}", key), ssx::sformat("val{}", vstart + i));
auto key_str = ssx::sformat("key{}", key);
if (produce_tombstones) {
records.emplace_back(std::move(key_str));
} else {
records.emplace_back(
std::move(key_str), ssx::sformat("val{}", vstart + i));
}
}
return records;
}
Expand Down
3 changes: 2 additions & 1 deletion src/v/model/record.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class record {
iobuf release_value() { return std::exchange(_value, {}); }
iobuf share_value() { return _value.share(0, _value.size_bytes()); }
bool has_value() const { return _val_size >= 0; }
bool is_tombstone() const { return !has_value(); }

const std::vector<record_header>& headers() const { return _headers; }
std::vector<record_header>& headers() { return _headers; }
Expand Down Expand Up @@ -360,7 +361,7 @@ class record_batch_attributes final {
record_batch_attributes& operator|=(model::compression c) {
// clang-format off
_attributes |=
static_cast<std::underlying_type_t<model::compression>>(c)
static_cast<std::underlying_type_t<model::compression>>(c)
& record_batch_attributes::compression_mask;
// clang-format on
return *this;
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/tests/append_entries_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ FIXTURE_TEST(test_collected_log_recovery, raft_test_fixture) {
first_ts,
100_MiB,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as,
storage::ntp_sanitizer_config{.sanitize_only = true}))
Expand Down
49 changes: 33 additions & 16 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -579,15 +579,16 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
result);
has_self_compacted = true;
}
// Remove any of the beginning segments that may only have one or no
// Remove any of the beginning segments that don't have any
// compactible records. They would be no-ops to compact.
while (!segs.empty()) {
if (segs.front()->may_have_compactible_records()) {
break;
}
// For all intents and purposes, these segments are already compacted.
// For all intents and purposes, these segments are already cleanly
// compacted.
auto seg = segs.front();
seg->mark_as_finished_windowed_compaction();
internal::mark_segment_as_finished_window_compaction(seg, true);
segs.pop_front();
}
if (segs.empty()) {
Expand Down Expand Up @@ -645,13 +646,18 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
if (cfg.asrc) {
cfg.asrc->check();
}
// A segment is considered "clean" if it has been fully indexed (all
// keys are de-duplicated)
const bool is_clean_compacted = seg->offsets().get_base_offset()
>= idx_start_offset;
if (seg->offsets().get_base_offset() > map.max_offset()) {
// The map was built from newest to oldest segments within this
// sliding range. If we see a new segment whose offsets are all
// higher than those indexed, it may be because the segment is
// entirely comprised of non-data batches. Mark it as compacted so
// we can progress through compactions.
seg->mark_as_finished_windowed_compaction();
internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted);
vlog(
gclog.debug,
"[{}] treating segment as compacted, offsets fall above highest "
Expand All @@ -663,8 +669,14 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
}
if (!seg->may_have_compactible_records()) {
// All data records are already compacted away. Skip to avoid a
// needless rewrite.
seg->mark_as_finished_windowed_compaction();
// needless rewrite. But, still flush index in case we are clean
// compacted to persist clean_compact_timestamp
internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted);
if (is_clean_compacted) {
co_await seg->index().flush();
}

vlog(
gclog.trace,
"[{}] treating segment as compacted, either all non-data "
Expand All @@ -673,6 +685,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
seg->filename());
continue;
}

// TODO: implement a segment replacement strategy such that each term
// tries to write only one segment (or more if the term had a large
// amount of data), rather than replacing N segments with N segments.
Expand Down Expand Up @@ -758,11 +771,15 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
seg->index().swap_index_state(std::move(new_idx));
seg->force_set_commit_offset_from_index();
seg->release_batch_cache_index();

// Mark the segment as completed window compaction, and possibly set the
// clean_compact_timestamp in it's index.
internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted);

co_await seg->index().flush();
co_await ss::rename_file(
cmp_idx_tmpname.string(), cmp_idx_name.string());

seg->mark_as_finished_windowed_compaction();
_probe->segment_compacted();
_probe->add_compaction_removed_bytes(
ssize_t(size_before) - ssize_t(size_after));
Expand All @@ -774,7 +791,9 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
vlog(
gclog.debug, "[{}] Final compacted segment {}", config().ntp(), seg);
}

_last_compaction_window_start_offset = idx_start_offset;

co_return true;
}

Expand Down Expand Up @@ -886,15 +905,10 @@ ss::future<compaction_result> disk_log_impl::do_compact_adjacent_segments(
auto segments = std::vector<ss::lw_shared_ptr<segment>>(
range.first, range.second);

bool all_window_compacted = true;
for (const auto& seg : segments) {
if (!seg->finished_windowed_compaction()) {
all_window_compacted = false;
break;
}
}
const bool all_window_compacted = std::ranges::all_of(
segments, &segment::finished_windowed_compaction);

auto all_segments_self_compacted = std::ranges::all_of(
const bool all_segments_self_compacted = std::ranges::all_of(
segments, &segment::finished_self_compaction);

if (unlikely(!all_segments_self_compacted)) {
Expand Down Expand Up @@ -945,6 +959,9 @@ ss::future<compaction_result> disk_log_impl::do_compact_adjacent_segments(
// size is already contained in the partition size probe
replacement->mark_as_compacted_segment();
if (all_window_compacted) {
// replacement's _clean_compact_timestamp will have been set in
// make_concatenated_segment if both segments were cleanly compacted
// already.
replacement->mark_as_finished_windowed_compaction();
}
_probe->add_initial_segment(*replacement.get());
Expand Down
14 changes: 11 additions & 3 deletions src/v/storage/index_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ std::ostream& operator<<(std::ostream& o, const index_state& s) {
<< ", non_data_timestamps:" << s.non_data_timestamps
<< ", broker_timestamp:" << s.broker_timestamp
<< ", num_compactible_records_appended:"
<< s.num_compactible_records_appended << ", index("
<< s.relative_offset_index.size() << ","
<< s.num_compactible_records_appended
<< ", clean_compact_timestamp:" << s.clean_compact_timestamp
<< ", index(" << s.relative_offset_index.size() << ","
<< s.relative_time_index.size() << "," << s.position_index.size()
<< ")}";
}
Expand All @@ -182,6 +183,7 @@ void index_state::serde_write(iobuf& out) const {
write(tmp, non_data_timestamps);
write(tmp, broker_timestamp);
write(tmp, num_compactible_records_appended);
write(tmp, clean_compact_timestamp);

crc::crc32c crc;
crc_extend_iobuf(crc, tmp);
Expand Down Expand Up @@ -278,6 +280,11 @@ void read_nested(
} else {
st.num_compactible_records_appended = std::nullopt;
}
if (hdr._version >= index_state::clean_compact_timestamp_version) {
read_nested(p, st.clean_compact_timestamp, 0U);
} else {
st.clean_compact_timestamp = std::nullopt;
}
}

index_state index_state::copy() const { return *this; }
Expand Down Expand Up @@ -357,7 +364,8 @@ index_state::index_state(const index_state& o) noexcept
, with_offset(o.with_offset)
, non_data_timestamps(o.non_data_timestamps)
, broker_timestamp(o.broker_timestamp)
, num_compactible_records_appended(o.num_compactible_records_appended) {}
, num_compactible_records_appended(o.num_compactible_records_appended)
, clean_compact_timestamp(o.clean_compact_timestamp) {}

namespace serde_compat {
uint64_t index_state_serde::checksum(const index_state& r) {
Expand Down
9 changes: 8 additions & 1 deletion src/v/storage/index_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ class offset_time_index {
1 byte - non_data_timestamps
*/
struct index_state
: serde::envelope<index_state, serde::version<7>, serde::compat_version<4>> {
: serde::envelope<index_state, serde::version<8>, serde::compat_version<4>> {
static constexpr auto monotonic_timestamps_version = 5;
static constexpr auto broker_timestamp_version = 6;
static constexpr auto num_compactible_records_version = 7;
static constexpr auto clean_compact_timestamp_version = 8;

static index_state make_empty_index(offset_delta_time with_offset);

Expand Down Expand Up @@ -131,6 +132,12 @@ struct index_state
// support this field, and we can't conclude anything.
std::optional<size_t> num_compactible_records_appended{0};

// If set, the timestamp at which every record up to and including
// those in this segment were first compacted via sliding window.
// If not yet set, sliding window compaction has not yet been applied to
// every previous record in the log.
std::optional<model::timestamp> clean_compact_timestamp{std::nullopt};

size_t size() const;

bool empty() const;
Expand Down
Loading

0 comments on commit 2e1cdb6

Please sign in to comment.