From 30af78700fb51c18b45eeebe8d613db092bb2b3c Mon Sep 17 00:00:00 2001 From: ozewr <94829827+ozewr@users.noreply.github.com> Date: Mon, 19 Aug 2024 20:11:08 +0800 Subject: [PATCH] feat: Implement the Buf to avoid extra memory allocation (#4585) * feat: Implement the Buf to avoid extra memory allocation * fmt toml * fmt code * mv entry.into_buffer to raw_entry_buffer * less reuse opendal * remove todo #4065 * Update src/mito2/src/wal/entry_reader.rs Co-authored-by: Weny Xu * fmt code --------- Co-authored-by: ozewr Co-authored-by: Weny Xu --- src/mito2/src/wal/entry_reader.rs | 16 ++++++++++++---- src/object-store/src/lib.rs | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs index 59a8fd8d46b3..27525155fdb6 100644 --- a/src/mito2/src/wal/entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -15,6 +15,7 @@ use api::v1::WalEntry; use async_stream::stream; use futures::StreamExt; +use object_store::Buffer; use prost::Message; use snafu::{ensure, ResultExt}; use store_api::logstore::entry::Entry; @@ -28,13 +29,20 @@ pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> let entry_id = raw_entry.entry_id(); let region_id = raw_entry.region_id(); ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id }); - // TODO(weny): implement the [Buf] for return value, avoid extra memory allocation. - let bytes = raw_entry.into_bytes(); - let wal_entry = WalEntry::decode(bytes.as_slice()).context(DecodeWalSnafu { region_id })?; - + let buffer = into_buffer(raw_entry); + let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?; Ok((entry_id, wal_entry)) } +fn into_buffer(raw_entry: Entry) -> Buffer { + match raw_entry { + Entry::Naive(entry) => Buffer::from(entry.data), + Entry::MultiplePart(entry) => { + Buffer::from_iter(entry.parts.into_iter().map(bytes::Bytes::from)) + } + } +} + /// [WalEntryReader] provides the ability to read and decode entries from the underlying store. /// /// Notes: It will consume the inner stream and only allow invoking the `read` at once. diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index d94194ffdb79..797e75f42d54 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -14,7 +14,7 @@ pub use opendal::raw::{normalize_path as raw_normalize_path, Access, HttpClient}; pub use opendal::{ - services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, + services, Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader, Result, Writer, };