Skip to content

Commit

Permalink
prepare for review
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 21, 2024
1 parent dec355f commit d3aafef
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 11 deletions.
3 changes: 0 additions & 3 deletions native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3363,9 +3363,6 @@ pub fn process_sorted_row_partition(
let codec = CompressionCodec::Zstd(1);
written += write_ipc_compressed(&batch, &mut cursor, &codec, &ipc_time)?;

// TODO document this more - this is important
written += 8;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
}
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1601,7 +1601,7 @@ pub fn write_ipc_compressed<W: Write + Seek>(

timer.stop();

Ok(compressed_length as usize)
Ok((end_pos - start_pos) as usize)
}

pub fn read_ipc_compressed(bytes: &[u8]) -> Result<RecordBatch> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class CometBlockStoreShuffleReader[K, C](

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
var currentReadIterator: ShuffleBatchDecoderIterator = null
var currentReadIterator: NativeBatchDecoderIterator = null

// Closes last read iterator after the task is finished.
// We need to close read iterator during iterating input streams,
Expand All @@ -92,14 +92,13 @@ class CometBlockStoreShuffleReader[K, C](
}
}

var currentDecoder: ShuffleBatchDecoderIterator = null
val recordIter: Iterator[(Int, ColumnarBatch)] = fetchIterator
.flatMap(blockIdAndStream => {
if (currentDecoder != null) {
currentDecoder.close()
if (currentReadIterator != null) {
currentReadIterator.close()
}
currentDecoder = ShuffleBatchDecoderIterator(blockIdAndStream._2, context)
currentDecoder
currentReadIterator = NativeBatchDecoderIterator(blockIdAndStream._2, context)
currentReadIterator
})
.map(b => (0, b))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.comet.vector.NativeUtil
* native ShuffleWriterExec and then calls native code to decompress and decode the shuffle blocks
* and use Arrow FFI to return the Arrow record batch.
*/
case class ShuffleBatchDecoderIterator(var in: InputStream, taskContext: TaskContext)
case class NativeBatchDecoderIterator(var in: InputStream, taskContext: TaskContext)
extends Iterator[ColumnarBatch] {
private val SPARK_LZ4_MAGIC = Array[Byte](76, 90, 52, 66, 108, 111, 99, 107) // "LZ4Block"
private var nextBatch: Option[ColumnarBatch] = None
Expand Down

0 comments on commit d3aafef

Please sign in to comment.