Skip to content

Commit

Permalink
[improve] [broker] Phase 1 of PIP-370 support disable create topics o…
Browse files Browse the repository at this point in the history
…n remote cluster through replication (#23169)
  • Loading branch information
poorbarcode authored Aug 23, 2024
1 parent 0a5cb51 commit 44f9860
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 8 deletions.
10 changes: 10 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,16 @@ replicatorPrefix=pulsar.repl
# due to missing ZooKeeper watch (disable with value 0)
replicationPolicyCheckDurationSeconds=600

# Whether the internal replication of the local cluster will trigger topic auto-creation on the remote cluster.
# 1. After enabling namespace-level Geo-Replication: whether the local broker will create topics on the remote
# cluster automatically when calling `pulsar-admin topics create-partitioned-topic`.
# 2. When enabling topic-level Geo-Replication on a partitioned topic: whether the local broker will create topics on
# the remote cluster.
# 3. Whether the internal Geo-Replicator in the local cluster will trigger non-persistent topic auto-creation for
# remote clusters.
# It is not a dynamic config, the default value is "true" to preserve backward-compatible behavior.
createTopicToRemoteClusterForReplication=true

# Default message retention time.
# 0 means retention is disabled. -1 means data is not removed by time quota.
defaultRetentionTimeInMinutes=0
Expand Down
10 changes: 10 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,16 @@ replicationProducerQueueSize=1000
# due to missing ZooKeeper watch (disable with value 0)
replicationPolicyCheckDurationSeconds=600

# Whether the internal replication of the local cluster will trigger topic auto-creation on the remote cluster.
# 1. After enabling namespace-level Geo-Replication: whether the local broker will create topics on the remote
# cluster automatically when calling `pulsar-admin topics create-partitioned-topic`.
# 2. When enabling topic-level Geo-Replication on a partitioned topic: whether the local broker will create topics on
# the remote cluster.
# 3. Whether the internal Geo-Replicator in the local cluster will trigger non-persistent topic auto-creation for
# remote clusters.
# It is not a dynamic config, the default value is "true" to preserve backward-compatible behavior.
createTopicToRemoteClusterForReplication=true

# Default message retention time. 0 means retention is disabled. -1 means data is not removed by time quota
defaultRetentionTimeInMinutes=0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2889,6 +2889,11 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
+ "inconsistency due to missing ZooKeeper watch (disable with value 0)"
)
private int replicationPolicyCheckDurationSeconds = 600;
@FieldContext(
category = CATEGORY_REPLICATION,
doc = "Whether the internal replicator will trigger topic auto-creation on the remote cluster."
)
private boolean createTopicToRemoteClusterForReplication = true;
@Deprecated
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,15 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
.thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
if (!createLocalTopicOnly && topicName.isGlobal()) {
if (!createLocalTopicOnly && topicName.isGlobal()
&& pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
log.info("[{}] Successfully created partitioned for topic {} for the remote clusters",
clientAppId());
} else {
log.info("[{}] Skip creating partitioned for topic {} for the remote clusters",
clientAppId(), topicName);
}
log.info("[{}] Successfully created partitions for topic {} in cluster {}",
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3332,6 +3332,12 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
}
return FutureUtil.waitForAll(futures);
}).thenCompose(__ -> {
if (!pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
log.info("[{}] Skip creating partitioned for topic {} for the remote clusters {}",
clientAppId(), topicName, replicationClusters.stream().filter(v ->
!pulsar().getConfig().getClusterName().equals(v)).collect(Collectors.toList()));
return CompletableFuture.completedFuture(null);
}
// Sync to create partitioned topic on the remote cluster if needed.
TopicName topicNameWithoutPartition = TopicName.get(topicName.getPartitionedTopicName());
return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public String getRemoteCluster() {
return remoteCluster;
}

protected CompletableFuture<Void> prepareCreateProducer() {
return CompletableFuture.completedFuture(null);
}

public void startProducer() {
// Guarantee only one task call "producerBuilder.createAsync()".
Pair<Boolean, State> setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting);
Expand All @@ -185,12 +189,15 @@ public void startProducer() {
}

log.info("[{}] Starting replicator", replicatorId);

// Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on
// the remote cluster.
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
builderImpl.getConf().setNonPartitionedTopicExpected(true);
producerBuilder.createAsync().thenAccept(producer -> {
setProducerAndTriggerReadEntries(producer);
prepareCreateProducer().thenCompose(ignore -> {
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
builderImpl.getConf().setNonPartitionedTopicExpected(true);
return producerBuilder.createAsync().thenAccept(producer -> {
setProducerAndTriggerReadEntries(producer);
});
}).exceptionally(ex -> {
Pair<Boolean, State> setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected);
if (setDisconnectedRes.getLeft()) {
Expand All @@ -215,6 +222,7 @@ public void startProducer() {
}
return null;
});

}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class GeoPersistentReplicator extends PersistentReplicator {
Expand All @@ -50,6 +52,33 @@ protected String getProducerName() {
return getReplicatorName(replicatorPrefix, localCluster) + REPL_PRODUCER_NAME_DELIMITER + remoteCluster;
}

@Override
protected CompletableFuture<Void> prepareCreateProducer() {
if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> topicCheckFuture = new CompletableFuture<>();
replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false, false)
.whenComplete((metadata, ex) -> {
if (ex == null) {
if (metadata.partitions == 0) {
topicCheckFuture.complete(null);
} else {
String errorMsg = String.format("{} Can not create the replicator due to the partitions in the"
+ " remote cluster is not 0, but is %s",
replicatorId, metadata.partitions);
log.error(errorMsg);
topicCheckFuture.completeExceptionally(
new PulsarClientException.NotAllowedException(errorMsg));
}
} else {
topicCheckFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
}
});
return topicCheckFuture;
}
}

@Override
protected boolean replicateEntries(List<Entry> entries) {
boolean atLeastOneMessageSentForReplication = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class DisabledCreateTopicToRemoteClusterForReplicationTest extends OneWayReplicatorTestBase {

@Override
@BeforeClass(alwaysRun = true, timeOut = 300000)
public void setup() throws Exception {
super.setup();
admin1.namespaces().setRetention(replicatedNamespace, new RetentionPolicies(300, 1024));
admin2.namespaces().setRetention(replicatedNamespace, new RetentionPolicies(300, 1024));
admin1.namespaces().setRetention(nonReplicatedNamespace, new RetentionPolicies(300, 1024));
admin2.namespaces().setRetention(nonReplicatedNamespace, new RetentionPolicies(300, 1024));
}

@Override
@AfterClass(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}

@Override
protected void setConfigDefaults(ServiceConfiguration config, String clusterName,
LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk);
config.setCreateTopicToRemoteClusterForReplication(false);
config.setReplicationStartAt("earliest");
}

@Test
public void testCreatePartitionedTopicWithNsReplication() throws Exception {
String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", "");
admin1.namespaces().createNamespace(ns);
admin2.namespaces().createNamespace(ns);
admin1.namespaces().setRetention(ns, new RetentionPolicies(3600, -1));
admin2.namespaces().setRetention(ns, new RetentionPolicies(3600, -1));

// Create non-partitioned topic.
// Enable replication.
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
final String part1 = TopicName.get(tp).getPartition(0).toString();
admin1.topics().createPartitionedTopic(tp, 1);
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2)));

// Trigger and wait for replicator starts.
String msgValue = "msg-1";
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(tp).create();
producer1.send(msgValue);
producer1.close();
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(part1, false).join().get();
assertFalse(topicPart1.getReplicators().isEmpty());
});

