Skip to content

Commit

Permalink
clippy, new benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 23, 2024
1 parent a3fb105 commit b593e80
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
31 changes: 25 additions & 6 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,42 @@ fn criterion_benchmark(c: &mut Criterion) {
group.bench_function("shuffle_writer: encode (no compression))", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (lz4)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Lz4Frame, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time)
});
});
group.bench_function("shuffle_writer: end to end", |b| {
let ctx = SessionContext::new();
Expand Down
12 changes: 5 additions & 7 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1578,9 +1578,9 @@ pub fn write_ipc_compressed<W: Write + Seek>(

// write codec used
match codec {
&CompressionCodec::Lz4Frame => output.write_all("LZ4_".as_bytes())?,
&CompressionCodec::Zstd(_) => output.write_all("ZSTD".as_bytes())?,
&CompressionCodec::None => output.write_all("NONE".as_bytes())?,
CompressionCodec::Lz4Frame => output.write_all("LZ4_".as_bytes())?,
CompressionCodec::Zstd(_) => output.write_all("ZSTD".as_bytes())?,
CompressionCodec::None => output.write_all("NONE".as_bytes())?,
}

let output = match codec {
Expand All @@ -1602,10 +1602,8 @@ pub fn write_ipc_compressed<W: Write + Seek>(
let mut reader = Cursor::new(ipc_encoded);
let mut wtr = lz4_flex::frame::FrameEncoder::new(output);
std::io::copy(&mut reader, &mut wtr)?;
let output = wtr
.finish()
.map_err(|e| DataFusionError::Execution(format!("lz4 compression error: {}", e)))?;
output
wtr.finish()
.map_err(|e| DataFusionError::Execution(format!("lz4 compression error: {}", e)))?
}
CompressionCodec::Zstd(level) => {
let encoder = zstd::Encoder::new(output, *level)?;
Expand Down

0 comments on commit b593e80

Please sign in to comment.