diff --git a/tests/c_benchmarks/Socket.cpp b/tests/c_benchmarks/Socket.cpp index 0b8637e95..227db5cbc 100644 --- a/tests/c_benchmarks/Socket.cpp +++ b/tests/c_benchmarks/Socket.cpp @@ -120,7 +120,8 @@ void Socket::connect(const std::string &remoteAddress, unsigned short remotePort void Socket::send(const void *buffer, int bufferLen) noexcept(false) { - if (::send(mFileDescriptor, buffer, bufferLen, 0) < 0) { + // MSG_NOSIGNAL to avoid SIGPIPE + if (::send(mFileDescriptor, buffer, bufferLen, MSG_NOSIGNAL) < 0) { throw SocketException("Send failed (send())", true); } } @@ -129,7 +130,7 @@ int Socket::recv(void *buffer, int bufferLen) noexcept(false) { int rtn = ::recv(mFileDescriptor, buffer, bufferLen, 0); if (rtn < 0) { - throw SocketException("Received failed (recv())", true); + throw SocketException("Receive failed (recv())", true); } return rtn; diff --git a/tests/c_benchmarks/TCPSocket.cpp b/tests/c_benchmarks/TCPSocket.cpp index 51ac9c65c..30a94f3a0 100644 --- a/tests/c_benchmarks/TCPSocket.cpp +++ b/tests/c_benchmarks/TCPSocket.cpp @@ -36,3 +36,11 @@ TCPSocket::TCPSocket(const std::string &remoteAddress, unsigned short remotePort TCPSocket::TCPSocket(int newConnSD) : Socket(newConnSD) { } + +void TCPSocket::setRecvTimeout(std::chrono::duration duration) +{ + struct timeval timeout; + timeout.tv_sec = std::chrono::duration_cast(duration).count(); + timeout.tv_usec = std::chrono::duration_cast(duration).count(); + setsockopt (mFileDescriptor, SOL_SOCKET, SO_RCVTIMEO, (char*) &timeout, sizeof (timeout)); +} diff --git a/tests/c_benchmarks/TCPSocket.hpp b/tests/c_benchmarks/TCPSocket.hpp index bffc7cee3..12040e705 100644 --- a/tests/c_benchmarks/TCPSocket.hpp +++ b/tests/c_benchmarks/TCPSocket.hpp @@ -23,6 +23,9 @@ #define QPID_DISPATCH_TCPSOCKET_HPP #include "Socket.hpp" + +#include + class TCPSocket : public Socket { private: @@ -36,6 +39,7 @@ class TCPSocket : public Socket { socket.mFileDescriptor = -1; }; + void setRecvTimeout(std::chrono::duration duration); }; #endif // QPID_DISPATCH_TCPSOCKET_HPP diff --git a/tests/c_benchmarks/bm_tcp_adapter.cpp b/tests/c_benchmarks/bm_tcp_adapter.cpp index 77452ed7f..fda4b5736 100644 --- a/tests/c_benchmarks/bm_tcp_adapter.cpp +++ b/tests/c_benchmarks/bm_tcp_adapter.cpp @@ -44,6 +44,8 @@ extern "C" { void qd_error_initialize(); } // extern "C" +static const bool VERBOSE = false; + static unsigned short findFreePort() { TCPServerSocket serverSocket(0); @@ -180,31 +182,54 @@ class LatencyMeasure int echoStringLen = echoString.length(); public: + inline TCPSocket createSocket(benchmark::State &state, unsigned short echoServerPort) { + for (int i = 0; i < 30; i++) { + try { + TCPSocket sock = try_to_connect(servAddress, echoServerPort); + sock.setRecvTimeout(std::chrono::seconds(10)); + // run few times outside benchmark to clean the pipes first + latencyMeasureSendReceive(state, sock); + latencyMeasureSendReceive(state, sock); + latencyMeasureSendReceive(state, sock); + return sock; + } catch (SocketException&) { + if(VERBOSE) printf("latencyMeasureLoop: recv err\n"); + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + throw std::runtime_error("Could not get working socket"); + } inline void latencyMeasureLoop(benchmark::State &state, unsigned short echoServerPort) { - { - TCPSocket sock = try_to_connect(servAddress, echoServerPort); - latencyMeasureSendReceive(state, sock); // run once outside benchmark to clean the pipes first + TCPSocket sock = createSocket(state, echoServerPort); - for (auto _ : state) { - latencyMeasureSendReceive(state, sock); + if(VERBOSE) printf("latencyMeasureLoop: BEGIN state loop\n"); + + for (auto _ : state) { + if (!latencyMeasureSendReceive(state, sock)) { + state.SkipWithError("unable to read from socket"); + break; } } + + if(VERBOSE) printf("latencyMeasureLoop: END state loop\n"); } - inline void latencyMeasureSendReceive(benchmark::State &state, TCPSocket &sock) + inline bool latencyMeasureSendReceive(benchmark::State &state, TCPSocket &sock) { + // todo: ensure we always receive what we sent, cannot build a bunch of in-flight values sock.send(echoString.c_str(), echoStringLen); int totalBytesReceived = 0; while (totalBytesReceived < echoStringLen) { int bytesReceived = sock.recv(echoBuffer, RCVBUFSIZE); if (bytesReceived <= 0) { - state.SkipWithError("unable to read from socket"); + return false; // we failed, bail out } totalBytesReceived += bytesReceived; echoBuffer[bytesReceived] = '\0'; } + return true; } }; @@ -257,7 +282,7 @@ class DispatchRouterSubprocessTcpLatencyTest int pid; public: - DispatchRouterSubprocessTcpLatencyTest(std::string configName) + explicit DispatchRouterSubprocessTcpLatencyTest(std::string configName) { pid = fork(); if (pid == 0) { @@ -361,3 +386,62 @@ static void BM_TCPEchoServerLatency2QDRSubprocess(benchmark::State &state) } BENCHMARK(BM_TCPEchoServerLatency2QDRSubprocess)->Unit(benchmark::kMillisecond); + +static void BM_TCPEchoServerLatencyNQDRSubprocess(benchmark::State &state) +{ + EchoServerThread est; + + int N = state.range(0); + unsigned short tcpConnector = est.port(); + unsigned short tcpListener = findFreePort(); + + std::vector listeners{}; // first one is unused + listeners.reserve(N); + for (int i = 0; i < N; ++i) { + listeners.push_back(findFreePort()); + } + + std::string configName_1 = "BM_TCPEchoServerLatencyNQDRSubprocess_first.conf"; + std::stringstream router_config_1 = multiRouterTcpConfig("QDRL1", {}, {listeners[1]}, tcpConnector, 0); + writeRouterConfig(configName_1, router_config_1); + + std::string configName_2 = "BM_TCPEchoServerLatencyNQDRSubprocess_last.conf"; + std::stringstream router_config_2 = + multiRouterTcpConfig("QDRL2", {listeners[listeners.size() - 1]}, {}, 0, tcpListener); + writeRouterConfig(configName_2, router_config_2); + + DispatchRouterSubprocessTcpLatencyTest qdr1{configName_1}; + DispatchRouterSubprocessTcpLatencyTest qdr2{configName_2}; + + // interior routers + std::vector interior; + interior.reserve(N - 2); + for (int i = 1; i < N - 1; i++) { + std::stringstream ss; + ss << "BM_TCPEchoServerLatencyNQDRSubprocess_" << i << ".conf"; + std::stringstream id; + id << "ROUTER" << i; + std::string configName = ss.str(); + std::stringstream router_config = multiRouterTcpConfig(id.str(), {listeners[i]}, {listeners[i + 1]}, 0, 0); + writeRouterConfig(configName, router_config); + + interior.emplace_back(configName); // this ends up starting router subprocesses; behold magic of C++ + } + + { + LatencyMeasure lm; + lm.latencyMeasureLoop(state, tcpListener); + } +} + +BENCHMARK(BM_TCPEchoServerLatencyNQDRSubprocess) + ->Unit(benchmark::kMillisecond) + ->Arg(2) + ->Arg(3) + ->Arg(4) + ->Arg(5) + ->Arg(6) + ->Arg(7) + ->Arg(8) + ->Arg(9) + ; \ No newline at end of file diff --git a/tests/c_benchmarks/echo_server.hpp b/tests/c_benchmarks/echo_server.hpp index 50ff59c04..3765c2909 100644 --- a/tests/c_benchmarks/echo_server.hpp +++ b/tests/c_benchmarks/echo_server.hpp @@ -48,13 +48,16 @@ class EchoServer } } - // will handle one TCP client and then it will return + // will handle TCP clients one at a time and it will return on failure (when server socket is closed, most likely) void run() { - try { - HandleTCPClient(servSock.accept()); - } catch (SocketException &e) { - std::cerr << e.what() << std::endl; + while(true) { + try { + HandleTCPClient(servSock.accept()); + } catch (SocketException &e) { + std::cerr << e.what() << std::endl; // TODO: error is expected now, make it silent + break; + } } } @@ -87,16 +90,15 @@ class EchoServerThread unsigned short echoServerPort; std::thread u; + EchoServer es{0}; + public: EchoServerThread() { u = std::thread([this]() { - EchoServer es(0); echoServerPort = es.port(); portLatch.notify(); es.run(); - echoServerLatch.wait(); - es.stop(); }); portLatch.wait(); @@ -104,7 +106,7 @@ class EchoServerThread ~EchoServerThread() { - echoServerLatch.notify(); + es.stop(); // if recv failed, echo server may be stuck waiting to accept(); call stop() here u.join(); } diff --git a/tests/c_unittests/helpers.hpp b/tests/c_unittests/helpers.hpp index 4bc92b7c5..ccaf8e8ce 100644 --- a/tests/c_unittests/helpers.hpp +++ b/tests/c_unittests/helpers.hpp @@ -33,7 +33,14 @@ // assertions without stack traces when running outside doctest #ifndef QDR_DOCTEST // https://stackoverflow.com/questions/3767869/adding-message-to-assert -#define REQUIRE(condition) assert(condition) +#define REQUIRE(condition) \ +do { \ + if (! (condition)) { \ + std::cerr << "Assertion `" #condition "` failed in " << __FILE__ \ + << " line " << __LINE__ << std::endl; \ + std::terminate(); \ + } \ +} while (false) #define REQUIRE_MESSAGE(condition, message) \ do { \ if (! (condition)) { \