Skip to content

Commit

Permalink
LedgerHandle: eliminate unnecessasary synchronization on LedgerHandle…
Browse files Browse the repository at this point in the history
….getLength() (#4516)
  • Loading branch information
eolivelli authored Nov 16, 2024
1 parent af8baa1 commit c6f27f0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
Expand Down Expand Up @@ -137,7 +138,7 @@ private enum HandleState {
*/
private int stickyBookieIndex;

long length;
final AtomicLong length;
final DigestManager macManager;
final DistributionSchedule distributionSchedule;
final RateLimiter throttler;
Expand Down Expand Up @@ -188,10 +189,10 @@ private enum HandleState {
LedgerMetadata metadata = versionedMetadata.getValue();
if (metadata.isClosed()) {
lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
length = metadata.getLength();
length = new AtomicLong(metadata.getLength());
} else {
lastAddConfirmed = lastAddPushed = INVALID_ENTRY_ID;
length = 0;
length = new AtomicLong();
}

this.pendingAddsSequenceHead = lastAddConfirmed;
Expand Down Expand Up @@ -365,7 +366,7 @@ boolean setLedgerMetadata(Versioned<LedgerMetadata> expected, Versioned<LedgerMe
LedgerMetadata metadata = versionedMetadata.getValue();
if (metadata.isClosed()) {
lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
length = metadata.getLength();
length.set(metadata.getLength());
}
return true;
} else {
Expand Down Expand Up @@ -422,9 +423,8 @@ DigestManager getDigestManager() {
* @param delta
* @return the length of the ledger after the addition
*/
synchronized long addToLength(long delta) {
this.length += delta;
return this.length;
long addToLength(long delta) {
return length.addAndGet(delta);
}

/**
Expand All @@ -433,8 +433,8 @@ synchronized long addToLength(long delta) {
* @return the length of the ledger in bytes
*/
@Override
public synchronized long getLength() {
return this.length;
public long getLength() {
return this.length.get();
}

/**
Expand Down Expand Up @@ -559,7 +559,7 @@ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc

// taking the length must occur after draining, as draining changes the length
lastEntry = lastAddPushed = LedgerHandle.this.lastAddConfirmed;
finalLength = LedgerHandle.this.length;
finalLength = LedgerHandle.this.length.get();
handleState = HandleState.CLOSED;
}

Expand Down Expand Up @@ -1649,7 +1649,7 @@ synchronized void updateLastConfirmed(long lac, long len) {
lacUpdateMissesCounter.inc();
}
lastAddPushed = Math.max(lastAddPushed, lac);
length = Math.max(length, len);
length.accumulateAndGet(len, (current, value) -> Math.max(current, value));
}

/**
Expand Down Expand Up @@ -1985,7 +1985,7 @@ public void asyncReadExplicitLastConfirmed(final ReadLastConfirmedCallback cb, f
isClosed = metadata.isClosed();
if (isClosed) {
lastAddConfirmed = metadata.getLastEntryId();
length = metadata.getLength();
length.set(metadata.getLength());
}
}
if (isClosed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
lh.lastAddPushed = lh.lastAddConfirmed = Math.max(data.getLastAddConfirmed(),
(lastEnsembleEntryId - 1));

lh.length = data.getLength();
lh.length.set(data.getLength());
lh.pendingAddsSequenceHead = lh.lastAddConfirmed;
startEntryToRead = endEntryToRead = lh.lastAddConfirmed;
}
Expand Down Expand Up @@ -192,7 +192,7 @@ public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object c
* be added again when processing the call to add it.
*/
synchronized (lh) {
lh.length = entry.getLength() - (long) data.length;
lh.length.set(entry.getLength() - (long) data.length);
// check whether entry id is expected, so we won't overwritten any entries by mistake
if (entry.getEntryId() != lh.lastAddPushed + 1) {
LOG.error("Unexpected to recovery add entry {} as entry {} for ledger {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ CompletableFuture<Versioned<LedgerMetadata>> closeRecovered() {
long lac, len;
synchronized (this) {
lac = lastAddConfirmed;
len = length;
len = length.get();
}
LOG.info("Closing recovered ledger {} at entry {}", getId(), lac);
CompletableFuture<Versioned<LedgerMetadata>> f = new MetadataUpdateLoop(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void asyncClose(CloseCallback cb, Object ctx) {
metadata = LedgerMetadataBuilder.from(metadata)
.withClosedState()
.withLastEntryId(lastEntry)
.withLength(length)
.withLength(length.get())
.build();
setLedgerMetadata(getVersionedLedgerMetadata(), new Versioned<>(metadata, new LongVersion(1L)));

Expand Down

0 comments on commit c6f27f0

Please sign in to comment.