Skip to content

Commit

Permalink
Implement isEnsembleAdheringToPlacementPolicy in RegionAwareEnsembleP…
Browse files Browse the repository at this point in the history
…lacementPolicy.
  • Loading branch information
druidliu committed Nov 17, 2023
1 parent 192f88b commit f514989
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -644,12 +645,75 @@ public final DistributionSchedule.WriteSet reorderReadLACSequence(
@Override
public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
int writeQuorumSize, int ackQuorumSize) {
/**
* TODO: have to implement actual logic for this method for
* RegionAwareEnsemblePlacementPolicy. For now return true value.
*
* - https://github.com/apache/bookkeeper/issues/1898
*/
if (CollectionUtils.isEmpty(ensembleList)) {
return PlacementPolicyAdherence.FAIL;
}

int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;

int ensembleSize = ensembleList.size();
Map<String, Set<BookieId>> regionsInQuorum = new HashMap<>();
BookieId bookie;
for (int i = 0; i < ensembleList.size(); i++) {
regionsInQuorum.clear();
for (int j = 0; j < writeQuorumSize; j++) {
bookie = ensembleList.get((i + j) % ensembleSize);
if (knownBookies.containsKey(bookie)) {
String region = getLocalRegion(knownBookies.get(bookie));
if (regionsInQuorum.containsKey(region)) {
regionsInQuorum.get(region).add(bookie);
} else {
Set<BookieId> bookieSet = new HashSet<>();
bookieSet.add(bookie);
regionsInQuorum.put(region, bookieSet);
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("bookie {} is not in the list of knownBookies", bookie);
}
}

if (regionsInQuorum.isEmpty()) {
return PlacementPolicyAdherence.FAIL;
}

if (regionsInQuorum.size() < 2) {
// fall back to use the ensemblePlacementPolicy in specific region
String region = regionsInQuorum.keySet().iterator().next();
Set<BookieId> bookieIds = regionsInQuorum.get(region);

TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region);
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = policyWithinRegion.isEnsembleAdheringToPlacementPolicy(
new ArrayList<>(bookieIds), bookieIds.size(), 1);
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
if (LOG.isDebugEnabled()) {
LOG.debug("For ensemble {}, write set starting at {} are all from one region, "
+ "fall back to RackawareEnsemblePlacementPolicy and fail.", ensembleList, i);
}
return PlacementPolicyAdherence.FAIL;
}
continue;
}

if (effectiveMinRegionsForDurability > 0 && regionsInQuorum.size() < effectiveMinRegionsForDurability) {
if (LOG.isDebugEnabled()) {
LOG.debug("For ensemble {}, write set starting at {} are from {} regions, "
+ "less than effectiveMinRegionsForDurability: {}.",
ensembleList, i, regionsInQuorum.size(), effectiveMinRegionsForDurability);
}
return PlacementPolicyAdherence.FAIL;
}

if (regionsInQuorum.size() < writeQuorumSize) {
// each writeQuorum should be different regions
if (LOG.isDebugEnabled()) {
LOG.debug("For ensemble: {}, write set starting at {} are from {} regions, "
+ "less than writeQuorumSize {}.",
ensembleList, i, regionsInQuorum.size(), writeQuorumSize);
}
return PlacementPolicyAdherence.FAIL;
}
}

return PlacementPolicyAdherence.MEETS_STRICT;
}
}
Loading

0 comments on commit f514989

Please sign in to comment.