Skip to content

Commit

Permalink
reuse buffer within decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 22, 2024
1 parent db1649c commit fd90ccf
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ case class NativeBatchDecoderIterator(
private var nextBatch: Option[ColumnarBatch] = None
private var finished = false;
private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
private var dataBuf: ByteBuffer = _
private val native = new Native()
private val nativeUtil = new NativeUtil()

Expand Down Expand Up @@ -104,16 +105,20 @@ case class NativeBatchDecoderIterator(
val fieldCount = longBuf.getLong.toInt

// read body
// TODO avoid allocating a new buffer for each batch
val buffer = ByteBuffer.allocateDirect(compressedLength - 8)
while (buffer.hasRemaining && channel.read(buffer) >= 0) {}
val bytesToRead = compressedLength - 8
if (dataBuf == null || dataBuf.capacity() < bytesToRead) {
dataBuf = ByteBuffer.allocateDirect(bytesToRead * 2)
}
dataBuf.clear()
dataBuf.limit(bytesToRead)
while (dataBuf.hasRemaining && channel.read(dataBuf) >= 0) {}

// make native call to decode batch
val startTime = System.nanoTime()
nextBatch = nativeUtil.getNextBatch(
fieldCount,
(arrayAddrs, schemaAddrs) => {
native.decodeShuffleBlock(buffer, arrayAddrs, schemaAddrs)
native.decodeShuffleBlock(dataBuf, arrayAddrs, schemaAddrs)
})
decodeTime.add(System.nanoTime() - startTime)

Expand Down

0 comments on commit fd90ccf

Please sign in to comment.