Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: check callback to avoid std::bad_function_call exception #456

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lib/AckGroupingTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,19 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
* @param[in] msgId ID of the message to be ACKed.
* @param[in] callback the callback that is triggered when the message is acknowledged
*/
virtual void addAcknowledge(const MessageId& msgId, ResultCallback callback) { callback(ResultOk); }
virtual void addAcknowledge(const MessageId& msgId, ResultCallback callback) {
if (callback)
callback(ResultOk);
}

/**
* Adding message ID list into ACK group for individual ACK.
* @param[in] msgIds of the message to be ACKed.
* @param[in] callback the callback that is triggered when the messages are acknowledged
*/
virtual void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback callback) {
callback(ResultOk);
if (callback)
callback(ResultOk);
}

/**
Expand All @@ -88,7 +92,8 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
* @param[in] callback the callback that is triggered when the message is acknowledged
*/
virtual void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {
callback(ResultOk);
if (callback)
callback(ResultOk);
}

/**
Expand Down
91 changes: 49 additions & 42 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ using namespace pulsar;
static std::string serviceUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";

class ReaderTest : public ::testing::TestWithParam<bool> {
class ReaderTest : public ::testing::TestWithParam<std::tuple<bool, bool>> {
public:
void initTopic(std::string topicName) {
if (isMultiTopic_) {
// call admin api to make it partitioned
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
std::string url = adminUrl + "admin/v2/" + (isNonPersistentTopic ? "non-" : "") + "persistent/public/default/" + topicName + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);
}
}

protected:
bool isMultiTopic_ = GetParam();
std::string fullTopicName(const std::string & topicName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testPartitionIndex uses TEST but not TEST_F or TEST_P. If you're going to call this method in those tests, please change TEST to TEST_P

return (isNonPersistentTopic ? "non-" : "") + "persistent://public/default/" + topicName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return (isNonPersistentTopic ? "non-" : "") + "persistent://public/default/" + topicName;
return std::string(isNonPersistentTopic ? "non-" : "") + "persistent://public/default/" + topicName;

Only std::string can be used to call operator+. Otherwise the build will fail with "Invalid operands to binary expression ('const char *' and 'const char[29]') "

}

bool isNonPersistentTopic = std::get<0>(GetParam());
bool isMultiTopic_ = std::get<1>(GetParam());
};

TEST_P(ReaderTest, testSimpleReader) {
Expand All @@ -62,10 +67,11 @@ TEST_P(ReaderTest, testSimpleReader) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -95,10 +101,11 @@ TEST_P(ReaderTest, testAsyncRead) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -140,7 +147,7 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -150,7 +157,7 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

for (int i = 0; i < 10; i++) {
Message msg;
Expand All @@ -174,7 +181,7 @@ TEST_P(ReaderTest, testMultipleReaders) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -184,10 +191,10 @@ TEST_P(ReaderTest, testMultipleReaders) {

ReaderConfiguration readerConf;
Reader reader1;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader1));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader1));

Reader reader2;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader2));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader2));

for (int i = 0; i < 10; i++) {
Message msg;
Expand Down Expand Up @@ -221,7 +228,7 @@ TEST_P(ReaderTest, testReaderOnLastMessage) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -231,7 +238,7 @@ TEST_P(ReaderTest, testReaderOnLastMessage) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader));

for (int i = 10; i < 20; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -261,7 +268,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -271,7 +278,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

MessageId lastMessageId;

Expand All @@ -287,7 +294,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {
}

// Create another reader starting on msgid4
ASSERT_EQ(ResultOk, client.createReader(topicName, lastMessageId, readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), lastMessageId, readerConf, reader));

for (int i = 5; i < 10; i++) {
Message msg;
Expand Down Expand Up @@ -319,7 +326,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelayMs(1000);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -334,7 +341,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

std::string lastMessageId;

Expand All @@ -352,7 +359,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {
// Create another reader starting on msgid4
auto msgId4 = MessageId::deserialize(lastMessageId);
Reader reader2;
ASSERT_EQ(ResultOk, client.createReader(topicName, msgId4, readerConf, reader2));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), msgId4, readerConf, reader2));

for (int i = 5; i < 11; i++) {
Message msg;
Expand Down Expand Up @@ -382,12 +389,12 @@ TEST_P(ReaderTest, testReaderReachEndOfTopic) {
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelayMs(1000);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer));

// 2. create reader, and expect hasMessageAvailable return false since no message produced.
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader));

bool hasMessageAvailable;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
Expand Down Expand Up @@ -457,12 +464,12 @@ TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
Producer producer;
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer));

// 2. create reader, and expect hasMessageAvailable return false since no message produced.
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader));

bool hasMessageAvailable;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
Expand Down Expand Up @@ -511,7 +518,7 @@ TEST(ReaderTest, testPartitionIndex) {
"ReaderTestPartitionIndex-par-topic-" + std::to_string(time(nullptr));

int res = makePutRequest(
adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2");
adminUrl + "admin/v2/" + (isNonPersistentTopic ? "non-" : "") + "persistent/public/default/" + partitionedTopic + "/partitions", "2");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

const std::string partition0 = partitionedTopic + "-partition-0";
Expand All @@ -520,14 +527,14 @@ TEST(ReaderTest, testPartitionIndex) {
ReaderConfiguration readerConf;
Reader readers[3];
ASSERT_EQ(ResultOk,
client.createReader(nonPartitionedTopic, MessageId::earliest(), readerConf, readers[0]));
ASSERT_EQ(ResultOk, client.createReader(partition0, MessageId::earliest(), readerConf, readers[1]));
ASSERT_EQ(ResultOk, client.createReader(partition1, MessageId::earliest(), readerConf, readers[2]));
client.createReader(fullTopicName(nonPartitionedTopic), MessageId::earliest(), readerConf, readers[0]));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(partition0), MessageId::earliest(), readerConf, readers[1]));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(partition1), MessageId::earliest(), readerConf, readers[2]));

Producer producers[3];
ASSERT_EQ(ResultOk, client.createProducer(nonPartitionedTopic, producers[0]));
ASSERT_EQ(ResultOk, client.createProducer(partition0, producers[1]));
ASSERT_EQ(ResultOk, client.createProducer(partition1, producers[2]));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(nonPartitionedTopic), producers[0]));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(partition0), producers[1]));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(partition1), producers[2]));

for (auto& producer : producers) {
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build()));
Expand Down Expand Up @@ -555,7 +562,7 @@ TEST_P(ReaderTest, testSubscriptionNameSetting) {
ReaderConfiguration readerConf;
readerConf.setInternalSubscriptionName(subName);
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());

Expand All @@ -575,7 +582,7 @@ TEST_P(ReaderTest, testSetSubscriptionNameAndPrefix) {
readerConf.setInternalSubscriptionName(subName);
readerConf.setSubscriptionRolePrefix("my-prefix");
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());

Expand All @@ -594,13 +601,13 @@ TEST_P(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
ReaderConfiguration readerConf1;
readerConf1.setInternalSubscriptionName(subscriptionName);
Reader reader1;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf1, reader1));

ReaderConfiguration readerConf2;
readerConf2.setInternalSubscriptionName(subscriptionName);
Reader reader2;
ASSERT_EQ(ResultConsumerBusy,
client.createReader(topicName, MessageId::earliest(), readerConf2, reader2));
client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf2, reader2));

reader1.close();
reader2.close();
Expand All @@ -616,7 +623,7 @@ TEST_P(ReaderTest, testIsConnected) {
Reader reader;
ASSERT_FALSE(reader.isConnected());

ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), {}, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), {}, reader));
ASSERT_TRUE(reader.isConnected());

ASSERT_EQ(ResultOk, reader.close());
Expand All @@ -633,7 +640,7 @@ TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) {
ProducerConfiguration producerConf;
producerConf.setBatchingMaxMessages(3);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer));

std::vector<MessageId> messageIds;
constexpr int numMessages = 7;
Expand All @@ -657,14 +664,14 @@ TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) {
bool hasMessageAvailable;

for (size_t i = 0; i < messageIds.size() - 1; i++) {
ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds[i], {}, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), messageIds[i], {}, reader));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
EXPECT_TRUE(hasMessageAvailable);
}

// The start message ID is exclusive by default, so when we start at the last message, there should be no
// message available.
ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds.back(), {}, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), messageIds.back(), {}, reader));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
EXPECT_FALSE(hasMessageAvailable);
client.close();
Expand All @@ -678,7 +685,7 @@ TEST_P(ReaderTest, testReceiveAfterSeek) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

MessageId seekMessageId;
for (int i = 0; i < 5; i++) {
Expand All @@ -690,7 +697,7 @@ TEST_P(ReaderTest, testReceiveAfterSeek) {
}

Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), {}, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), {}, reader));

reader.seek(seekMessageId);

Expand Down Expand Up @@ -888,5 +895,5 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
}
}

INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Combine(::testing::Values(true, false), ::testing::Values(true, false)));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));
Loading