Skip to content

Commit

Permalink
Feat: Implemented consistent-hashing load balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
bersen66 committed Apr 28, 2024
1 parent ef75ea0 commit ed9f0ef
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 7 deletions.
2 changes: 2 additions & 0 deletions cmake/install_dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ find_package(ctre REQUIRED)

include_directories(${Boost_INCLUDE_DIRS})
include_directories(${yaml-cpp_INCLUDE_DIRS})

include_directories(third_party/pumba/include)
3 changes: 2 additions & 1 deletion configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ acceptor:
ip_version: 4

load_balancing:
algorithm: round_robin
algorithm: consistent_hash
replicas: 5
endpoints:
- ip: "127.0.0.1"
port: 8001
Expand Down
112 changes: 106 additions & 6 deletions src/lb/tcp/selectors.cpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
#include <unordered_map>
#include <lb/tcp/selectors.hpp>
#include <lb/logging.hpp>
#include <pumba/pumba.hpp>
#include <boost/lexical_cast.hpp>

namespace lb::tcp
{

Backend::Backend(const std::string& ip_address, int port)
: value(boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip_address), port))
{
TRACE("Backend {}:{} init as ip", ip_address, port);
}
{}

Backend::Backend(const std::string& url_string)
: value(Url(url_string))
{
TRACE("Backend {} init as url", url_string);
}
{}

bool Backend::IsUrl() const
{
Expand Down Expand Up @@ -50,6 +48,24 @@ bool Backend::operator==(const Backend& other) const
return false;
}

std::string Backend::ToString() const
{
if (IsUrl()) {
return AsUrl().ToString();
} else {
return AsEndpoint().address().to_string() + ":" + std::to_string(AsEndpoint().port());
}
}

Backend::Backend(const Backend::EndpointType& ep)
: value(ep)
{}

Backend::Backend(const Backend::UrlType& url)
: value(url)
{}


std::ostream& operator<<(std::ostream& out, const Backend& backend)
{
if (backend.IsIpEndpoint()) {
Expand Down Expand Up @@ -96,6 +112,15 @@ SelectorPtr DetectSelector(const YAML::Node& node)
return wrr;
}
break;
case SelectorType::CONSISTENT_HASH: {
if (!balancing_node["replicas"].as<std::size_t>()) {
EXCEPTION("Missed replicas field!");
}
std::size_t replicas_num = balancing_node["replicas"].as<std::size_t>();
SelectorPtr ch = std::make_shared<ConsistentHashSelector>(replicas_num);
ch->Configure(balancing_node);
return ch;
}
default: {
STACKTRACE("Selector {} is not implemented", name);
}
Expand Down Expand Up @@ -265,4 +290,79 @@ void WeightedRoundRobinSelector::AdvanceCounter()
}
}

// ============================ ConsistentHashSelector ============================

BackendCHTraits::HashType BackendCHTraits::GetHash(const Backend& backend)
{
return std::hash<std::string>{}(backend.ToString());
}

std::vector<BackendCHTraits::HashType>
BackendCHTraits::Replicate(const Backend& backend, std::size_t replicas)
{
std::vector<HashType> result;
result.reserve(replicas);

for (int i = 0; i < replicas; ++i) {
result.emplace_back(GetHash(backend.ToString() + "#" + boost::lexical_cast<std::string>(i)));
}

return result;
}


ConsistentHashSelector::ConsistentHashSelector(std::size_t spawn_replicas)
: ring_(spawn_replicas)
{}

void ConsistentHashSelector::Configure(const YAML::Node &balancing_node)
{
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Consistent hash endpoints node is missed");
}
const YAML::Node& ep_node = balancing_node["endpoints"];
if (!ep_node.IsSequence()) {
EXCEPTION("endpoints node must be a sequence");
}

for (const YAML::Node& ep : ep_node) {
if (ep["url"].IsDefined()) {
ring_.EmplaceNode(ep["url"].as<std::string>());
continue;
}

if (!ep["ip"].IsDefined()) {
STACKTRACE("{} missed {} field", ep, "ip");
}
if (!ep["port"].IsDefined()) {
STACKTRACE("{} missed {} field", ep, "port");
}

ring_.EmplaceNode(ep["ip"].as<std::string>(), ep["port"].as<int>());
}


}

Backend ConsistentHashSelector::SelectBackend(const boost::asio::ip::tcp::endpoint& client)
{
boost::mutex::scoped_lock lock(mutex_);
Backend result = ring_.SelectNode(client);
return result;
}

void ConsistentHashSelector::ExcludeBackend(const Backend& backend)
{
boost::mutex::scoped_lock lock(mutex_);
ring_.EraseNode(backend);
if (ring_.Empty()) {
EXCEPTION("All backends are excluded!");
}
}

SelectorType ConsistentHashSelector::Type() const
{
return SelectorType::CONSISTENT_HASH;
}

} // namespace lb::tcp
36 changes: 36 additions & 0 deletions src/lb/tcp/selectors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/thread/mutex.hpp>
#include <lb/url.hpp>
#include <pumba/pumba.hpp>


