Skip to content

Commit

Permalink
fix: mitigate memory spike during startup (#3418)
Browse files Browse the repository at this point in the history
* fix: fix memory spike during startup

* fix: allocate a region write ctx for each wal entry
  • Loading branch information
niebayes authored Mar 1, 2024
1 parent 376409b commit 7d30c24
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,9 @@ pub(crate) async fn replay_memtable<S: LogStore>(
// Last entry id should start from flushed entry id since there might be no
// data in the WAL.
let mut last_entry_id = flushed_entry_id;
let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone());

let replay_from_entry_id = flushed_entry_id + 1;
let mut stale_entry_found = false;

let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?;
while let Some(res) = wal_stream.next().await {
let (entry_id, entry) = res?;
Expand All @@ -417,8 +416,10 @@ pub(crate) async fn replay_memtable<S: LogStore>(
}
);
}

last_entry_id = last_entry_id.max(entry_id);

let mut region_write_ctx =
RegionWriteCtx::new(region_id, version_control, wal_options.clone());
for mutation in entry.mutations {
rows_replayed += mutation
.rows
Expand All @@ -427,11 +428,11 @@ pub(crate) async fn replay_memtable<S: LogStore>(
.unwrap_or(0);
region_write_ctx.push_mutation(mutation.op_type, mutation.rows, OptionOutputTx::none());
}
}

// set next_entry_id and write to memtable.
region_write_ctx.set_next_entry_id(last_entry_id + 1);
region_write_ctx.write_memtable();
// set next_entry_id and write to memtable.
region_write_ctx.set_next_entry_id(last_entry_id + 1);
region_write_ctx.write_memtable();
}

if allow_stale_entries && stale_entry_found {
wal.obsolete(region_id, flushed_entry_id, wal_options)
Expand Down

0 comments on commit 7d30c24

Please sign in to comment.