Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc][WIP] Detect "double release" and "use after release" bugs with recycled objects #22110

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>,
ReferenceCounted {

private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
private static final Recycler<EntryImpl> RECYCLER = new Recycler<>() {
@Override
protected EntryImpl newObject(Handle<EntryImpl> handle) {
return new EntryImpl(handle);
Expand All @@ -47,56 +47,61 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
private Runnable onDeallocate;

public static EntryImpl create(LedgerEntry ledgerEntry) {
EntryImpl entry = RECYCLER.get();
EntryImpl entry = getEntryFromRecycler();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerEntry.getLedgerId();
entry.entryId = ledgerEntry.getEntryId();
entry.data = ledgerEntry.getEntryBuffer();
entry.data.retain();
entry.setRefCnt(1);
return entry;
}

private static EntryImpl getEntryFromRecycler() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻 factoring this out makes it easier to follow (and add other common behaviors in one place later if needed - basic DRY stuff).

EntryImpl entry = RECYCLER.get();
if (entry.refCnt() != 1) {
throw new IllegalStateException(
"EntryImpl should be obtained from the recycler with refCnt == 1. refCnt = " + entry.refCnt()
+ " instance = " + entry);
}
lhotari marked this conversation as resolved.
Show resolved Hide resolved
return entry;
}

@VisibleForTesting
public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
EntryImpl entry = RECYCLER.get();
EntryImpl entry = getEntryFromRecycler();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.data = Unpooled.wrappedBuffer(data);
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
EntryImpl entry = getEntryFromRecycler();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.data = data;
entry.data.retain();
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(PositionImpl position, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
EntryImpl entry = getEntryFromRecycler();
entry.timestamp = System.nanoTime();
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.data = data;
entry.data.retain();
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(EntryImpl other) {
EntryImpl entry = RECYCLER.get();
EntryImpl entry = getEntryFromRecycler();
entry.timestamp = System.nanoTime();
entry.ledgerId = other.ledgerId;
entry.entryId = other.entryId;
entry.data = other.data.retainedDuplicate();
entry.setRefCnt(1);
return entry;
}

Expand All @@ -121,16 +126,19 @@ public void onDeallocate(Runnable r) {
}

public long getTimestamp() {
checkRefCount();
return timestamp;
}

@Override
public ByteBuf getDataBuffer() {
checkRefCount();
return data;
}

@Override
public byte[] getData() {
checkRefCount();
byte[] array = new byte[data.readableBytes()];
data.getBytes(data.readerIndex(), array);
return array;
Expand All @@ -139,33 +147,39 @@ public byte[] getData() {
// Only for test
@Override
public byte[] getDataAndRelease() {
checkRefCount();
byte[] array = getData();
release();
return array;
}

@Override
public int getLength() {
checkRefCount();
return data.readableBytes();
}

@Override
public PositionImpl getPosition() {
checkRefCount();
return new PositionImpl(ledgerId, entryId);
}

@Override
public long getLedgerId() {
checkRefCount();
return ledgerId;
}

@Override
public long getEntryId() {
checkRefCount();
return entryId;
}

@Override
public int compareTo(EntryImpl other) {
checkRefCount();
if (this.ledgerId != other.ledgerId) {
return this.ledgerId < other.ledgerId ? -1 : 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
*/

public abstract class AbstractCASReferenceCounted implements ReferenceCounted {
private static final boolean refCountCheckOnAccess =
Boolean.parseBoolean(System.getProperty("pulsar.refcount.check.on_access", "true"));

private static final AtomicIntegerFieldUpdater<AbstractCASReferenceCounted> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractCASReferenceCounted.class, "refCnt");

Expand All @@ -42,13 +45,6 @@ public final int refCnt() {
return refCnt;
}

/**
* An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly.
*/
protected final void setRefCnt(int refCnt) {
refCntUpdater.set(this, refCnt);
}

@Override
public ReferenceCounted retain() {
return retain0(1);
Expand All @@ -60,7 +56,7 @@ public ReferenceCounted retain(int increment) {
}

private ReferenceCounted retain0(int increment) {
for (;;) {
for (; ; ) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;

Expand Down Expand Up @@ -91,7 +87,7 @@ public boolean release(int decrement) {
}

private boolean release0(int decrement) {
for (;;) {
for (; ; ) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
Expand All @@ -111,4 +107,16 @@ private boolean release0(int decrement) {
* Called once {@link #refCnt()} is equals 0.
*/
protected abstract void deallocate();

/**
* Validate that the instance hasn't been released before accessing fields.
* This is a sanity check to ensure that we don't read fields from deallocated objects.
*/
protected void checkRefCount() {
if (refCountCheckOnAccess && refCnt() < 1) {
throw new IllegalReferenceCountException(
"Possible double release bug (refCnt=" + refCnt() + "). " + getClass().getSimpleName()
+ " has been deallocated. ");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -92,6 +91,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.AbstractValidatingReferenceCounted;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
Expand Down Expand Up @@ -1357,11 +1357,12 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
}
}

static class ChunkedMessageCtx extends AbstractReferenceCounted {
static class ChunkedMessageCtx extends AbstractValidatingReferenceCounted {
protected MessageIdImpl firstChunkMessageId;
protected MessageIdImpl lastChunkMessageId;

public ChunkMessageIdImpl getChunkMessageId() {
checkRefCount();
return new ChunkMessageIdImpl(firstChunkMessageId, lastChunkMessageId);
}

Expand All @@ -1374,8 +1375,10 @@ protected ProducerImpl.ChunkedMessageCtx newObject(
};

public static ChunkedMessageCtx get(int totalChunks) {
ChunkedMessageCtx chunkedMessageCtx = RECYCLER.get();
chunkedMessageCtx.setRefCnt(totalChunks);
ChunkedMessageCtx chunkedMessageCtx = getAndCheck(RECYCLER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is setting ref count to 1 and then retaining N-1 less performant than setting ref count to N?

OR

Is setting ref count to 1 and then retaining N-1 more functionally correct than setting ref count to N?

if (totalChunks > 1) {
chunkedMessageCtx.retain(totalChunks - 1);
}
return chunkedMessageCtx;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
package org.apache.pulsar.common.api.raw;

import io.netty.buffer.ByteBuf;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCounted;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.util.AbstractValidatingReferenceCounted;

/**
* Class representing a reference-counted object that requires explicit deallocation.
*/
public class ReferenceCountedMessageMetadata extends AbstractReferenceCounted {
public class ReferenceCountedMessageMetadata extends AbstractValidatingReferenceCounted {

private static final Recycler<ReferenceCountedMessageMetadata> RECYCLER = //
new Recycler<ReferenceCountedMessageMetadata>() {
Expand All @@ -46,10 +46,9 @@ private ReferenceCountedMessageMetadata(Recycler.Handle<ReferenceCountedMessageM
}

public static ReferenceCountedMessageMetadata get(ByteBuf parsedBuf) {
ReferenceCountedMessageMetadata ref = RECYCLER.get();
ReferenceCountedMessageMetadata ref = getAndCheck(RECYCLER);
ref.parsedBuf = parsedBuf;
ref.parsedBuf.retain();
ref.setRefCnt(1);
return ref;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import org.apache.pulsar.common.util.AbstractValidatingReferenceCounted;

/**
* ByteBuf holder that contains 2 buffers.
*/
public final class ByteBufPair extends AbstractReferenceCounted {
public final class ByteBufPair extends AbstractValidatingReferenceCounted {

private ByteBuf b1;
private ByteBuf b2;
Expand Down Expand Up @@ -62,22 +62,24 @@ private ByteBufPair(Handle<ByteBufPair> recyclerHandle) {
* @return
*/
public static ByteBufPair get(ByteBuf b1, ByteBuf b2) {
ByteBufPair buf = RECYCLER.get();
buf.setRefCnt(1);
ByteBufPair buf = getAndCheck(RECYCLER);
buf.b1 = b1;
buf.b2 = b2;
return buf;
}

public ByteBuf getFirst() {
checkRefCount();
return b1;
}

public ByteBuf getSecond() {
checkRefCount();
return b2;
}

public int readableBytes() {
checkRefCount();
return b1.readableBytes() + b2.readableBytes();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util;

import io.netty.util.AbstractReferenceCounted;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.Recycler;

public abstract class AbstractValidatingReferenceCounted extends AbstractReferenceCounted {
private static final boolean refCountCheckOnAccess =
Boolean.parseBoolean(System.getProperty("pulsar.refcount.check.on_access", "true"));
/**
* Validate that the instance hasn't been released before accessing fields.
* This is a sanity check to ensure that we don't read fields from deallocated objects.
*/
protected void checkRefCount() {
if (refCountCheckOnAccess && refCnt() < 1) {
throw new IllegalReferenceCountException(
"Possible double release bug (refCnt=" + refCnt() + "). " + getClass().getSimpleName()
+ " has been deallocated. ");
}
}

public static <T extends AbstractReferenceCounted> T getAndCheck(Recycler<T> recycler) {
T object = recycler.get();
if (object.refCnt() != 1) {
throw new IllegalReferenceCountException(object.getClass().getSimpleName()
+ " should be obtained from the recycler with refCnt == 1, (refCnt=" + object.refCnt()
+ ") instance=" + object);
}
return object;
}
}
Loading