namespace lb::tcp {

Expand All @@ -18,6 +20,10 @@ class Backend {
bool IsUrl() const;
bool IsIpEndpoint() const;

Backend(const EndpointType& ep);

Backend(const UrlType& url);

Backend(const std::string& ip_address, int port);

Backend(const std::string& url_string);
Expand All @@ -26,6 +32,8 @@ class Backend {
const EndpointType& AsEndpoint() const;

bool operator==(const Backend& other) const;

std::string ToString() const;
private:
std::variant<EndpointType, UrlType> value;
};
Expand Down Expand Up @@ -94,4 +102,32 @@ class WeightedRoundRobinSelector final : public ISelector {
};


struct BackendCHTraits {
using HashType = std::size_t;

static HashType GetHash(const Backend& backend);

static std::vector<HashType>
Replicate(const Backend& backend, std::size_t replicas);
};


class ConsistentHashSelector final : public ISelector {
public:

ConsistentHashSelector(std::size_t spawn_replicas = 1);

void Configure(const YAML::Node& config) override;

Backend SelectBackend(const boost::asio::ip::tcp::endpoint& client_socket) override;

void ExcludeBackend(const Backend& backend) override;

SelectorType Type() const override;

private:
boost::mutex mutex_;
pumba::ConsistentHashingRouter<Backend, BackendCHTraits> ring_; // guarded by mutex
};

} // namespace lb::tcp
6 changes: 6 additions & 0 deletions src/lb/url.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace lb
}

Url::Url(const std::string &string)
: whole_url(string)
{
auto match_results = MatchUrl(string);
if (!match_results)
Expand Down Expand Up @@ -72,6 +73,11 @@ namespace lb
return fragment;
}

const std::string& Url::ToString() const noexcept
{
return whole_url;
}

bool operator==(const Url &lhs, const Url &rhs)
{
return std::tie(lhs.Protocol(), lhs.Hostname(), lhs.Port(), lhs.Path(), lhs.Query(), lhs.Fragment())
Expand Down
3 changes: 3 additions & 0 deletions src/lb/url.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ class Url {
const std::string& Fragment() const noexcept;

static bool IsUrl(const std::string& src);

const std::string& ToString() const noexcept;
private:
std::string whole_url;
std::string protocol;
std::string hostname;
std::string port;
Expand Down
7 changes: 7 additions & 0 deletions third_party/pumba/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
add_library(pumba INTERFACE)

target_include_directories(
pumba
INTERFACE $<BUILD_INTERFACE: ${CMAKE_CURRENT_SOURCE_DIR}/include>

)
6 changes: 6 additions & 0 deletions third_party/pumba/include/pumba/pumba.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#ifndef PUMBA_LIBRARY_INCLUDED
#define PUMBA_LIBRARY_INCLUDED

#include <pumba/router.hpp>

#endif // macro PUMBA_LIBRARY_INCLUDED
107 changes: 107 additions & 0 deletions third_party/pumba/include/pumba/router.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#pragma once

#include <map>
#include <list>
#include <stdexcept>

namespace pumba {

template<typename NodeType, typename HashTraits>
class ConsistentHashingRouter {
public:

using HashType = typename HashTraits::HashType;
using NodeIt = typename std::list<NodeType>::iterator;

public:

explicit ConsistentHashingRouter(std::size_t replicas = 1)
: replicas_(replicas)
{}

template<typename... Args>
void EmplaceNode(Args&&... args)
{
physical_nodes_.emplace_back(std::forward<Args>(args)...);
NodeIt it = std::prev(physical_nodes_.end());
SpawnReplicas(it);
}

void InsertNode(NodeType&& node)
{
physical_nodes_.push_back(node);
NodeIt it = std::prev(physical_nodes_.end());
SpawnReplicas(it);
}

void EraseNode(NodeType&& node) {
if (auto it = ring_.find(HashTraits::GetHash(node)); it != ring_.end()) {
NodeIt node_it = it->second;
EraseReplicasOf(node_it);
physical_nodes_.erase(node_it);
}
}

void EraseNode(const NodeType& node) {
if (auto it = ring_.find(HashTraits::GetHash(node)); it != ring_.end()) {
NodeIt node_it = it->second;
EraseReplicasOf(node_it);
physical_nodes_.erase(node_it);
}
}

const NodeType& SelectNode(const NodeType& client)
{
if (Empty()) {
throw std::runtime_error("pumba::Router: no nodes on the ring");
}

HashType hash = HashTraits::GetHash(client);
auto result_it = ring_.lower_bound(hash);
if (result_it == ring_.end()) {
result_it = ring_.begin();
}
return *(result_it->second);
}

std::size_t PhysicalNodes() const noexcept
{
return physical_nodes_.size();
}

bool Empty() const noexcept
{
return physical_nodes_.empty();
}

std::size_t Replicas() const noexcept
{
return replicas_;
}
private:

void EraseReplicasOf(NodeIt physical_node)
{
for (auto it = ring_.begin(); it != ring_.end();) {
if (it->second == physical_node) {
it = ring_.erase(it);
} else {
++it;
}
}
}

void SpawnReplicas(NodeIt physical_node)
{
for (const HashType& hash : HashTraits::Replicate(*physical_node, replicas_)) {
ring_[hash] = physical_node;
}
}

private:
std::map<HashType, NodeIt> ring_;
std::list<NodeType> physical_nodes_;
std::size_t replicas_;
};

} // namespace router

0 comments on commit ed9f0ef

Please sign in to comment.