From e41a9e0de527171b24161a9979795246045fd9b7 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sat, 27 Jan 2024 11:19:17 +0800 Subject: [PATCH 01/14] Refactor read op, and introduce batchReadOp. --- .../bookkeeper/client/BatchedReadOp.java | 323 +++++++++++ .../client/ListenerBasedPendingReadOp.java | 2 +- .../bookkeeper/client/PendingReadOp.java | 534 +++++------------- .../apache/bookkeeper/client/ReadOpBase.java | 282 +++++++++ .../bookkeeper/client/TestParallelRead.java | 16 +- 5 files changed, 761 insertions(+), 396 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java new file mode 100644 index 00000000000..131b6d0c380 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -0,0 +1,323 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.BatchedReadEntryCallback; +import org.apache.bookkeeper.proto.checksum.DigestManager; +import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.MathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallback { + + private static final Logger LOG = LoggerFactory.getLogger(BatchedReadOp.class); + + final int maxCount; + final long maxSize; + + BatchedLedgerEntryRequest request; + + BatchedReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + int maxCount, + long maxSize, + boolean isRecoveryRead) { + super(lh, clientCtx, startEntryId, -1L, isRecoveryRead); + this.maxCount = maxCount; + this.maxSize = maxSize; + } + + @Override + void initiate() { + this.requestTimeNanos = MathUtils.nowInNano(); + List ensemble = getLedgerMetadata().getEnsembleAt(startEntryId); + if (parallelRead) { + LOG.info("Batch read unsupported the parallelRead, fail back to sequence read."); + } + request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize); + request.read(); + } + + @Override + protected void submitCallback(int code) { + // ensure callback once + if (!complete.compareAndSet(false, true)) { + return; + } + + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); + if (code != BKException.Code.OK) { + LOG.error( + "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " + + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", + lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, + BKException.getMessage(code), startEntryId, code); + clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + // release the entries + + request.close(); + future.completeExceptionally(BKException.create(code)); + } else { + clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + future.complete(LedgerEntriesImpl.create(request.entries)); + } + } + + BatchedReadOp parallelRead(boolean enabled) { + this.parallelRead = enabled; + return this; + } + + @Override + public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBufList bufList, Object ctx) { + final ReadContext rctx = (ReadContext) ctx; + final BatchedLedgerEntryRequest entry = (BatchedLedgerEntryRequest) rctx.entry; + + if (rc != BKException.Code.OK) { + entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); + return; + } + + heardFromHosts.add(rctx.to); + heardFromHostsBitSet.set(rctx.bookieIndex, true); + + bufList.retain(); + // if entry has completed don't handle twice + if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) { + if (!isRecoveryRead) { + // do not advance LastAddConfirmed for recovery reads + lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); + } + submitCallback(BKException.Code.OK); + } else { + bufList.release(); + } + } + + void sendReadTo(int bookieIndex, BookieId to, BatchedLedgerEntryRequest entry) throws InterruptedException { + if (lh.throttler != null) { + lh.throttler.acquire(); + } + if (isRecoveryRead) { + int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; + clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, entry.eId, + maxCount, maxSize, this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); + } else { + clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, entry.eId, maxCount, maxSize, + this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + } + } + + abstract class BatchedLedgerEntryRequest extends LedgerEntryRequest { + + //Indicate which ledger the BatchedLedgerEntryRequest is reading. + final long lId; + final int maxCount; + final long maxSize; + + final List entries; + + BatchedLedgerEntryRequest(List ensemble, long lId, long eId, int maxCount, long maxSize) { + super(ensemble, eId); + this.lId = lId; + this.maxCount = maxCount; + this.maxSize = maxSize; + this.entries = new ArrayList<>(maxCount); + } + + boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { + if (isComplete()) { + return false; + } + if (!complete.getAndSet(true)) { + for (int i = 0; i < bufList.size(); i++) { + ByteBuf buffer = bufList.getBuffer(i); + ByteBuf content; + try { + content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer); + } catch (BKException.BKDigestMatchException e) { + clientCtx.getClientStats().getReadOpDmCounter().inc(); + logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", + BKException.Code.DigestMatchException); + return false; + } + rc = BKException.Code.OK; + /* + * The length is a long and it is the last field of the metadata of an entry. + * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length. + */ + LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i); + entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8)); + entryImpl.setEntryBuf(content); + entries.add(entryImpl); + } + writeSet.recycle(); + return true; + } else { + writeSet.recycle(); + return false; + } + } + + @Override + public String toString() { + return String.format("L%d-E%d~%d s-%d", lh.getId(), eId, eId + maxCount, maxSize); + } + } + + class SequenceReadRequest extends BatchedLedgerEntryRequest { + + static final int NOT_FOUND = -1; + int nextReplicaIndexToReadFrom = 0; + final BitSet sentReplicas; + final BitSet erroredReplicas; + SequenceReadRequest(List ensemble, + long lId, + long eId, + int maxCount, + long maxSize) { + super(ensemble, lId, eId, maxCount, maxSize); + this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + } + + @Override + void read() { + sendNextRead(); + } + + private synchronized int getNextReplicaIndexToReadFrom() { + return nextReplicaIndexToReadFrom; + } + + private BitSet getSentToBitSet() { + BitSet b = new BitSet(ensemble.size()); + + for (int i = 0; i < sentReplicas.length(); i++) { + if (sentReplicas.get(i)) { + b.set(writeSet.get(i)); + } + } + return b; + } + + private boolean readsOutstanding() { + return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0; + } + + @Override + synchronized BookieId maybeSendSpeculativeRead(BitSet heardFrom) { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + return null; + } + + BitSet sentTo = getSentToBitSet(); + sentTo.and(heardFrom); + + // only send another read if we have had no successful response at all + // (even for other entries) from any of the other bookies we have sent the + // request to + if (sentTo.cardinality() == 0) { + clientCtx.getClientStats().getSpeculativeReadCounter().inc(); + return sendNextRead(); + } else { + return null; + } + } + + synchronized BookieId sendNextRead() { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + // we are done, the read has failed from all replicas, just fail the + // read + fail(firstError); + return null; + } + + // ToDo: pick replica with writable PCBC. ISSUE #1239 + // https://github.com/apache/bookkeeper/issues/1239 + int replica = nextReplicaIndexToReadFrom; + int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom); + nextReplicaIndexToReadFrom++; + + try { + BookieId to = ensemble.get(bookieIndex); + sendReadTo(bookieIndex, to, this); + sentToHosts.add(to); + sentReplicas.set(replica); + return to; + } catch (InterruptedException ie) { + LOG.error("Interrupted reading entry " + this, ie); + Thread.currentThread().interrupt(); + fail(BKException.Code.InterruptedException); + return null; + } + } + + @Override + synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); + int replica = writeSet.indexOf(bookieIndex); + if (replica == NOT_FOUND) { + LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble); + return; + } + erroredReplicas.set(replica); + if (isRecoveryRead && (numBookiesMissingEntry >= requiredBookiesMissingEntryForRecovery)) { + /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies report that they do not + * have the entry */ + fail(BKException.Code.NoSuchEntryException); + return; + } + + if (!readsOutstanding()) { + sendNextRead(); + } + } + + @Override + boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { + boolean completed = super.complete(bookieIndex, host, bufList); + if (completed) { + int numReplicasTried = getNextReplicaIndexToReadFrom(); + // Check if any speculative reads were issued and mark any slow bookies before + // the first successful speculative read as "slow" + for (int i = 0; i < numReplicasTried - 1; i++) { + int slowBookieIndex = writeSet.get(i); + BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId); + } + } + return completed; + } + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java index 6733b2e9ea9..fedb79696a9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java @@ -45,7 +45,7 @@ class ListenerBasedPendingReadOp extends PendingReadOp { @Override protected void submitCallback(int code) { - LedgerEntryRequest request; + SingleLedgerEntryRequest request; while (!seq.isEmpty() && (request = seq.getFirst()) != null) { if (!request.isComplete()) { return; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 73715859c0d..e3d1a57be8e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -21,27 +21,17 @@ package org.apache.bookkeeper.client; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; import io.netty.buffer.ByteBuf; import java.util.BitSet; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.bookkeeper.client.BKException.BKDigestMatchException; -import org.apache.bookkeeper.client.api.LedgerEntries; -import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; @@ -54,85 +44,176 @@ * application as soon as it arrives rather than waiting for the whole thing. * */ -class PendingReadOp implements ReadEntryCallback, Runnable { +class PendingReadOp extends ReadOpBase implements ReadEntryCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class); private ScheduledFuture speculativeTask = null; - protected final LinkedList seq; - private final CompletableFuture future; - private final Set heardFromHosts; - private final BitSet heardFromHostsBitSet; - private final Set sentToHosts = new HashSet(); - LedgerHandle lh; - final ClientContext clientCtx; + protected final LinkedList seq; - long numPendingEntries; - final long startEntryId; - final long endEntryId; - long requestTimeNanos; + PendingReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + long endEntryId, + boolean isRecoveryRead) { + super(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead); + this.seq = new LinkedList<>(); + numPendingEntries = endEntryId - startEntryId + 1; + } + + protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { + if (speculativeTask != null) { + speculativeTask.cancel(mayInterruptIfRunning); + speculativeTask = null; + } + } + + public ScheduledFuture getSpeculativeTask() { + return speculativeTask; + } + + PendingReadOp parallelRead(boolean enabled) { + this.parallelRead = enabled; + return this; + } + + void initiate() { + long nextEnsembleChange = startEntryId, i = startEntryId; + this.requestTimeNanos = MathUtils.nowInNano(); + List ensemble = null; + do { + if (i == nextEnsembleChange) { + ensemble = getLedgerMetadata().getEnsembleAt(i); + nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i); + } + SingleLedgerEntryRequest entry; + if (parallelRead) { + entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); + } else { + entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); + } + seq.add(entry); + i++; + } while (i <= endEntryId); + // read the entries. + for (LedgerEntryRequest entry : seq) { + entry.read(); + if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); + } + } + } + + @Override + public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { + final ReadContext rctx = (ReadContext) ctx; + final SingleLedgerEntryRequest entry = (SingleLedgerEntryRequest) rctx.entry; + + if (rc != BKException.Code.OK) { + entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); + return; + } + + heardFromHosts.add(rctx.to); + heardFromHostsBitSet.set(rctx.bookieIndex, true); + + buffer.retain(); + // if entry has completed don't handle twice + if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { + if (!isRecoveryRead) { + // do not advance LastAddConfirmed for recovery reads + lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); + } + submitCallback(BKException.Code.OK); + } else { + buffer.release(); + } + + if (numPendingEntries < 0) { + LOG.error("Read too many values for ledger {} : [{}, {}].", + ledgerId, startEntryId, endEntryId); + } + + } + + protected void submitCallback(int code) { + if (BKException.Code.OK == code) { + numPendingEntries--; + if (numPendingEntries != 0) { + return; + } + } - final int requiredBookiesMissingEntryForRecovery; - final boolean isRecoveryRead; + // ensure callback once + if (!complete.compareAndSet(false, true)) { + return; + } - boolean parallelRead = false; - final AtomicBoolean complete = new AtomicBoolean(false); - boolean allowFailFast = false; + cancelSpeculativeTask(true); - abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable { + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); + if (code != BKException.Code.OK) { + long firstUnread = LedgerHandle.INVALID_ENTRY_ID; + Integer firstRc = null; + for (LedgerEntryRequest req : seq) { + if (!req.isComplete()) { + firstUnread = req.eId; + firstRc = req.rc; + break; + } + } + LOG.error( + "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " + + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", + lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, + BKException.getMessage(code), firstUnread, firstRc); + clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + // release the entries + seq.forEach(LedgerEntryRequest::close); + future.completeExceptionally(BKException.create(code)); + } else { + clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl))); + } + } - final AtomicBoolean complete = new AtomicBoolean(false); + void sendReadTo(int bookieIndex, BookieId to, SingleLedgerEntryRequest entry) throws InterruptedException { + if (lh.throttler != null) { + lh.throttler.acquire(); + } - int rc = BKException.Code.OK; - int firstError = BKException.Code.OK; - int numBookiesMissingEntry = 0; + if (isRecoveryRead) { + int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; + clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, + this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); + } else { + clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, + this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + } + } - final List ensemble; - final DistributionSchedule.WriteSet writeSet; + abstract class SingleLedgerEntryRequest extends LedgerEntryRequest { final LedgerEntryImpl entryImpl; - final long eId; - LedgerEntryRequest(List ensemble, long lId, long eId) { + SingleLedgerEntryRequest(List ensemble, long lId, long eId) { + super(ensemble, eId); this.entryImpl = LedgerEntryImpl.create(lId, eId); - this.ensemble = ensemble; - this.eId = eId; - - if (clientCtx.getConf().enableReorderReadSequence) { - writeSet = clientCtx.getPlacementPolicy() - .reorderReadSequence( - ensemble, - lh.getBookiesHealthInfo(), - lh.getWriteSetForReadOperation(eId)); - } else { - writeSet = lh.getWriteSetForReadOperation(eId); - } } @Override public void close() { - // this request has succeeded before, can't recycle writeSet again - if (complete.compareAndSet(false, true)) { - rc = BKException.Code.UnexpectedConditionException; - writeSet.recycle(); - } + super.close(); entryImpl.close(); } - /** - * Execute the read request. - */ - abstract void read(); - /** * Complete the read request from host. * - * @param bookieIndex - * bookie index - * @param host - * host that respond the read - * @param buffer - * the data buffer + * @param bookieIndex bookie index + * @param host host that respond the read + * @param buffer the data buffer * @return return true if we managed to complete the entry; - * otherwise return false if the read entry is not complete or it is already completed before + * otherwise return false if the read entry is not complete or it is already completed before */ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { ByteBuf content; @@ -141,7 +222,7 @@ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { } try { content = lh.macManager.verifyDigestAndReturnData(eId, buffer); - } catch (BKDigestMatchException e) { + } catch (BKException.BKDigestMatchException e) { clientCtx.getClientStats().getReadOpDmCounter().inc(); logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException); return false; @@ -161,125 +242,9 @@ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { return false; } } - - /** - * Fail the request with given result code rc. - * - * @param rc - * result code to fail the request. - * @return true if we managed to fail the entry; otherwise return false if it already failed or completed. - */ - boolean fail(int rc) { - if (complete.compareAndSet(false, true)) { - this.rc = rc; - submitCallback(rc); - return true; - } else { - return false; - } - } - - /** - * Log error errMsg and reattempt read from host. - * - * @param bookieIndex - * bookie index - * @param host - * host that just respond - * @param errMsg - * error msg to log - * @param rc - * read result code - */ - synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { - if (BKException.Code.OK == firstError - || BKException.Code.NoSuchEntryException == firstError - || BKException.Code.NoSuchLedgerExistsException == firstError) { - firstError = rc; - } else if (BKException.Code.BookieHandleNotAvailableException == firstError - && BKException.Code.NoSuchEntryException != rc - && BKException.Code.NoSuchLedgerExistsException != rc) { - // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is - // returned we need to update firstError to indicate that it might be a valid read but just - // failed. - firstError = rc; - } - if (BKException.Code.NoSuchEntryException == rc - || BKException.Code.NoSuchLedgerExistsException == rc) { - ++numBookiesMissingEntry; - if (LOG.isDebugEnabled()) { - LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", - lh.ledgerId, eId, host); - } - } else { - if (LOG.isInfoEnabled()) { - LOG.info("{} while reading L{} E{} from bookie: {}", - errMsg, lh.ledgerId, eId, host); - } - } - - lh.recordReadErrorOnBookie(bookieIndex); - } - - /** - * Send to next replica speculatively, if required and possible. - * This returns the host we may have sent to for unit testing. - * - * @param heardFromHostsBitSet - * the set of hosts that we already received responses. - * @return host we sent to if we sent. null otherwise. - */ - abstract BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet); - - /** - * Whether the read request completed. - * - * @return true if the read request is completed. - */ - boolean isComplete() { - return complete.get(); - } - - /** - * Get result code of this entry. - * - * @return result code. - */ - int getRc() { - return rc; - } - - @Override - public String toString() { - return String.format("L%d-E%d", lh.getId(), eId); - } - - /** - * Issues a speculative request and indicates if more speculative - * requests should be issued. - * - * @return whether more speculative requests should be issued - */ - @Override - public ListenableFuture issueSpeculativeRequest() { - return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable() { - @Override - public Boolean call() throws Exception { - if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Send speculative read for {}. Hosts sent are {}, " - + " Hosts heard are {}, ensemble is {}.", - this, sentToHosts, heardFromHostsBitSet, ensemble); - } - return true; - } - return false; - } - }); - } } - class ParallelReadRequest extends LedgerEntryRequest { + class ParallelReadRequest extends SingleLedgerEntryRequest { int numPendings; @@ -326,7 +291,7 @@ BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) { } } - class SequenceReadRequest extends LedgerEntryRequest { + class SequenceReadRequest extends SingleLedgerEntryRequest { static final int NOT_FOUND = -1; int nextReplicaIndexToReadFrom = 0; @@ -456,205 +421,4 @@ boolean complete(int bookieIndex, BookieId host, ByteBuf buffer) { return completed; } } - - PendingReadOp(LedgerHandle lh, - ClientContext clientCtx, - long startEntryId, - long endEntryId, - boolean isRecoveryRead) { - this.seq = new LinkedList<>(); - this.future = new CompletableFuture<>(); - this.lh = lh; - this.clientCtx = clientCtx; - this.startEntryId = startEntryId; - this.endEntryId = endEntryId; - this.isRecoveryRead = isRecoveryRead; - - this.allowFailFast = false; - numPendingEntries = endEntryId - startEntryId + 1; - requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize() - - getLedgerMetadata().getAckQuorumSize() + 1; - heardFromHosts = new HashSet<>(); - heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); - } - - CompletableFuture future() { - return future; - } - - protected LedgerMetadata getLedgerMetadata() { - return lh.getLedgerMetadata(); - } - - protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { - if (speculativeTask != null) { - speculativeTask.cancel(mayInterruptIfRunning); - speculativeTask = null; - } - } - - public ScheduledFuture getSpeculativeTask() { - return speculativeTask; - } - - PendingReadOp parallelRead(boolean enabled) { - this.parallelRead = enabled; - return this; - } - - void allowFailFastOnUnwritableChannel() { - allowFailFast = true; - } - - public void submit() { - clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this); - } - - void initiate() { - long nextEnsembleChange = startEntryId, i = startEntryId; - this.requestTimeNanos = MathUtils.nowInNano(); - List ensemble = null; - do { - if (i == nextEnsembleChange) { - ensemble = getLedgerMetadata().getEnsembleAt(i); - nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i); - } - LedgerEntryRequest entry; - if (parallelRead) { - entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); - } else { - entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); - } - seq.add(entry); - i++; - } while (i <= endEntryId); - // read the entries. - for (LedgerEntryRequest entry : seq) { - entry.read(); - if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { - speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() - .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); - } - } - } - - @Override - public void run() { - initiate(); - } - - private static class ReadContext implements ReadEntryCallbackCtx { - final int bookieIndex; - final BookieId to; - final LedgerEntryRequest entry; - long lac = LedgerHandle.INVALID_ENTRY_ID; - - ReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { - this.bookieIndex = bookieIndex; - this.to = to; - this.entry = entry; - } - - @Override - public void setLastAddConfirmed(long lac) { - this.lac = lac; - } - - @Override - public long getLastAddConfirmed() { - return lac; - } - } - - private static ReadContext createReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { - return new ReadContext(bookieIndex, to, entry); - } - - void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry) throws InterruptedException { - if (lh.throttler != null) { - lh.throttler.acquire(); - } - - if (isRecoveryRead) { - int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; - clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); - } else { - clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); - } - } - - @Override - public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { - final ReadContext rctx = (ReadContext) ctx; - final LedgerEntryRequest entry = rctx.entry; - - if (rc != BKException.Code.OK) { - entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); - return; - } - - heardFromHosts.add(rctx.to); - heardFromHostsBitSet.set(rctx.bookieIndex, true); - - buffer.retain(); - // if entry has completed don't handle twice - if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { - if (!isRecoveryRead) { - // do not advance LastAddConfirmed for recovery reads - lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); - } - submitCallback(BKException.Code.OK); - } else { - buffer.release(); - } - - if (numPendingEntries < 0) { - LOG.error("Read too many values for ledger {} : [{}, {}].", - ledgerId, startEntryId, endEntryId); - } - } - - protected void submitCallback(int code) { - if (BKException.Code.OK == code) { - numPendingEntries--; - if (numPendingEntries != 0) { - return; - } - } - - // ensure callback once - if (!complete.compareAndSet(false, true)) { - return; - } - - cancelSpeculativeTask(true); - - long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); - if (code != BKException.Code.OK) { - long firstUnread = LedgerHandle.INVALID_ENTRY_ID; - Integer firstRc = null; - for (LedgerEntryRequest req : seq) { - if (!req.isComplete()) { - firstUnread = req.eId; - firstRc = req.rc; - break; - } - } - LOG.error( - "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " - + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", - lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, - BKException.getMessage(code), firstUnread, firstRc); - clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); - // release the entries - seq.forEach(LedgerEntryRequest::close); - future.completeExceptionally(BKException.create(code)); - } else { - clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); - future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl))); - } - } - } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java new file mode 100644 index 00000000000..9c535384b5f --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java @@ -0,0 +1,282 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ReadOpBase implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ReadOpBase.class); + + protected final CompletableFuture future; + protected final Set heardFromHosts; + protected final BitSet heardFromHostsBitSet; + protected final Set sentToHosts = new HashSet(); + LedgerHandle lh; + protected ClientContext clientCtx; + + protected final long startEntryId; + protected long requestTimeNanos; + + protected final int requiredBookiesMissingEntryForRecovery; + protected final boolean isRecoveryRead; + + protected boolean parallelRead = false; + protected final AtomicBoolean complete = new AtomicBoolean(false); + protected boolean allowFailFast = false; + long numPendingEntries; + final long endEntryId; + + protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, + boolean isRecoveryRead) { + this.lh = lh; + this.future = new CompletableFuture<>(); + this.startEntryId = startEntryId; + this.endEntryId = endEntryId; + this.isRecoveryRead = isRecoveryRead; + this.requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize() + - getLedgerMetadata().getAckQuorumSize() + 1; + this.heardFromHosts = new HashSet<>(); + this.heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); + this.allowFailFast = false; + this.clientCtx = clientCtx; + } + + protected LedgerMetadata getLedgerMetadata() { + return lh.getLedgerMetadata(); + } + + CompletableFuture future() { + return future; + } + + void allowFailFastOnUnwritableChannel() { + allowFailFast = true; + } + + public void submit() { + clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this); + } + + @Override + public void run() { + initiate(); + } + + abstract void initiate(); + + protected abstract void submitCallback(int code); + + abstract class LedgerEntryRequest implements SpeculativeRequestExecutor { + + final AtomicBoolean complete = new AtomicBoolean(false); + + int rc = BKException.Code.OK; + int firstError = BKException.Code.OK; + int numBookiesMissingEntry = 0; + + final long eId; + + final List ensemble; + final DistributionSchedule.WriteSet writeSet; + + + LedgerEntryRequest(List ensemble, final long eId) { + this.ensemble = ensemble; + this.eId = eId; + if (clientCtx.getConf().enableReorderReadSequence) { + writeSet = clientCtx.getPlacementPolicy() + .reorderReadSequence( + ensemble, + lh.getBookiesHealthInfo(), + lh.getWriteSetForReadOperation(eId)); + } else { + writeSet = lh.getWriteSetForReadOperation(eId); + } + } + + public void close() { + // this request has succeeded before, can't recycle writeSet again + if (complete.compareAndSet(false, true)) { + rc = BKException.Code.UnexpectedConditionException; + writeSet.recycle(); + } + } + + /** + * Execute the read request. + */ + abstract void read(); + + /** + * Fail the request with given result code rc. + * + * @param rc + * result code to fail the request. + * @return true if we managed to fail the entry; otherwise return false if it already failed or completed. + */ + boolean fail(int rc) { + if (complete.compareAndSet(false, true)) { + this.rc = rc; + writeSet.recycle(); + submitCallback(rc); + return true; + } else { + return false; + } + } + + /** + * Log error errMsg and reattempt read from host. + * + * @param bookieIndex + * bookie index + * @param host + * host that just respond + * @param errMsg + * error msg to log + * @param rc + * read result code + */ + synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + if (BKException.Code.OK == firstError + || BKException.Code.NoSuchEntryException == firstError + || BKException.Code.NoSuchLedgerExistsException == firstError) { + firstError = rc; + } else if (BKException.Code.BookieHandleNotAvailableException == firstError + && BKException.Code.NoSuchEntryException != rc + && BKException.Code.NoSuchLedgerExistsException != rc) { + // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is + // returned we need to update firstError to indicate that it might be a valid read but just + // failed. + firstError = rc; + } + if (BKException.Code.NoSuchEntryException == rc + || BKException.Code.NoSuchLedgerExistsException == rc) { + ++numBookiesMissingEntry; + if (LOG.isDebugEnabled()) { + LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", + lh.ledgerId, eId, host); + } + } else { + if (LOG.isInfoEnabled()) { + LOG.info("{} while reading L{} E{} from bookie: {}", + errMsg, lh.ledgerId, eId, host); + } + } + + lh.recordReadErrorOnBookie(bookieIndex); + } + + /** + * Send to next replica speculatively, if required and possible. + * This returns the host we may have sent to for unit testing. + * + * @param heardFromHostsBitSet + * the set of hosts that we already received responses. + * @return host we sent to if we sent. null otherwise. + */ + abstract BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet); + + /** + * Whether the read request completed. + * + * @return true if the read request is completed. + */ + boolean isComplete() { + return complete.get(); + } + + /** + * Get result code of this entry. + * + * @return result code. + */ + int getRc() { + return rc; + } + + @Override + public String toString() { + return String.format("L%d-E%d", lh.getId(), eId); + } + + /** + * Issues a speculative request and indicates if more speculative + * requests should be issued. + * + * @return whether more speculative requests should be issued + */ + @Override + public ListenableFuture issueSpeculativeRequest() { + return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable() { + @Override + public Boolean call() throws Exception { + if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send speculative read for {}. Hosts sent are {}, " + + " Hosts heard are {}, ensemble is {}.", + this, sentToHosts, heardFromHostsBitSet, ensemble); + } + return true; + } + return false; + } + }); + } + } + + protected static class ReadContext implements BookkeeperInternalCallbacks.ReadEntryCallbackCtx { + final int bookieIndex; + final BookieId to; + final PendingReadOp.LedgerEntryRequest entry; + long lac = LedgerHandle.INVALID_ENTRY_ID; + + ReadContext(int bookieIndex, BookieId to, PendingReadOp.LedgerEntryRequest entry) { + this.bookieIndex = bookieIndex; + this.to = to; + this.entry = entry; + } + + @Override + public void setLastAddConfirmed(long lac) { + this.lac = lac; + } + + @Override + public long getLastAddConfirmed() { + return lac; + } + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java index f2ada1e5dc8..129fa6915e1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java @@ -268,8 +268,8 @@ public void testLedgerEntryRequestComplete() throws Exception { PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1, 2, false); pendingReadOp.parallelRead(true); pendingReadOp.initiate(); - PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0); - PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1); + PendingReadOp.SingleLedgerEntryRequest first = pendingReadOp.seq.get(0); + PendingReadOp.SingleLedgerEntryRequest second = pendingReadOp.seq.get(1); pendingReadOp.submitCallback(-105); @@ -287,13 +287,9 @@ public void testLedgerEntryRequestComplete() throws Exception { assertTrue(second.complete.get()); // Mock ledgerEntryImpl reuse - Method method = PendingReadOp.class.getDeclaredMethod("createReadContext", - int.class, BookieId.class, PendingReadOp.LedgerEntryRequest.class); - method.setAccessible(true); - ByteBuf byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); // byteBuf has been release assertEquals(byteBuf.refCnt(), 1); @@ -308,15 +304,15 @@ public void testLedgerEntryRequestComplete() throws Exception { // read entry failed twice, will not close twice pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext( 1, BookieId.parse("test"), first)); pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext( 1, BookieId.parse("test"), first)); // will not complete twice when completed byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext( 1, BookieId.parse("test"), first)); assertEquals(1, byteBuf.refCnt()); } From 46de2e48094e12108c939dfe0e300a9d1d57da15 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sat, 27 Jan 2024 11:27:17 +0800 Subject: [PATCH 02/14] Fix ci. --- .../org/apache/bookkeeper/client/TestParallelRead.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java index 129fa6915e1..423e02b4aad 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java @@ -33,7 +33,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import java.lang.reflect.Method; import java.util.Iterator; import java.util.List; import java.util.TreeMap; @@ -304,15 +303,15 @@ public void testLedgerEntryRequestComplete() throws Exception { // read entry failed twice, will not close twice pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - new ReadOpBase.ReadContext( 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - new ReadOpBase.ReadContext( 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); // will not complete twice when completed byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10), - new ReadOpBase.ReadContext( 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); assertEquals(1, byteBuf.refCnt()); } From 32b7da28538f34bb51e36658e9ab77dc7067b7a4 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Mon, 29 Jan 2024 11:50:33 +0800 Subject: [PATCH 03/14] Ledger handle introduces batch read API. --- .../bookkeeper/client/ClientInternalConf.java | 6 +- .../bookkeeper/client/LedgerHandle.java | 312 ++++++++++++++++++ .../bookkeeper/client/api/ReadHandle.java | 38 +++ .../bookkeeper/conf/ClientConfiguration.java | 7 + .../bookkeeper/test/BookieClientTest.java | 2 +- 5 files changed, 362 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java index cff66a3cb3e..fc83617cef6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java @@ -48,6 +48,8 @@ class ClientInternalConf { final boolean enableBookieFailureTracking; final boolean useV2WireProtocol; final boolean enforceMinNumFaultDomainsForWrite; + final boolean batchReadEnabled; + final int nettyMaxFrameSizeBytes; static ClientInternalConf defaultValues() { return fromConfig(new ClientConfiguration()); @@ -72,9 +74,9 @@ private ClientInternalConf(ClientConfiguration conf, this.addEntryQuorumTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout()); this.throttleValue = conf.getThrottleValue(); this.bookieFailureHistoryExpirationMSec = conf.getBookieFailureHistoryExpirationMSec(); - + this.batchReadEnabled = conf.isBatchReadEnabled(); + this.nettyMaxFrameSizeBytes = conf.getNettyMaxFrameSizeBytes(); this.disableEnsembleChangeFeature = featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName()); - this.delayEnsembleChange = conf.getDelayEnsembleChange(); this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges(); this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 9486b2e632c..efbe7a753db 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -103,6 +103,7 @@ public class LedgerHandle implements WriteHandle { final long ledgerId; final ExecutorService executor; long lastAddPushed; + boolean notSupportBatch; private enum HandleState { OPEN, @@ -641,6 +642,29 @@ public Enumeration readEntries(long firstEntry, long lastEntry) return SyncCallbackUtils.waitForResult(result); } + /** + * Read a sequence of entries synchronously. + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @param failbackToSingleRead + * is fail back to single read. + * @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, Object) + */ + public Enumeration batchReadEntries(long startEntry, int maxCount, long maxSize, + boolean failbackToSingleRead) + throws InterruptedException, BKException { + CompletableFuture> result = new CompletableFuture<>(); + + asyncBatchReadEntries(startEntry, maxCount, maxSize, failbackToSingleRead, new SyncReadCallback(result), null); + + return SyncCallbackUtils.waitForResult(result); + } + /** * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.
* This is the same of @@ -663,6 +687,28 @@ public Enumeration readUnconfirmedEntries(long firstEntry, long las return SyncCallbackUtils.waitForResult(result); } + + /** + * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.
+ * This is the same of + * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean, ReadCallback, Object) } + * + * @param firstEntry + * id of first entry of sequence (included) + * @param maxCount + * id of last entry of sequence (included) + * @param maxSize + * + * @param failbackToSingleRead + */ + public Enumeration batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize, boolean failbackToSingleRead) + throws InterruptedException, BKException { + CompletableFuture> result = new CompletableFuture<>(); + + asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, failbackToSingleRead, new SyncReadCallback(result), null); + + return SyncCallbackUtils.waitForResult(result); + } /** * Read a sequence of entries asynchronously. @@ -694,6 +740,60 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); } + + /** + * Read a sequence of entries in asynchronously. + * It send an RPC to get all entries instead of send multi RPC to get all entries. + * + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + * @param failbackToSingleRead + * failback to {@link #asyncReadEntriesInternal(long, long, ReadCallback, Object, boolean) } + * @param cb + * object implementing read callback interface + * @param ctx + * control object + */ + public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, boolean failbackToSingleRead, + ReadCallback cb, Object ctx) { + // Little sanity check + if (startEntry > lastAddConfirmed) { + LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}", + ledgerId, startEntry, lastAddConfirmed); + cb.readComplete(BKException.Code.ReadException, this, null, ctx); + return; + } + if (notSupportBatchRead()) { + if (failbackToSingleRead) { + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + LOG.error("Not support batch read not."); + cb.readComplete(BKException.Code.ReadException, this, null, ctx); + } + } else { + asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + if (rc == Code.BookieHandleNotAvailableException) { + notSupportBatch = true; + if (failbackToSingleRead) { + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + cb.readComplete(rc, lh, seq, ctx); + } + } else { + cb.readComplete(rc, lh, seq, ctx); + } + } + }, ctx, false); + } + } /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. @@ -733,6 +833,57 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); } + + /** + * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. + * It sends an RPC to get all entries instead of send multi RPC to get all entries. + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + * @param failbackToSingleRead + * failback to {@link #asyncReadEntriesInternal(long, long, ReadCallback, Object, boolean)} + * @param cb + * object implementing read callback interface + * @param ctx + * control object + */ + public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, boolean failbackToSingleRead, + ReadCallback cb, Object ctx) { + // Little sanity check + if (startEntry < 0) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); + cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx); + } + if (notSupportBatchRead()) { + if (failbackToSingleRead) { + long lastEntry = startEntry + maxCount - 1; + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + LOG.error("Not support batch read not."); + cb.readComplete(BKException.Code.ReadException, this, null, ctx); + } + } else { + asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + if (rc == Code.BookieHandleNotAvailableException) { + notSupportBatch = true; + if (failbackToSingleRead) { + long lastEntry = startEntry + maxCount - 1; + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + cb.readComplete(rc, lh, seq, ctx); + } + } else { + cb.readComplete(rc, lh, seq, ctx); + } + } + }, ctx, false); + } + } /** * Read a sequence of entries asynchronously. @@ -759,6 +910,133 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return readEntriesInternalAsync(firstEntry, lastEntry, false); } + + /** + * Read a sequence of entries in asynchronously. + * It sends an RPC to get all entries instead of send multi RPC to get all entries. + * + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + * @param failbackToSingleRead + * failback to {@link #readEntriesInternalAsync(long, long, boolean) } + */ + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize, + boolean failbackToSingleRead) { + // Little sanity check + if (startEntry < 0) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); + return FutureUtils.exception(new BKIncorrectParameterException()); + } + if (startEntry > lastAddConfirmed) { + LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}", + ledgerId, startEntry, lastAddConfirmed); + return FutureUtils.exception(new BKReadException()); + } + if (notSupportBatchRead()) { + if (failbackToSingleRead) { + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + return readEntriesInternalAsync(startEntry, lastEntry, false); + } else { + LOG.error("Not support batch read."); + return FutureUtils.exception(new BKReadException()); + } + } + CompletableFuture future = new CompletableFuture<>(); + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) + .whenComplete((entries, ex) -> { + if (ex != null) { + if (ex instanceof BKException.BKBookieHandleNotAvailableException) { + notSupportBatch = true; + if (failbackToSingleRead) { + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + readEntriesInternalAsync(startEntry, lastEntry, false).whenComplete((entries1, ex1) -> { + if (ex1 != null) { + future.completeExceptionally(ex1); + } else { + future.complete(entries1); + } + }); + } else { + future.completeExceptionally(ex); + } + } else { + future.completeExceptionally(ex); + } + } else { + future.complete(entries); + } + }); + return future; + } + + private boolean notSupportBatchRead() { + if (!clientCtx.getConf().batchReadEnabled) { + return true; + } + if (notSupportBatch) { + return true; + } + LedgerMetadata ledgerMetadata = getLedgerMetadata(); + return ledgerMetadata.getEnsembleSize() != ledgerMetadata.getWriteQuorumSize(); + } + + private CompletableFuture batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize, + boolean isRecoveryRead) { + int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes; + if (maxSize > nettyMaxFrameSizeBytes) { + LOG.info( + "The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.", + nettyMaxFrameSizeBytes); + maxSize = nettyMaxFrameSizeBytes; + } + if (maxSize <= 0) { + LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes); + maxSize = nettyMaxFrameSizeBytes; + } + BatchedReadOp op = new BatchedReadOp(this, clientCtx, + startEntry, maxCount, maxSize, isRecoveryRead); + if (!clientCtx.isClientClosed()) { + // Waiting on the first one. + // This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive + // state later after N requests sent. + // Unfortunately it seems that alternatives are: + // - send reads one-by-one (up to the app) + // - rework LedgerHandle to send requests one-by-one (maybe later, potential perf impact) + // - block worker pool (not good) + // Even with this implementation one should be more concerned about OOME when all read responses arrive + // or about overloading bookies with these requests then about submission of many small requests. + // Naturally one of the solutions would be to submit smaller batches and in this case + // current implementation will prevent next batch from starting when bookie is + // unresponsive thus helpful enough. + if (clientCtx.getConf().waitForWriteSetMs >= 0) { + DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(startEntry); + try { + if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) { + op.allowFailFastOnUnwritableChannel(); + } + } finally { + ws.recycle(); + } + } + + if (isHandleWritable()) { + // Ledger handle in read/write mode: submit to OSE for ordered execution. + executeOrdered(op); + } else { + // Read-only ledger handle: bypass OSE and execute read directly in client thread. + // This avoids a context-switch to OSE thread and thus reduces latency. + op.run(); + } + } else { + op.future().completeExceptionally(BKException.create(ClientClosedException)); + } + return op.future(); + } /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. @@ -829,6 +1107,40 @@ public void onFailure(Throwable cause) { } } + void asyncBatchReadEntriesInternal(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx, boolean isRecoveryRead) { + if (!clientCtx.isClientClosed()) { + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, isRecoveryRead) + .whenCompleteAsync(new FutureEventListener() { + @Override + public void onSuccess(LedgerEntries entries) { + cb.readComplete( + Code.OK, + LedgerHandle.this, + IteratorUtils.asEnumeration( + Iterators.transform(entries.iterator(), le -> { + LedgerEntry entry = new LedgerEntry((LedgerEntryImpl) le); + le.close(); + return entry; + })), + ctx); + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof BKException) { + BKException bke = (BKException) cause; + cb.readComplete(bke.getCode(), LedgerHandle.this, null, ctx); + } else { + cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null, ctx); + } + } + }, clientCtx.getMainWorkerPool().chooseThread(ledgerId)); + } else { + cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx); + } + } + /* * Read the last entry in the ledger * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java index d5e906d17d8..7ffccc4e997 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java @@ -44,6 +44,26 @@ public interface ReadHandle extends Handle { * @return an handle to the result of the operation */ CompletableFuture readAsync(long firstEntry, long lastEntry); + + /** + * Read a sequence of entries asynchronously. + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @param failbackToSingleRead + * is fail back to single read. + * @return an handle to the result of the operation + */ + default CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize, + boolean failbackToSingleRead) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException()); + return future; + } /** * Read a sequence of entries synchronously. @@ -59,6 +79,24 @@ default LedgerEntries read(long firstEntry, long lastEntry) throws BKException, BKException.HANDLER); } + /** + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @param failbackToSingleRead + * is fail back to single read. + * @return the result of the operation + */ + default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize, boolean failbackToSingleRead) + throws BKException, InterruptedException { + return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize, failbackToSingleRead), + BKException.HANDLER); + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 297a2f62f47..d238441deda 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -202,6 +202,9 @@ public class ClientConfiguration extends AbstractConfiguration Date: Mon, 29 Jan 2024 11:57:34 +0800 Subject: [PATCH 04/14] address the comments. --- .../main/java/org/apache/bookkeeper/client/BatchedReadOp.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java index 131b6d0c380..cd1be76033f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -62,7 +62,7 @@ void initiate() { this.requestTimeNanos = MathUtils.nowInNano(); List ensemble = getLedgerMetadata().getEnsembleAt(startEntryId); if (parallelRead) { - LOG.info("Batch read unsupported the parallelRead, fail back to sequence read."); + LOG.info("Batch read unsupported the parallelRead, failback to sequence read."); } request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize); request.read(); From bd017ea527639e27b2ae0b01313645e7bf38e10c Mon Sep 17 00:00:00 2001 From: horizonzy Date: Tue, 30 Jan 2024 15:13:53 +0800 Subject: [PATCH 05/14] Refactor. --- .../apache/bookkeeper/client/BatchedReadOp.java | 17 +++++++++-------- .../apache/bookkeeper/client/PendingReadOp.java | 13 +------------ .../apache/bookkeeper/client/ReadOpBase.java | 15 +++++++++++++-- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java index cd1be76033f..4cf1c3cc6f4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.List; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; @@ -44,6 +45,8 @@ public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallbac final int maxCount; final long maxSize; + private ScheduledFuture speculativeTask = null; + BatchedLedgerEntryRequest request; BatchedReadOp(LedgerHandle lh, @@ -61,11 +64,12 @@ public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallbac void initiate() { this.requestTimeNanos = MathUtils.nowInNano(); List ensemble = getLedgerMetadata().getEnsembleAt(startEntryId); - if (parallelRead) { - LOG.info("Batch read unsupported the parallelRead, failback to sequence read."); - } request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize); request.read(); + if (clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), request); + } } @Override @@ -75,6 +79,8 @@ protected void submitCallback(int code) { return; } + cancelSpeculativeTask(true); + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); if (code != BKException.Code.OK) { LOG.error( @@ -93,11 +99,6 @@ protected void submitCallback(int code) { } } - BatchedReadOp parallelRead(boolean enabled) { - this.parallelRead = enabled; - return this; - } - @Override public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBufList bufList, Object ctx) { final ReadContext rctx = (ReadContext) ctx; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index e3d1a57be8e..43ab26cb4c9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -47,7 +47,7 @@ class PendingReadOp extends ReadOpBase implements ReadEntryCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class); - private ScheduledFuture speculativeTask = null; + protected boolean parallelRead = false; protected final LinkedList seq; PendingReadOp(LedgerHandle lh, @@ -60,17 +60,6 @@ class PendingReadOp extends ReadOpBase implements ReadEntryCallback { numPendingEntries = endEntryId - startEntryId + 1; } - protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { - if (speculativeTask != null) { - speculativeTask.cancel(mayInterruptIfRunning); - speculativeTask = null; - } - } - - public ScheduledFuture getSpeculativeTask() { - return speculativeTask; - } - PendingReadOp parallelRead(boolean enabled) { this.parallelRead = enabled; return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java index 9c535384b5f..cbd68ec657a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; @@ -39,6 +40,7 @@ public abstract class ReadOpBase implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ReadOpBase.class); + protected ScheduledFuture speculativeTask = null; protected final CompletableFuture future; protected final Set heardFromHosts; protected final BitSet heardFromHostsBitSet; @@ -52,12 +54,10 @@ public abstract class ReadOpBase implements Runnable { protected final int requiredBookiesMissingEntryForRecovery; protected final boolean isRecoveryRead; - protected boolean parallelRead = false; protected final AtomicBoolean complete = new AtomicBoolean(false); protected boolean allowFailFast = false; long numPendingEntries; final long endEntryId; - protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, boolean isRecoveryRead) { this.lh = lh; @@ -77,6 +77,17 @@ protected LedgerMetadata getLedgerMetadata() { return lh.getLedgerMetadata(); } + protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { + if (speculativeTask != null) { + speculativeTask.cancel(mayInterruptIfRunning); + speculativeTask = null; + } + } + + public ScheduledFuture getSpeculativeTask() { + return speculativeTask; + } + CompletableFuture future() { return future; } From 7f1b3316bbd4ef5de5eb78605ae1a1e7af5857f1 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Tue, 30 Jan 2024 15:13:53 +0800 Subject: [PATCH 06/14] Refactor. --- .../apache/bookkeeper/client/BatchedReadOp.java | 17 +++++++++-------- .../apache/bookkeeper/client/PendingReadOp.java | 13 +------------ .../apache/bookkeeper/client/ReadOpBase.java | 15 +++++++++++++-- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java index cd1be76033f..4cf1c3cc6f4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.List; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; @@ -44,6 +45,8 @@ public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallbac final int maxCount; final long maxSize; + private ScheduledFuture speculativeTask = null; + BatchedLedgerEntryRequest request; BatchedReadOp(LedgerHandle lh, @@ -61,11 +64,12 @@ public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallbac void initiate() { this.requestTimeNanos = MathUtils.nowInNano(); List ensemble = getLedgerMetadata().getEnsembleAt(startEntryId); - if (parallelRead) { - LOG.info("Batch read unsupported the parallelRead, failback to sequence read."); - } request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize); request.read(); + if (clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), request); + } } @Override @@ -75,6 +79,8 @@ protected void submitCallback(int code) { return; } + cancelSpeculativeTask(true); + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); if (code != BKException.Code.OK) { LOG.error( @@ -93,11 +99,6 @@ protected void submitCallback(int code) { } } - BatchedReadOp parallelRead(boolean enabled) { - this.parallelRead = enabled; - return this; - } - @Override public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBufList bufList, Object ctx) { final ReadContext rctx = (ReadContext) ctx; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index e3d1a57be8e..43ab26cb4c9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -47,7 +47,7 @@ class PendingReadOp extends ReadOpBase implements ReadEntryCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class); - private ScheduledFuture speculativeTask = null; + protected boolean parallelRead = false; protected final LinkedList seq; PendingReadOp(LedgerHandle lh, @@ -60,17 +60,6 @@ class PendingReadOp extends ReadOpBase implements ReadEntryCallback { numPendingEntries = endEntryId - startEntryId + 1; } - protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { - if (speculativeTask != null) { - speculativeTask.cancel(mayInterruptIfRunning); - speculativeTask = null; - } - } - - public ScheduledFuture getSpeculativeTask() { - return speculativeTask; - } - PendingReadOp parallelRead(boolean enabled) { this.parallelRead = enabled; return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java index 9c535384b5f..cbd68ec657a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; @@ -39,6 +40,7 @@ public abstract class ReadOpBase implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ReadOpBase.class); + protected ScheduledFuture speculativeTask = null; protected final CompletableFuture future; protected final Set heardFromHosts; protected final BitSet heardFromHostsBitSet; @@ -52,12 +54,10 @@ public abstract class ReadOpBase implements Runnable { protected final int requiredBookiesMissingEntryForRecovery; protected final boolean isRecoveryRead; - protected boolean parallelRead = false; protected final AtomicBoolean complete = new AtomicBoolean(false); protected boolean allowFailFast = false; long numPendingEntries; final long endEntryId; - protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, boolean isRecoveryRead) { this.lh = lh; @@ -77,6 +77,17 @@ protected LedgerMetadata getLedgerMetadata() { return lh.getLedgerMetadata(); } + protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { + if (speculativeTask != null) { + speculativeTask.cancel(mayInterruptIfRunning); + speculativeTask = null; + } + } + + public ScheduledFuture getSpeculativeTask() { + return speculativeTask; + } + CompletableFuture future() { return future; } From 7ad75f757d5e44f322e437092c26b14cc17cb431 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Tue, 30 Jan 2024 15:21:42 +0800 Subject: [PATCH 07/14] fix ci. --- .../main/java/org/apache/bookkeeper/client/PendingReadOp.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 43ab26cb4c9..15d48c64351 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -25,7 +25,6 @@ import java.util.BitSet; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; From a86bd147cf9a233d1f5621b9444b5f8e6eb14d7b Mon Sep 17 00:00:00 2001 From: horizonzy Date: Tue, 30 Jan 2024 19:58:16 +0800 Subject: [PATCH 08/14] code clean. --- .../main/java/org/apache/bookkeeper/client/BatchedReadOp.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java index 4cf1c3cc6f4..a6256563cf0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -45,8 +45,6 @@ public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallbac final int maxCount; final long maxSize; - private ScheduledFuture speculativeTask = null; - BatchedLedgerEntryRequest request; BatchedReadOp(LedgerHandle lh, From 689a67de089f0b8a33ce80b1cb3a139db1f8133e Mon Sep 17 00:00:00 2001 From: horizonzy Date: Tue, 30 Jan 2024 20:00:09 +0800 Subject: [PATCH 09/14] code clean. --- .../main/java/org/apache/bookkeeper/client/BatchedReadOp.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java index a6256563cf0..4892882e1d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.List; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; From 07d4c21a5bedc94774ba1c1471ffcb3ac56828ed Mon Sep 17 00:00:00 2001 From: horizonzy Date: Wed, 31 Jan 2024 01:25:24 +0800 Subject: [PATCH 10/14] add test for batched read. --- .../bookkeeper/client/BatchedReadOp.java | 3 - .../bookkeeper/client/PendingReadOp.java | 1 - .../proto/BatchedReadEntryProcessor.java | 6 + .../bookkeeper/client/TestBatchedRead.java | 292 +++++++++++++ .../client/TestSpeculativeBatchRead.java | 406 ++++++++++++++++++ 5 files changed, 704 insertions(+), 4 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java index 4cf1c3cc6f4..4892882e1d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.List; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; @@ -45,8 +44,6 @@ public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallbac final int maxCount; final long maxSize; - private ScheduledFuture speculativeTask = null; - BatchedLedgerEntryRequest request; BatchedReadOp(LedgerHandle lh, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 43ab26cb4c9..15d48c64351 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -25,7 +25,6 @@ import java.util.BitSet; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java index 700952042f0..fe755df1535 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java @@ -85,6 +85,12 @@ protected BookieProtocol.Response buildReadResponse(ReferenceCounted data) { return ResponseBuilder.buildBatchedReadResponse((ByteBufList) data, (BatchedReadRequest) request); } + @Override + public String toString() { + BatchedReadRequest br = (BatchedReadRequest) request; + return String.format("BatchedReadEntry(%d, %d %d, %d)", br.getLedgerId(), br.getEntryId(), br.getMaxCount(), br.getMaxSize()); + } + protected void recycle() { request.recycle(); super.reset(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java new file mode 100644 index 00000000000..1bb95ed0478 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java @@ -0,0 +1,292 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import org.apache.bookkeeper.client.BKException.Code; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit tests for batch reading. + */ +public class TestBatchedRead extends BookKeeperClusterTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(TestBatchedRead.class); + + final DigestType digestType; + final byte[] passwd = "sequence-read".getBytes(); + + public TestBatchedRead() { + super(6); + baseClientConf.setUseV2WireProtocol(true); + this.digestType = DigestType.CRC32; + } + + long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int numEntries) + throws Exception { + LedgerHandle lh = bkc.createLedger(ensemble, writeQuorum, ackQuorum, digestType, passwd); + for (int i = 0; i < numEntries; i++) { + lh.addEntry(("" + i).getBytes()); + } + lh.close(); + return lh.getId(); + } + + BatchedReadOp createReadOp(LedgerHandle lh, long startEntry, int count) { + return new BatchedReadOp(lh, bkc.getClientCtx(), startEntry, count, 1024 * count, false); + } + + BatchedReadOp createRecoveryReadOp(LedgerHandle lh, long startEntry, int count) { + return new BatchedReadOp(lh, bkc.getClientCtx(), startEntry, count, 1024 * count, true); + } + + @Test + public void testNormalRead() throws Exception { + int numEntries = 10; + long id = getLedgerToRead(5, 5, 2, numEntries); + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + //read single entry + for (int i = 0; i < numEntries; i++) { + BatchedReadOp readOp = createReadOp(lh, i, 1); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + + // read multiple entries + BatchedReadOp readOp = createReadOp(lh, 0, numEntries); + readOp.submit(); + Iterator iterator = readOp.future().get().iterator(); + + int numReads = 0; + while (iterator.hasNext()) { + LedgerEntry entry = iterator.next(); + assertNotNull(entry); + assertEquals(numReads, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + ++numReads; + } + assertEquals(numEntries, numReads); + lh.close(); + } + + @Test + public void testReadWhenEnsembleNotEqualWQ() throws Exception { + int numEntries = 10; + long id = getLedgerToRead(5, 2, 2, numEntries); + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + //read single entry + for (int i = 0; i < numEntries; i++) { + BatchedReadOp readOp = createReadOp(lh, i, 1); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + + // read multiple entries, because the ensemble is not equals with write quorum, the return entries + // will less than max count. + for (int i = 0; i < numEntries; i++) { + BatchedReadOp readOp = createReadOp(lh, i, numEntries); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + lh.close(); + } + + private static void expectFail(CompletableFuture future, int expectedRc) { + try { + result(future); + fail("Expect to fail"); + } catch (Exception e) { + assertTrue(e instanceof BKException); + BKException bke = (BKException) e; + assertEquals(expectedRc, bke.getCode()); + } + } + + @Test + public void testReadMissingEntries() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 5, 2, numEntries); + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + // read single entry + BatchedReadOp readOp = createReadOp(lh, 10, 1); + readOp.submit(); + expectFail(readOp.future(), Code.NoSuchEntryException); + + // read multiple entries + readOp = createReadOp(lh, 8, 3); + readOp.submit(); + + int index = 8; + int numReads = 0; + Iterator iterator = readOp.future().get().iterator(); + while (iterator.hasNext()) { + LedgerEntry entry = iterator.next(); + assertNotNull(entry); + assertEquals(index, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + ++index; + ++numReads; + } + assertEquals(2, numReads); + lh.close(); + } + + @Test + public void testFailRecoveryReadMissingEntryImmediately() throws Exception { + int numEntries = 1; + + long id = getLedgerToRead(5, 5, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(10); + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + // sleep two bookie + sleepBookie(ensemble.get(0), latch1); + sleepBookie(ensemble.get(1), latch2); + + BatchedReadOp readOp = createRecoveryReadOp(lh, 10, 1); + readOp.submit(); + // would fail immediately if found missing entries don't cover ack quorum + expectFail(readOp.future(), Code.NoSuchEntryException); + latch1.countDown(); + latch2.countDown(); + + lh.close(); + newBk.close(); + } + + @Test + public void testReadWithFailedBookies() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 3, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(5); + // kill two bookies + killBookie(ensemble.get(0)); + killBookie(ensemble.get(1)); + + // read multiple entries, because the ensemble is not equals with write quorum, the return entries + // will less than max count. + int numReads = 0; + for (int i = 0; i < numEntries;) { + BatchedReadOp readOp = createReadOp(lh, i, numEntries); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + if (!entries.hasNext()) { + i++; + continue; + } + while (entries.hasNext()) { + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + i++; + numReads++; + } + } + assertEquals(10, numReads); + lh.close(); + newBk.close(); + } + + @Test + public void testReadFailureWithFailedBookies() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 3, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(5); + // kill two bookies + killBookie(ensemble.get(0)); + killBookie(ensemble.get(1)); + killBookie(ensemble.get(2)); + + // read multiple entries + BatchedReadOp readOp = createReadOp(lh, 0, numEntries); + readOp.submit(); + expectFail(readOp.future(), Code.BookieHandleNotAvailableException); + + lh.close(); + newBk.close(); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java new file mode 100644 index 00000000000..2fb1f20792b --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java @@ -0,0 +1,406 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.test.TestStatsProvider; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.BitSet; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE; +import static org.apache.bookkeeper.client.BookKeeperClientStats.SPECULATIVE_READ_COUNT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * This unit test tests ledger fencing. + * + */ +public class TestSpeculativeBatchRead extends BookKeeperClusterTestCase { + private static final Logger LOG = LoggerFactory.getLogger(TestSpeculativeBatchRead.class); + + private final DigestType digestType; + byte[] passwd = "specPW".getBytes(); + + public TestSpeculativeBatchRead() { + super(10); + this.digestType = DigestType.CRC32; + } + + long getLedgerToRead(int ensemble, int quorum) throws Exception { + byte[] data = "Data for test".getBytes(); + LedgerHandle l = bkc.createLedger(ensemble, quorum, digestType, passwd); + for (int i = 0; i < 10; i++) { + l.addEntry(data); + } + l.close(); + + return l.getId(); + } + + @SuppressWarnings("deprecation") + BookKeeperTestClient createClient(int specTimeout) throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setSpeculativeReadTimeout(specTimeout) + .setReadTimeout(30000) + .setUseV2WireProtocol(true) + .setReorderReadSequenceEnabled(true) + .setEnsemblePlacementPolicySlowBookies(true) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + return new BookKeeperTestClient(conf, new TestStatsProvider()); + } + + class LatchCallback implements ReadCallback { + CountDownLatch l = new CountDownLatch(1); + boolean success = false; + long startMillis = System.currentTimeMillis(); + long endMillis = Long.MAX_VALUE; + + public void readComplete(int rc, + LedgerHandle lh, + Enumeration seq, + Object ctx) { + endMillis = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("Got response {} {}", rc, getDuration()); + } + success = rc == BKException.Code.OK; + l.countDown(); + } + + long getDuration() { + return endMillis - startMillis; + } + + void expectSuccess(int milliseconds) throws Exception { + boolean await = l.await(milliseconds, TimeUnit.MILLISECONDS); + System.out.println(await); + } + + void expectFail(int milliseconds) throws Exception { + assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS)); + assertFalse(success); + } + + void expectTimeout(int milliseconds) throws Exception { + assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS)); + } + } + + + /** + * Test basic speculative functionality. + * - Create 2 clients with read timeout disabled, one with spec + * read enabled, the other not. + * - create ledger + * - sleep second bookie in ensemble + * - read first entry, both should find on first bookie. + * - read second bookie, spec client should find on bookie three, + * non spec client should hang. + */ + @Test + public void testSpeculativeRead() throws Exception { + long id = getLedgerToRead(3, 2); + BookKeeperTestClient bknospec = createClient(0); // disabled + BookKeeperTestClient bkspec = createClient(2000); + + LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd); + LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd); + + // sleep second bookie + CountDownLatch sleepLatch = new CountDownLatch(1); + BookieId second = lnospec.getLedgerMetadata().getAllEnsembles().get(0L).get(1); + sleepBookie(second, sleepLatch); + + try { + // read first entry, both go to first bookie, should be fine + LatchCallback nospeccb = new LatchCallback(); + LatchCallback speccb = new LatchCallback(); + lnospec.asyncBatchReadEntries(0, 1, 1024, false, nospeccb, null); + lspec.asyncBatchReadEntries(0, 1, 1024, false, speccb, null); + nospeccb.expectSuccess(2000); + speccb.expectSuccess(2000); + + // read second entry, both look for second book, spec read client + // tries third bookie, nonspec client hangs as read timeout is very long. + nospeccb = new LatchCallback(); + speccb = new LatchCallback(); + lnospec.asyncReadEntries(1, 1, nospeccb, null); + lspec.asyncReadEntries(1, 1, speccb, null); + speccb.expectSuccess(4000); + nospeccb.expectTimeout(4000); + // Check that the second bookie is registered as slow at entryId 1 + RackawareEnsemblePlacementPolicy rep = (RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy(); + assertTrue(rep.slowBookies.asMap().size() == 1); + + assertTrue( + "Stats should not reflect speculative reads if disabled", + bknospec.getTestStatsProvider() + .getCounter(CLIENT_SCOPE + "." + SPECULATIVE_READ_COUNT).get() == 0); + assertTrue( + "Stats should reflect speculative reads", + bkspec.getTestStatsProvider() + .getCounter(CLIENT_SCOPE + "." + SPECULATIVE_READ_COUNT).get() > 0); + + } finally { + sleepLatch.countDown(); + lspec.close(); + lnospec.close(); + bkspec.close(); + bknospec.close(); + } + } + + /** + * Test that if more than one replica is down, we can still read, as long as the quorum + * size is larger than the number of down replicas. + */ + @Test + public void testSpeculativeReadMultipleReplicasDown() throws Exception { + long id = getLedgerToRead(5, 5); + int timeout = 5000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + // sleep bookie 1, 2 & 4 + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1), sleepLatch); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2), sleepLatch); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(4), sleepLatch); + + try { + // read first entry, should complete faster than timeout + // as bookie 0 has the entry + LatchCallback latch0 = new LatchCallback(); + l.asyncBatchReadEntries(0, 1, 1024, false, latch0, null); + latch0.expectSuccess(timeout / 2); + + // second should have to hit two timeouts (bookie 1 & 2) + // bookie 3 has the entry + LatchCallback latch1 = new LatchCallback(); + l.asyncBatchReadEntries(1, 1, 1024, false, latch1, null); + latch1.expectTimeout(timeout); + latch1.expectSuccess(timeout * 2); + LOG.info("Timeout {} latch1 duration {}", timeout, latch1.getDuration()); + assertTrue("should have taken longer than two timeouts, but less than 3", + latch1.getDuration() >= timeout * 2 + && latch1.getDuration() < timeout * 3); + + // bookies 1 & 2 should be registered as slow bookies because of speculative reads + Set expectedSlowBookies = new HashSet<>(); + expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1)); + expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2)); + assertEquals(((RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy()).slowBookies.asMap().keySet(), + expectedSlowBookies); + + // third should not hit timeouts since bookies 1 & 2 are registered as slow + // bookie 3 has the entry + LatchCallback latch2 = new LatchCallback(); + l.asyncBatchReadEntries(2, 1, 1024, false, latch2, null); + latch2.expectSuccess(timeout); + + // fourth should have no timeout + // bookie 3 has the entry + LatchCallback latch3 = new LatchCallback(); + l.asyncBatchReadEntries(3, 1, 1024, false, latch3, null); + latch3.expectSuccess(timeout / 2); + + // fifth should hit one timeout, (bookie 4) + // bookie 0 has the entry + LatchCallback latch4 = new LatchCallback(); + l.asyncBatchReadEntries(4, 1, 1024, false, latch4, null); + latch4.expectTimeout(timeout / 2); + latch4.expectSuccess(timeout); + LOG.info("Timeout {} latch4 duration {}", timeout, latch4.getDuration()); + assertTrue("should have taken longer than one timeout, but less than 2", + latch4.getDuration() >= timeout + && latch4.getDuration() < timeout * 2); + + } finally { + sleepLatch.countDown(); + l.close(); + bkspec.close(); + } + } + + /** + * Test that if after a speculative read is kicked off, the original read completes + * nothing bad happens. + */ + @Test + public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception { + long id = getLedgerToRead(2, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + // sleep bookies + CountDownLatch sleepLatch0 = new CountDownLatch(1); + CountDownLatch sleepLatch1 = new CountDownLatch(1); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(0), sleepLatch0); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1), sleepLatch1); + + try { + // read goes to first bookie, spec read timeout occurs, + // goes to second + LatchCallback latch0 = new LatchCallback(); + l.asyncBatchReadEntries(0, 1, 1024, false, latch0, null); + latch0.expectTimeout(timeout); + + // wake up first bookie + sleepLatch0.countDown(); + latch0.expectSuccess(timeout / 2); + + sleepLatch1.countDown(); + + // check we can read next entry without issue + LatchCallback latch1 = new LatchCallback(); + l.asyncBatchReadEntries(1, 1, 1024, false, latch1, null); + latch1.expectSuccess(timeout / 2); + + } finally { + sleepLatch0.countDown(); + sleepLatch1.countDown(); + l.close(); + bkspec.close(); + } + } + + /** + * Unit test to check if the scheduled speculative task gets cancelled + * on successful read. + */ + @Test + public void testSpeculativeReadScheduledTaskCancel() throws Exception { + long id = getLedgerToRead(3, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + BatchedReadOp op = null; + try { + op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); + op.initiate(); + op.future().get(); + } finally { + assertNull("Speculative Read tasks must be null", op.getSpeculativeTask()); + } + } + + /** + * Unit test for the speculative read scheduling method. + */ + @Test + public void testSpeculativeReadScheduling() throws Exception { + long id = getLedgerToRead(3, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + List ensemble = l.getLedgerMetadata().getAllEnsembles().get(0L); + BitSet allHosts = new BitSet(ensemble.size()); + for (int i = 0; i < ensemble.size(); i++) { + allHosts.set(i, true); + } + BitSet noHost = new BitSet(ensemble.size()); + BitSet secondHostOnly = new BitSet(ensemble.size()); + secondHostOnly.set(1, true); + BatchedReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null; + try { + BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); + // if we've already heard from all hosts, + // we only send the initial read + req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1,1024); + assertTrue("Should have sent to first", + req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0))); + assertNull("Should not have sent another", + req0.maybeSendSpeculativeRead(allHosts)); + + // if we have heard from some hosts, but not one we have sent to + // send again + req2 = op.new SequenceReadRequest(ensemble, l.getId(), 2, 1, 1024); + assertTrue("Should have sent to third", + req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2))); + assertTrue("Should have sent to first", + req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0))); + + // if we have heard from some hosts, which includes one we sent to + // do not read again + req4 = op.new SequenceReadRequest(ensemble, l.getId(), 4, 1, 1024); + assertTrue("Should have sent to second", + req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1))); + assertNull("Should not have sent another", + req4.maybeSendSpeculativeRead(secondHostOnly)); + } finally { + for (BatchedReadOp.LedgerEntryRequest req + : new BatchedReadOp.LedgerEntryRequest[] { req0, req2, req4 }) { + if (req != null) { + int i = 0; + while (!req.isComplete()) { + if (i++ > 10) { + break; // wait for up to 10 seconds + } + Thread.sleep(1000); + } + assertTrue("Request should be done", req.isComplete()); + } + } + + l.close(); + bkspec.close(); + } + } + + @Test + public void testSequenceReadLocalEnsemble() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setSpeculativeReadTimeout(1000) + .setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setEnsemblePlacementPolicySlowBookies(true) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper bkspec = new BookKeeperTestClient(conf, new TestStatsProvider()); + LedgerHandle l = bkspec.createLedger(1, 1, digestType, passwd); + List ensemble = l.getLedgerMetadata().getAllEnsembles().get(0L); + BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120,false); + BatchedReadOp.LedgerEntryRequest req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024); + assertNotNull(req0.writeSet); + } +} From 09dc2153d652d2239bb4724238b8d73f440fea37 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Wed, 31 Jan 2024 01:38:19 +0800 Subject: [PATCH 11/14] fix ci. --- .../bookkeeper/client/LedgerHandle.java | 37 +++++++-------- .../bookkeeper/client/api/ReadHandle.java | 2 +- .../proto/BatchedReadEntryProcessor.java | 3 +- .../client/TestSpeculativeBatchRead.java | 45 +++++++++---------- 4 files changed, 42 insertions(+), 45 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index efbe7a753db..6a9ba395258 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -659,9 +659,9 @@ public Enumeration batchReadEntries(long startEntry, int maxCount, boolean failbackToSingleRead) throws InterruptedException, BKException { CompletableFuture> result = new CompletableFuture<>(); - + asyncBatchReadEntries(startEntry, maxCount, maxSize, failbackToSingleRead, new SyncReadCallback(result), null); - + return SyncCallbackUtils.waitForResult(result); } @@ -687,7 +687,7 @@ public Enumeration readUnconfirmedEntries(long firstEntry, long las return SyncCallbackUtils.waitForResult(result); } - + /** * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.
* This is the same of @@ -701,12 +701,13 @@ public Enumeration readUnconfirmedEntries(long firstEntry, long las * * @param failbackToSingleRead */ - public Enumeration batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize, boolean failbackToSingleRead) - throws InterruptedException, BKException { + public Enumeration batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize, + boolean failbackToSingleRead) throws InterruptedException, BKException { CompletableFuture> result = new CompletableFuture<>(); - - asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, failbackToSingleRead, new SyncReadCallback(result), null); - + + asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, failbackToSingleRead, + new SyncReadCallback(result), null); + return SyncCallbackUtils.waitForResult(result); } @@ -740,7 +741,7 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); } - + /** * Read a sequence of entries in asynchronously. * It send an RPC to get all entries instead of send multi RPC to get all entries. @@ -833,7 +834,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); } - + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. * It sends an RPC to get all entries instead of send multi RPC to get all entries. @@ -850,8 +851,8 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal * @param ctx * control object */ - public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, boolean failbackToSingleRead, - ReadCallback cb, Object ctx) { + public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, + boolean failbackToSingleRead, ReadCallback cb, Object ctx) { // Little sanity check if (startEntry < 0) { LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); @@ -910,7 +911,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return readEntriesInternalAsync(firstEntry, lastEntry, false); } - + /** * Read a sequence of entries in asynchronously. * It sends an RPC to get all entries instead of send multi RPC to get all entries. @@ -973,7 +974,7 @@ public CompletableFuture batchReadAsync(long startEntry, int maxC }); return future; } - + private boolean notSupportBatchRead() { if (!clientCtx.getConf().batchReadEnabled) { return true; @@ -984,7 +985,7 @@ private boolean notSupportBatchRead() { LedgerMetadata ledgerMetadata = getLedgerMetadata(); return ledgerMetadata.getEnsembleSize() != ledgerMetadata.getWriteQuorumSize(); } - + private CompletableFuture batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize, boolean isRecoveryRead) { int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes; @@ -1023,7 +1024,7 @@ private CompletableFuture batchReadEntriesInternalAsync(long star ws.recycle(); } } - + if (isHandleWritable()) { // Ledger handle in read/write mode: submit to OSE for ordered execution. executeOrdered(op); @@ -1125,7 +1126,7 @@ public void onSuccess(LedgerEntries entries) { })), ctx); } - + @Override public void onFailure(Throwable cause) { if (cause instanceof BKException) { @@ -1140,7 +1141,7 @@ public void onFailure(Throwable cause) { cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx); } } - + /* * Read the last entry in the ledger * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java index 7ffccc4e997..289ae29bce6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java @@ -44,7 +44,7 @@ public interface ReadHandle extends Handle { * @return an handle to the result of the operation */ CompletableFuture readAsync(long firstEntry, long lastEntry); - + /** * Read a sequence of entries asynchronously. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java index fe755df1535..6db3e143519 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java @@ -88,7 +88,8 @@ protected BookieProtocol.Response buildReadResponse(ReferenceCounted data) { @Override public String toString() { BatchedReadRequest br = (BatchedReadRequest) request; - return String.format("BatchedReadEntry(%d, %d %d, %d)", br.getLedgerId(), br.getEntryId(), br.getMaxCount(), br.getMaxSize()); + return String.format("BatchedReadEntry(%d, %d %d, %d)", br.getLedgerId(), br.getEntryId(), br.getMaxCount(), + br.getMaxSize()); } protected void recycle() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java index 2fb1f20792b..fdee8ba8b08 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java @@ -20,6 +20,21 @@ */ package org.apache.bookkeeper.client; +import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE; +import static org.apache.bookkeeper.client.BookKeeperClientStats.SPECULATIVE_READ_COUNT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.BitSet; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy; import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -31,22 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.BitSet; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE; -import static org.apache.bookkeeper.client.BookKeeperClientStats.SPECULATIVE_READ_COUNT; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - /** * This unit test tests ledger fencing. * @@ -121,7 +120,6 @@ void expectTimeout(int milliseconds) throws Exception { assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS)); } } - /** * Test basic speculative functionality. @@ -138,15 +136,15 @@ public void testSpeculativeRead() throws Exception { long id = getLedgerToRead(3, 2); BookKeeperTestClient bknospec = createClient(0); // disabled BookKeeperTestClient bkspec = createClient(2000); - + LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd); LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd); - + // sleep second bookie CountDownLatch sleepLatch = new CountDownLatch(1); BookieId second = lnospec.getLedgerMetadata().getAllEnsembles().get(0L).get(1); sleepBookie(second, sleepLatch); - + try { // read first entry, both go to first bookie, should be fine LatchCallback nospeccb = new LatchCallback(); @@ -155,7 +153,7 @@ public void testSpeculativeRead() throws Exception { lspec.asyncBatchReadEntries(0, 1, 1024, false, speccb, null); nospeccb.expectSuccess(2000); speccb.expectSuccess(2000); - + // read second entry, both look for second book, spec read client // tries third bookie, nonspec client hangs as read timeout is very long. nospeccb = new LatchCallback(); @@ -167,7 +165,7 @@ public void testSpeculativeRead() throws Exception { // Check that the second bookie is registered as slow at entryId 1 RackawareEnsemblePlacementPolicy rep = (RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy(); assertTrue(rep.slowBookies.asMap().size() == 1); - + assertTrue( "Stats should not reflect speculative reads if disabled", bknospec.getTestStatsProvider() @@ -176,7 +174,6 @@ public void testSpeculativeRead() throws Exception { "Stats should reflect speculative reads", bkspec.getTestStatsProvider() .getCounter(CLIENT_SCOPE + "." + SPECULATIVE_READ_COUNT).get() > 0); - } finally { sleepLatch.countDown(); lspec.close(); @@ -251,7 +248,6 @@ public void testSpeculativeReadMultipleReplicasDown() throws Exception { assertTrue("should have taken longer than one timeout, but less than 2", latch4.getDuration() >= timeout && latch4.getDuration() < timeout * 2); - } finally { sleepLatch.countDown(); l.close(); @@ -294,7 +290,6 @@ public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception { LatchCallback latch1 = new LatchCallback(); l.asyncBatchReadEntries(1, 1, 1024, false, latch1, null); latch1.expectSuccess(timeout / 2); - } finally { sleepLatch0.countDown(); sleepLatch1.countDown(); From ec91972e811c60cb690fb58707ecbff7027f53ce Mon Sep 17 00:00:00 2001 From: horizonzy Date: Wed, 31 Jan 2024 01:40:15 +0800 Subject: [PATCH 12/14] add comments. --- .../java/org/apache/bookkeeper/client/LedgerHandle.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 6a9ba395258..cc860d43ce6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -780,6 +780,8 @@ public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, b asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { @Override public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. if (rc == Code.BookieHandleNotAvailableException) { notSupportBatch = true; if (failbackToSingleRead) { @@ -870,6 +872,8 @@ public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { @Override public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. if (rc == Code.BookieHandleNotAvailableException) { notSupportBatch = true; if (failbackToSingleRead) { @@ -951,6 +955,8 @@ public CompletableFuture batchReadAsync(long startEntry, int maxC batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) .whenComplete((entries, ex) -> { if (ex != null) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. if (ex instanceof BKException.BKBookieHandleNotAvailableException) { notSupportBatch = true; if (failbackToSingleRead) { From bf13fbf8897f218c393618e69007614d5d8693c9 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Wed, 31 Jan 2024 02:12:34 +0800 Subject: [PATCH 13/14] fix ci. --- .../apache/bookkeeper/client/TestSpeculativeBatchRead.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java index fdee8ba8b08..8f64cfc6989 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java @@ -342,7 +342,7 @@ public void testSpeculativeReadScheduling() throws Exception { BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); // if we've already heard from all hosts, // we only send the initial read - req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1,1024); + req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024); assertTrue("Should have sent to first", req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0))); assertNull("Should not have sent another", @@ -394,7 +394,7 @@ public void testSequenceReadLocalEnsemble() throws Exception { BookKeeper bkspec = new BookKeeperTestClient(conf, new TestStatsProvider()); LedgerHandle l = bkspec.createLedger(1, 1, digestType, passwd); List ensemble = l.getLedgerMetadata().getAllEnsembles().get(0L); - BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120,false); + BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); BatchedReadOp.LedgerEntryRequest req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024); assertNotNull(req0.writeSet); } From 4495a70a31f1b9298179dd1c32ff8952674b31c7 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Sun, 4 Feb 2024 14:53:30 +0800 Subject: [PATCH 14/14] always failback to the single read. --- .../bookkeeper/client/LedgerHandle.java | 96 ++++++------------- .../bookkeeper/client/api/ReadHandle.java | 12 +-- .../client/TestSpeculativeBatchRead.java | 18 ++-- 3 files changed, 40 insertions(+), 86 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index cc860d43ce6..6a98af55032 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -651,16 +651,13 @@ public Enumeration readEntries(long firstEntry, long lastEntry) * the total entries count. * @param maxSize * the total entries size. - * @param failbackToSingleRead - * is fail back to single read. * @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, Object) */ - public Enumeration batchReadEntries(long startEntry, int maxCount, long maxSize, - boolean failbackToSingleRead) + public Enumeration batchReadEntries(long startEntry, int maxCount, long maxSize) throws InterruptedException, BKException { CompletableFuture> result = new CompletableFuture<>(); - asyncBatchReadEntries(startEntry, maxCount, maxSize, failbackToSingleRead, new SyncReadCallback(result), null); + asyncBatchReadEntries(startEntry, maxCount, maxSize, new SyncReadCallback(result), null); return SyncCallbackUtils.waitForResult(result); } @@ -698,15 +695,13 @@ public Enumeration readUnconfirmedEntries(long firstEntry, long las * @param maxCount * id of last entry of sequence (included) * @param maxSize - * - * @param failbackToSingleRead + * the total entries size */ - public Enumeration batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize, - boolean failbackToSingleRead) throws InterruptedException, BKException { + public Enumeration batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize) + throws InterruptedException, BKException { CompletableFuture> result = new CompletableFuture<>(); - asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, failbackToSingleRead, - new SyncReadCallback(result), null); + asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, new SyncReadCallback(result), null); return SyncCallbackUtils.waitForResult(result); } @@ -752,15 +747,12 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O * the entries count * @param maxSize * the total entries size - * @param failbackToSingleRead - * failback to {@link #asyncReadEntriesInternal(long, long, ReadCallback, Object, boolean) } * @param cb * object implementing read callback interface * @param ctx * control object */ - public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, boolean failbackToSingleRead, - ReadCallback cb, Object ctx) { + public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, Object ctx) { // Little sanity check if (startEntry > lastAddConfirmed) { LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}", @@ -769,13 +761,8 @@ public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, b return; } if (notSupportBatchRead()) { - if (failbackToSingleRead) { - long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); - asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); - } else { - LOG.error("Not support batch read not."); - cb.readComplete(BKException.Code.ReadException, this, null, ctx); - } + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); } else { asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { @Override @@ -784,12 +771,8 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration seq, // connection, then get the BookieHandleNotAvailableException. if (rc == Code.BookieHandleNotAvailableException) { notSupportBatch = true; - if (failbackToSingleRead) { - long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); - asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); - } else { - cb.readComplete(rc, lh, seq, ctx); - } + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); } else { cb.readComplete(rc, lh, seq, ctx); } @@ -846,28 +829,21 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal * the entries count * @param maxSize * the total entries size - * @param failbackToSingleRead - * failback to {@link #asyncReadEntriesInternal(long, long, ReadCallback, Object, boolean)} * @param cb * object implementing read callback interface * @param ctx * control object */ - public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, - boolean failbackToSingleRead, ReadCallback cb, Object ctx) { + public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx) { // Little sanity check if (startEntry < 0) { LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx); } if (notSupportBatchRead()) { - if (failbackToSingleRead) { - long lastEntry = startEntry + maxCount - 1; - asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); - } else { - LOG.error("Not support batch read not."); - cb.readComplete(BKException.Code.ReadException, this, null, ctx); - } + long lastEntry = startEntry + maxCount - 1; + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); } else { asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { @Override @@ -876,12 +852,8 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration seq, // connection, then get the BookieHandleNotAvailableException. if (rc == Code.BookieHandleNotAvailableException) { notSupportBatch = true; - if (failbackToSingleRead) { - long lastEntry = startEntry + maxCount - 1; - asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); - } else { - cb.readComplete(rc, lh, seq, ctx); - } + long lastEntry = startEntry + maxCount - 1; + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); } else { cb.readComplete(rc, lh, seq, ctx); } @@ -926,12 +898,9 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr * the entries count * @param maxSize * the total entries size - * @param failbackToSingleRead - * failback to {@link #readEntriesInternalAsync(long, long, boolean) } */ @Override - public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize, - boolean failbackToSingleRead) { + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { // Little sanity check if (startEntry < 0) { LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); @@ -943,13 +912,8 @@ public CompletableFuture batchReadAsync(long startEntry, int maxC return FutureUtils.exception(new BKReadException()); } if (notSupportBatchRead()) { - if (failbackToSingleRead) { - long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); - return readEntriesInternalAsync(startEntry, lastEntry, false); - } else { - LOG.error("Not support batch read."); - return FutureUtils.exception(new BKReadException()); - } + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + return readEntriesInternalAsync(startEntry, lastEntry, false); } CompletableFuture future = new CompletableFuture<>(); batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) @@ -959,18 +923,14 @@ public CompletableFuture batchReadAsync(long startEntry, int maxC // connection, then get the BookieHandleNotAvailableException. if (ex instanceof BKException.BKBookieHandleNotAvailableException) { notSupportBatch = true; - if (failbackToSingleRead) { - long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); - readEntriesInternalAsync(startEntry, lastEntry, false).whenComplete((entries1, ex1) -> { - if (ex1 != null) { - future.completeExceptionally(ex1); - } else { - future.complete(entries1); - } - }); - } else { - future.completeExceptionally(ex); - } + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + readEntriesInternalAsync(startEntry, lastEntry, false).whenComplete((entries1, ex1) -> { + if (ex1 != null) { + future.completeExceptionally(ex1); + } else { + future.complete(entries1); + } + }); } else { future.completeExceptionally(ex); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java index 289ae29bce6..e9bcddd0b39 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java @@ -54,12 +54,9 @@ public interface ReadHandle extends Handle { * the total entries count. * @param maxSize * the total entries size. - * @param failbackToSingleRead - * is fail back to single read. * @return an handle to the result of the operation */ - default CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize, - boolean failbackToSingleRead) { + default CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new UnsupportedOperationException()); return future; @@ -87,14 +84,11 @@ default LedgerEntries read(long firstEntry, long lastEntry) throws BKException, * the total entries count. * @param maxSize * the total entries size. - * @param failbackToSingleRead - * is fail back to single read. * @return the result of the operation */ - default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize, boolean failbackToSingleRead) + default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize) throws BKException, InterruptedException { - return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize, failbackToSingleRead), - BKException.HANDLER); + return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize), BKException.HANDLER); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java index 8f64cfc6989..3bf5e2d5e44 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java @@ -149,8 +149,8 @@ public void testSpeculativeRead() throws Exception { // read first entry, both go to first bookie, should be fine LatchCallback nospeccb = new LatchCallback(); LatchCallback speccb = new LatchCallback(); - lnospec.asyncBatchReadEntries(0, 1, 1024, false, nospeccb, null); - lspec.asyncBatchReadEntries(0, 1, 1024, false, speccb, null); + lnospec.asyncBatchReadEntries(0, 1, 1024, nospeccb, null); + lspec.asyncBatchReadEntries(0, 1, 1024, speccb, null); nospeccb.expectSuccess(2000); speccb.expectSuccess(2000); @@ -205,13 +205,13 @@ public void testSpeculativeReadMultipleReplicasDown() throws Exception { // read first entry, should complete faster than timeout // as bookie 0 has the entry LatchCallback latch0 = new LatchCallback(); - l.asyncBatchReadEntries(0, 1, 1024, false, latch0, null); + l.asyncBatchReadEntries(0, 1, 1024, latch0, null); latch0.expectSuccess(timeout / 2); // second should have to hit two timeouts (bookie 1 & 2) // bookie 3 has the entry LatchCallback latch1 = new LatchCallback(); - l.asyncBatchReadEntries(1, 1, 1024, false, latch1, null); + l.asyncBatchReadEntries(1, 1, 1024, latch1, null); latch1.expectTimeout(timeout); latch1.expectSuccess(timeout * 2); LOG.info("Timeout {} latch1 duration {}", timeout, latch1.getDuration()); @@ -229,19 +229,19 @@ public void testSpeculativeReadMultipleReplicasDown() throws Exception { // third should not hit timeouts since bookies 1 & 2 are registered as slow // bookie 3 has the entry LatchCallback latch2 = new LatchCallback(); - l.asyncBatchReadEntries(2, 1, 1024, false, latch2, null); + l.asyncBatchReadEntries(2, 1, 1024, latch2, null); latch2.expectSuccess(timeout); // fourth should have no timeout // bookie 3 has the entry LatchCallback latch3 = new LatchCallback(); - l.asyncBatchReadEntries(3, 1, 1024, false, latch3, null); + l.asyncBatchReadEntries(3, 1, 1024, latch3, null); latch3.expectSuccess(timeout / 2); // fifth should hit one timeout, (bookie 4) // bookie 0 has the entry LatchCallback latch4 = new LatchCallback(); - l.asyncBatchReadEntries(4, 1, 1024, false, latch4, null); + l.asyncBatchReadEntries(4, 1, 1024, latch4, null); latch4.expectTimeout(timeout / 2); latch4.expectSuccess(timeout); LOG.info("Timeout {} latch4 duration {}", timeout, latch4.getDuration()); @@ -277,7 +277,7 @@ public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception { // read goes to first bookie, spec read timeout occurs, // goes to second LatchCallback latch0 = new LatchCallback(); - l.asyncBatchReadEntries(0, 1, 1024, false, latch0, null); + l.asyncBatchReadEntries(0, 1, 1024, latch0, null); latch0.expectTimeout(timeout); // wake up first bookie @@ -288,7 +288,7 @@ public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception { // check we can read next entry without issue LatchCallback latch1 = new LatchCallback(); - l.asyncBatchReadEntries(1, 1, 1024, false, latch1, null); + l.asyncBatchReadEntries(1, 1, 1024, latch1, null); latch1.expectSuccess(timeout / 2); } finally { sleepLatch0.countDown();