From b91d537e242aba2d6b313b61555c869abd2b1877 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Fri, 22 Nov 2024 00:09:34 -0800 Subject: [PATCH 1/2] bugfix: check callback to avoid std::bad_function_call exception --- lib/AckGroupingTracker.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/AckGroupingTracker.h b/lib/AckGroupingTracker.h index 97d0d85f..0494bb77 100644 --- a/lib/AckGroupingTracker.h +++ b/lib/AckGroupingTracker.h @@ -71,7 +71,10 @@ class AckGroupingTracker : public std::enable_shared_from_this Date: Sat, 23 Nov 2024 13:35:11 -0800 Subject: [PATCH 2/2] modified ReaderTest to test Reader for both persistent and non-persistent topics --- tests/ReaderTest.cc | 91 ++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 42 deletions(-) diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index 92fdf624..6e9efd81 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -37,12 +37,12 @@ using namespace pulsar; static std::string serviceUrl = "pulsar://localhost:6650"; static const std::string adminUrl = "http://localhost:8080/"; -class ReaderTest : public ::testing::TestWithParam { +class ReaderTest : public ::testing::TestWithParam> { public: void initTopic(std::string topicName) { if (isMultiTopic_) { // call admin api to make it partitioned - std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + std::string url = adminUrl + "admin/v2/" + (isNonPersistentTopic ? "non-" : "") + "persistent/public/default/" + topicName + "/partitions"; int res = makePutRequest(url, "5"); LOG_INFO("res = " << res); ASSERT_FALSE(res != 204 && res != 409); @@ -50,7 +50,12 @@ class ReaderTest : public ::testing::TestWithParam { } protected: - bool isMultiTopic_ = GetParam(); + std::string fullTopicName(const std::string & topicName) { + return (isNonPersistentTopic ? "non-" : "") + "persistent://public/default/" + topicName; + } + + bool isNonPersistentTopic = std::get<0>(GetParam()); + bool isMultiTopic_ = std::get<1>(GetParam()); }; TEST_P(ReaderTest, testSimpleReader) { @@ -62,10 +67,11 @@ TEST_P(ReaderTest, testSimpleReader) { ReaderConfiguration readerConf; Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader)); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -95,10 +101,11 @@ TEST_P(ReaderTest, testAsyncRead) { ReaderConfiguration readerConf; Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader)); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -140,7 +147,7 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -150,7 +157,7 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) { ReaderConfiguration readerConf; Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader)); for (int i = 0; i < 10; i++) { Message msg; @@ -174,7 +181,7 @@ TEST_P(ReaderTest, testMultipleReaders) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -184,10 +191,10 @@ TEST_P(ReaderTest, testMultipleReaders) { ReaderConfiguration readerConf; Reader reader1; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader1)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader1)); Reader reader2; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader2)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader2)); for (int i = 0; i < 10; i++) { Message msg; @@ -221,7 +228,7 @@ TEST_P(ReaderTest, testReaderOnLastMessage) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -231,7 +238,7 @@ TEST_P(ReaderTest, testReaderOnLastMessage) { ReaderConfiguration readerConf; Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader)); for (int i = 10; i < 20; i++) { std::string content = "my-message-" + std::to_string(i); @@ -261,7 +268,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -271,7 +278,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) { ReaderConfiguration readerConf; Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader)); MessageId lastMessageId; @@ -287,7 +294,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) { } // Create another reader starting on msgid4 - ASSERT_EQ(ResultOk, client.createReader(topicName, lastMessageId, readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), lastMessageId, readerConf, reader)); for (int i = 5; i < 10; i++) { Message msg; @@ -319,7 +326,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) { ProducerConfiguration producerConf; producerConf.setBatchingEnabled(true); producerConf.setBatchingMaxPublishDelayMs(1000); - ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -334,7 +341,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) { ReaderConfiguration readerConf; Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader)); std::string lastMessageId; @@ -352,7 +359,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) { // Create another reader starting on msgid4 auto msgId4 = MessageId::deserialize(lastMessageId); Reader reader2; - ASSERT_EQ(ResultOk, client.createReader(topicName, msgId4, readerConf, reader2)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), msgId4, readerConf, reader2)); for (int i = 5; i < 11; i++) { Message msg; @@ -382,12 +389,12 @@ TEST_P(ReaderTest, testReaderReachEndOfTopic) { ProducerConfiguration producerConf; producerConf.setBatchingEnabled(true); producerConf.setBatchingMaxPublishDelayMs(1000); - ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer)); // 2. create reader, and expect hasMessageAvailable return false since no message produced. ReaderConfiguration readerConf; Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader)); bool hasMessageAvailable; ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); @@ -457,12 +464,12 @@ TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) { Producer producer; ProducerConfiguration producerConf; producerConf.setBatchingEnabled(false); - ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer)); // 2. create reader, and expect hasMessageAvailable return false since no message produced. ReaderConfiguration readerConf; Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader)); bool hasMessageAvailable; ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); @@ -511,7 +518,7 @@ TEST(ReaderTest, testPartitionIndex) { "ReaderTestPartitionIndex-par-topic-" + std::to_string(time(nullptr)); int res = makePutRequest( - adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2"); + adminUrl + "admin/v2/" + (isNonPersistentTopic ? "non-" : "") + "persistent/public/default/" + partitionedTopic + "/partitions", "2"); ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; const std::string partition0 = partitionedTopic + "-partition-0"; @@ -520,14 +527,14 @@ TEST(ReaderTest, testPartitionIndex) { ReaderConfiguration readerConf; Reader readers[3]; ASSERT_EQ(ResultOk, - client.createReader(nonPartitionedTopic, MessageId::earliest(), readerConf, readers[0])); - ASSERT_EQ(ResultOk, client.createReader(partition0, MessageId::earliest(), readerConf, readers[1])); - ASSERT_EQ(ResultOk, client.createReader(partition1, MessageId::earliest(), readerConf, readers[2])); + client.createReader(fullTopicName(nonPartitionedTopic), MessageId::earliest(), readerConf, readers[0])); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(partition0), MessageId::earliest(), readerConf, readers[1])); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(partition1), MessageId::earliest(), readerConf, readers[2])); Producer producers[3]; - ASSERT_EQ(ResultOk, client.createProducer(nonPartitionedTopic, producers[0])); - ASSERT_EQ(ResultOk, client.createProducer(partition0, producers[1])); - ASSERT_EQ(ResultOk, client.createProducer(partition1, producers[2])); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(nonPartitionedTopic), producers[0])); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(partition0), producers[1])); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(partition1), producers[2])); for (auto& producer : producers) { ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build())); @@ -555,7 +562,7 @@ TEST_P(ReaderTest, testSubscriptionNameSetting) { ReaderConfiguration readerConf; readerConf.setInternalSubscriptionName(subName); Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader)); ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName()); @@ -575,7 +582,7 @@ TEST_P(ReaderTest, testSetSubscriptionNameAndPrefix) { readerConf.setInternalSubscriptionName(subName); readerConf.setSubscriptionRolePrefix("my-prefix"); Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader)); ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName()); @@ -594,13 +601,13 @@ TEST_P(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) { ReaderConfiguration readerConf1; readerConf1.setInternalSubscriptionName(subscriptionName); Reader reader1; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf1, reader1)); ReaderConfiguration readerConf2; readerConf2.setInternalSubscriptionName(subscriptionName); Reader reader2; ASSERT_EQ(ResultConsumerBusy, - client.createReader(topicName, MessageId::earliest(), readerConf2, reader2)); + client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf2, reader2)); reader1.close(); reader2.close(); @@ -616,7 +623,7 @@ TEST_P(ReaderTest, testIsConnected) { Reader reader; ASSERT_FALSE(reader.isConnected()); - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), {}, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), {}, reader)); ASSERT_TRUE(reader.isConnected()); ASSERT_EQ(ResultOk, reader.close()); @@ -633,7 +640,7 @@ TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) { ProducerConfiguration producerConf; producerConf.setBatchingMaxMessages(3); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer)); std::vector messageIds; constexpr int numMessages = 7; @@ -657,14 +664,14 @@ TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) { bool hasMessageAvailable; for (size_t i = 0; i < messageIds.size() - 1; i++) { - ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds[i], {}, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), messageIds[i], {}, reader)); ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); EXPECT_TRUE(hasMessageAvailable); } // The start message ID is exclusive by default, so when we start at the last message, there should be no // message available. - ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds.back(), {}, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), messageIds.back(), {}, reader)); ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); EXPECT_FALSE(hasMessageAvailable); client.close(); @@ -678,7 +685,7 @@ TEST_P(ReaderTest, testReceiveAfterSeek) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer)); MessageId seekMessageId; for (int i = 0; i < 5; i++) { @@ -690,7 +697,7 @@ TEST_P(ReaderTest, testReceiveAfterSeek) { } Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), {}, reader)); + ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), {}, reader)); reader.seek(seekMessageId); @@ -888,5 +895,5 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) { } } -INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false)); +INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Combine(::testing::Values(true, false), ::testing::Values(true, false))); INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));