diff --git a/config/config.md b/config/config.md
index 6a48ed717e69..8e1fb5837aec 100644
--- a/config/config.md
+++ b/config/config.md
@@ -83,6 +83,7 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.
**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.
**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. |
+| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
@@ -409,6 +410,7 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.
**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.
**It's only used when the provider is `kafka`**. |
+| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 22670f857c06..85b7793ef65a 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -213,6 +213,17 @@ create_index = true
## **It's only used when the provider is `kafka`**.
dump_index_interval = "60s"
+## Ignore missing entries during read WAL.
+## **It's only used when the provider is `kafka`**.
+##
+## This option ensures that when Kafka messages are deleted, the system
+## can still successfully replay memtable data without throwing an
+## out-of-range error.
+## However, enabling this option might lead to unexpected data loss,
+## as the system will skip over missing entries instead of treating
+## them as critical errors.
+overwrite_entry_start_id = false
+
# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 4c40d2dde6db..81f4ee47f03b 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -237,6 +237,17 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"
+## Ignore missing entries during read WAL.
+## **It's only used when the provider is `kafka`**.
+##
+## This option ensures that when Kafka messages are deleted, the system
+## can still successfully replay memtable data without throwing an
+## out-of-range error.
+## However, enabling this option might lead to unexpected data loss,
+## as the system will skip over missing entries instead of treating
+## them as critical errors.
+overwrite_entry_start_id = false
+
# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs
index 27f693204014..405d13d7d42d 100644
--- a/src/common/wal/src/config/kafka/datanode.rs
+++ b/src/common/wal/src/config/kafka/datanode.rs
@@ -46,6 +46,8 @@ pub struct DatanodeKafkaConfig {
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
+ /// Ignore missing entries during read WAL.
+ pub overwrite_entry_start_id: bool,
}
impl Default for DatanodeKafkaConfig {
@@ -60,6 +62,7 @@ impl Default for DatanodeKafkaConfig {
auto_create_topics: true,
create_index: true,
dump_index_interval: Duration::from_secs(60),
+ overwrite_entry_start_id: false,
}
}
}
diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs
index f084f20f022d..23493812a416 100644
--- a/src/log-store/src/kafka/log_store.rs
+++ b/src/log-store/src/kafka/log_store.rs
@@ -49,6 +49,8 @@ pub struct KafkaLogStore {
max_batch_bytes: usize,
/// The consumer wait timeout.
consumer_wait_timeout: Duration,
+ /// Ignore missing entries during read WAL.
+ overwrite_entry_start_id: bool,
}
impl KafkaLogStore {
@@ -64,6 +66,7 @@ impl KafkaLogStore {
client_manager,
max_batch_bytes: config.max_batch_bytes.as_bytes() as usize,
consumer_wait_timeout: config.consumer_wait_timeout,
+ overwrite_entry_start_id: config.overwrite_entry_start_id,
})
}
}
@@ -205,7 +208,7 @@ impl LogStore for KafkaLogStore {
async fn read(
&self,
provider: &Provider,
- entry_id: EntryId,
+ mut entry_id: EntryId,
index: Option,
) -> Result> {
let provider = provider
@@ -225,6 +228,25 @@ impl LogStore for KafkaLogStore {
.client()
.clone();
+ if self.overwrite_entry_start_id {
+ let start_offset =
+ client
+ .get_offset(OffsetAt::Earliest)
+ .await
+ .context(GetOffsetSnafu {
+ topic: &provider.topic,
+ })?;
+
+ if entry_id as i64 <= start_offset {
+ warn!(
+ "The entry_id: {} is less than start_offset: {}, topic: {}. Overwriting entry_id with start_offset",
+ entry_id, start_offset, &provider.topic
+ );
+
+ entry_id = start_offset as u64;
+ }
+ }
+
// Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic.
// The read operation terminates when this record is consumed.
// Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented.