From 397b6d46f6f8b413aa132d55a3da6c2523f08106 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 23 Feb 2024 17:44:11 +0200 Subject: [PATCH 1/4] WIP Add checks for preventing hard-to-debug double release bugs with recycled objects --- .../bookkeeper/mledger/impl/EntryImpl.java | 35 +++++++++----- .../util/AbstractCASReferenceCounted.java | 26 ++++++---- .../pulsar/client/impl/ProducerImpl.java | 9 ++-- .../raw/ReferenceCountedMessageMetadata.java | 8 ++-- .../pulsar/common/protocol/ByteBufPair.java | 11 +++-- .../AbstractValidatingReferenceCounted.java | 48 +++++++++++++++++++ 6 files changed, 105 insertions(+), 32 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 6512399173f0a..b4e201ad18367 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -31,7 +31,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable, ReferenceCounted { - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER = new Recycler<>() { @Override protected EntryImpl newObject(Handle handle) { return new EntryImpl(handle); @@ -47,56 +47,60 @@ protected EntryImpl newObject(Handle handle) { private Runnable onDeallocate; public static EntryImpl create(LedgerEntry ledgerEntry) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerEntry.getLedgerId(); entry.entryId = ledgerEntry.getEntryId(); entry.data = ledgerEntry.getEntryBuffer(); entry.data.retain(); - entry.setRefCnt(1); + return entry; + } + + private static EntryImpl getEntryFromRecycler() { + EntryImpl entry = RECYCLER.get(); + if (entry.refCnt() != 0) { + throw new IllegalStateException("EntryImpl should be obtained from the recycler with refCnt == 0"); + } + entry.retain(); return entry; } @VisibleForTesting public static EntryImpl create(long ledgerId, long entryId, byte[] data) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; entry.data = Unpooled.wrappedBuffer(data); - entry.setRefCnt(1); return entry; } public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; entry.data = data; entry.data.retain(); - entry.setRefCnt(1); return entry; } public static EntryImpl create(PositionImpl position, ByteBuf data) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = position.getLedgerId(); entry.entryId = position.getEntryId(); entry.data = data; entry.data.retain(); - entry.setRefCnt(1); return entry; } public static EntryImpl create(EntryImpl other) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = other.ledgerId; entry.entryId = other.entryId; entry.data = other.data.retainedDuplicate(); - entry.setRefCnt(1); return entry; } @@ -121,16 +125,19 @@ public void onDeallocate(Runnable r) { } public long getTimestamp() { + checkRefCount(); return timestamp; } @Override public ByteBuf getDataBuffer() { + checkRefCount(); return data; } @Override public byte[] getData() { + checkRefCount(); byte[] array = new byte[data.readableBytes()]; data.getBytes(data.readerIndex(), array); return array; @@ -139,6 +146,7 @@ public byte[] getData() { // Only for test @Override public byte[] getDataAndRelease() { + checkRefCount(); byte[] array = getData(); release(); return array; @@ -146,26 +154,31 @@ public byte[] getDataAndRelease() { @Override public int getLength() { + checkRefCount(); return data.readableBytes(); } @Override public PositionImpl getPosition() { + checkRefCount(); return new PositionImpl(ledgerId, entryId); } @Override public long getLedgerId() { + checkRefCount(); return ledgerId; } @Override public long getEntryId() { + checkRefCount(); return entryId; } @Override public int compareTo(EntryImpl other) { + checkRefCount(); if (this.ledgerId != other.ledgerId) { return this.ledgerId < other.ledgerId ? -1 : 1; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java index 8b0e25f1348ca..a60d14e9b567e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java @@ -32,6 +32,9 @@ */ public abstract class AbstractCASReferenceCounted implements ReferenceCounted { + private static final boolean refCountCheckOnAccess = + Boolean.parseBoolean(System.getProperty("pulsar.refcount.check.on_access", "true")); + private static final AtomicIntegerFieldUpdater refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractCASReferenceCounted.class, "refCnt"); @@ -42,13 +45,6 @@ public final int refCnt() { return refCnt; } - /** - * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly. - */ - protected final void setRefCnt(int refCnt) { - refCntUpdater.set(this, refCnt); - } - @Override public ReferenceCounted retain() { return retain0(1); @@ -60,7 +56,7 @@ public ReferenceCounted retain(int increment) { } private ReferenceCounted retain0(int increment) { - for (;;) { + for (; ; ) { int refCnt = this.refCnt; final int nextCnt = refCnt + increment; @@ -91,7 +87,7 @@ public boolean release(int decrement) { } private boolean release0(int decrement) { - for (;;) { + for (; ; ) { int refCnt = this.refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); @@ -111,4 +107,16 @@ private boolean release0(int decrement) { * Called once {@link #refCnt()} is equals 0. */ protected abstract void deallocate(); + + /** + * Validate that the instance hasn't been released before accessing fields. + * This is a sanity check to ensure that we don't read fields from deallocated objects. + */ + protected void checkRefCount() { + if (refCountCheckOnAccess && refCnt() < 1) { + throw new IllegalReferenceCountException( + "Possible double release bug (refCnt=" + refCnt() + "). " + getClass().getSimpleName() + + " has been deallocated. "); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 4908d10f330b3..ff22803c32bff 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -32,7 +32,6 @@ import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; @@ -92,6 +91,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.AbstractValidatingReferenceCounted; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RelativeTimeUtil; @@ -1357,11 +1357,12 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) { } } - static class ChunkedMessageCtx extends AbstractReferenceCounted { + static class ChunkedMessageCtx extends AbstractValidatingReferenceCounted { protected MessageIdImpl firstChunkMessageId; protected MessageIdImpl lastChunkMessageId; public ChunkMessageIdImpl getChunkMessageId() { + checkRefCount(); return new ChunkMessageIdImpl(firstChunkMessageId, lastChunkMessageId); } @@ -1374,8 +1375,8 @@ protected ProducerImpl.ChunkedMessageCtx newObject( }; public static ChunkedMessageCtx get(int totalChunks) { - ChunkedMessageCtx chunkedMessageCtx = RECYCLER.get(); - chunkedMessageCtx.setRefCnt(totalChunks); + ChunkedMessageCtx chunkedMessageCtx = getAndCheck(RECYCLER); + chunkedMessageCtx.retain(totalChunks); return chunkedMessageCtx; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java index 3d7c0f4c54f82..5af024d67c4d6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java @@ -19,15 +19,15 @@ package org.apache.pulsar.common.api.raw; import io.netty.buffer.ByteBuf; -import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.netty.util.ReferenceCounted; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.util.AbstractValidatingReferenceCounted; /** * Class representing a reference-counted object that requires explicit deallocation. */ -public class ReferenceCountedMessageMetadata extends AbstractReferenceCounted { +public class ReferenceCountedMessageMetadata extends AbstractValidatingReferenceCounted { private static final Recycler RECYCLER = // new Recycler() { @@ -46,10 +46,10 @@ private ReferenceCountedMessageMetadata(Recycler.Handle recyclerHandle) { * @return */ public static ByteBufPair get(ByteBuf b1, ByteBuf b2) { - ByteBufPair buf = RECYCLER.get(); - buf.setRefCnt(1); + ByteBufPair buf = getAndCheck(RECYCLER); + buf.retain(); buf.b1 = b1; buf.b2 = b2; return buf; } public ByteBuf getFirst() { + checkRefCount(); return b1; } public ByteBuf getSecond() { + checkRefCount(); return b2; } public int readableBytes() { + checkRefCount(); return b1.readableBytes() + b2.readableBytes(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java new file mode 100644 index 0000000000000..2410ca26e3148 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.IllegalReferenceCountException; +import io.netty.util.Recycler; + +public abstract class AbstractValidatingReferenceCounted extends AbstractReferenceCounted { + private static final boolean refCountCheckOnAccess = + Boolean.parseBoolean(System.getProperty("pulsar.refcount.check.on_access", "true")); + /** + * Validate that the instance hasn't been released before accessing fields. + * This is a sanity check to ensure that we don't read fields from deallocated objects. + */ + protected void checkRefCount() { + if (refCountCheckOnAccess && refCnt() < 1) { + throw new IllegalReferenceCountException( + "Possible double release bug (refCnt=" + refCnt() + "). " + getClass().getSimpleName() + + " has been deallocated. "); + } + } + + public static T getAndCheck(Recycler recycler) { + T object = recycler.get(); + if (object.refCnt() != 0) { + throw new IllegalReferenceCountException(object.getClass().getSimpleName() + + " should be obtained from the recycler with refCnt == 0, instance=" + object); + } + return object; + } +} From 373e3430d7952ee876688f43f6582a5f35f80906 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 23 Feb 2024 22:47:08 +0200 Subject: [PATCH 2/4] refCnt starts at 1 --- .../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java | 7 ++++--- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- .../common/api/raw/ReferenceCountedMessageMetadata.java | 1 - .../org/apache/pulsar/common/protocol/ByteBufPair.java | 1 - .../common/util/AbstractValidatingReferenceCounted.java | 5 +++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index b4e201ad18367..89461e323ea7f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -58,10 +58,11 @@ public static EntryImpl create(LedgerEntry ledgerEntry) { private static EntryImpl getEntryFromRecycler() { EntryImpl entry = RECYCLER.get(); - if (entry.refCnt() != 0) { - throw new IllegalStateException("EntryImpl should be obtained from the recycler with refCnt == 0"); + if (entry.refCnt() != 1) { + throw new IllegalStateException( + "EntryImpl should be obtained from the recycler with refCnt == 1. refCnt = " + entry.refCnt() + + " instance = " + entry); } - entry.retain(); return entry; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index ff22803c32bff..84593ec854b7d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1376,7 +1376,7 @@ protected ProducerImpl.ChunkedMessageCtx newObject( public static ChunkedMessageCtx get(int totalChunks) { ChunkedMessageCtx chunkedMessageCtx = getAndCheck(RECYCLER); - chunkedMessageCtx.retain(totalChunks); + chunkedMessageCtx.retain(totalChunks - 1); return chunkedMessageCtx; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java index 5af024d67c4d6..01737a84ff129 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java @@ -49,7 +49,6 @@ public static ReferenceCountedMessageMetadata get(ByteBuf parsedBuf) { ReferenceCountedMessageMetadata ref = getAndCheck(RECYCLER); ref.parsedBuf = parsedBuf; ref.parsedBuf.retain(); - ref.retain(); return ref; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index dbb7d7a0fdc09..705ebb91526e5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -63,7 +63,6 @@ private ByteBufPair(Handle recyclerHandle) { */ public static ByteBufPair get(ByteBuf b1, ByteBuf b2) { ByteBufPair buf = getAndCheck(RECYCLER); - buf.retain(); buf.b1 = b1; buf.b2 = b2; return buf; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java index 2410ca26e3148..346c08e0f08e3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java @@ -39,9 +39,10 @@ protected void checkRefCount() { public static T getAndCheck(Recycler recycler) { T object = recycler.get(); - if (object.refCnt() != 0) { + if (object.refCnt() != 1) { throw new IllegalReferenceCountException(object.getClass().getSimpleName() - + " should be obtained from the recycler with refCnt == 0, instance=" + object); + + " should be obtained from the recycler with refCnt == 1, (refCnt=" + object.refCnt() + + ") instance=" + object); } return object; } From dc3e1a9f9c4ac509977c2a0379063b795c2290c8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 23 Feb 2024 22:49:12 +0200 Subject: [PATCH 3/4] Fix retain logic for ChunkedMessageCtx to match setRefCnt(totalChunks) --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 84593ec854b7d..ef49337b18547 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1376,7 +1376,9 @@ protected ProducerImpl.ChunkedMessageCtx newObject( public static ChunkedMessageCtx get(int totalChunks) { ChunkedMessageCtx chunkedMessageCtx = getAndCheck(RECYCLER); - chunkedMessageCtx.retain(totalChunks - 1); + if (totalChunks > 1) { + chunkedMessageCtx.retain(totalChunks - 1); + } return chunkedMessageCtx; } From d59f9ab10ac4991047dad9b933824d4e0ae78667 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 26 Feb 2024 07:26:50 +0200 Subject: [PATCH 4/4] Remove refCnt==0 check since object might be new or reused when it gets here --- .../apache/bookkeeper/mledger/impl/EntryImpl.java | 6 +----- .../mledger/util/AbstractCASReferenceCounted.java | 4 ++++ .../util/AbstractValidatingReferenceCounted.java | 13 +++++++------ 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 89461e323ea7f..8a6cdb5372aab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -58,11 +58,7 @@ public static EntryImpl create(LedgerEntry ledgerEntry) { private static EntryImpl getEntryFromRecycler() { EntryImpl entry = RECYCLER.get(); - if (entry.refCnt() != 1) { - throw new IllegalStateException( - "EntryImpl should be obtained from the recycler with refCnt == 1. refCnt = " + entry.refCnt() - + " instance = " + entry); - } + entry.resetRefCnt(); return entry; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java index a60d14e9b567e..15fbcb0eea52d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java @@ -108,6 +108,10 @@ private boolean release0(int decrement) { */ protected abstract void deallocate(); + public final void resetRefCnt() { + refCntUpdater.set(this, 1); + } + /** * Validate that the instance hasn't been released before accessing fields. * This is a sanity check to ensure that we don't read fields from deallocated objects. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java index 346c08e0f08e3..82ed5ad7d9520 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java @@ -25,6 +25,7 @@ public abstract class AbstractValidatingReferenceCounted extends AbstractReferenceCounted { private static final boolean refCountCheckOnAccess = Boolean.parseBoolean(System.getProperty("pulsar.refcount.check.on_access", "true")); + /** * Validate that the instance hasn't been released before accessing fields. * This is a sanity check to ensure that we don't read fields from deallocated objects. @@ -37,13 +38,13 @@ protected void checkRefCount() { } } - public static T getAndCheck(Recycler recycler) { + public final void resetRefCnt() { + setRefCnt(1); + } + + public static T getAndCheck(Recycler recycler) { T object = recycler.get(); - if (object.refCnt() != 1) { - throw new IllegalReferenceCountException(object.getClass().getSimpleName() - + " should be obtained from the recycler with refCnt == 1, (refCnt=" + object.refCnt() - + ") instance=" + object); - } + object.resetRefCnt(); return object; } }