From bc3a933679635936f02953d4db62ca92ae2881fa Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 20 Dec 2024 20:58:03 +0800 Subject: [PATCH] [improve][client] Make replicateSubscriptionState nullable (#23757) Signed-off-by: Zixuan Liu --- .../pulsar/broker/service/ServerCnx.java | 4 +- .../broker/service/SubscriptionOption.java | 2 +- .../nonpersistent/NonPersistentTopic.java | 4 +- .../persistent/PersistentSubscription.java | 19 +++- .../service/persistent/PersistentTopic.java | 15 ++- .../client/api/ReplicateSubscriptionTest.java | 96 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../impl/conf/ConsumerConfigurationData.java | 14 ++- .../client/impl/ConsumerBuilderImplTest.java | 36 ++++++- .../pulsar/common/protocol/Commands.java | 8 +- 10 files changed, 175 insertions(+), 25 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a1dcd7af6b06f..7dcc44c325a0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1196,8 +1196,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { ? subscribe.getStartMessageRollbackDurationSec() : -1; final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; - final boolean isReplicated = subscribe.hasReplicateSubscriptionState() - && subscribe.isReplicateSubscriptionState(); + final Boolean isReplicated = + subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null; final boolean forceTopicCreation = subscribe.isForceTopicCreation(); final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index af56d023616b4..328e7618f8cd8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -46,7 +46,7 @@ public class SubscriptionOption { private boolean readCompacted; private CommandSubscribe.InitialPosition initialPosition; private long startMessageRollbackDurationSec; - private boolean replicatedSubscriptionStateArg; + private Boolean replicatedSubscriptionStateArg; private KeySharedMeta keySharedMeta; private Optional> subscriptionProperties; private long consumerEpoch; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 0a725183bde9f..a8eaf82fa6dfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -256,7 +256,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), - option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), + option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null), option.getSchemaType()); } @@ -280,7 +280,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St MessageId startMessageId, Map metadata, boolean readCompacted, long resetStartMessageBackInSec, - boolean replicateSubscriptionState, + Boolean replicateSubscriptionState, KeySharedMeta keySharedMeta, Map subscriptionProperties, SchemaType schemaType) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 538d131ea7cc3..afe18ff548126 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -132,7 +132,11 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile Map subscriptionProperties; private volatile CompletableFuture fenceFuture; private volatile CompletableFuture inProgressResetCursorFuture; + private volatile Boolean replicatedControlled; + static Map getBaseCursorProperties(Boolean isReplicated) { + return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : + NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } @@ -146,19 +150,21 @@ static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) { } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated) { + Boolean replicated) { this(topic, subscriptionName, cursor, replicated, Collections.emptyMap()); } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + Boolean replicated, Map subscriptionProperties) { this.topic = topic; this.cursor = cursor; this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); - this.setReplicated(replicated); + this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); + if (replicated != null) { + this.setReplicated(replicated); + } this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() @@ -197,6 +203,7 @@ public boolean isReplicated() { } public boolean setReplicated(boolean replicated) { + replicatedControlled = replicated; ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { @@ -1439,4 +1446,8 @@ public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl pos private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); + @VisibleForTesting + public Boolean getReplicatedControlled() { + return replicatedControlled; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 31ed54bbb43d6..46ae137a739dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -572,7 +572,7 @@ public CompletableFuture unloadSubscription(@Nonnull String subName) { } private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + Boolean replicated, Map subscriptionProperties) { Objects.requireNonNull(compactedTopic); if (isCompactionSubscription(subscriptionName)) { return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); @@ -886,7 +886,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St Map metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean replicatedSubscriptionStateArg, + Boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch, @@ -897,12 +897,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { - boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; - - if (replicatedSubscriptionState + if (replicatedSubscriptionStateArg != null && replicatedSubscriptionStateArg && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { log.warn("[{}] Replicated Subscription is disabled by broker.", getName()); - replicatedSubscriptionState = false; } if (subType == SubType.Key_Shared @@ -971,7 +968,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St CompletableFuture subscriptionFuture = isDurable ? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionState, subscriptionProperties) + replicatedSubscriptionStateArg, subscriptionProperties) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec, readCompacted, subscriptionProperties); @@ -1068,7 +1065,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs private CompletableFuture getDurableSubscription(String subscriptionName, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean replicated, + Boolean replicated, Map subscriptionProperties) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) { @@ -1099,7 +1096,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { return; } } - if (replicated && !subscription.isReplicated()) { + if (replicated != null && replicated && !subscription.isReplicated()) { // Flip the subscription state subscription.setReplicated(replicated); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java new file mode 100644 index 0000000000000..327081bf1b9c8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ReplicateSubscriptionTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + } + + @DataProvider + public Object[] replicateSubscriptionState() { + return new Object[]{ + Boolean.TRUE, + Boolean.FALSE, + null + }; + } + + @Test(dataProvider = "replicateSubscriptionState") + public void testReplicateSubscriptionState(Boolean replicateSubscriptionState) + throws Exception { + String topic = "persistent://my-property/my-ns/" + System.nanoTime(); + String subName = "sub-" + System.nanoTime(); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName); + if (replicateSubscriptionState != null) { + consumerBuilder.replicateSubscriptionState(replicateSubscriptionState); + } + ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder; + assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState); + @Cleanup + Consumer ignored = consumerBuilder.subscribe(); + CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic); + assertThat(topicIfExists) + .succeedsWithin(3, TimeUnit.SECONDS) + .matches(optionalTopic -> { + assertTrue(optionalTopic.isPresent()); + Topic topicRef = optionalTopic.get(); + Subscription subscription = topicRef.getSubscription(subName); + assertNotNull(subscription); + assertTrue(subscription instanceof PersistentSubscription); + PersistentSubscription persistentSubscription = (PersistentSubscription) subscription; + assertEquals(persistentSubscription.getReplicatedControlled(), replicateSubscriptionState); + return true; + }); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c6f4779db3f46..998e8d70676b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -847,7 +847,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { synchronized (this) { ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, - conf.isReplicateSubscriptionState(), + conf.getReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 2bb7ef79c647a..88a0aa8ec1361 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Sets; import io.swagger.annotations.ApiModelProperty; import java.io.Serializable; @@ -381,7 +382,8 @@ public int getMaxPendingChuckedMessage() { value = "If `replicateSubscriptionState` is enabled, a subscription state is replicated to geo-replicated" + " clusters." ) - private boolean replicateSubscriptionState = false; + @JsonProperty(access = JsonProperty.Access.READ_WRITE) + private Boolean replicateSubscriptionState; private boolean resetIncludeHead = false; @@ -437,4 +439,14 @@ public ConsumerConfigurationData clone() { throw new RuntimeException("Failed to clone ConsumerConfigurationData"); } } + + /** + * Backward compatibility with the old `replicateSubscriptionState` field. + * @deprecated Using {@link #getReplicateSubscriptionState()} instead. + */ + @JsonIgnore + @Deprecated + public boolean isReplicateSubscriptionState() { + return replicateSubscriptionState != null && replicateSubscriptionState; + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 3fe136630462f..bb18d78b03692 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -501,7 +501,7 @@ public void testLoadConf() throws Exception { assertTrue(configurationData.isRetryEnable()); assertFalse(configurationData.isAutoUpdatePartitions()); assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2); - assertTrue(configurationData.isReplicateSubscriptionState()); + assertEquals(configurationData.getReplicateSubscriptionState(), Boolean.TRUE); assertTrue(configurationData.isResetIncludeHead()); assertTrue(configurationData.isBatchIndexAckEnabled()); assertTrue(configurationData.isAckReceiptEnabled()); @@ -561,7 +561,7 @@ public void testLoadConfNotModified() { assertFalse(configurationData.isRetryEnable()); assertTrue(configurationData.isAutoUpdatePartitions()); assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60); - assertFalse(configurationData.isReplicateSubscriptionState()); + assertNull(configurationData.getReplicateSubscriptionState()); assertFalse(configurationData.isResetIncludeHead()); assertFalse(configurationData.isBatchIndexAckEnabled()); assertFalse(configurationData.isAckReceiptEnabled()); @@ -581,6 +581,38 @@ public void testLoadConfNotModified() { assertNull(configurationData.getPayloadProcessor()); } + @Test + public void testReplicateSubscriptionState() { + ConsumerBuilderImpl consumerBuilder = createConsumerBuilder(); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + + consumerBuilder.replicateSubscriptionState(true); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE); + + consumerBuilder.replicateSubscriptionState(false); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE); + + Map conf = new HashMap<>(); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + + conf.put("replicateSubscriptionState", true); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE); + + conf.put("replicateSubscriptionState", false); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE); + + conf.put("replicateSubscriptionState", null); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + } + private ConsumerBuilderImpl createConsumerBuilder() { ConsumerBuilderImpl consumerBuilder = new ConsumerBuilderImpl<>(null, Schema.BYTES); Map properties = new HashMap<>(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index c352da0c871ed..9fae9d1160e1c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -580,7 +580,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, Boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, @@ -591,7 +591,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, Boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, Map subscriptionProperties, long consumerEpoch) { @@ -607,9 +607,11 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setDurable(isDurable) .setReadCompacted(readCompacted) .setInitialPosition(subscriptionInitialPosition) - .setReplicateSubscriptionState(isReplicated) .setForceTopicCreation(createTopicIfDoesNotExist) .setConsumerEpoch(consumerEpoch); + if (isReplicated != null) { + subscribe.setReplicateSubscriptionState(isReplicated); + } if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) { List keyValues = new ArrayList<>();