From b593e80dd90abc33194b74830a61b171edb1c426 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 23 Dec 2024 10:25:34 -0700 Subject: [PATCH] clippy, new benchmark --- native/core/benches/shuffle_writer.rs | 31 +++++++++++++++---- .../src/execution/shuffle/shuffle_writer.rs | 12 +++---- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 865ca73b4..c9b003b6d 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -35,23 +35,42 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function("shuffle_writer: encode (no compression))", |b| { let batch = create_batch(8192, true); let mut buffer = vec![]; - let mut cursor = Cursor::new(&mut buffer); let ipc_time = Time::default(); - b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time)); + b.iter(|| { + buffer.clear(); + let mut cursor = Cursor::new(&mut buffer); + write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time) + }); + }); + group.bench_function("shuffle_writer: encode and compress (lz4)", |b| { + let batch = create_batch(8192, true); + let mut buffer = vec![]; + let ipc_time = Time::default(); + b.iter(|| { + buffer.clear(); + let mut cursor = Cursor::new(&mut buffer); + write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Lz4Frame, &ipc_time) + }); }); group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| { let batch = create_batch(8192, true); let mut buffer = vec![]; - let mut cursor = Cursor::new(&mut buffer); let ipc_time = Time::default(); - b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time)); + b.iter(|| { + buffer.clear(); + let mut cursor = Cursor::new(&mut buffer); + write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time) + }); }); group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| { let batch = create_batch(8192, true); let mut buffer = vec![]; - let mut cursor = Cursor::new(&mut buffer); let ipc_time = Time::default(); - b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time)); + b.iter(|| { + buffer.clear(); + let mut cursor = Cursor::new(&mut buffer); + write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time) + }); }); group.bench_function("shuffle_writer: end to end", |b| { let ctx = SessionContext::new(); diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 947f2572e..7b0d291ff 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -1578,9 +1578,9 @@ pub fn write_ipc_compressed( // write codec used match codec { - &CompressionCodec::Lz4Frame => output.write_all("LZ4_".as_bytes())?, - &CompressionCodec::Zstd(_) => output.write_all("ZSTD".as_bytes())?, - &CompressionCodec::None => output.write_all("NONE".as_bytes())?, + CompressionCodec::Lz4Frame => output.write_all("LZ4_".as_bytes())?, + CompressionCodec::Zstd(_) => output.write_all("ZSTD".as_bytes())?, + CompressionCodec::None => output.write_all("NONE".as_bytes())?, } let output = match codec { @@ -1602,10 +1602,8 @@ pub fn write_ipc_compressed( let mut reader = Cursor::new(ipc_encoded); let mut wtr = lz4_flex::frame::FrameEncoder::new(output); std::io::copy(&mut reader, &mut wtr)?; - let output = wtr - .finish() - .map_err(|e| DataFusionError::Execution(format!("lz4 compression error: {}", e)))?; - output + wtr.finish() + .map_err(|e| DataFusionError::Execution(format!("lz4 compression error: {}", e)))? } CompressionCodec::Zstd(level) => { let encoder = zstd::Encoder::new(output, *level)?;