Skip to content

Commit

Permalink
[improve][client] Make replicateSubscriptionState nullable (apache#23757
Browse files Browse the repository at this point in the history
)

Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit bc3a933)
  • Loading branch information
nodece authored and srinath-ctds committed Dec 31, 2024
1 parent 92e6f3c commit 8d79588
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
&& subscribe.isReplicateSubscriptionState();
final Boolean isReplicated =
subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null;
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class SubscriptionOption {
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private long startMessageRollbackDurationSec;
private boolean replicatedSubscriptionStateArg;
private Boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(),
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(),
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null),
option.getSchemaType());
}
Expand All @@ -277,7 +277,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
String consumerName, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted,
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
Boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
SchemaType schemaType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
private volatile CompletableFuture<Void> inProgressResetCursorFuture;
private volatile Boolean replicatedControlled;

static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
Expand All @@ -143,19 +151,21 @@ static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated) {
Boolean replicated) {
this(topic, subscriptionName, cursor, replicated, Collections.emptyMap());
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
this.setReplicated(replicated);
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this);
if (replicated != null) {
this.setReplicated(replicated);
}
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
Expand Down Expand Up @@ -194,6 +204,7 @@ public boolean isReplicated() {
}

public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
Expand Down Expand Up @@ -1436,4 +1447,8 @@ public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl pos

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);

@VisibleForTesting
public Boolean getReplicatedControlled() {
return replicatedControlled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
}

private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
&& topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) {
Expand Down Expand Up @@ -891,7 +891,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicatedSubscriptionStateArg,
Boolean replicatedSubscriptionStateArg,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
long consumerEpoch,
Expand All @@ -902,12 +902,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
}

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;

if (replicatedSubscriptionState
if (replicatedSubscriptionStateArg != null && replicatedSubscriptionStateArg
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
replicatedSubscriptionState = false;
}

if (subType == SubType.Key_Shared
Expand Down Expand Up @@ -976,7 +973,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

CompletableFuture<? extends Subscription> subscriptionFuture = isDurable
? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
replicatedSubscriptionStateArg, subscriptionProperties)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted, subscriptionProperties);

Expand Down Expand Up @@ -1073,7 +1070,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicated,
Boolean replicated,
Map<String, String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
Expand Down Expand Up @@ -1104,7 +1101,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
return;
}
}
if (replicated && !subscription.isReplicated()) {
if (replicated != null && replicated && !subscription.isReplicated()) {
// Flip the subscription state
subscription.setReplicated(replicated);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ReplicateSubscriptionTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
}

@DataProvider
public Object[] replicateSubscriptionState() {
return new Object[]{
Boolean.TRUE,
Boolean.FALSE,
null
};
}

@Test(dataProvider = "replicateSubscriptionState")
public void testReplicateSubscriptionState(Boolean replicateSubscriptionState)
throws Exception {
String topic = "persistent://my-property/my-ns/" + System.nanoTime();
String subName = "sub-" + System.nanoTime();
ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName);
if (replicateSubscriptionState != null) {
consumerBuilder.replicateSubscriptionState(replicateSubscriptionState);
}
ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder;
assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState);
@Cleanup
Consumer<String> ignored = consumerBuilder.subscribe();
CompletableFuture<Optional<Topic>> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic);
assertThat(topicIfExists)
.succeedsWithin(3, TimeUnit.SECONDS)
.matches(optionalTopic -> {
assertTrue(optionalTopic.isPresent());
Topic topicRef = optionalTopic.get();
Subscription subscription = topicRef.getSubscription(subName);
assertNotNull(subscription);
assertTrue(subscription instanceof PersistentSubscription);
PersistentSubscription persistentSubscription = (PersistentSubscription) subscription;
assertEquals(persistentSubscription.getReplicatedControlled(), replicateSubscriptionState);
return true;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
synchronized (this) {
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(),
conf.getReplicateSubscriptionState(),
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
// Use the current epoch to subscribe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
Expand Down Expand Up @@ -381,7 +382,8 @@ public int getMaxPendingChuckedMessage() {
value = "If `replicateSubscriptionState` is enabled, a subscription state is replicated to geo-replicated"
+ " clusters."
)
private boolean replicateSubscriptionState = false;
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Boolean replicateSubscriptionState;

private boolean resetIncludeHead = false;

Expand Down Expand Up @@ -437,4 +439,14 @@ public ConsumerConfigurationData<T> clone() {
throw new RuntimeException("Failed to clone ConsumerConfigurationData");
}
}

/**
* Backward compatibility with the old `replicateSubscriptionState` field.
* @deprecated Using {@link #getReplicateSubscriptionState()} instead.
*/
@JsonIgnore
@Deprecated
public boolean isReplicateSubscriptionState() {
return replicateSubscriptionState != null && replicateSubscriptionState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ public void testLoadConf() throws Exception {
assertTrue(configurationData.isRetryEnable());
assertFalse(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
assertTrue(configurationData.isReplicateSubscriptionState());
assertEquals(configurationData.getReplicateSubscriptionState(), Boolean.TRUE);
assertTrue(configurationData.isResetIncludeHead());
assertTrue(configurationData.isBatchIndexAckEnabled());
assertTrue(configurationData.isAckReceiptEnabled());
Expand Down Expand Up @@ -561,7 +561,7 @@ public void testLoadConfNotModified() {
assertFalse(configurationData.isRetryEnable());
assertTrue(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60);
assertFalse(configurationData.isReplicateSubscriptionState());
assertNull(configurationData.getReplicateSubscriptionState());
assertFalse(configurationData.isResetIncludeHead());
assertFalse(configurationData.isBatchIndexAckEnabled());
assertFalse(configurationData.isAckReceiptEnabled());
Expand All @@ -581,6 +581,38 @@ public void testLoadConfNotModified() {
assertNull(configurationData.getPayloadProcessor());
}

@Test
public void testReplicateSubscriptionState() {
ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());

consumerBuilder.replicateSubscriptionState(true);
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE);

consumerBuilder.replicateSubscriptionState(false);
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE);

Map<String, Object> conf = new HashMap<>();
consumerBuilder = createConsumerBuilder();
consumerBuilder.loadConf(conf);
assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());

conf.put("replicateSubscriptionState", true);
consumerBuilder = createConsumerBuilder();
consumerBuilder.loadConf(conf);
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE);

conf.put("replicateSubscriptionState", false);
consumerBuilder = createConsumerBuilder();
consumerBuilder.loadConf(conf);
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE);

conf.put("replicateSubscriptionState", null);
consumerBuilder = createConsumerBuilder();
consumerBuilder.loadConf(conf);
assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());
}

private ConsumerBuilderImpl<byte[]> createConsumerBuilder() {
ConsumerBuilderImpl<byte[]> consumerBuilder = new ConsumerBuilderImpl<>(null, Schema.BYTES);
Map<String, String> properties = new HashMap<>();
Expand Down
Loading

0 comments on commit 8d79588

Please sign in to comment.