diff --git a/.github/workflows/bk-ci.yml b/.github/workflows/bk-ci.yml index e8d77821147..c9f9226dd80 100644 --- a/.github/workflows/bk-ci.yml +++ b/.github/workflows/bk-ci.yml @@ -200,6 +200,10 @@ jobs: path: surefire-reports retention-days: 7 + - name: print JVM thread dumps when cancelled + if: cancelled() + run: ./dev/ci-tool print_thread_dumps + integration-tests: name: Integration Tests runs-on: ubuntu-latest diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java index 1fb3d47b3e0..5343357198a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java @@ -46,7 +46,17 @@ void populateValueAndReset(int digest, ByteBuf buf) { } @Override - int update(int digest, ByteBuf data, int offset, int len) { + int internalUpdate(int digest, ByteBuf data, int offset, int len) { return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len); } + + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + return Crc32cIntChecksum.resumeChecksum(digest, buffer, offset, len); + } + + @Override + boolean acceptsMemoryAddressBuffer() { + return Crc32cIntChecksum.acceptsMemoryAddressBuffer(); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java index 21be2651a7a..0d18312cfc8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java @@ -34,6 +34,7 @@ interface CRC32Digest { long getValueAndReset(); void update(ByteBuf buf, int offset, int len); + void update(byte[] buffer, int offset, int len); } private static final FastThreadLocal crc = new FastThreadLocal() { @@ -62,14 +63,25 @@ void populateValueAndReset(int digest, ByteBuf buf) { } @Override - int update(int digest, ByteBuf data, int offset, int len) { + int internalUpdate(int digest, ByteBuf data, int offset, int len) { crc.get().update(data, offset, len); return 0; } + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + crc.get().update(buffer, offset, len); + return 0; + } + @Override boolean isInt32Digest() { // This is stored as 8 bytes return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return DirectMemoryCRC32Digest.isSupported(); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index eab33945b1e..1e78e4075eb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -20,10 +20,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.FastThreadLocal; import java.security.GeneralSecurityException; @@ -34,6 +32,7 @@ import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType; import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.ByteBufVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,10 +52,25 @@ public abstract class DigestManager { final long ledgerId; final boolean useV2Protocol; private final ByteBufAllocator allocator; + private final DigestUpdaterByteBufVisitorCallback byteBufVisitorCallback; abstract int getMacCodeLength(); - abstract int update(int digest, ByteBuf buffer, int offset, int len); + abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len); + + abstract int internalUpdate(int digest, byte[] buffer, int offset, int len); + + final int update(int digest, ByteBuf buffer, int offset, int len) { + if (buffer.hasMemoryAddress() && acceptsMemoryAddressBuffer()) { + return internalUpdate(digest, buffer, offset, len); + } else if (buffer.hasArray()) { + return internalUpdate(digest, buffer.array(), buffer.arrayOffset() + offset, len); + } else { + UpdateContext updateContext = new UpdateContext(digest); + ByteBufVisitor.visitBuffers(buffer, offset, len, byteBufVisitorCallback, updateContext); + return updateContext.digest; + } + } abstract void populateValueAndReset(int digest, ByteBuf buffer); @@ -69,6 +83,7 @@ public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allo this.useV2Protocol = useV2Protocol; this.macCodeLength = getMacCodeLength(); this.allocator = allocator; + this.byteBufVisitorCallback = new DigestUpdaterByteBufVisitorCallback(); } public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType, @@ -136,22 +151,7 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long // Compute checksum over the headers int digest = update(0, buf, buf.readerIndex(), buf.readableBytes()); - - // don't unwrap slices - final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf - ? data.unwrap() : data; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.safeRelease(data); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf cbb = (CompositeByteBuf) unwrapped; - for (int i = 0; i < cbb.numComponents(); i++) { - ByteBuf b = cbb.component(i); - digest = update(digest, b, b.readerIndex(), b.readableBytes()); - } - } else { - digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - } + digest = update(digest, data, data.readerIndex(), data.readableBytes()); populateValueAndReset(digest, buf); @@ -159,11 +159,11 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long buf.readerIndex(0); if (isSmallEntry) { - buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - unwrapped.release(); + buf.writeBytes(data, data.readerIndex(), data.readableBytes()); + data.release(); return buf; } else { - return ByteBufList.get(buf, unwrapped); + return ByteBufList.get(buf, data); } } @@ -176,25 +176,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA headersBuffer.writeLong(length); int digest = update(0, headersBuffer, 0, METADATA_LENGTH); - - // don't unwrap slices - final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf - ? data.unwrap() : data; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(data); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped); - for (int i = 0; i < cbb.numComponents(); i++) { - ByteBuf b = cbb.component(i); - digest = update(digest, b, b.readerIndex(), b.readableBytes()); - } - } else { - digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - } + digest = update(digest, data, data.readerIndex(), data.readableBytes()); populateValueAndReset(digest, headersBuffer); - - return ByteBufList.get(headersBuffer, unwrapped); + return ByteBufList.get(headersBuffer, data); } /** @@ -373,4 +357,34 @@ public RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) thr long length = dataReceived.readLong(); return new RecoveryData(lastAddConfirmed, length); } + + private static class UpdateContext { + int digest; + + UpdateContext(int digest) { + this.digest = digest; + } + } + + private class DigestUpdaterByteBufVisitorCallback implements ByteBufVisitor.ByteBufVisitorCallback { + + @Override + public void visitBuffer(UpdateContext context, ByteBuf visitBuffer, int visitIndex, int visitLength) { + // recursively visit the sub buffer and update the digest + context.digest = internalUpdate(context.digest, visitBuffer, visitIndex, visitLength); + } + + @Override + public void visitArray(UpdateContext context, byte[] visitArray, int visitIndex, int visitLength) { + // update the digest with the array + context.digest = internalUpdate(context.digest, visitArray, visitIndex, visitLength); + } + + @Override + public boolean acceptsMemoryAddress(UpdateContext context) { + return DigestManager.this.acceptsMemoryAddressBuffer(); + } + } + + abstract boolean acceptsMemoryAddressBuffer(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java index eda223ef7f3..07a2bdf464f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java @@ -50,18 +50,32 @@ public void update(ByteBuf buf, int index, int length) { crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length); } else if (buf.hasArray()) { // Use the internal method to update from array based - crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length); + crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length); } else { // Fallback to data copy if buffer is not contiguous byte[] b = new byte[length]; buf.getBytes(index, b, 0, length); - crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length); + crcValue = updateArray(crcValue, b, 0, length); } } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } + private static int updateArray(int crcValue, byte[] buf, int offset, int length) + throws IllegalAccessException, InvocationTargetException { + return (int) updateBytes.invoke(null, crcValue, buf, offset, length); + } + + @Override + public void update(byte[] buffer, int offset, int len) { + try { + crcValue = updateArray(crcValue, buffer, offset, len); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + private static final Method updateByteBuffer; private static final Method updateBytes; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java index b15499f0cc5..e2fff9bd7ca 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java @@ -38,7 +38,12 @@ int getMacCodeLength() { } @Override - int update(int digest, ByteBuf buffer, int offset, int len) { + int internalUpdate(int digest, ByteBuf buffer, int offset, int len) { + return 0; + } + + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { return 0; } @@ -49,4 +54,9 @@ void populateValueAndReset(int digest, ByteBuf buffer) {} boolean isInt32Digest() { return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java index c04c411c6c7..f9fda5a531d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java @@ -96,13 +96,24 @@ void populateValueAndReset(int digest, ByteBuf buffer) { } @Override - int update(int digest, ByteBuf data, int offset, int len) { + int internalUpdate(int digest, ByteBuf data, int offset, int len) { mac.get().update(data.slice(offset, len).nioBuffer()); return 0; } + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + mac.get().update(buffer, offset, len); + return 0; + } + @Override boolean isInt32Digest() { return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java index 3d48f0ef7da..7635e3e9f20 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java @@ -39,4 +39,9 @@ public long getValueAndReset() { public void update(ByteBuf buf, int offset, int len) { crc.update(buf.slice(offset, len).nioBuffer()); } + + @Override + public void update(byte[] buffer, int offset, int len) { + crc.update(buffer, offset, len); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java index 324588d852b..b363e4da636 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -133,43 +132,14 @@ public static ByteBufList get() { * Append a {@link ByteBuf} at the end of this {@link ByteBufList}. */ public void add(ByteBuf buf) { - final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf - ? buf.unwrap() : buf; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(buf); - - if (unwrapped instanceof CompositeByteBuf) { - ((CompositeByteBuf) unwrapped).forEach(b -> { - ReferenceCountUtil.retain(b); - buffers.add(b); - }); - ReferenceCountUtil.release(unwrapped); - } else { - buffers.add(unwrapped); - } + buffers.add(buf); } /** * Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}. */ public void prepend(ByteBuf buf) { - // don't unwrap slices - final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf - ? buf.unwrap() : buf; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(buf); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf composite = (CompositeByteBuf) unwrapped; - for (int i = composite.numComponents() - 1; i >= 0; i--) { - ByteBuf b = composite.component(i); - ReferenceCountUtil.retain(b); - buffers.add(0, b); - } - ReferenceCountUtil.release(unwrapped); - } else { - buffers.add(0, unwrapped); - } + buffers.add(0, buf); } /** @@ -285,7 +255,7 @@ public ByteBufList retain() { @Override protected void deallocate() { for (int i = 0; i < buffers.size(); i++) { - ReferenceCountUtil.release(buffers.get(i)); + buffers.get(i).release(); } buffers.clear(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java new file mode 100644 index 00000000000..32e9c8c55a4 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java @@ -0,0 +1,1132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.ByteProcessor; +import io.netty.util.concurrent.FastThreadLocal; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +/** + * This class visits the possible wrapped child buffers of a Netty {@link ByteBuf} for a given offset and length. + *

+ * The Netty ByteBuf API does not provide a method to visit the wrapped child buffers. The + * {@link ByteBuf#unwrap()} method is not suitable for this purpose as it loses the + * {@link ByteBuf#readerIndex()} state, resulting in incorrect offset and length information. + *

+ * Despite Netty not having a public API for visiting the sub buffers, it is possible to achieve this using + * the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} method. This class uses this method to visit the + * wrapped child buffers by providing a suitable {@link ByteBuf} implementation. This implementation supports + * the role of the destination buffer for the getBytes call. It requires implementing the + * {@link ByteBuf#setBytes(int, ByteBuf, int, int)} and {@link ByteBuf#setBytes(int, byte[], int, int)} methods + * and other methods required by getBytes such as {@link ByteBuf#hasArray()}, {@link ByteBuf#hasMemoryAddress()}, + * {@link ByteBuf#nioBufferCount()} and {@link ByteBuf#capacity()}. + * All other methods in the internal ByteBuf implementation are not supported and will throw an exception. + * This is to ensure correctness and to fail fast if some ByteBuf implementation is not following the expected + * and supported interface contract. + */ +public class ByteBufVisitor { + private static final int DEFAULT_VISIT_MAX_DEPTH = 10; + + private ByteBufVisitor() { + // prevent instantiation + } + + /** + * This method traverses the potential nested composite buffers of the provided buffer, given a specific offset and + * length. The traversal continues until it encounters a buffer that is backed by an array or a memory address, + * which allows for the inspection of individual buffer segments without the need for data duplication. + * If no such wrapped buffer is found, the callback function is invoked with the original buffer, offset, + * and length as parameters. + * + * @param buffer the buffer to visit + * @param offset the offset for the buffer + * @param length the length for the buffer + * @param callback the callback to call for each visited buffer + * @param context the context to pass to the callback + */ + public static void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback callback, + T context) { + visitBuffers(buffer, offset, length, callback, context, DEFAULT_VISIT_MAX_DEPTH); + } + + /** + * The callback interface for visiting buffers. + * In case of a heap buffer that is backed by an byte[] array, the visitArray method is called. This + * is due to the internal implementation detail of the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} + * method for heap buffers. + */ + public interface ByteBufVisitorCallback { + void visitBuffer(T context, ByteBuf visitBuffer, int visitIndex, int visitLength); + void visitArray(T context, byte[] visitArray, int visitIndex, int visitLength); + default boolean preferArrayOrMemoryAddress(T context) { + return true; + } + default boolean acceptsMemoryAddress(T context) { + return false; + } + } + + /** + * See @{@link #visitBuffers(ByteBuf, int, int, ByteBufVisitorCallback, Object)}. This method + * allows to specify the maximum depth of recursion for visiting wrapped buffers. + */ + public static void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback callback, + T context, int maxDepth) { + if (length == 0) { + // skip visiting empty buffers + return; + } + InternalContext internalContext = new InternalContext<>(); + internalContext.maxDepth = maxDepth; + internalContext.callbackContext = context; + internalContext.callback = callback; + internalContext.recursivelyVisitBuffers(buffer, offset, length); + } + + private static final int TL_COPY_BUFFER_SIZE = 64 * 1024; + private static final FastThreadLocal TL_COPY_BUFFER = new FastThreadLocal() { + @Override + protected byte[] initialValue() { + return new byte[TL_COPY_BUFFER_SIZE]; + } + }; + + private static class InternalContext { + int depth; + int maxDepth; + ByteBuf parentBuffer; + int parentOffset; + int parentLength; + T callbackContext; + ByteBufVisitorCallback callback; + GetBytesCallbackByteBuf callbackByteBuf = new GetBytesCallbackByteBuf(this); + + void recursivelyVisitBuffers(ByteBuf visitBuffer, int visitIndex, int visitLength) { + // visit the wrapped buffers recursively if the buffer is not backed by an array or memory address + // and the max depth has not been reached + if (depth < maxDepth && !visitBuffer.hasMemoryAddress() && !visitBuffer.hasArray()) { + parentBuffer = visitBuffer; + parentOffset = visitIndex; + parentLength = visitLength; + depth++; + // call getBytes to trigger the wrapped buffer visit + visitBuffer.getBytes(visitIndex, callbackByteBuf, 0, visitLength); + depth--; + } else { + passBufferToCallback(visitBuffer, visitIndex, visitLength); + } + } + + void handleBuffer(ByteBuf visitBuffer, int visitIndex, int visitLength) { + if (visitLength == 0) { + // skip visiting empty buffers + return; + } + if (visitBuffer == parentBuffer && visitIndex == parentOffset && visitLength == parentLength) { + // further recursion would cause unnecessary recursion up to the max depth of recursion + passBufferToCallback(visitBuffer, visitIndex, visitLength); + } else { + // use the doRecursivelyVisitBuffers method to visit the wrapped buffer, possibly recursively + recursivelyVisitBuffers(visitBuffer, visitIndex, visitLength); + } + } + + private void passBufferToCallback(ByteBuf visitBuffer, int visitIndex, int visitLength) { + if (callback.preferArrayOrMemoryAddress(callbackContext)) { + if (visitBuffer.hasArray()) { + handleArray(visitBuffer.array(), visitBuffer.arrayOffset() + visitIndex, visitLength); + } else if (visitBuffer.hasMemoryAddress() && callback.acceptsMemoryAddress(callbackContext)) { + callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength); + } else if (callback.acceptsMemoryAddress(callbackContext) && visitBuffer.isDirect() + && visitBuffer.alloc().isDirectBufferPooled()) { + // read-only buffers need to be copied before they can be directly accessed + ByteBuf copyBuffer = visitBuffer.copy(visitIndex, visitLength); + callback.visitBuffer(callbackContext, copyBuffer, 0, visitLength); + copyBuffer.release(); + } else { + // fallback to reading the visited buffer into the copy buffer in a loop + byte[] copyBuffer = TL_COPY_BUFFER.get(); + int remaining = visitLength; + int currentOffset = visitIndex; + while (remaining > 0) { + int readLen = Math.min(remaining, copyBuffer.length); + visitBuffer.getBytes(currentOffset, copyBuffer, 0, readLen); + handleArray(copyBuffer, 0, readLen); + remaining -= readLen; + currentOffset += readLen; + } + } + } else { + callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength); + } + } + + void handleArray(byte[] visitArray, int visitIndex, int visitLength) { + if (visitLength == 0) { + // skip visiting empty arrays + return; + } + // pass array to callback + callback.visitArray(callbackContext, visitArray, visitIndex, visitLength); + } + } + + /** + * A ByteBuf implementation that can be used as the destination buffer for + * a {@link ByteBuf#getBytes(int, ByteBuf)} for visiting the wrapped child buffers. + */ + static class GetBytesCallbackByteBuf extends ByteBuf { + private final InternalContext internalContext; + + GetBytesCallbackByteBuf(InternalContext internalContext) { + this.internalContext = internalContext; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + internalContext.handleBuffer(src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + internalContext.handleArray(src, srcIndex, length); + return this; + } + + @Override + public boolean hasArray() { + // return false so that the wrapped buffer is visited + return false; + } + + @Override + public boolean hasMemoryAddress() { + // return false so that the wrapped buffer is visited + return false; + } + + @Override + public int nioBufferCount() { + // return 0 so that the wrapped buffer is visited + return 0; + } + + @Override + public int capacity() { + // should return sufficient capacity for the total length + return Integer.MAX_VALUE; + } + + @Override + public ByteBuf capacity(int newCapacity) { + throw new UnsupportedOperationException(); + } + + @Override + public int maxCapacity() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBufAllocator alloc() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteOrder order() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf unwrap() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDirect() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadOnly() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf asReadOnly() { + throw new UnsupportedOperationException(); + } + + @Override + public int readerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readerIndex(int readerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int writerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writerIndex(int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setIndex(int readerIndex, int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int readableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public int writableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public int maxWritableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadable() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadable(int size) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWritable() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWritable(int size) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf clear() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf markReaderIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf resetReaderIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf markWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf resetWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf discardReadBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf discardSomeReadBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf ensureWritable(int minWritableBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getUnsignedByte(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShortLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedShort(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedShortLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMedium(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMediumLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedMedium(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedMediumLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getIntLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getUnsignedInt(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getUnsignedIntLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLongLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public char getChar(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CharSequence getCharSequence(int index, int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBoolean(int index, boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setByte(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setShort(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setShortLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setMedium(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setMediumLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setInt(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setIntLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setLong(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setLongLE(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setChar(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setFloat(int index, float value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setDouble(int index, double value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setZero(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int setCharSequence(int index, CharSequence sequence, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean readBoolean() { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public short readUnsignedByte() { + throw new UnsupportedOperationException(); + } + + @Override + public short readShort() { + throw new UnsupportedOperationException(); + } + + @Override + public short readShortLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedShort() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedShortLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readMedium() { + throw new UnsupportedOperationException(); + } + + @Override + public int readMediumLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedMedium() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedMediumLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readInt() { + throw new UnsupportedOperationException(); + } + + @Override + public int readIntLE() { + throw new UnsupportedOperationException(); + } + + @Override + public long readUnsignedInt() { + throw new UnsupportedOperationException(); + } + + @Override + public long readUnsignedIntLE() { + throw new UnsupportedOperationException(); + } + + @Override + public long readLong() { + throw new UnsupportedOperationException(); + } + + @Override + public long readLongLE() { + throw new UnsupportedOperationException(); + } + + @Override + public char readChar() { + throw new UnsupportedOperationException(); + } + + @Override + public float readFloat() { + throw new UnsupportedOperationException(); + } + + @Override + public double readDouble() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readSlice(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readRetainedSlice(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(byte[] dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(OutputStream out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CharSequence readCharSequence(int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int readBytes(FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf skipBytes(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBoolean(boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeByte(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeShort(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeShortLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeMedium(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeMediumLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeInt(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeIntLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeLong(long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeLongLE(long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeChar(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeFloat(float value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeDouble(double value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(InputStream in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(FileChannel in, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeZero(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int writeCharSequence(CharSequence sequence, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int indexOf(int fromIndex, int toIndex, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(int length, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(int index, int length, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByte(ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByteDesc(ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByteDesc(int index, int length, ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf slice() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedSlice() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf slice(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedSlice(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf duplicate() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedDuplicate() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + throw new UnsupportedOperationException(); + } + + + @Override + public byte[] array() { + throw new UnsupportedOperationException(); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException(); + } + @Override + public long memoryAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString(Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString(int index, int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(ByteBuf buffer) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retain(int increment) { + throw new UnsupportedOperationException(); + } + + @Override + public int refCnt() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retain() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf touch() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf touch(Object hint) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(int decrement) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + '@' + Integer.toHexString(System.identityHashCode(this)); + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return obj == this; + } + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java new file mode 100644 index 00000000000..6252bb71be9 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.proto.checksum; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import com.scurrilous.circe.checksum.IntHash; +import com.scurrilous.circe.checksum.Java8IntHash; +import com.scurrilous.circe.checksum.Java9IntHash; +import com.scurrilous.circe.checksum.JniIntHash; +import com.scurrilous.circe.crc.Sse42Crc32C; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import org.apache.bookkeeper.proto.BookieProtoEncoding; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.ByteBufVisitor; +import org.apache.commons.lang3.RandomUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * This test class was added to reproduce a bug in the checksum calculation when + * the payload is a CompositeByteBuf and this buffer has a reader index state other than 0. + * The reader index state gets lost in the unwrapping process. + * + * There were at least 2 different bugs. One that occured when the + * payload was >= BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD and the other when + * it was < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD. + * This test covers both useV2Protocol=true and useV2Protocol=false since the bug was triggered differently. + * + * The bug has been fixed and this test is here to make sure it doesn't happen again. + */ +@RunWith(Parameterized.class) +public class CompositeByteBufUnwrapBugReproduceTest { + final byte[] testPayLoad; + final int defaultBufferPrefixLength; + private final boolean useV2Protocol; + + // set to 0 to 3 to run a single scenario for debugging purposes + private static final int RUN_SINGLE_SCENARIO_FOR_DEBUGGING = -1; + + @Parameterized.Parameters + public static Collection testScenarios() { + List scenarios = Arrays.asList(new Object[][] { + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, true}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, false}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, true}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, false} + }); + if (RUN_SINGLE_SCENARIO_FOR_DEBUGGING >= 0) { + // pick a single scenario for debugging + scenarios = scenarios.subList(RUN_SINGLE_SCENARIO_FOR_DEBUGGING, 1); + } + return scenarios; + } + + public CompositeByteBufUnwrapBugReproduceTest(int payloadSize, boolean useV2Protocol) { + this.testPayLoad = createTestPayLoad(payloadSize); + this.defaultBufferPrefixLength = payloadSize / 7; + this.useV2Protocol = useV2Protocol; + } + + private static byte[] createTestPayLoad(int payloadSize) { + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payloadSize; i++) { + payload[i] = (byte) i; + } + return payload; + } + + + /** + * A DigestManager that uses the given IntHash implementation for testing. + */ + static class TestIntHashDigestManager extends DigestManager { + private final IntHash intHash; + + public TestIntHashDigestManager(IntHash intHash, long ledgerId, boolean useV2Protocol, + ByteBufAllocator allocator) { + super(ledgerId, useV2Protocol, allocator); + this.intHash = intHash; + } + + @Override + int getMacCodeLength() { + return 4; + } + + @Override + boolean isInt32Digest() { + return true; + } + + @Override + void populateValueAndReset(int digest, ByteBuf buf) { + buf.writeInt(digest); + } + + @Override + int internalUpdate(int digest, ByteBuf data, int offset, int len) { + return intHash.resume(digest, data, offset, len); + } + + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + return intHash.resume(digest, buffer, offset, len); + } + + @Override + boolean acceptsMemoryAddressBuffer() { + return intHash.acceptsMemoryAddressBuffer(); + } + } + + @Test + public void shouldCalculateChecksumForCompositeBuffer() { + ByteBuf testPayload = Unpooled.wrappedBuffer(testPayLoad); + byte[] referenceOutput = computeDigestAndPackageForSending(new Java8IntHash(), testPayload.retainedDuplicate()); + assertDigestAndPackageMatchesReference(new Java8IntHash(), testPayload, referenceOutput); + assertDigestAndPackageMatchesReference(new Java9IntHash(), testPayload, referenceOutput); + if (Sse42Crc32C.isSupported()) { + assertDigestAndPackageMatchesReference(new JniIntHash(), testPayload, referenceOutput); + } + testPayload.release(); + } + + private void assertDigestAndPackageMatchesReference(IntHash intHash, ByteBuf payload, byte[] referenceOutput) { + assertDigestAndPackageScenario(intHash, payload.retainedDuplicate(), referenceOutput, testPayLoad, + "plain payload, no wrapping"); + + ByteBuf payload2 = wrapWithPrefixAndCompositeByteBufWithReaderIndexState(payload.retainedDuplicate(), + defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload2, referenceOutput, testPayLoad, + "payload with prefix wrapped in CompositeByteBuf with readerIndex state"); + + ByteBuf payload3 = wrapWithPrefixAndMultipleCompositeByteBufWithReaderIndexStateAndMultipleLayersOfDuplicate( + payload.retainedDuplicate(), defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload3, referenceOutput, testPayLoad, + "payload with prefix wrapped in 2 layers of CompositeByteBuf with readerIndex state in the outer " + + "composite. In addition, the outer composite is duplicated twice."); + + ByteBuf payload4 = wrapInCompositeByteBufAndSlice(payload.retainedDuplicate(), defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload4, referenceOutput, testPayLoad, + "payload with prefix wrapped in CompositeByteBuf and sliced"); + } + + private void assertDigestAndPackageScenario(IntHash intHash, ByteBuf payload, byte[] referenceOutput, + byte[] testPayLoadArray, + String scenario) { + // this validates that the readable bytes in the payload match the TEST_PAYLOAD content + assertArrayEquals(testPayLoadArray, ByteBufUtil.getBytes(payload.duplicate()), + "input is invalid for scenario '" + scenario + "'"); + + ByteBuf visitedCopy = Unpooled.buffer(payload.readableBytes()); + ByteBufVisitor.visitBuffers(payload, payload.readerIndex(), payload.readableBytes(), + new ByteBufVisitor.ByteBufVisitorCallback() { + @Override + public void visitBuffer(Void context, ByteBuf visitBuffer, int visitIndex, int visitLength) { + visitedCopy.writeBytes(visitBuffer, visitIndex, visitLength); + } + + @Override + public void visitArray(Void context, byte[] visitArray, int visitIndex, int visitLength) { + visitedCopy.writeBytes(visitArray, visitIndex, visitLength); + } + }, null); + + assertArrayEquals(ByteBufUtil.getBytes(visitedCopy), testPayLoadArray, + "visited copy is invalid for scenario '" + scenario + "'. Bug in ByteBufVisitor?"); + + // compute the digest and package + byte[] output = computeDigestAndPackageForSending(intHash, payload.duplicate()); + if (referenceOutput == null) { + referenceOutput = + computeDigestAndPackageForSending(new Java8IntHash(), Unpooled.wrappedBuffer(testPayLoadArray)); + } + // this validates that the output matches the reference output + assertArrayEquals(referenceOutput, output, "output is invalid for scenario '" + scenario + "'"); + } + + private byte[] computeDigestAndPackageForSending(IntHash intHash, ByteBuf data) { + DigestManager digestManager = new TestIntHashDigestManager(intHash, 1, useV2Protocol, ByteBufAllocator.DEFAULT); + ReferenceCounted packagedBuffer = + digestManager.computeDigestAndPackageForSending(1, 0, data.readableBytes(), data, + MacDigestManager.EMPTY_LEDGER_KEY, BookieProtocol.FLAG_NONE); + return packagedBufferToBytes(packagedBuffer); + } + + ByteBuf wrapWithPrefixAndCompositeByteBufWithReaderIndexState(ByteBuf payload, int bufferPrefixLength) { + // create a new buffer with a prefix and the actual payload + ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes()); + prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength)); + prefixedPayload.writeBytes(payload); + + // wrap the buffer in a composite buffer + CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + outerComposite.addComponent(true, prefixedPayload); + + // set reader index state. this is the state that gets lost in the unwrapping process + outerComposite.readerIndex(bufferPrefixLength); + + return outerComposite; + } + + ByteBuf wrapWithPrefixAndMultipleCompositeByteBufWithReaderIndexStateAndMultipleLayersOfDuplicate(ByteBuf payload, + int bufferPrefixLength) { + // create a new buffer with a prefix and the actual payload + ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes()); + prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength)); + prefixedPayload.writeBytes(payload); + + CompositeByteBuf innerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + innerComposite.addComponent(true, prefixedPayload); + innerComposite.addComponent(true, Unpooled.EMPTY_BUFFER); + + // wrap the buffer in a composite buffer + CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + outerComposite.addComponent(true, innerComposite); + outerComposite.addComponent(true, Unpooled.EMPTY_BUFFER); + + // set reader index state. this is the state that gets lost in the unwrapping process + outerComposite.readerIndex(bufferPrefixLength); + + return outerComposite.duplicate().duplicate(); + } + + ByteBuf wrapInCompositeByteBufAndSlice(ByteBuf payload, int bufferPrefixLength) { + // create a composite buffer + CompositeByteBuf compositeWithPrefix = ByteBufAllocator.DEFAULT.compositeBuffer(); + compositeWithPrefix.addComponent(true, Unpooled.wrappedBuffer(RandomUtils.nextBytes(bufferPrefixLength))); + compositeWithPrefix.addComponent(true, payload); + + // return a slice of the composite buffer so that it returns the payload + return compositeWithPrefix.slice(bufferPrefixLength, payload.readableBytes()); + } + + private static byte[] packagedBufferToBytes(ReferenceCounted packagedBuffer) { + byte[] output; + if (packagedBuffer instanceof ByteBufList) { + ByteBufList bufList = (ByteBufList) packagedBuffer; + output = new byte[bufList.readableBytes()]; + bufList.getBytes(output); + for (int i = 0; i < bufList.size(); i++) { + bufList.getBuffer(i).release(); + } + } else if (packagedBuffer instanceof ByteBuf) { + output = ByteBufUtil.getBytes((ByteBuf) packagedBuffer); + packagedBuffer.release(); + } else { + throw new RuntimeException("Unexpected type: " + packagedBuffer.getClass()); + } + return output; + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java index ac7aca77226..88de17d0a9d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -87,39 +86,6 @@ public void testDouble() throws Exception { assertEquals(b2.refCnt(), 0); } - @Test - public void testComposite() throws Exception { - ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); - b1.writerIndex(b1.capacity()); - ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); - b2.writerIndex(b2.capacity()); - - CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer(); - composite.addComponent(b1); - composite.addComponent(b2); - - ByteBufList buf = ByteBufList.get(composite); - - // composite is unwrapped into two parts - assertEquals(2, buf.size()); - // and released - assertEquals(composite.refCnt(), 0); - - assertEquals(256, buf.readableBytes()); - assertEquals(b1, buf.getBuffer(0)); - assertEquals(b2, buf.getBuffer(1)); - - assertEquals(buf.refCnt(), 1); - assertEquals(b1.refCnt(), 1); - assertEquals(b2.refCnt(), 1); - - buf.release(); - - assertEquals(buf.refCnt(), 0); - assertEquals(b1.refCnt(), 0); - assertEquals(b2.refCnt(), 0); - } - @Test public void testClone() throws Exception { ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java index 65a77b1492b..d90f8b7ea5d 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java @@ -71,12 +71,30 @@ public static int resumeChecksum(int previousChecksum, ByteBuf payload) { /** * Computes incremental checksum with input previousChecksum and input payload * - * @param previousChecksum : previously computed checksum - * @param payload - * @return + * @param previousChecksum the previously computed checksum + * @param payload the data for which the checksum is to be computed + * @param offset the starting position in the payload + * @param len the number of bytes to include in the checksum computation + * @return the updated checksum */ public static int resumeChecksum(int previousChecksum, ByteBuf payload, int offset, int len) { return CRC32C_HASH.resume(previousChecksum, payload, offset, len); } + /** + * Computes incremental checksum with input previousChecksum and input payload + * + * @param previousChecksum the previously computed checksum + * @param payload the data for which the checksum is to be computed + * @param offset the starting position in the payload + * @param len the number of bytes to include in the checksum computation + * @return the updated checksum + */ + public static int resumeChecksum(int previousChecksum, byte[] payload, int offset, int len) { + return CRC32C_HASH.resume(previousChecksum, payload, offset, len); + } + + public static boolean acceptsMemoryAddressBuffer() { + return CRC32C_HASH.acceptsMemoryAddressBuffer(); + } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java index e8922e3a16b..be98ae19be1 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java @@ -28,4 +28,8 @@ public interface IntHash { int resume(int current, ByteBuf buffer); int resume(int current, ByteBuf buffer, int offset, int len); + + int resume(int current, byte[] buffer, int offset, int len); + + boolean acceptsMemoryAddressBuffer(); } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java index fd548bc4de4..2825c610b11 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java @@ -53,4 +53,14 @@ public int resume(int current, ByteBuf buffer, int offset, int len) { return hash.resume(current, buffer.slice(offset, len).nioBuffer()); } } + + @Override + public int resume(int current, byte[] buffer, int offset, int len) { + return hash.resume(current, buffer, offset, len); + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java index 31af153666e..2e779a92766 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java @@ -19,7 +19,6 @@ package com.scurrilous.circe.checksum; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.util.concurrent.FastThreadLocal; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -77,7 +76,7 @@ public int calculate(ByteBuf buffer, int offset, int len) { return resume(0, buffer, offset, len); } - private int resume(int current, long address, int offset, int length) { + private int updateDirectByteBuffer(int current, long address, int offset, int length) { try { return (int) UPDATE_DIRECT_BYTEBUFFER.invoke(null, current, address, offset, offset + length); } catch (IllegalAccessException | InvocationTargetException e) { @@ -85,7 +84,20 @@ private int resume(int current, long address, int offset, int length) { } } - private int resume(int current, byte[] array, int offset, int length) { + @Override + public int resume(int current, byte[] array, int offset, int length) { + // the bit-wise complementing of the input and output is explained in the resume method below + current = ~current; + current = updateBytes(current, array, offset, length); + return ~current; + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return true; + } + + private static int updateBytes(int current, byte[] array, int offset, int length) { try { return (int) UPDATE_BYTES.invoke(null, current, array, offset, offset + length); } catch (IllegalAccessException | InvocationTargetException e) { @@ -100,33 +112,37 @@ public int resume(int current, ByteBuf buffer) { @Override public int resume(int current, ByteBuf buffer, int offset, int len) { - int negCrc = ~current; + // The input value is bit-wise complemented for two reasons: + // 1. The CRC32C algorithm is designed to start with a seed value where all bits are set to 1 (0xffffffff). + // When 0 is initially passed in, ~0 results in the correct initial value (0xffffffff). + // 2. The CRC32C algorithm complements the final value as the last step. This method will always complement + // the return value. Therefore, when the algorithm is used iteratively, it is necessary to complement + // the input value to continue calculations. + // This allows the algorithm to be used incrementally without needing separate initialization and + // finalization steps. + current = ~current; if (buffer.hasMemoryAddress()) { - negCrc = resume(negCrc, buffer.memoryAddress(), offset, len); + current = updateDirectByteBuffer(current, buffer.memoryAddress(), offset, len); } else if (buffer.hasArray()) { int arrayOffset = buffer.arrayOffset() + offset; - negCrc = resume(negCrc, buffer.array(), arrayOffset, len); - } else if (buffer instanceof CompositeByteBuf) { - CompositeByteBuf compositeByteBuf = (CompositeByteBuf) buffer; - int loopedCurrent = current; - for (int i = 0; i < compositeByteBuf.numComponents(); i ++) { - loopedCurrent = resume(loopedCurrent, compositeByteBuf.component(i)); - } - return loopedCurrent; + current = updateBytes(current, buffer.array(), arrayOffset, len); } else { byte[] b = TL_BUFFER.get(); int toRead = len; int loopOffset = offset; while (toRead > 0) { int length = Math.min(toRead, b.length); - buffer.slice(loopOffset, length).readBytes(b, 0, length); - negCrc = resume(negCrc, b, 0, length); + buffer.getBytes(loopOffset, b, 0, length); + current = updateBytes(current, b, 0, length); toRead -= length; loopOffset += length; } } - return ~negCrc; + // The current value is complemented to align with the finalization step of the CRC32C algorithm. + // If there is a subsequent resume step, the value will be complemented again to initiate the next step + // as described in the comments in the beginning of this method. + return ~current; } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java index e8e87bf6b1a..dc5bed0fc1c 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java @@ -51,4 +51,14 @@ public int resume(int current, ByteBuf buffer, int offset, int len) { return hash.resume(current, buffer.slice(offset, len).nioBuffer()); } } + + @Override + public int resume(int current, byte[] buffer, int offset, int len) { + return hash.resume(current, buffer, offset, len); + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return true; + } }