Skip to content

Commit

Permalink
[BP-62] LedgerHandle introduces batch read API. (#4195)
Browse files Browse the repository at this point in the history
### Motivation
This is the fifth PR for the batch read(#4051) feature.

LedgerHandle introduces batch read API.

This PR is based on #4190, please merge it firstly.
  • Loading branch information
horizonzy authored Feb 5, 2024
1 parent e1d72cf commit 2209734
Show file tree
Hide file tree
Showing 8 changed files with 1,023 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ClientInternalConf {
final boolean enableBookieFailureTracking;
final boolean useV2WireProtocol;
final boolean enforceMinNumFaultDomainsForWrite;
final boolean batchReadEnabled;
final int nettyMaxFrameSizeBytes;

static ClientInternalConf defaultValues() {
return fromConfig(new ClientConfiguration());
Expand All @@ -72,9 +74,9 @@ private ClientInternalConf(ClientConfiguration conf,
this.addEntryQuorumTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
this.throttleValue = conf.getThrottleValue();
this.bookieFailureHistoryExpirationMSec = conf.getBookieFailureHistoryExpirationMSec();

this.batchReadEnabled = conf.isBatchReadEnabled();
this.nettyMaxFrameSizeBytes = conf.getNettyMaxFrameSizeBytes();
this.disableEnsembleChangeFeature = featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());

this.delayEnsembleChange = conf.getDelayEnsembleChange();
this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges();
this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class LedgerHandle implements WriteHandle {
final long ledgerId;
final ExecutorService executor;
long lastAddPushed;
boolean notSupportBatch;

private enum HandleState {
OPEN,
Expand Down Expand Up @@ -641,6 +642,26 @@ public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
return SyncCallbackUtils.waitForResult(result);
}

/**
* Read a sequence of entries synchronously.
*
* @param startEntry
* start entry id
* @param maxCount
* the total entries count.
* @param maxSize
* the total entries size.
* @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, Object)
*/
public Enumeration<LedgerEntry> batchReadEntries(long startEntry, int maxCount, long maxSize)
throws InterruptedException, BKException {
CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();

asyncBatchReadEntries(startEntry, maxCount, maxSize, new SyncReadCallback(result), null);

return SyncCallbackUtils.waitForResult(result);
}

/**
* Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.<br>
* This is the same of
Expand All @@ -664,6 +685,27 @@ public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long las
return SyncCallbackUtils.waitForResult(result);
}

/**
* Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.<br>
* This is the same of
* {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean, ReadCallback, Object) }
*
* @param firstEntry
* id of first entry of sequence (included)
* @param maxCount
* id of last entry of sequence (included)
* @param maxSize
* the total entries size
*/
public Enumeration<LedgerEntry> batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize)
throws InterruptedException, BKException {
CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();

asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, new SyncReadCallback(result), null);

return SyncCallbackUtils.waitForResult(result);
}

/**
* Read a sequence of entries asynchronously.
*
Expand Down Expand Up @@ -695,6 +737,50 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
}

/**
* Read a sequence of entries in asynchronously.
* It send an RPC to get all entries instead of send multi RPC to get all entries.
*
* @param startEntry
* id of first entry of sequence
* @param maxCount
* the entries count
* @param maxSize
* the total entries size
* @param cb
* object implementing read callback interface
* @param ctx
* control object
*/
public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, Object ctx) {
// Little sanity check
if (startEntry > lastAddConfirmed) {
LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}",
ledgerId, startEntry, lastAddConfirmed);
cb.readComplete(BKException.Code.ReadException, this, null, ctx);
return;
}
if (notSupportBatchRead()) {
long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed);
asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
} else {
asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
//If the bookie server not support the batch read request, the bookie server will close the
// connection, then get the BookieHandleNotAvailableException.
if (rc == Code.BookieHandleNotAvailableException) {
notSupportBatch = true;
long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed);
asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
} else {
cb.readComplete(rc, lh, seq, ctx);
}
}
}, ctx, false);
}
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* <br>This is the same of
Expand Down Expand Up @@ -734,6 +820,48 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* It sends an RPC to get all entries instead of send multi RPC to get all entries.
* @param startEntry
* id of first entry of sequence
* @param maxCount
* the entries count
* @param maxSize
* the total entries size
* @param cb
* object implementing read callback interface
* @param ctx
* control object
*/
public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb,
Object ctx) {
// Little sanity check
if (startEntry < 0) {
LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry);
cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx);
}
if (notSupportBatchRead()) {
long lastEntry = startEntry + maxCount - 1;
asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
} else {
asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
//If the bookie server not support the batch read request, the bookie server will close the
// connection, then get the BookieHandleNotAvailableException.
if (rc == Code.BookieHandleNotAvailableException) {
notSupportBatch = true;
long lastEntry = startEntry + maxCount - 1;
asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
} else {
cb.readComplete(rc, lh, seq, ctx);
}
}
}, ctx, false);
}
}

