Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] BM_TCPEchoServerLatencyNQDRSubprocess benchmark #326

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions tests/c_benchmarks/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ void Socket::connect(const std::string &remoteAddress, unsigned short remotePort

void Socket::send(const void *buffer, int bufferLen) noexcept(false)
{
if (::send(mFileDescriptor, buffer, bufferLen, 0) < 0) {
// MSG_NOSIGNAL to avoid SIGPIPE
if (::send(mFileDescriptor, buffer, bufferLen, MSG_NOSIGNAL) < 0) {
throw SocketException("Send failed (send())", true);
}
}
Expand All @@ -129,7 +130,7 @@ int Socket::recv(void *buffer, int bufferLen) noexcept(false)
{
int rtn = ::recv(mFileDescriptor, buffer, bufferLen, 0);
if (rtn < 0) {
throw SocketException("Received failed (recv())", true);
throw SocketException("Receive failed (recv())", true);
}

return rtn;
Expand Down
8 changes: 8 additions & 0 deletions tests/c_benchmarks/TCPSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ TCPSocket::TCPSocket(const std::string &remoteAddress, unsigned short remotePort
TCPSocket::TCPSocket(int newConnSD) : Socket(newConnSD)
{
}

void TCPSocket::setRecvTimeout(std::chrono::duration<int64_t> duration)
{
struct timeval timeout;
timeout.tv_sec = std::chrono::duration_cast<std::chrono::seconds>(duration).count();
timeout.tv_usec = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
setsockopt (mFileDescriptor, SOL_SOCKET, SO_RCVTIMEO, (char*) &timeout, sizeof (timeout));
}
4 changes: 4 additions & 0 deletions tests/c_benchmarks/TCPSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#define QPID_DISPATCH_TCPSOCKET_HPP

#include "Socket.hpp"

#include <chrono>

class TCPSocket : public Socket
{
private:
Expand All @@ -36,6 +39,7 @@ class TCPSocket : public Socket
{
socket.mFileDescriptor = -1;
};
void setRecvTimeout(std::chrono::duration<int64_t> duration);
};

#endif // QPID_DISPATCH_TCPSOCKET_HPP
100 changes: 92 additions & 8 deletions tests/c_benchmarks/bm_tcp_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ extern "C" {
void qd_error_initialize();
} // extern "C"

static const bool VERBOSE = false;

static unsigned short findFreePort()
{
TCPServerSocket serverSocket(0);
Expand Down Expand Up @@ -180,31 +182,54 @@ class LatencyMeasure
int echoStringLen = echoString.length();

public:
inline TCPSocket createSocket(benchmark::State &state, unsigned short echoServerPort) {
for (int i = 0; i < 30; i++) {
try {
TCPSocket sock = try_to_connect(servAddress, echoServerPort);
sock.setRecvTimeout(std::chrono::seconds(10));
// run few times outside benchmark to clean the pipes first
latencyMeasureSendReceive(state, sock);
latencyMeasureSendReceive(state, sock);
latencyMeasureSendReceive(state, sock);
return sock;
} catch (SocketException&) {
if(VERBOSE) printf("latencyMeasureLoop: recv err\n");
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
throw std::runtime_error("Could not get working socket");
}
inline void latencyMeasureLoop(benchmark::State &state, unsigned short echoServerPort)
{
{
TCPSocket sock = try_to_connect(servAddress, echoServerPort);
latencyMeasureSendReceive(state, sock); // run once outside benchmark to clean the pipes first
TCPSocket sock = createSocket(state, echoServerPort);

for (auto _ : state) {
latencyMeasureSendReceive(state, sock);
if(VERBOSE) printf("latencyMeasureLoop: BEGIN state loop\n");

for (auto _ : state) {
if (!latencyMeasureSendReceive(state, sock)) {
state.SkipWithError("unable to read from socket");
break;
}
}

if(VERBOSE) printf("latencyMeasureLoop: END state loop\n");
}

inline void latencyMeasureSendReceive(benchmark::State &state, TCPSocket &sock)
inline bool latencyMeasureSendReceive(benchmark::State &state, TCPSocket &sock)
{
// todo: ensure we always receive what we sent, cannot build a bunch of in-flight values
sock.send(echoString.c_str(), echoStringLen);

int totalBytesReceived = 0;
while (totalBytesReceived < echoStringLen) {
int bytesReceived = sock.recv(echoBuffer, RCVBUFSIZE);
if (bytesReceived <= 0) {
state.SkipWithError("unable to read from socket");
return false; // we failed, bail out
}
totalBytesReceived += bytesReceived;
echoBuffer[bytesReceived] = '\0';
}
return true;
}
};

Expand Down Expand Up @@ -257,7 +282,7 @@ class DispatchRouterSubprocessTcpLatencyTest
int pid;

public:
DispatchRouterSubprocessTcpLatencyTest(std::string configName)
explicit DispatchRouterSubprocessTcpLatencyTest(std::string configName)
{
pid = fork();
if (pid == 0) {
Expand Down Expand Up @@ -361,3 +386,62 @@ static void BM_TCPEchoServerLatency2QDRSubprocess(benchmark::State &state)
}

BENCHMARK(BM_TCPEchoServerLatency2QDRSubprocess)->Unit(benchmark::kMillisecond);

static void BM_TCPEchoServerLatencyNQDRSubprocess(benchmark::State &state)
{
EchoServerThread est;

int N = state.range(0);
unsigned short tcpConnector = est.port();
unsigned short tcpListener = findFreePort();

std::vector<unsigned short> listeners{}; // first one is unused
listeners.reserve(N);
for (int i = 0; i < N; ++i) {
listeners.push_back(findFreePort());
}

std::string configName_1 = "BM_TCPEchoServerLatencyNQDRSubprocess_first.conf";
std::stringstream router_config_1 = multiRouterTcpConfig("QDRL1", {}, {listeners[1]}, tcpConnector, 0);
writeRouterConfig(configName_1, router_config_1);

std::string configName_2 = "BM_TCPEchoServerLatencyNQDRSubprocess_last.conf";
std::stringstream router_config_2 =
multiRouterTcpConfig("QDRL2", {listeners[listeners.size() - 1]}, {}, 0, tcpListener);
writeRouterConfig(configName_2, router_config_2);

DispatchRouterSubprocessTcpLatencyTest qdr1{configName_1};
DispatchRouterSubprocessTcpLatencyTest qdr2{configName_2};

// interior routers
std::vector<DispatchRouterSubprocessTcpLatencyTest> interior;
interior.reserve(N - 2);
for (int i = 1; i < N - 1; i++) {
std::stringstream ss;
ss << "BM_TCPEchoServerLatencyNQDRSubprocess_" << i << ".conf";
std::stringstream id;
id << "ROUTER" << i;
std::string configName = ss.str();
std::stringstream router_config = multiRouterTcpConfig(id.str(), {listeners[i]}, {listeners[i + 1]}, 0, 0);
writeRouterConfig(configName, router_config);

interior.emplace_back(configName); // this ends up starting router subprocesses; behold magic of C++
}

{
LatencyMeasure lm;
lm.latencyMeasureLoop(state, tcpListener);
}
}

BENCHMARK(BM_TCPEchoServerLatencyNQDRSubprocess)
->Unit(benchmark::kMillisecond)
->Arg(2)
->Arg(3)
->Arg(4)
->Arg(5)
->Arg(6)
->Arg(7)
->Arg(8)
->Arg(9)
;
20 changes: 11 additions & 9 deletions tests/c_benchmarks/echo_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,16 @@ class EchoServer
}
}

// will handle one TCP client and then it will return
// will handle TCP clients one at a time and it will return on failure (when server socket is closed, most likely)
void run()
{
try {
HandleTCPClient(servSock.accept());
} catch (SocketException &e) {
std::cerr << e.what() << std::endl;
while(true) {
try {
HandleTCPClient(servSock.accept());
} catch (SocketException &e) {
std::cerr << e.what() << std::endl; // TODO: error is expected now, make it silent
break;
}
}
}

Expand Down Expand Up @@ -87,24 +90,23 @@ class EchoServerThread
unsigned short echoServerPort;
std::thread u;

EchoServer es{0};

public:
EchoServerThread()
{
u = std::thread([this]() {
EchoServer es(0);
echoServerPort = es.port();
portLatch.notify();
es.run();
echoServerLatch.wait();
es.stop();
});

portLatch.wait();
}

~EchoServerThread()
{
echoServerLatch.notify();
es.stop(); // if recv failed, echo server may be stuck waiting to accept(); call stop() here
u.join();
}

Expand Down
9 changes: 8 additions & 1 deletion tests/c_unittests/helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@
// assertions without stack traces when running outside doctest
#ifndef QDR_DOCTEST
// https://stackoverflow.com/questions/3767869/adding-message-to-assert
#define REQUIRE(condition) assert(condition)
#define REQUIRE(condition) \
do { \
if (! (condition)) { \
std::cerr << "Assertion `" #condition "` failed in " << __FILE__ \
<< " line " << __LINE__ << std::endl; \
std::terminate(); \
} \
} while (false)
#define REQUIRE_MESSAGE(condition, message) \
do { \
if (! (condition)) { \
Expand Down