diff --git a/cmake/install_dependencies.cmake b/cmake/install_dependencies.cmake index 8083153..494ec32 100644 --- a/cmake/install_dependencies.cmake +++ b/cmake/install_dependencies.cmake @@ -15,3 +15,5 @@ find_package(ctre REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) include_directories(${yaml-cpp_INCLUDE_DIRS}) + +include_directories(third_party/pumba/include) \ No newline at end of file diff --git a/configs/config.yaml b/configs/config.yaml index b0d6f3b..1fb1b21 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -13,7 +13,8 @@ acceptor: ip_version: 4 load_balancing: - algorithm: round_robin + algorithm: consistent_hash + replicas: 5 endpoints: - ip: "127.0.0.1" port: 8001 diff --git a/src/lb/tcp/selectors.cpp b/src/lb/tcp/selectors.cpp index 01fc462..77258cc 100644 --- a/src/lb/tcp/selectors.cpp +++ b/src/lb/tcp/selectors.cpp @@ -1,21 +1,19 @@ #include #include #include +#include +#include namespace lb::tcp { Backend::Backend(const std::string& ip_address, int port) : value(boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip_address), port)) -{ - TRACE("Backend {}:{} init as ip", ip_address, port); -} +{} Backend::Backend(const std::string& url_string) : value(Url(url_string)) -{ - TRACE("Backend {} init as url", url_string); -} +{} bool Backend::IsUrl() const { @@ -50,6 +48,24 @@ bool Backend::operator==(const Backend& other) const return false; } +std::string Backend::ToString() const +{ + if (IsUrl()) { + return AsUrl().ToString(); + } else { + return AsEndpoint().address().to_string() + ":" + std::to_string(AsEndpoint().port()); + } +} + +Backend::Backend(const Backend::EndpointType& ep) + : value(ep) +{} + +Backend::Backend(const Backend::UrlType& url) + : value(url) +{} + + std::ostream& operator<<(std::ostream& out, const Backend& backend) { if (backend.IsIpEndpoint()) { @@ -96,6 +112,15 @@ SelectorPtr DetectSelector(const YAML::Node& node) return wrr; } break; + case SelectorType::CONSISTENT_HASH: { + if (!balancing_node["replicas"].as()) { + EXCEPTION("Missed replicas field!"); + } + std::size_t replicas_num = balancing_node["replicas"].as(); + SelectorPtr ch = std::make_shared(replicas_num); + ch->Configure(balancing_node); + return ch; + } default: { STACKTRACE("Selector {} is not implemented", name); } @@ -265,4 +290,79 @@ void WeightedRoundRobinSelector::AdvanceCounter() } } +// ============================ ConsistentHashSelector ============================ + +BackendCHTraits::HashType BackendCHTraits::GetHash(const Backend& backend) +{ + return std::hash{}(backend.ToString()); +} + +std::vector +BackendCHTraits::Replicate(const Backend& backend, std::size_t replicas) +{ + std::vector result; + result.reserve(replicas); + + for (int i = 0; i < replicas; ++i) { + result.emplace_back(GetHash(backend.ToString() + "#" + boost::lexical_cast(i))); + } + + return result; +} + + +ConsistentHashSelector::ConsistentHashSelector(std::size_t spawn_replicas) + : ring_(spawn_replicas) +{} + +void ConsistentHashSelector::Configure(const YAML::Node &balancing_node) +{ + if (!balancing_node["endpoints"].IsDefined()) { + STACKTRACE("Consistent hash endpoints node is missed"); + } + const YAML::Node& ep_node = balancing_node["endpoints"]; + if (!ep_node.IsSequence()) { + EXCEPTION("endpoints node must be a sequence"); + } + + for (const YAML::Node& ep : ep_node) { + if (ep["url"].IsDefined()) { + ring_.EmplaceNode(ep["url"].as()); + continue; + } + + if (!ep["ip"].IsDefined()) { + STACKTRACE("{} missed {} field", ep, "ip"); + } + if (!ep["port"].IsDefined()) { + STACKTRACE("{} missed {} field", ep, "port"); + } + + ring_.EmplaceNode(ep["ip"].as(), ep["port"].as()); + } + + +} + +Backend ConsistentHashSelector::SelectBackend(const boost::asio::ip::tcp::endpoint& client) +{ + boost::mutex::scoped_lock lock(mutex_); + Backend result = ring_.SelectNode(client); + return result; +} + +void ConsistentHashSelector::ExcludeBackend(const Backend& backend) +{ + boost::mutex::scoped_lock lock(mutex_); + ring_.EraseNode(backend); + if (ring_.Empty()) { + EXCEPTION("All backends are excluded!"); + } +} + +SelectorType ConsistentHashSelector::Type() const +{ + return SelectorType::CONSISTENT_HASH; +} + } // namespace lb::tcp diff --git a/src/lb/tcp/selectors.hpp b/src/lb/tcp/selectors.hpp index bf6fdaf..97936fa 100644 --- a/src/lb/tcp/selectors.hpp +++ b/src/lb/tcp/selectors.hpp @@ -7,6 +7,8 @@ #include #include #include +#include + namespace lb::tcp { @@ -18,6 +20,10 @@ class Backend { bool IsUrl() const; bool IsIpEndpoint() const; + Backend(const EndpointType& ep); + + Backend(const UrlType& url); + Backend(const std::string& ip_address, int port); Backend(const std::string& url_string); @@ -26,6 +32,8 @@ class Backend { const EndpointType& AsEndpoint() const; bool operator==(const Backend& other) const; + + std::string ToString() const; private: std::variant value; }; @@ -94,4 +102,32 @@ class WeightedRoundRobinSelector final : public ISelector { }; +struct BackendCHTraits { + using HashType = std::size_t; + + static HashType GetHash(const Backend& backend); + + static std::vector + Replicate(const Backend& backend, std::size_t replicas); +}; + + +class ConsistentHashSelector final : public ISelector { +public: + + ConsistentHashSelector(std::size_t spawn_replicas = 1); + + void Configure(const YAML::Node& config) override; + + Backend SelectBackend(const boost::asio::ip::tcp::endpoint& client_socket) override; + + void ExcludeBackend(const Backend& backend) override; + + SelectorType Type() const override; + +private: + boost::mutex mutex_; + pumba::ConsistentHashingRouter ring_; // guarded by mutex +}; + } // namespace lb::tcp \ No newline at end of file diff --git a/src/lb/url.cpp b/src/lb/url.cpp index 7223250..1d599f6 100644 --- a/src/lb/url.cpp +++ b/src/lb/url.cpp @@ -27,6 +27,7 @@ namespace lb } Url::Url(const std::string &string) + : whole_url(string) { auto match_results = MatchUrl(string); if (!match_results) @@ -72,6 +73,11 @@ namespace lb return fragment; } + const std::string& Url::ToString() const noexcept + { + return whole_url; + } + bool operator==(const Url &lhs, const Url &rhs) { return std::tie(lhs.Protocol(), lhs.Hostname(), lhs.Port(), lhs.Path(), lhs.Query(), lhs.Fragment()) diff --git a/src/lb/url.hpp b/src/lb/url.hpp index 114fe19..770c180 100644 --- a/src/lb/url.hpp +++ b/src/lb/url.hpp @@ -20,7 +20,10 @@ class Url { const std::string& Fragment() const noexcept; static bool IsUrl(const std::string& src); + + const std::string& ToString() const noexcept; private: + std::string whole_url; std::string protocol; std::string hostname; std::string port; diff --git a/third_party/pumba/CMakeLists.txt b/third_party/pumba/CMakeLists.txt new file mode 100644 index 0000000..393472a --- /dev/null +++ b/third_party/pumba/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library(pumba INTERFACE) + +target_include_directories( + pumba + INTERFACE $ + +) diff --git a/third_party/pumba/include/pumba/pumba.hpp b/third_party/pumba/include/pumba/pumba.hpp new file mode 100644 index 0000000..8d2fdd0 --- /dev/null +++ b/third_party/pumba/include/pumba/pumba.hpp @@ -0,0 +1,6 @@ +#ifndef PUMBA_LIBRARY_INCLUDED +#define PUMBA_LIBRARY_INCLUDED + +#include + +#endif // macro PUMBA_LIBRARY_INCLUDED \ No newline at end of file diff --git a/third_party/pumba/include/pumba/router.hpp b/third_party/pumba/include/pumba/router.hpp new file mode 100644 index 0000000..9a33630 --- /dev/null +++ b/third_party/pumba/include/pumba/router.hpp @@ -0,0 +1,107 @@ +#pragma once + +#include +#include +#include + +namespace pumba { + +template +class ConsistentHashingRouter { +public: + + using HashType = typename HashTraits::HashType; + using NodeIt = typename std::list::iterator; + +public: + + explicit ConsistentHashingRouter(std::size_t replicas = 1) + : replicas_(replicas) + {} + + template + void EmplaceNode(Args&&... args) + { + physical_nodes_.emplace_back(std::forward(args)...); + NodeIt it = std::prev(physical_nodes_.end()); + SpawnReplicas(it); + } + + void InsertNode(NodeType&& node) + { + physical_nodes_.push_back(node); + NodeIt it = std::prev(physical_nodes_.end()); + SpawnReplicas(it); + } + + void EraseNode(NodeType&& node) { + if (auto it = ring_.find(HashTraits::GetHash(node)); it != ring_.end()) { + NodeIt node_it = it->second; + EraseReplicasOf(node_it); + physical_nodes_.erase(node_it); + } + } + + void EraseNode(const NodeType& node) { + if (auto it = ring_.find(HashTraits::GetHash(node)); it != ring_.end()) { + NodeIt node_it = it->second; + EraseReplicasOf(node_it); + physical_nodes_.erase(node_it); + } + } + + const NodeType& SelectNode(const NodeType& client) + { + if (Empty()) { + throw std::runtime_error("pumba::Router: no nodes on the ring"); + } + + HashType hash = HashTraits::GetHash(client); + auto result_it = ring_.lower_bound(hash); + if (result_it == ring_.end()) { + result_it = ring_.begin(); + } + return *(result_it->second); + } + + std::size_t PhysicalNodes() const noexcept + { + return physical_nodes_.size(); + } + + bool Empty() const noexcept + { + return physical_nodes_.empty(); + } + + std::size_t Replicas() const noexcept + { + return replicas_; + } +private: + + void EraseReplicasOf(NodeIt physical_node) + { + for (auto it = ring_.begin(); it != ring_.end();) { + if (it->second == physical_node) { + it = ring_.erase(it); + } else { + ++it; + } + } + } + + void SpawnReplicas(NodeIt physical_node) + { + for (const HashType& hash : HashTraits::Replicate(*physical_node, replicas_)) { + ring_[hash] = physical_node; + } + } + +private: + std::map ring_; + std::list physical_nodes_; + std::size_t replicas_; +}; + +} // namespace router \ No newline at end of file