/**
* Read a sequence of entries asynchronously.
*
Expand All @@ -760,6 +888,123 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
return readEntriesInternalAsync(firstEntry, lastEntry, false);
}

/**
* Read a sequence of entries in asynchronously.
* It sends an RPC to get all entries instead of send multi RPC to get all entries.
*
* @param startEntry
* id of first entry of sequence
* @param maxCount
* the entries count
* @param maxSize
* the total entries size
*/
@Override
public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long maxSize) {
// Little sanity check
if (startEntry < 0) {
LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry);
return FutureUtils.exception(new BKIncorrectParameterException());
}
if (startEntry > lastAddConfirmed) {
LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}",
ledgerId, startEntry, lastAddConfirmed);
return FutureUtils.exception(new BKReadException());
}
if (notSupportBatchRead()) {
long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed);
return readEntriesInternalAsync(startEntry, lastEntry, false);
}
CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false)
.whenComplete((entries, ex) -> {
if (ex != null) {
//If the bookie server not support the batch read request, the bookie server will close the
// connection, then get the BookieHandleNotAvailableException.
if (ex instanceof BKException.BKBookieHandleNotAvailableException) {
notSupportBatch = true;
long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed);
readEntriesInternalAsync(startEntry, lastEntry, false).whenComplete((entries1, ex1) -> {
if (ex1 != null) {
future.completeExceptionally(ex1);
} else {
future.complete(entries1);
}
});
} else {
future.completeExceptionally(ex);
}
} else {
future.complete(entries);
}
});
return future;
}

private boolean notSupportBatchRead() {
if (!clientCtx.getConf().batchReadEnabled) {
return true;
}
if (notSupportBatch) {
return true;
}
LedgerMetadata ledgerMetadata = getLedgerMetadata();
return ledgerMetadata.getEnsembleSize() != ledgerMetadata.getWriteQuorumSize();
}

private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
boolean isRecoveryRead) {
int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes;
if (maxSize > nettyMaxFrameSizeBytes) {
LOG.info(
"The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.",
nettyMaxFrameSizeBytes);
maxSize = nettyMaxFrameSizeBytes;
}
if (maxSize <= 0) {
LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes);
maxSize = nettyMaxFrameSizeBytes;
}
BatchedReadOp op = new BatchedReadOp(this, clientCtx,
startEntry, maxCount, maxSize, isRecoveryRead);
if (!clientCtx.isClientClosed()) {
// Waiting on the first one.
// This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive
// state later after N requests sent.
// Unfortunately it seems that alternatives are:
// - send reads one-by-one (up to the app)
// - rework LedgerHandle to send requests one-by-one (maybe later, potential perf impact)
// - block worker pool (not good)
// Even with this implementation one should be more concerned about OOME when all read responses arrive
// or about overloading bookies with these requests then about submission of many small requests.
// Naturally one of the solutions would be to submit smaller batches and in this case
// current implementation will prevent next batch from starting when bookie is
// unresponsive thus helpful enough.
if (clientCtx.getConf().waitForWriteSetMs >= 0) {
DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(startEntry);
try {
if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
ws.recycle();
}
}

if (isHandleWritable()) {
// Ledger handle in read/write mode: submit to OSE for ordered execution.
executeOrdered(op);
} else {
// Read-only ledger handle: bypass OSE and execute read directly in client thread.
// This avoids a context-switch to OSE thread and thus reduces latency.
op.run();
}
} else {
op.future().completeExceptionally(BKException.create(ClientClosedException));
}
return op.future();
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* <br>This is the same of
Expand Down Expand Up @@ -829,6 +1074,40 @@ public void onFailure(Throwable cause) {
}
}

void asyncBatchReadEntriesInternal(long startEntry, int maxCount, long maxSize, ReadCallback cb,
Object ctx, boolean isRecoveryRead) {
if (!clientCtx.isClientClosed()) {
batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, isRecoveryRead)
.whenCompleteAsync(new FutureEventListener<LedgerEntries>() {
@Override
public void onSuccess(LedgerEntries entries) {
cb.readComplete(
Code.OK,
LedgerHandle.this,
IteratorUtils.asEnumeration(
Iterators.transform(entries.iterator(), le -> {
LedgerEntry entry = new LedgerEntry((LedgerEntryImpl) le);
le.close();
return entry;
})),
ctx);
}

@Override
public void onFailure(Throwable cause) {
if (cause instanceof BKException) {
BKException bke = (BKException) cause;
cb.readComplete(bke.getCode(), LedgerHandle.this, null, ctx);
} else {
cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null, ctx);
}
}
}, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
} else {
cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx);
}
}

/*
* Read the last entry in the ledger
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ public interface ReadHandle extends Handle {
*/
CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);

/**
* Read a sequence of entries asynchronously.
*
* @param startEntry
* start entry id
* @param maxCount
* the total entries count.
* @param maxSize
* the total entries size.
* @return an handle to the result of the operation
*/
default CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long maxSize) {
CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
future.completeExceptionally(new UnsupportedOperationException());
return future;
}

/**
* Read a sequence of entries synchronously.
*
Expand All @@ -59,6 +76,21 @@ default LedgerEntries read(long firstEntry, long lastEntry) throws BKException,
BKException.HANDLER);
}

/**
*
* @param startEntry
* start entry id
* @param maxCount
* the total entries count.
* @param maxSize
* the total entries size.
* @return the result of the operation
*/
default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize)
throws BKException, InterruptedException {
return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize), BKException.HANDLER);
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* <br>This is the same of
Expand Down
Loading

0 comments on commit 2209734

Please sign in to comment.