Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-4.16] Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 (#4196) #4205

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/bk-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ jobs:
path: surefire-reports
retention-days: 7

- name: print JVM thread dumps when cancelled
if: cancelled()
run: ./dev/ci-tool print_thread_dumps

integration-tests:
name: Integration Tests
runs-on: ubuntu-latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,17 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, buffer, offset, len);
}

@Override
boolean acceptsMemoryAddressBuffer() {
return Crc32cIntChecksum.acceptsMemoryAddressBuffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ interface CRC32Digest {
long getValueAndReset();

void update(ByteBuf buf, int offset, int len);
void update(byte[] buffer, int offset, int len);
}

private static final FastThreadLocal<CRC32Digest> crc = new FastThreadLocal<CRC32Digest>() {
Expand Down Expand Up @@ -62,14 +63,25 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
crc.get().update(data, offset, len);
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
crc.get().update(buffer, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
// This is stored as 8 bytes
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return DirectMemoryCRC32Digest.isSupported();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.FastThreadLocal;
import java.security.GeneralSecurityException;
Expand All @@ -34,6 +32,7 @@
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.ByteBufVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,10 +52,25 @@ public abstract class DigestManager {
final long ledgerId;
final boolean useV2Protocol;
private final ByteBufAllocator allocator;
private final DigestUpdaterByteBufVisitorCallback byteBufVisitorCallback;

abstract int getMacCodeLength();

abstract int update(int digest, ByteBuf buffer, int offset, int len);
abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len);

abstract int internalUpdate(int digest, byte[] buffer, int offset, int len);

final int update(int digest, ByteBuf buffer, int offset, int len) {
if (buffer.hasMemoryAddress() && acceptsMemoryAddressBuffer()) {
return internalUpdate(digest, buffer, offset, len);
} else if (buffer.hasArray()) {
return internalUpdate(digest, buffer.array(), buffer.arrayOffset() + offset, len);
} else {
UpdateContext updateContext = new UpdateContext(digest);
ByteBufVisitor.visitBuffers(buffer, offset, len, byteBufVisitorCallback, updateContext);
return updateContext.digest;
}
}

abstract void populateValueAndReset(int digest, ByteBuf buffer);

Expand All @@ -69,6 +83,7 @@ public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allo
this.useV2Protocol = useV2Protocol;
this.macCodeLength = getMacCodeLength();
this.allocator = allocator;
this.byteBufVisitorCallback = new DigestUpdaterByteBufVisitorCallback();
}

public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
Expand Down Expand Up @@ -136,34 +151,19 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long

// Compute checksum over the headers
int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.safeRelease(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());

populateValueAndReset(digest, buf);

// Reset the reader index to the beginning
buf.readerIndex(0);

if (isSmallEntry) {
buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
unwrapped.release();
buf.writeBytes(data, data.readerIndex(), data.readableBytes());
data.release();
return buf;
} else {
return ByteBufList.get(buf, unwrapped);
return ByteBufList.get(buf, data);
}
}

Expand All @@ -176,25 +176,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA
headersBuffer.writeLong(length);

int digest = update(0, headersBuffer, 0, METADATA_LENGTH);

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer, unwrapped);
return ByteBufList.get(headersBuffer, data);
}

/**
Expand Down Expand Up @@ -373,4 +357,34 @@ public RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) thr
long length = dataReceived.readLong();
return new RecoveryData(lastAddConfirmed, length);
}

private static class UpdateContext {
int digest;

UpdateContext(int digest) {
this.digest = digest;
}
}

private class DigestUpdaterByteBufVisitorCallback implements ByteBufVisitor.ByteBufVisitorCallback<UpdateContext> {

@Override
public void visitBuffer(UpdateContext context, ByteBuf visitBuffer, int visitIndex, int visitLength) {
// recursively visit the sub buffer and update the digest
context.digest = internalUpdate(context.digest, visitBuffer, visitIndex, visitLength);
}

@Override
public void visitArray(UpdateContext context, byte[] visitArray, int visitIndex, int visitLength) {
// update the digest with the array
context.digest = internalUpdate(context.digest, visitArray, visitIndex, visitLength);
}

@Override
public boolean acceptsMemoryAddress(UpdateContext context) {
return DigestManager.this.acceptsMemoryAddressBuffer();
}
}

abstract boolean acceptsMemoryAddressBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,32 @@ public void update(ByteBuf buf, int index, int length) {
crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length);
} else if (buf.hasArray()) {
// Use the internal method to update from array based
crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length);
crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length);
} else {
// Fallback to data copy if buffer is not contiguous
byte[] b = new byte[length];
buf.getBytes(index, b, 0, length);
crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length);
crcValue = updateArray(crcValue, b, 0, length);
}
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static int updateArray(int crcValue, byte[] buf, int offset, int length)
throws IllegalAccessException, InvocationTargetException {
return (int) updateBytes.invoke(null, crcValue, buf, offset, length);
}

@Override
public void update(byte[] buffer, int offset, int len) {
try {
crcValue = updateArray(crcValue, buffer, offset, len);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static final Method updateByteBuffer;
private static final Method updateBytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ int getMacCodeLength() {
}

@Override
int update(int digest, ByteBuf buffer, int offset, int len) {
int internalUpdate(int digest, ByteBuf buffer, int offset, int len) {
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return 0;
}

Expand All @@ -49,4 +54,9 @@ void populateValueAndReset(int digest, ByteBuf buffer) {}
boolean isInt32Digest() {
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,24 @@ void populateValueAndReset(int digest, ByteBuf buffer) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
mac.get().update(data.slice(offset, len).nioBuffer());
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
mac.get().update(buffer, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
return false;
}

@Override
boolean acceptsMemoryAddressBuffer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public long getValueAndReset() {
public void update(ByteBuf buf, int offset, int len) {
crc.update(buf.slice(offset, len).nioBuffer());
}

@Override
public void update(byte[] buffer, int offset, int len) {
crc.update(buffer, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -133,43 +132,14 @@ private static ByteBufList get() {
* Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
*/
public void add(ByteBuf buf) {
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(b -> {
ReferenceCountUtil.retain(b);
buffers.add(b);
});
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(unwrapped);
}
buffers.add(buf);
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
public void prepend(ByteBuf buf) {
// don't unwrap slices
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
for (int i = composite.numComponents() - 1; i >= 0; i--) {
ByteBuf b = composite.component(i);
ReferenceCountUtil.retain(b);
buffers.add(0, b);
}
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(0, unwrapped);
}
buffers.add(0, buf);
}

/**
Expand Down Expand Up @@ -285,7 +255,7 @@ public ByteBufList retain() {
@Override
protected void deallocate() {
for (int i = 0; i < buffers.size(); i++) {
ReferenceCountUtil.release(buffers.get(i));
buffers.get(i).release();
}

buffers.clear();
Expand Down
Loading
Loading