Skip to content

Commit

Permalink
revert reuse buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 21, 2024
1 parent f218870 commit 64d6ab0
Showing 1 changed file with 6 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ case class ShuffleBatchDecoderIterator(var in: InputStream, taskContext: TaskCon
private var nextBatch: Option[ColumnarBatch] = None
private var finished = false;
private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
private var messageBuf = new Array[Byte](8192 * 512)
private val native = new Native()
private val nativeUtil = new NativeUtil()

Expand Down Expand Up @@ -89,14 +88,14 @@ case class ShuffleBatchDecoderIterator(var in: InputStream, taskContext: TaskCon
val fieldCount = longBuf.getLong.toInt

// read body
ensureCapacity(compressedLength)
fillBuffer(in, messageBuf, compressedLength)
val buffer = new Array[Byte](compressedLength)
fillBuffer(in, buffer)

// make native call to decode batch
nextBatch = nativeUtil.getNextBatch(
fieldCount,
(arrayAddrs, schemaAddrs) => {
native.decodeShuffleBlock(messageBuf, arrayAddrs, schemaAddrs)
native.decodeShuffleBlock(buffer, arrayAddrs, schemaAddrs)
})

true
Expand All @@ -112,19 +111,10 @@ case class ShuffleBatchDecoderIterator(var in: InputStream, taskContext: TaskCon
}
}

private def ensureCapacity(requiredSize: Int): Unit = {
if (messageBuf.length < requiredSize) {
val newSize = Math.max(messageBuf.length * 2, requiredSize)
val newBuffer = new Array[Byte](newSize)
Array.copy(messageBuf, 0, newBuffer, 0, messageBuf.length)
messageBuf = newBuffer
}
}

private def fillBuffer(in: InputStream, buffer: Array[Byte], len: Int): Unit = {
private def fillBuffer(in: InputStream, buffer: Array[Byte]): Unit = {
var bytesRead = 0
while (bytesRead < len) {
val result = in.read(buffer, bytesRead, len - bytesRead)
while (bytesRead < buffer.length) {
val result = in.read(buffer, bytesRead, buffer.length - bytesRead)
if (result == -1) throw new EOFException()
bytesRead += result
}
Expand Down

0 comments on commit 64d6ab0

Please sign in to comment.