Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BP-62] LedgerHandle introduces batch read API. #4195

Merged
merged 17 commits into from
Feb 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ClientInternalConf {
final boolean enableBookieFailureTracking;
final boolean useV2WireProtocol;
final boolean enforceMinNumFaultDomainsForWrite;
final boolean batchReadEnabled;
final int nettyMaxFrameSizeBytes;

static ClientInternalConf defaultValues() {
return fromConfig(new ClientConfiguration());
Expand All @@ -72,9 +74,9 @@ private ClientInternalConf(ClientConfiguration conf,
this.addEntryQuorumTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
this.throttleValue = conf.getThrottleValue();
this.bookieFailureHistoryExpirationMSec = conf.getBookieFailureHistoryExpirationMSec();

this.batchReadEnabled = conf.isBatchReadEnabled();
this.nettyMaxFrameSizeBytes = conf.getNettyMaxFrameSizeBytes();
this.disableEnsembleChangeFeature = featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());

this.delayEnsembleChange = conf.getDelayEnsembleChange();
this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges();
this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class LedgerHandle implements WriteHandle {
final long ledgerId;
final ExecutorService executor;
long lastAddPushed;
boolean notSupportBatch;

private enum HandleState {
OPEN,
Expand Down Expand Up @@ -641,6 +642,26 @@ public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
return SyncCallbackUtils.waitForResult(result);
}

/**
* Read a sequence of entries synchronously.
*
* @param startEntry
* start entry id
* @param maxCount
* the total entries count.
* @param maxSize
* the total entries size.
* @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, Object)
*/
public Enumeration<LedgerEntry> batchReadEntries(long startEntry, int maxCount, long maxSize)
throws InterruptedException, BKException {
CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();

asyncBatchReadEntries(startEntry, maxCount, maxSize, new SyncReadCallback(result), null);

return SyncCallbackUtils.waitForResult(result);
}

/**
* Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.<br>
* This is the same of
Expand All @@ -664,6 +685,27 @@ public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long las
return SyncCallbackUtils.waitForResult(result);
}

/**
* Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.<br>
* This is the same of
* {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean, ReadCallback, Object) }
*
* @param firstEntry
* id of first entry of sequence (included)
* @param maxCount
* id of last entry of sequence (included)
* @param maxSize
* the total entries size
*/
public Enumeration<LedgerEntry> batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize)
throws InterruptedException, BKException {
CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();

asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, new SyncReadCallback(result), null);

return SyncCallbackUtils.waitForResult(result);
}

/**
* Read a sequence of entries asynchronously.
*
Expand Down Expand Up @@ -695,6 +737,50 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
}

/**
* Read a sequence of entries in asynchronously.
* It send an RPC to get all entries instead of send multi RPC to get all entries.
*
* @param startEntry
* id of first entry of sequence
* @param maxCount
* the entries count
* @param maxSize
* the total entries size
* @param cb
* object implementing read callback interface
* @param ctx
* control object
*/
public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, Object ctx) {
// Little sanity check
if (startEntry > lastAddConfirmed) {
LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}",
ledgerId, startEntry, lastAddConfirmed);
cb.readComplete(BKException.Code.ReadException, this, null, ctx);
return;
}
if (notSupportBatchRead()) {
long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed);
asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
} else {
asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
//If the bookie server not support the batch read request, the bookie server will close the
// connection, then get the BookieHandleNotAvailableException.
if (rc == Code.BookieHandleNotAvailableException) {
notSupportBatch = true;
long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed);
asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
} else {
cb.readComplete(rc, lh, seq, ctx);
}
}
}, ctx, false);
}
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* <br>This is the same of
Expand Down Expand Up @@ -734,6 +820,48 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* It sends an RPC to get all entries instead of send multi RPC to get all entries.
* @param startEntry
* id of first entry of sequence
* @param maxCount
* the entries count
* @param maxSize
* the total entries size
* @param cb
* object implementing read callback interface
* @param ctx
* control object
*/
public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb,
Object ctx) {
// Little sanity check
if (startEntry < 0) {
LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry);
cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx);
}
if (notSupportBatchRead()) {
long lastEntry = startEntry + maxCount - 1;
asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
} else {
asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
//If the bookie server not support the batch read request, the bookie server will close the
// connection, then get the BookieHandleNotAvailableException.
if (rc == Code.BookieHandleNotAvailableException) {
notSupportBatch = true;
long lastEntry = startEntry + maxCount - 1;
asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
} else {
cb.readComplete(rc, lh, seq, ctx);
}
}
}, ctx, false);
}
}

