Skip to content

Commit

Permalink
Fix: Added more tests, fixed some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
bersen66 committed May 15, 2024
1 parent 7dc8f29 commit ff08648
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 500 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@

build
CMakeUserPresets.json
.vscode
.vscode
tests/functional/__pycache__
Testing
22 changes: 11 additions & 11 deletions configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,7 @@ load_balancing:


# Example config of least connections
algorithm: least_connections
endpoints:
- ip: "127.0.0.1"
port: 8081
- ip: "127.0.0.2"
port: 8082
- ip: "127.0.0.3"
port: 8083

# Example config of least response time
# algorithm: least_response_time
# algorithm: least_connections
# endpoints:
# - ip: "127.0.0.1"
# port: 8081
Expand All @@ -45,6 +35,16 @@ load_balancing:
# - ip: "127.0.0.3"
# port: 8083

# Example config of least response time
algorithm: least_response_time
endpoints:
- ip: "127.0.0.1"
port: 8081
- ip: "127.0.0.2"
port: 8082
- ip: "127.0.0.3"
port: 8083

# Example config of round robin
# algorithm: round_robin
# endpoints:
Expand Down
2 changes: 1 addition & 1 deletion lbbuild.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ function BuildApp() {
conan install . --output-folder=build --build=missing --settings=build_type=$BUILD_TYPE
cd build
cmake .. -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DCMAKE_TOOLCHAIN_FILE=conan_toolchain.cmake
cmake --build . -j 4
cmake --build . -j 3
}

if ! command -v conan &> /dev/null
Expand Down
3 changes: 2 additions & 1 deletion src/lb/formatters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ namespace YAML {class Node;}
template <> struct fmt::formatter<boost::stacktrace::stacktrace> : ostream_formatter{};
template <> struct fmt::formatter<YAML::Node> : ostream_formatter{};
template <> struct fmt::formatter<lb::tcp::Backend> : ostream_formatter{};
template <> struct fmt::formatter<boost::beast::http::request<boost::beast::http::string_body>> : ostream_formatter{};
template <> struct fmt::formatter<boost::beast::http::request<boost::beast::http::string_body>> : ostream_formatter{};
template <> struct fmt::formatter<boost::beast::http::response<boost::beast::http::string_body>> : ostream_formatter{};
100 changes: 82 additions & 18 deletions src/lb/tcp/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <lb/tcp/connector.hpp>
#include <lb/tcp/session.hpp>

using SocketType = lb::tcp::HttpSession::SocketType;

namespace lb::tcp {

Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector)
Expand All @@ -10,38 +12,100 @@ Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector)
, selector(selector)
{}

SessionPtr MakeSession(SelectorPtr& selector, boost::asio::ip::tcp::socket client_socket, boost::asio::ip::tcp::socket server_socket, Backend backend)

class LeastConnectionsCallbacks : public StateNotifier
{
public:
LeastConnectionsCallbacks(Backend backend, SelectorPtr selector)
: StateNotifier()
, backend(std::move(backend))
, selector(std::dynamic_pointer_cast<LeastConnectionsSelector>(selector))
{}

void OnConnect() override
{
selector->IncreaseConnectionCount(backend);
}

void OnDisconnect() override
{
selector->DecreaseConnectionCount(backend);
}

using StateNotifier::StateNotifier;
private:
Backend backend;
std::shared_ptr<LeastConnectionsSelector> selector;
};


class LeastResponseTimeCallbacks : public StateNotifier
{
public:
using TimeType = decltype(std::chrono::high_resolution_clock::now());
public:
LeastResponseTimeCallbacks(Backend backend, SelectorPtr selector)
: StateNotifier()
, backend(std::move(backend))
, selector(std::dynamic_pointer_cast<LeastResponseTimeSelector>(selector))
{}

void OnRequestSent() override
{
response_begin = std::chrono::high_resolution_clock::now();
}

switch(selector->Type()) {
case SelectorType::LEAST_CONNECTIONS: {
std::shared_ptr<LeastConnectionsSelector> lc_selector = std::dynamic_pointer_cast<LeastConnectionsSelector>(selector);
return std::make_shared<LeastConnectionsHttpSession>(std::move(client_socket), std::move(server_socket), lc_selector, backend);
} break;
case SelectorType::LEAST_RESPONSE_TIME: {
std::shared_ptr<LeastResponseTimeSelector> lrt_selector = std::dynamic_pointer_cast<LeastResponseTimeSelector>(selector);
return std::make_shared<LeastResponseTimeHttpSession>(std::move(client_socket), std::move(server_socket), lrt_selector, backend);
} break;
default: {
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket));
}
void OnResponseReceive() override
{
response_end = std::chrono::high_resolution_clock::now();
std::chrono::duration<long, std::nano> duration = response_end - response_begin;
selector->AddResponseTime(backend, duration.count());
}

using StateNotifier::StateNotifier;
private:
Backend backend;
std::shared_ptr<LeastResponseTimeSelector> selector;
TimeType response_begin;
TimeType response_end;
};

