Skip to content

Commit

Permalink
Allow no longer to receive given topic messages in PatternMultiTopics…
Browse files Browse the repository at this point in the history
…ConsumerImpl.
  • Loading branch information
Denovo1998 committed Dec 30, 2024
1 parent 5a3a1f1 commit 98b978f
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -1170,4 +1172,207 @@ public void testPatternQuote(boolean partitioned) throws Exception {
admin.topics().delete(topicName, false);
}
}

@Test(timeOut = 20000)
public void testBlockAndUnBlockGivenTopics() throws Exception {
String baseTopicName = "persistent://my-property/my-ns/testBlockAndUnBlockGivenTopics-" + System.currentTimeMillis();
Pattern pattern = Pattern.compile(baseTopicName + ".*");

// create 3 topics.
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-1")
.create();
Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-2")
.create();
Producer<String> producer3 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-3")
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(pattern)
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();

// wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;

// verify consumer get methods.
assertSame(consumerImpl.getPattern().pattern(), pattern.pattern());
assertEquals(consumerImpl.getPartitionedTopics().size(), 0);

sendMessage(producer1, "msg1-1", consumer);
sendMessage(producer2, "msg2-1", consumer);
sendMessage(producer3, "msg3-1", consumer);

// add block topics.
Set<String> blockTopics = new HashSet<>();
blockTopics.add(baseTopicName + "-2");
blockTopics.add(baseTopicName + "-3");
((PatternMultiTopicsConsumerImpl<String>) consumer).blockTopics(blockTopics);

// waiting for topics to be blocked.
Thread.sleep(2000);

sendMessage(producer1, "msg1-2", consumer);
producer2.send("msg2-2");
producer3.send("msg3-2");

// await to check if msg2-2 and msg3-2 is not received in 5 seconds.
Awaitility.await().during(5, TimeUnit.SECONDS).until(() -> {
Message<String> receivedMessage = consumer.receive(100, TimeUnit.MILLISECONDS);
if (receivedMessage != null
&& (receivedMessage.getValue().equals("msg2-2") || receivedMessage.getValue().equals("msg3-2"))) {
throw new AssertionError("Received message which was supposed to be blocked");
}
return receivedMessage == null
|| (!receivedMessage.getValue().equals("msg2-2") && !receivedMessage.getValue().equals("msg3-2"));
});

((PatternMultiTopicsConsumerImpl<String>) consumer).unBlockTopics(blockTopics);

receivedAndAckedMessage(consumer);
receivedAndAckedMessage(consumer);

producer2.send("msg2-3");
receivedAndAckedMessage(consumer, "msg2-3");
producer3.send("msg3-3");
receivedAndAckedMessage(consumer, "msg3-3");
}

@Test(timeOut = 20000)
public void testRecheckTopicsAfterTopicBlocked() throws Exception {
String baseTopicName = "persistent://my-property/my-ns/testBlockAndUnBlockGivenTopics-"
+ System.currentTimeMillis();
Pattern pattern = Pattern.compile(baseTopicName + ".*");

// create 2 topics.
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-1")
.create();
Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-2")
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(pattern)
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();

// wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;

// verify consumer get methods.
assertSame(consumerImpl.getPattern().pattern(), pattern.pattern());
assertEquals(consumerImpl.getPartitionedTopics().size(), 0);

sendMessage(producer1, "msg1-1", consumer);
sendMessage(producer2, "msg2-1", consumer);

// add block topics.
Set<String> blockTopics = new HashSet<>();
blockTopics.add(baseTopicName + "-2");
((PatternMultiTopicsConsumerImpl<String>) consumer).blockTopics(blockTopics);

// waiting for topic2 to be blocked.
Thread.sleep(2000);

producer2.send("msg2-2");

((PatternMultiTopicsConsumerImpl<String>) consumer).recheckTopicsChange();

// await to check if msg2-2 is not received in 5 seconds.
Awaitility.await().during(5, TimeUnit.SECONDS).until(() -> {
Message<String> receivedMessage = consumer.receive(100, TimeUnit.MILLISECONDS);
if (receivedMessage != null && receivedMessage.getValue().equals("msg2-2")) {
throw new AssertionError("Received message which was supposed to be blocked");
}
return receivedMessage == null || !receivedMessage.getValue().equals("msg2-2");
});

((PatternMultiTopicsConsumerImpl<String>) consumer).unBlockTopics(blockTopics);

receivedAndAckedMessage(consumer, "msg2-2");
}