/**
* Read a sequence of entries asynchronously.
*
Expand All @@ -760,6 +888,123 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
return readEntriesInternalAsync(firstEntry, lastEntry, false);
}

/**
* Read a sequence of entries in asynchronously.
* It sends an RPC to get all entries instead of send multi RPC to get all entries.
*
* @param startEntry
* id of first entry of sequence
* @param maxCount
* the entries count
* @param maxSize
* the total entries size
*/
@Override
public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long maxSize) {
// Little sanity check
if (startEntry < 0) {
LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry);
return FutureUtils.exception(new BKIncorrectParameterException());
}
if (startEntry > lastAddConfirmed) {
LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}",
ledgerId, startEntry, lastAddConfirmed);
return FutureUtils.exception(new BKReadException());
}
if (notSupportBatchRead()) {
long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed);
return readEntriesInternalAsync(startEntry, lastEntry, false);
}
CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false)
.whenComplete((entries, ex) -> {
if (ex != null) {
//If the bookie server not support the batch read request, the bookie server will close the
// connection, then get the BookieHandleNotAvailableException.
if (ex instanceof BKException.BKBookieHandleNotAvailableException) {
notSupportBatch = true;
long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed);
readEntriesInternalAsync(startEntry, lastEntry, false).whenComplete((entries1, ex1) -> {
if (ex1 != null) {
future.completeExceptionally(ex1);
} else {
future.complete(entries1);
}
});
} else {
future.completeExceptionally(ex);
}
} else {
future.complete(entries);
}
});
return future;
}

private boolean notSupportBatchRead() {
if (!clientCtx.getConf().batchReadEnabled) {
return true;
}
if (notSupportBatch) {
return true;
}
LedgerMetadata ledgerMetadata = getLedgerMetadata();
return ledgerMetadata.getEnsembleSize() != ledgerMetadata.getWriteQuorumSize();
}

private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
boolean isRecoveryRead) {
int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes;
if (maxSize > nettyMaxFrameSizeBytes) {
LOG.info(
"The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.",
nettyMaxFrameSizeBytes);
maxSize = nettyMaxFrameSizeBytes;
}
if (maxSize <= 0) {
LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes);
maxSize = nettyMaxFrameSizeBytes;
}
BatchedReadOp op = new BatchedReadOp(this, clientCtx,
startEntry, maxCount, maxSize, isRecoveryRead);
if (!clientCtx.isClientClosed()) {
// Waiting on the first one.
// This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive
// state later after N requests sent.
// Unfortunately it seems that alternatives are:
// - send reads one-by-one (up to the app)
// - rework LedgerHandle to send requests one-by-one (maybe later, potential perf impact)
// - block worker pool (not good)
// Even with this implementation one should be more concerned about OOME when all read responses arrive
// or about overloading bookies with these requests then about submission of many small requests.
// Naturally one of the solutions would be to submit smaller batches and in this case
// current implementation will prevent next batch from starting when bookie is
// unresponsive thus helpful enough.
if (clientCtx.getConf().waitForWriteSetMs >= 0) {
DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(startEntry);
try {
if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
ws.recycle();
}
}

if (isHandleWritable()) {
// Ledger handle in read/write mode: submit to OSE for ordered execution.
executeOrdered(op);
} else {
// Read-only ledger handle: bypass OSE and execute read directly in client thread.
// This avoids a context-switch to OSE thread and thus reduces latency.
op.run();
}
} else {
op.future().completeExceptionally(BKException.create(ClientClosedException));
}
return op.future();
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* <br>This is the same of
Expand Down Expand Up @@ -829,6 +1074,40 @@ public void onFailure(Throwable cause) {
}
}

void asyncBatchReadEntriesInternal(long startEntry, int maxCount, long maxSize, ReadCallback cb,
Object ctx, boolean isRecoveryRead) {
if (!clientCtx.isClientClosed()) {
batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, isRecoveryRead)
.whenCompleteAsync(new FutureEventListener<LedgerEntries>() {
@Override
public void onSuccess(LedgerEntries entries) {
cb.readComplete(
Code.OK,
LedgerHandle.this,
IteratorUtils.asEnumeration(
Iterators.transform(entries.iterator(), le -> {
LedgerEntry entry = new LedgerEntry((LedgerEntryImpl) le);
le.close();
return entry;
})),
ctx);
}

@Override
public void onFailure(Throwable cause) {
if (cause instanceof BKException) {
BKException bke = (BKException) cause;
cb.readComplete(bke.getCode(), LedgerHandle.this, null, ctx);
} else {
cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null, ctx);
}
}
}, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
} else {
cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx);
}
}

/*
* Read the last entry in the ledger
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ public interface ReadHandle extends Handle {
*/
CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);

/**
* Read a sequence of entries asynchronously.
*
* @param startEntry
* start entry id
* @param maxCount
* the total entries count.
* @param maxSize
* the total entries size.
* @return an handle to the result of the operation
*/
default CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long maxSize) {
CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
future.completeExceptionally(new UnsupportedOperationException());
return future;
}

/**
* Read a sequence of entries synchronously.
*
Expand All @@ -59,6 +76,21 @@ default LedgerEntries read(long firstEntry, long lastEntry) throws BKException,
BKException.HANDLER);
}

/**
*
* @param startEntry
* start entry id
* @param maxCount
* the total entries count.
* @param maxSize
* the total entries size.
* @return the result of the operation
*/
default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize)
throws BKException, InterruptedException {
return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize), BKException.HANDLER);
}

/**
* Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
* <br>This is the same of
Expand Down
Loading
Loading