From abb5981676a870c2d3b17b62d5b96f3a3f4a4d55 Mon Sep 17 00:00:00 2001 From: Hang Chen Date: Wed, 20 Sep 2023 15:14:25 +0800 Subject: [PATCH] Rackaware placement policy support local node awareness by hostname (#4057) ### Motivation Rack-aware placement policies enable preference for bookies that reside in the same rack as the bookie client. - Initiate local node by resolving the rack information https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L207-L215 - When generating new ensembles for a ledger, the selecting algorithm will set the localNode's rack to `curRack` and select one bookie from `curRack` first https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L420-L426 However, when resolving the local node's rack information, we use IP to resolve the rack name, which is unfriendly with k8s deployment. https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L209 In k8s deployment, we usually use the hostname as bookieId and Pulsar broker name instead of IP, because the IP will be changed when the pods migrated to other nodes. ### Modification In order not to bring break change to the current behavior, I introduced a flag `useHostnameResolveLocalNodePlacementPolicy` in the BookKeeper client configuration to control whether to use the hostname to resolve the bookie client's local node rack information. The flag is `false` by default, which is the same behavior as the current logic. Due to this PR doesn't introduce any break changes, I think we can cherry-pick it back to the patch releases (branch-4.14, branch-4.15 and branch-4.16) --- .../RackawareEnsemblePlacementPolicy.java | 30 +++++-- .../RackawareEnsemblePlacementPolicyImpl.java | 43 +++++++++- .../RegionAwareEnsemblePlacementPolicy.java | 9 +- .../bookkeeper/conf/ClientConfiguration.java | 19 +++++ .../TestRackawareEnsemblePlacementPolicy.java | 84 +++++++++++++++++++ 5 files changed, 176 insertions(+), 9 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index 72858f188f7..1fb17ca3ef1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -58,20 +58,40 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso boolean enforceMinNumRacksPerWriteQuorum, boolean ignoreLocalNodeInPlacementPolicy, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { + return initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, + reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, + enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, false, + statsLogger, bookieAddressResolver); + } + + @Override + protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsResolver, + HashedWheelTimer timer, + boolean reorderReadsRandom, + int stabilizePeriodSeconds, + int reorderThresholdPendingRequests, + boolean isWeighted, + int maxWeightMultiple, + int minNumRacksPerWriteQuorum, + boolean enforceMinNumRacksPerWriteQuorum, + boolean ignoreLocalNodeInPlacementPolicy, + boolean useHostnameResolveLocalNodePlacementPolicy, + StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { if (stabilizePeriodSeconds > 0) { super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, - ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver); + ignoreLocalNodeInPlacementPolicy, useHostnameResolveLocalNodePlacementPolicy, + statsLogger, bookieAddressResolver); slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability); slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, - enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, statsLogger, - bookieAddressResolver); + enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, + useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver); } else { super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, - enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, statsLogger, - bookieAddressResolver); + enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, + useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver); slave = null; } return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 6ec9e5b1589..7f219854ed8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -90,6 +90,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP protected int minNumRacksPerWriteQuorum; protected boolean enforceMinNumRacksPerWriteQuorum; protected boolean ignoreLocalNodeInPlacementPolicy; + protected boolean useHostnameResolveLocalNodePlacementPolicy; public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering"; @@ -144,6 +145,41 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP topology = new NetworkTopologyImpl(); } + /** + * Initialize the policy. + * + * @param dnsResolver + * @param timer + * @param reorderReadsRandom + * @param stabilizePeriodSeconds + * @param reorderThresholdPendingRequests + * @param isWeighted + * @param maxWeightMultiple + * @param minNumRacksPerWriteQuorum + * @param enforceMinNumRacksPerWriteQuorum + * @param ignoreLocalNodeInPlacementPolicy + * @param statsLogger + * @param bookieAddressResolver + * @return initialized ensemble placement policy + */ + protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dnsResolver, + HashedWheelTimer timer, + boolean reorderReadsRandom, + int stabilizePeriodSeconds, + int reorderThresholdPendingRequests, + boolean isWeighted, + int maxWeightMultiple, + int minNumRacksPerWriteQuorum, + boolean enforceMinNumRacksPerWriteQuorum, + boolean ignoreLocalNodeInPlacementPolicy, + StatsLogger statsLogger, + BookieAddressResolver bookieAddressResolver) { + return initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, + reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, + enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, + false, statsLogger, bookieAddressResolver); + } + /** * Initialize the policy. * @@ -160,6 +196,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns int minNumRacksPerWriteQuorum, boolean enforceMinNumRacksPerWriteQuorum, boolean ignoreLocalNodeInPlacementPolicy, + boolean useHostnameResolveLocalNodePlacementPolicy, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead."); @@ -195,6 +232,7 @@ public Integer getSample() { this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum; this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum; this.ignoreLocalNodeInPlacementPolicy = ignoreLocalNodeInPlacementPolicy; + this.useHostnameResolveLocalNodePlacementPolicy = useHostnameResolveLocalNodePlacementPolicy; // create the network topology if (stabilizePeriodSeconds > 0) { @@ -206,7 +244,9 @@ public Integer getSample() { BookieNode bn = null; if (!ignoreLocalNodeInPlacementPolicy) { try { - bn = createDummyLocalBookieNode(InetAddress.getLocalHost().getHostAddress()); + String hostname = useHostnameResolveLocalNodePlacementPolicy + ? InetAddress.getLocalHost().getCanonicalHostName() : InetAddress.getLocalHost().getHostAddress(); + bn = createDummyLocalBookieNode(hostname); } catch (IOException e) { LOG.error("Failed to get local host address : ", e); } @@ -303,6 +343,7 @@ public Long load(BookieId key) throws Exception { conf.getMinNumRacksPerWriteQuorum(), conf.getEnforceMinNumRacksPerWriteQuorum(), conf.getIgnoreLocalNodeInPlacementPolicy(), + conf.getUseHostnameResolveLocalNodePlacementPolicy(), statsLogger, bookieAddressResolver); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index 43969b8fde3..5fcfd0f94f5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -140,7 +140,8 @@ public void handleBookiesThatJoined(Set joinedBookies) { .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, - this.ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver) + this.ignoreLocalNodeInPlacementPolicy, + this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK)); } @@ -201,7 +202,8 @@ public void onBookieRackChange(List bookieAddressList) { this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, - this.ignoreLocalNodeInPlacementPolicy, statsLogger, + this.ignoreLocalNodeInPlacementPolicy, + this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); perRegionPlacement.put(newRegion, newRegionPlacement); @@ -242,7 +244,8 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, - this.ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver) + this.ignoreLocalNodeInPlacementPolicy, this.ignoreLocalNodeInPlacementPolicy, + statsLogger, bookieAddressResolver) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK)); } minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 8aaa8bbff40..297a2f62f47 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -161,6 +161,9 @@ public class ClientConfiguration extends AbstractConfigurationempty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + // Update cluster + Set addrs = new HashSet(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + addrs.add(addr6.toBookieId()); + addrs.add(addr7.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + + int ensembleSize = 3; + int writeQuorumSize = 3; + int ackQuorumSize = 2; + + Set excludeBookies = new HashSet<>(); + + for (int i = 0; i < 50000; ++i) { + EnsemblePlacementPolicy.PlacementResult> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + List ensemble = ensembleResponse.getResult(); + if (!ensemble.contains(addr1.toBookieId())) { + fail("Failed to select bookie located on the same rack with bookie client"); + } + if (ensemble.contains(addr2.toBookieId()) && ensemble.contains(addr3.toBookieId())) { + fail("addr2 and addr3 is same rack."); + } + } + + //addr4 shutdown. + addrs.remove(addr5.toBookieId()); + repp.onClusterChanged(addrs, new HashSet()); + for (int i = 0; i < 50000; ++i) { + EnsemblePlacementPolicy.PlacementResult> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + List ensemble = ensembleResponse.getResult(); + if (!ensemble.contains(addr1.toBookieId())) { + fail("Failed to select bookie located on the same rack with bookie client"); + } + } + + } + @Test public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception { int numOfRacksToCreate = 6;