@Test(timeOut = 20000)
public void testBlockUnExistsTopic() throws Exception {
String baseTopicName = "persistent://my-property/my-ns/testBlockAndUnBlockGivenTopics-"
+ System.currentTimeMillis();
Pattern pattern = Pattern.compile(baseTopicName + ".*");

Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-1")
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(pattern)
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();

// wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;

// verify consumer get methods.
assertSame(consumerImpl.getPattern().pattern(), pattern.pattern());
assertEquals(consumerImpl.getPartitionedTopics().size(), 0);

sendMessage(producer1, "msg1-1", consumer);

// add block topics.
Set<String> blockTopics = new HashSet<>();
blockTopics.add(baseTopicName + "-2");
((PatternMultiTopicsConsumerImpl<String>) consumer).blockTopics(blockTopics);

// waiting for topic2 to be blocked.
Thread.sleep(2000);

Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-2")
.create();

// Blocking a non-existent topic will ignore the block request. At this time, messages can still be consumed.
sendMessage(producer2, "msg2-1", consumer);
}

private <T> void sendMessage(Producer<T> producer, T sendMessage,
Consumer<T> consumer) throws PulsarClientException {
producer.send(sendMessage);
receivedAndAckedMessage(consumer, sendMessage);
}

private <T> void receivedAndAckedMessage(Consumer<T> consumer, T sendMessage) throws PulsarClientException {
Message<T> message = consumer.receive();
assertEquals(message.getValue(), sendMessage);
consumer.acknowledge(message);
}

private <T> void receivedAndAckedMessage(Consumer<T> consumer) throws PulsarClientException {
Message<T> message = consumer.receive();
consumer.acknowledge(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.validation.constraints.NotNull;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T

private PatternConsumerUpdateQueue updateTaskQueue;

private final Set<String> blockedTopics = new HashSet<>();

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
Expand Down Expand Up @@ -171,17 +174,28 @@ CompletableFuture<Void> recheckTopicsChange() {

final List<String> oldTopics = new ArrayList<>(getPartitions());
return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult,
topicsChangeListener, oldTopics, subscription);
topicsChangeListener, oldTopics, subscription, blockedTopics);
}
});
}

@VisibleForTesting
static CompletableFuture<Void> updateSubscriptions(Pattern topicsPattern,
java.util.function.Consumer<String> topicsHashSetter,
GetTopicsResult getTopicsResult,
TopicsChangedListener topicsChangedListener,
List<String> oldTopics,
String subscriptionForLog) {
return updateSubscriptions(topicsPattern, topicsHashSetter, getTopicsResult, topicsChangedListener, oldTopics,
subscriptionForLog, Collections.emptySet());
}

static CompletableFuture<Void> updateSubscriptions(Pattern topicsPattern,
java.util.function.Consumer<String> topicsHashSetter,
GetTopicsResult getTopicsResult,
TopicsChangedListener topicsChangedListener,
List<String> oldTopics,
String subscriptionForLog, Set<String> blockedTopics) {
topicsHashSetter.accept(getTopicsResult.getTopicsHash());
if (!getTopicsResult.isChanged()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -197,6 +211,10 @@ static CompletableFuture<Void> updateSubscriptions(Pattern topicsPattern,
final List<CompletableFuture<?>> listenersCallback = new ArrayList<>(2);
Set<String> topicsAdded = TopicList.minus(newTopics, oldTopics);
Set<String> topicsRemoved = TopicList.minus(oldTopics, newTopics);
if (!blockedTopics.isEmpty()) {
topicsAdded.removeAll(blockedTopics);
}

if (log.isDebugEnabled()) {
log.debug("Pattern consumer [{}] Recheck pattern consumer's topics. topicsAdded: {}, topicsRemoved: {}",
subscriptionForLog, topicsAdded, topicsRemoved);
Expand Down Expand Up @@ -264,6 +282,9 @@ public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics)
unsubscribeList.add(unsubscribeFuture);
partialRemoved.add(topicName.getPartitionedTopicName());
partialRemovedForLog.add(topicName.toString());
} else {
// If the topic to be blocked does not exist, it is simply ignored.
blockedTopics.remove(tp);
}
}
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -425,5 +446,24 @@ protected void handleSubscribeOneTopicError(String topicName,
subscribeFuture.completeExceptionally(error);
}

protected void blockTopics(@NotNull Set<String> topicNames) {
if (!topicNames.isEmpty()) {
blockedTopics.addAll(topicNames);
updateTaskQueue.appendTopicsRemovedOp(topicNames);
}
}

protected void unBlockTopics(@NotNull Set<String> topicNames) {
topicNames.retainAll(blockedTopics);
if (!topicNames.isEmpty()) {
updateTaskQueue.appendTopicsAddedOp(topicNames);
blockedTopics.removeAll(topicNames);
}
}

protected Set<String> getBlockedTopics() {
return blockedTopics;
}

private static final Logger log = LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class);
}

0 comments on commit 98b978f

Please sign in to comment.