Skip to content

Commit

Permalink
Fix consumer might not subscribe after a reconnection (apache#438)
Browse files Browse the repository at this point in the history
Fixes apache#436

### Motivation

When a consumer starts grabbing the connection, it registers a timer after the operation timeout. When that
timer is expired, it will fail the connection and cancel the connection timer. However, it results a race
condition that:
  1. The consumer's connection is closed (e.g. the keep alive timer failed)
  2. The connection timer is registered on the executor and will trigger the reconnection after 100ms
  3. The connection timer is cancelled, then the reconnection won't start.

### Modifications

Cancel the `creationTimer_` once `HandlerBase#start` succeeded first
time. Add `testReconnectWhenFirstConnectTimedOut` to cover this case.
  • Loading branch information
BewareMyPower authored Aug 13, 2024
1 parent 2a69168 commit 5940cb5
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 6 deletions.
1 change: 1 addition & 0 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
const size_t poolIndex_;

friend class PulsarFriend;
friend class ConsumerTest;

void checkServerError(ServerError error, const std::string& message);

Expand Down
23 changes: 17 additions & 6 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
#include "HandlerBase.h"

#include <chrono>

#include "AsioDefines.h"
#include "Backoff.h"
#include "ClientConnection.h"
#include "ClientImpl.h"
Expand Down Expand Up @@ -63,6 +66,7 @@ void HandlerBase::start() {
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
auto self = weakSelf.lock();
if (self && !error) {
LOG_WARN("Cancel the pending reconnection due to the start timeout");
connectionFailed(ResultTimeout);
ASIO_ERROR ignored;
timer_->cancel(ignored);
Expand Down Expand Up @@ -118,13 +122,21 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
}
auto self = shared_from_this();
auto cnxFuture = getConnection(client, assignedBrokerUrl);
cnxFuture.addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
using namespace std::chrono;
auto before = high_resolution_clock::now();
cnxFuture.addListener([this, self, before](Result result, const ClientConnectionPtr& cnx) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx).addListener([this, self](Result result, bool) {
connectionOpened(cnx).addListener([this, self, before](Result result, bool) {
// Do not use bool, only Result.
reconnectionPending_ = false;
if (result != ResultOk && isResultRetryable(result)) {
if (result == ResultOk) {
connectionTimeMs_ =
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
// Prevent the creationTimer_ from cancelling the timer_ in future
ASIO_ERROR ignored;
creationTimer_->cancel(ignored);
LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
} else if (isResultRetryable(result)) {
scheduleReconnection();
}
});
Expand Down Expand Up @@ -194,8 +206,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig

void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const boost::optional<std::string>& assignedBrokerUrl) {
if (ec) {
LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec << "]");
return;
LOG_INFO(getName() << "Ignoring timer cancelled event, code[" << ec << "]");
} else {
epoch_++;
grabCnx(assignedBrokerUrl);
Expand Down
2 changes: 2 additions & 0 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
ClientConnectionWeakPtr connection_;
std::string redirectedClusterURI_;
std::atomic<long> firstRequestIdAfterConnect_{-1L};
std::atomic<long> connectionTimeMs_{0}; // only for tests

friend class ClientConnection;
friend class PulsarFriend;
friend class ConsumerTest;
};
} // namespace pulsar
#endif //_PULSAR_HANDLER_BASE_HEADER_
32 changes: 32 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
#include "ConsumerTest.h"

#include <gtest/gtest.h>
#include <pulsar/Client.h>

Expand All @@ -26,6 +28,7 @@
#include <map>
#include <mutex>
#include <set>
#include <string>
#include <thread>
#include <vector>

Expand Down Expand Up @@ -1467,4 +1470,33 @@ TEST(ConsumerTest, testMultiConsumerListenerAndAck) {
client.close();
}

// When a consumer starts grabbing the connection, it registers a timer after the operation timeout. When that
// timer is expired, it will fail the connection and cancel the connection timer. However, it results a race
// condition that:
// 1. The consumer's connection is closed (e.g. the keep alive timer failed)
// 2. The connection timer is registered on the executor and will trigger the reconnection after 100ms
// 3. The connection timer is cancelled, then the reconnection won't start.
TEST(ConsumerTest, testReconnectWhenFirstConnectTimedOut) {
ClientConfiguration conf;
conf.setOperationTimeoutSeconds(1);
Client client{lookupUrl, conf};

auto topic = "consumer-test-reconnect-when-first-connect-timed-out" + std::to_string(time(nullptr));
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));

auto timer = ConsumerTest::scheduleCloseConnection(consumer, std::chrono::seconds(1));
ASSERT_TRUE(timer != nullptr);
timer->wait();

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build()));

Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
ASSERT_EQ("msg", msg.getDataAsString());
client.close();
}

} // namespace pulsar
23 changes: 23 additions & 0 deletions tests/ConsumerTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
#include <chrono>
#include <memory>
#include <stdexcept>
#include <string>

#include "lib/ClientConnection.h"
#include "lib/ConsumerImpl.h"
#include "lib/ExecutorService.h"

using std::string;

Expand All @@ -28,5 +33,23 @@ class ConsumerTest {
static int getNumOfMessagesInQueue(const Consumer& consumer) {
return consumer.impl_->getNumOfPrefetchedMessages();
}

template <typename T>
static DeadlineTimerPtr scheduleCloseConnection(const Consumer& consumer, T delaySinceStartGrabCnx) {
auto impl = std::dynamic_pointer_cast<ConsumerImpl>(consumer.impl_);
if (!impl) {
throw std::runtime_error("scheduleCloseConnection can only be called on ConsumerImpl");
}

auto cnx = impl->getCnx().lock();
if (!cnx) {
return nullptr;
}
auto timer = cnx->executor_->createDeadlineTimer();
timer->expires_from_now(delaySinceStartGrabCnx -
std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); });
return timer;
}
};
} // namespace pulsar

0 comments on commit 5940cb5

Please sign in to comment.