// Verify: there is no topic with the same name on the remote cluster.
try {
admin2.topics().getPartitionedTopicMetadata(tp);
fail("Expected a not found ex");
} catch (PulsarAdminException.NotFoundException ex) {
// expected.
}

// Verify: after creating the topic on the remote cluster, all things are fine.
admin2.topics().createPartitionedTopic(tp, 1);
Consumer<String> consumer2 = client2.newConsumer(Schema.STRING).topic(tp).isAckReceiptEnabled(true)
.subscriptionName("s1").subscribe();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue);
consumer2.close();

// cleanup.
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1)));
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(part1, false).join().get();
assertTrue(topicPart1.getReplicators().isEmpty());
});
admin1.topics().deletePartitionedTopic(tp, false);
admin2.topics().deletePartitionedTopic(tp, false);
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}

@Test
public void testEnableTopicReplication() throws Exception {
String ns = nonReplicatedNamespace;

// Create non-partitioned topic.
// Enable replication.
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
final String part1 = TopicName.get(tp).getPartition(0).toString();
admin1.topics().createPartitionedTopic(tp, 1);
admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1, cluster2));

// Trigger and wait for replicator starts.
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
p1.send("msg-1");
p1.close();
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(part1, false).join().get();
assertFalse(topicPart1.getReplicators().isEmpty());
});

// Verify: there is no topic with the same name on the remote cluster.
try {
admin2.topics().getPartitionedTopicMetadata(tp);
fail("Expected a not found ex");
} catch (PulsarAdminException.NotFoundException ex) {
// expected.
}

// Verify: after creating the topic on the remote cluster, all things are fine.
admin2.topics().createPartitionedTopic(tp, 1);
waitReplicatorStarted(part1);

// cleanup.
admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(part1, false).join().get();
assertTrue(topicPart1.getReplicators().isEmpty());
});
admin1.topics().deletePartitionedTopic(tp, false);
admin2.topics().deletePartitionedTopic(tp, false);
}

@Test
public void testNonPartitionedTopic() throws Exception {
String ns = nonReplicatedNamespace;

// Create non-partitioned topic.
// Enable replication.
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
admin1.topics().createNonPartitionedTopic(tp);
admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1, cluster2));

// Trigger and wait for replicator starts.
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
p1.send("msg-1");
p1.close();
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(tp, false).join().get();
assertFalse(topicPart1.getReplicators().isEmpty());
});

// Verify: there is no topic with the same name on the remote cluster.
try {
admin2.topics().getPartitionedTopicMetadata(tp);
fail("Expected a not found ex");
} catch (PulsarAdminException.NotFoundException ex) {
// expected.
}

// Verify: after creating the topic on the remote cluster, all things are fine.
admin2.topics().createNonPartitionedTopic(tp);
waitReplicatorStarted(tp);

// cleanup.
admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(tp, false).join().get();
assertTrue(topicPart1.getReplicators().isEmpty());
});
admin1.topics().delete(tp, false);
admin2.topics().delete(tp, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@ public void testInitialize() throws Exception {
assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200);
assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(), true);
}
}
Loading

0 comments on commit 44f9860

Please sign in to comment.