diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index 72858f188f7..1fb17ca3ef1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -58,20 +58,40 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso boolean enforceMinNumRacksPerWriteQuorum, boolean ignoreLocalNodeInPlacementPolicy, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { + return initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, + reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, + enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, false, + statsLogger, bookieAddressResolver); + } + + @Override + protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsResolver, + HashedWheelTimer timer, + boolean reorderReadsRandom, + int stabilizePeriodSeconds, + int reorderThresholdPendingRequests, + boolean isWeighted, + int maxWeightMultiple, + int minNumRacksPerWriteQuorum, + boolean enforceMinNumRacksPerWriteQuorum, + boolean ignoreLocalNodeInPlacementPolicy, + boolean useHostnameResolveLocalNodePlacementPolicy, + StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { if (stabilizePeriodSeconds > 0) { super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, - ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver); + ignoreLocalNodeInPlacementPolicy, useHostnameResolveLocalNodePlacementPolicy, + statsLogger, bookieAddressResolver); slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability); slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, - enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, statsLogger, - bookieAddressResolver); + enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, + useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver); } else { super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, - enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, statsLogger, - bookieAddressResolver); + enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, + useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver); slave = null; } return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 6ec9e5b1589..7f219854ed8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -90,6 +90,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP protected int minNumRacksPerWriteQuorum; protected boolean enforceMinNumRacksPerWriteQuorum; protected boolean ignoreLocalNodeInPlacementPolicy; + protected boolean useHostnameResolveLocalNodePlacementPolicy; public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering"; @@ -144,6 +145,41 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP topology = new NetworkTopologyImpl(); } + /** + * Initialize the policy. + * + * @param dnsResolver + * @param timer + * @param reorderReadsRandom + * @param stabilizePeriodSeconds + * @param reorderThresholdPendingRequests + * @param isWeighted + * @param maxWeightMultiple + * @param minNumRacksPerWriteQuorum + * @param enforceMinNumRacksPerWriteQuorum + * @param ignoreLocalNodeInPlacementPolicy + * @param statsLogger + * @param bookieAddressResolver + * @return initialized ensemble placement policy + */ + protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dnsResolver, + HashedWheelTimer timer, + boolean reorderReadsRandom, + int stabilizePeriodSeconds, + int reorderThresholdPendingRequests, + boolean isWeighted, + int maxWeightMultiple, + int minNumRacksPerWriteQuorum, + boolean enforceMinNumRacksPerWriteQuorum, + boolean ignoreLocalNodeInPlacementPolicy, + StatsLogger statsLogger, + BookieAddressResolver bookieAddressResolver) { + return initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, + reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, + enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, + false, statsLogger, bookieAddressResolver); + } + /** * Initialize the policy. * @@ -160,6 +196,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns int minNumRacksPerWriteQuorum, boolean enforceMinNumRacksPerWriteQuorum, boolean ignoreLocalNodeInPlacementPolicy, + boolean useHostnameResolveLocalNodePlacementPolicy, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead."); @@ -195,6 +232,7 @@ public Integer getSample() { this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum; this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum; this.ignoreLocalNodeInPlacementPolicy = ignoreLocalNodeInPlacementPolicy; + this.useHostnameResolveLocalNodePlacementPolicy = useHostnameResolveLocalNodePlacementPolicy; // create the network topology if (stabilizePeriodSeconds > 0) { @@ -206,7 +244,9 @@ public Integer getSample() { BookieNode bn = null; if (!ignoreLocalNodeInPlacementPolicy) { try { - bn = createDummyLocalBookieNode(InetAddress.getLocalHost().getHostAddress()); + String hostname = useHostnameResolveLocalNodePlacementPolicy + ? InetAddress.getLocalHost().getCanonicalHostName() : InetAddress.getLocalHost().getHostAddress(); + bn = createDummyLocalBookieNode(hostname); } catch (IOException e) { LOG.error("Failed to get local host address : ", e); } @@ -303,6 +343,7 @@ public Long load(BookieId key) throws Exception { conf.getMinNumRacksPerWriteQuorum(), conf.getEnforceMinNumRacksPerWriteQuorum(), conf.getIgnoreLocalNodeInPlacementPolicy(), + conf.getUseHostnameResolveLocalNodePlacementPolicy(), statsLogger, bookieAddressResolver); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index 43969b8fde3..5fcfd0f94f5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -140,7 +140,8 @@ public void handleBookiesThatJoined(Set joinedBookies) { .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, - this.ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver) + this.ignoreLocalNodeInPlacementPolicy, + this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK)); } @@ -201,7 +202,8 @@ public void onBookieRackChange(List bookieAddressList) { this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, - this.ignoreLocalNodeInPlacementPolicy, statsLogger, + this.ignoreLocalNodeInPlacementPolicy, + this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); perRegionPlacement.put(newRegion, newRegionPlacement); @@ -242,7 +244,8 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, - this.ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver) + this.ignoreLocalNodeInPlacementPolicy, this.ignoreLocalNodeInPlacementPolicy, + statsLogger, bookieAddressResolver) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK)); } minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 8aaa8bbff40..297a2f62f47 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -161,6 +161,9 @@ public class ClientConfiguration extends AbstractConfigurationempty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + addrs.add(addr6.toBookieId()); + addrs.add(addr7.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + int ensembleSize = 3; + int writeQuorumSize = 3; + int ackQuorumSize = 2; + + Set excludeBookies = new HashSet<>(); + + for (int i = 0; i < 50000; ++i) { + EnsemblePlacementPolicy.PlacementResult> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + List ensemble = ensembleResponse.getResult(); + if (!ensemble.contains(addr1.toBookieId())) { + fail("Failed to select bookie located on the same rack with bookie client"); + } + if (ensemble.contains(addr2.toBookieId()) && ensemble.contains(addr3.toBookieId())) { + fail("addr2 and addr3 is same rack."); + } + } + + //addr4 shutdown. + addrs.remove(addr5.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + for (int i = 0; i < 50000; ++i) { + EnsemblePlacementPolicy.PlacementResult> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + List ensemble = ensembleResponse.getResult(); + if (!ensemble.contains(addr1.toBookieId())) { + fail("Failed to select bookie located on the same rack with bookie client"); + } + } + + } + @Test public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception { int numOfRacksToCreate = 6;