SessionPtr MakeSession(SelectorPtr& selector,
SocketType client_socket,
SocketType server_socket,
Backend backend)
{
switch (selector->Type()) {
case SelectorType::LEAST_CONNECTIONS:
{
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket),
std::make_unique<LeastConnectionsCallbacks>(std::move(backend), selector));
} break;

case SelectorType::LEAST_RESPONSE_TIME:
{
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket),
std::make_unique<LeastResponseTimeCallbacks>(std::move(backend), selector));
} break;
default:
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket));
}
}


void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
void Connector::MakeAndRunSession(SocketType client_socket)
{
// TODO: selection of backend
DEBUG("In connector");
Backend backend = selector->SelectBackend(client_socket.remote_endpoint());

if (backend.IsIpEndpoint()) {
DEBUG("Is ip endpoint");
auto server_socket = std::make_shared<boost::asio::ip::tcp::socket>(client_socket.get_executor());
auto server_socket = std::make_shared<SocketType>(client_socket.get_executor());

server_socket->async_connect(
backend.AsEndpoint(),
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)] (const boost::system::error_code& error) mutable
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)]
(const boost::system::error_code& error) mutable
{
if (error) {
ERROR("{}", error.message());
Expand All @@ -56,7 +120,7 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
});
} else if (backend.IsUrl()) {
DEBUG("Is url");
auto server_socket = std::make_shared<boost::asio::ip::tcp::socket>(client_socket.get_executor());
auto server_socket = std::make_shared<SocketType>(client_socket.get_executor());

const auto& url = backend.AsUrl();
DEBUG("URL: hostname: {}, port: {}", url.Hostname(), url.Port());
Expand Down
3 changes: 1 addition & 2 deletions src/lb/tcp/connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ class Connector {
Connector& operator=(Connector&&) = delete;

void MakeAndRunSession(boost::asio::ip::tcp::socket client);
private:
void HandleAsyncResolve(const boost::system::error_code& ec, ResolverResults results, boost::asio::ip::tcp::socket client_socket);

private:
boost::asio::io_context& ioc;
boost::asio::ip::tcp::resolver resolver;
Expand Down
13 changes: 12 additions & 1 deletion src/lb/tcp/selectors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ SelectorPtr DetectSelector(const YAML::Node& node)
// ============================ RoundRobinSelector ============================
void RoundRobinSelector::Configure(const YAML::Node &balancing_node)
{
INFO("Configuring RoundRobinSelector");
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Round-robin endpoints node is missed");
}
Expand Down Expand Up @@ -219,6 +220,7 @@ std::size_t ReadWeight(const YAML::Node& ep)

void WeightedRoundRobinSelector::Configure(const YAML::Node &balancing_node)
{
INFO("Configuring WeightedRoundRobinSelector");
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Weighted-round-robin endpoints node is missed");
}
Expand Down Expand Up @@ -314,6 +316,7 @@ void WeightedRoundRobinSelector::AdvanceCounter()

void IpHashSelector::Configure(const YAML::Node& balancing_node)
{
INFO("Configuring IpHashSelector");
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Ip-hash endpoints node is missed");
}
Expand Down Expand Up @@ -375,7 +378,10 @@ SelectorType IpHashSelector::Type() const

BackendCHTraits::HashType BackendCHTraits::GetHash(const Backend& backend)
{
return std::hash<std::string>{}(backend.ToString());

static std::hash<std::string> hash{};
std::size_t res = hash(backend.ToString());
return res;
}

std::vector<BackendCHTraits::HashType>
Expand All @@ -398,6 +404,7 @@ ConsistentHashSelector::ConsistentHashSelector(std::size_t spawn_replicas)

void ConsistentHashSelector::Configure(const YAML::Node &balancing_node)
{
INFO("Configuring ConsistentHashSelector");
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Consistent hash endpoints node is missed");
}
Expand Down Expand Up @@ -429,13 +436,15 @@ Backend ConsistentHashSelector::SelectBackend(const boost::asio::ip::tcp::endpoi
{
boost::mutex::scoped_lock lock(mutex_);
Backend result = ring_.SelectNode(client);
DEBUG("Selected: {}", result);
return result;
}

void ConsistentHashSelector::ExcludeBackend(const Backend& backend)
{
boost::mutex::scoped_lock lock(mutex_);
ring_.EraseNode(backend);
DEBUG("Excluded: {}", backend);
if (ring_.Empty()) {
EXCEPTION("All backends are excluded!");
}
Expand All @@ -449,6 +458,7 @@ SelectorType ConsistentHashSelector::Type() const
// ============================ LeastConnectionsSelector ============================

void LeastConnectionsSelector::Configure(const YAML::Node& config) {
INFO("Configuring LeastConnectionsSelector");
if (!config["endpoints"].IsDefined()) {
STACKTRACE("Least connections endpoints node is missed");
}
Expand Down Expand Up @@ -546,6 +556,7 @@ void LeastConnectionsSelector::DecreaseConnectionCount(const Backend& backend)

void LeastResponseTimeSelector::Configure(const YAML::Node& config)
{
INFO("Configuring LeastResponseTimeSelector");
if (!config["endpoints"].IsDefined()) {
STACKTRACE("Least response time endpoints node is missed");
}
Expand Down
Loading

0 comments on commit ff08648

Please sign in to comment.