diff --git a/configure.ac b/configure.ac index a23c3bcf4..9768447b8 100644 --- a/configure.ac +++ b/configure.ac @@ -337,6 +337,20 @@ if test "x$iperf3_cv_header_tcp_info_snd_wnd" = "xyes"; then AC_DEFINE([HAVE_TCP_INFO_SND_WND], [1], [Have tcpi_snd_wnd field in tcp_info.]) fi +# Check for MSG_ZEROCOPY (mostly on Linux) +AC_CACHE_CHECK([MSG_ZEROCOPY send option], +[iperf3_cv_header_msg_zerocopy], +AC_COMPILE_IFELSE( + [AC_LANG_PROGRAM([[#include + #include + #include ]], + [[int foo = MSG_ZEROCOPY;]])], + iperf3_cv_header_msg_zerocopy=yes, + iperf3_cv_header_msg_zerocopy=no)) +if test "x$iperf3_cv_header_msg_zerocopy" = "xyes"; then + AC_DEFINE([HAVE_MSG_ZEROCOPY], [1], [Have MSG_ZEROCOPY send option.]) +fi + # Check if we need -lrt for clock_gettime AC_SEARCH_LIBS(clock_gettime, [rt posix4]) # Check for clock_gettime support diff --git a/src/iperf.h b/src/iperf.h index 527e549ed..497852223 100644 --- a/src/iperf.h +++ b/src/iperf.h @@ -83,6 +83,37 @@ typedef atomic_uint_fast64_t atomic_iperf_size_t; typedef unsigned int uint #endif // __vxworks or __VXWORKS__ +#if defined(HAVE_MSG_ZEROCOPY) && defined(HAVE_POLL_H) +#define SUPPORTED_MSG_ZEROCOPY 1 + +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 60 +#endif + +// FIXME: supposed to be in ? + +#ifndef SO_EE_ORIGIN_ZEROCOPY +#define SO_EE_ORIGIN_ZEROCOPY 5 +#endif + +#ifndef SO_EE_CODE_ZEROCOPY_COPIED +#define SO_EE_CODE_ZEROCOPY_COPIED 1 +#endif + +struct sock_extended_err +{ + uint32_t ee_errno; /* error number */ + uint8_t ee_origin; /* where the error originated */ + uint8_t ee_type; /* type */ + uint8_t ee_code; /* code */ + uint8_t ee_pad; /* padding */ + uint32_t ee_info; /* additional information */ + uint32_t ee_data; /* other data */ + /* More data may follow */ +}; + +#endif /* HAVE_MSG_ZEROCOPY && HAVE_POLL_H */ + struct iperf_interval_results { atomic_iperf_size_t bytes_transferred; /* bytes transferred in this interval */ @@ -230,6 +261,14 @@ struct iperf_stream int (*rcv2) (struct iperf_stream * stream); int (*snd2) (struct iperf_stream * stream); +#if defined(SUPPORTED_MSG_ZEROCOPY) + /* used when sending using MSG_ZEROCOPY */ + long completions; + long expected_completions; + int zerocopied; + uint32_t next_completion; +#endif /* SUPPORTED_MSG_ZEROCOPY */ + // struct iperf_stream *next; SLIST_ENTRY(iperf_stream) streams; @@ -331,7 +370,7 @@ struct iperf_test int verbose; /* -V option - verbose mode */ int json_output; /* -J option - JSON output */ int json_stream; /* --json-stream */ - int zerocopy; /* -Z option - use sendfile */ + int zerocopy; /* -Z option - use sendfile or MSG_ZEROCOPY for TCP, MSG_ZEROCOPY for UDP */ int debug; /* -d option - enable debug */ enum debug_level debug_level; /* -d option option - level of debug messages to show */ int get_server_output; /* --get-server-output */ @@ -459,4 +498,8 @@ extern int gerror; /* error value from getaddrinfo(3), for use in internal error /* In Reverse mode, maximum number of packets to wait for "accept" response - to handle out of order packets */ #define MAX_REVERSE_OUT_OF_ORDER_PACKETS 2 +/* Zerocopy methood use - sendfile() or MSG_ZEROCOPY (for UDP only MSG_ZEROCOPY is supported) */ +#define ZEROCOPY_SENDFILE 1 +#define ZEROCOPY_MSG_ZEROCOPY 2 + #endif /* !__IPERF_H */ diff --git a/src/iperf_api.c b/src/iperf_api.c index 4c73e8328..092d4c071 100644 --- a/src/iperf_api.c +++ b/src/iperf_api.c @@ -69,8 +69,8 @@ #include #endif /* HAVE_SETPROCESSAFFINITYMASK */ -#include "net.h" #include "iperf.h" +#include "net.h" #include "iperf_api.h" #include "iperf_udp.h" #include "iperf_tcp.h" @@ -702,7 +702,7 @@ iperf_has_zerocopy( void ) void iperf_set_test_zerocopy(struct iperf_test *ipt, int zerocopy) { - ipt->zerocopy = (zerocopy && has_sendfile()); + ipt->zerocopy = (zerocopy && (ipt->protocol->id == Pudp ? 1 : has_sendfile())); } void @@ -1104,7 +1104,11 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv) #if defined(HAVE_FLOWLABEL) {"flowlabel", required_argument, NULL, 'L'}, #endif /* HAVE_FLOWLABEL */ +#if defined(SUPPORTED_MSG_ZEROCOPY) + {"zerocopy", optional_argument, NULL, 'Z'}, +#else {"zerocopy", no_argument, NULL, 'Z'}, +#endif /* SUPPORTED_MSG_ZEROCOPY */ {"omit", required_argument, NULL, 'O'}, {"file", required_argument, NULL, 'F'}, {"repeating-payload", no_argument, NULL, OPT_REPEATING_PAYLOAD}, @@ -1467,11 +1471,22 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv) TAILQ_INSERT_TAIL(&test->xbind_addrs, xbe, link); break; case 'Z': +#if defined(SUPPORTED_MSG_ZEROCOPY) + if (optarg && strcmp(optarg, "")) { + if (!strcmp(optarg, "z")) + test->zerocopy = ZEROCOPY_MSG_ZEROCOPY; + else { + i_errno = IENOSENDFILE; + return -1; + } + } else +#endif /* SUPPORTED_MSG_ZEROCOPY */ if (!has_sendfile()) { i_errno = IENOSENDFILE; return -1; + } else { + test->zerocopy = ZEROCOPY_SENDFILE; } - test->zerocopy = 1; client_flag = 1; break; case OPT_REPEATING_PAYLOAD: @@ -1744,6 +1759,28 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv) return -1; } +#if defined(SUPPORTED_MSG_ZEROCOPY) + // UDP supports "zero copy" only using MSG_ZEROCOPY + if (test->protocol->id == Pudp && test->zerocopy) + test->zerocopy = ZEROCOPY_MSG_ZEROCOPY; + // Zero copy for TCP use sendfile() + if (test->zerocopy && test->protocol->id != Pudp && !has_sendfile()) { + i_errno = IENOSENDFILE; + return -1; + } + // Using MSG_ZEROCOPY is not supported when disk file is used + if (test->diskfile_name != (char*) 0 && test->zerocopy == ZEROCOPY_MSG_ZEROCOPY) { + i_errno = IEDISKFILEZEROCOPY; + return -1; + } +#else + // Zero copy is supported only by TCP + if (test->zerocopy && test->protocol->id != Ptcp) { + i_errno = IENOSENDFILE; + return -1; + } +#endif /* SUPPORTED_MSG_ZEROCOPY */ + if (blksize == 0) { if (test->protocol->id == Pudp) blksize = 0; /* try to dynamically determine from MSS */ @@ -4464,6 +4501,12 @@ iperf_new_stream(struct iperf_test *test, int s, int sender) sp->snd = test->protocol->send; sp->rcv = test->protocol->recv; +#if defined(SUPPORTED_MSG_ZEROCOPY) + // Note: sp->next_completion is not initialized to 1, since the first + // SO_EE_ORIGIN_ZEROCOPY messge value is zero (and not 1 as expected). + sp->zerocopied = -1; +#endif /* SUPPORTED_MSG_ZEROCOPY */ + if (test->diskfile_name != (char*) 0) { sp->diskfile_fd = open(test->diskfile_name, sender ? O_RDONLY : (O_WRONLY|O_CREAT|O_TRUNC), S_IRUSR|S_IWUSR); if (sp->diskfile_fd == -1) { diff --git a/src/iperf_api.h b/src/iperf_api.h index 131314243..3666b0195 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -420,6 +420,7 @@ enum { IESNDTIMEOUT = 33, // Illegal message send timeout IEUDPFILETRANSFER = 34, // Cannot transfer file using UDP IESERVERAUTHUSERS = 35, // Cannot access authorized users file + IEDISKFILEZEROCOPY = 36, // Sending disk file using MSG_ZEROCOPY is not supported /* Test errors */ IENEWTEST = 100, // Unable to create a new test (check perror) IEINITTEST = 101, // Test initialization failed (check perror) diff --git a/src/iperf_client_api.c b/src/iperf_client_api.c index 7ad4c939b..fe79ff3fa 100644 --- a/src/iperf_client_api.c +++ b/src/iperf_client_api.c @@ -399,7 +399,7 @@ iperf_connect(struct iperf_test *test) /* Create and connect the control channel */ if (test->ctrl_sck < 0) // Create the control channel using an ephemeral port - test->ctrl_sck = netdial(test->settings->domain, Ptcp, test->bind_address, test->bind_dev, 0, test->server_hostname, test->server_port, test->settings->connect_timeout); + test->ctrl_sck = netdial(test->settings->domain, Ptcp, test->bind_address, test->bind_dev, 0, test->server_hostname, test->server_port, test->settings->connect_timeout, 0); if (test->ctrl_sck < 0) { i_errno = IECONNECT; return -1; @@ -720,6 +720,11 @@ iperf_run_client(struct iperf_test * test) SLIST_FOREACH(sp, &test->streams, streams) { if (sp->sender) { int rc; +#if defined(SUPPORTED_MSG_ZEROCOPY) + if (sp->test->zerocopy == ZEROCOPY_MSG_ZEROCOPY) { + wait_zerocopy_buffer_available(sp); /* Wait until last message is sent */ + } +#endif /* SUPPORTED_MSG_ZEROCOPY */ sp->done = 1; rc = pthread_cancel(sp->thr); if (rc != 0 && rc != ESRCH) { diff --git a/src/iperf_error.c b/src/iperf_error.c index 0fedf3110..fc3ac3352 100644 --- a/src/iperf_error.c +++ b/src/iperf_error.c @@ -210,7 +210,14 @@ iperf_strerror(int int_errno) snprintf(errstr, len, "TCP MSS too large (maximum = %d bytes)", MAX_MSS); break; case IENOSENDFILE: +#if defined(SUPPORTED_MSG_ZEROCOPY) + snprintf(errstr, len, "invalid zerocopy option value or this OS does not support sendfile"); +#else snprintf(errstr, len, "this OS does not support sendfile"); +#endif /* SUPPORTED_MSG_ZEROCOPY */ + break; + case IEDISKFILEZEROCOPY: + snprintf(errstr, len, "Sending disk file using MSG_ZEROCOPY is not supported"); break; case IEOMIT: snprintf(errstr, len, "bogus value for --omit"); diff --git a/src/iperf_locale.c b/src/iperf_locale.c index 9d94e0234..cbd0c125e 100644 --- a/src/iperf_locale.c +++ b/src/iperf_locale.c @@ -198,7 +198,12 @@ const char usage_longstr[] = "Usage: iperf3 [-s|-c host] [options]\n" #if defined(HAVE_FLOWLABEL) " -L, --flowlabel N set the IPv6 flow label (only supported on Linux)\n" #endif /* HAVE_FLOWLABEL */ - " -Z, --zerocopy use a 'zero copy' method of sending data\n" +#if defined(HAVE_MSG_ZEROCOPY) && defined(HAVE_POLL_H) + " -Z, --zerocopy[=z] for UDP use MSG_ZEROCOPY 'zero copy' method for sending data;\n" + " for TCP, use sendfile() uless '=z' is set for using MSG_ZEROCOPY\n" +#else + " -Z, --zerocopy use `sendfile()` for 'zero copy' send of TCP data\n" +#endif /* SUPPORTED_MSG_ZEROCOPY */ " -O, --omit N perform pre-test for N seconds and omit the pre-test statistics\n" " -T, --title str prefix every output line with this string\n" " --extra-data str data string to include in client and server JSON\n" diff --git a/src/iperf_server_api.c b/src/iperf_server_api.c index 7d512081c..573931275 100644 --- a/src/iperf_server_api.c +++ b/src/iperf_server_api.c @@ -420,6 +420,11 @@ cleanup_server(struct iperf_test *test) int i_errno_save = i_errno; SLIST_FOREACH(sp, &test->streams, streams) { int rc; +#if defined(SUPPORTED_MSG_ZEROCOPY) + if (sp->sender && sp->test->zerocopy == ZEROCOPY_MSG_ZEROCOPY) { + wait_zerocopy_buffer_available(sp); /* Wait until last message is sent */ + } +#endif /* SUPPORTED_MSG_ZEROCOPY */ sp->done = 1; rc = pthread_cancel(sp->thr); if (rc != 0 && rc != ESRCH) { diff --git a/src/iperf_tcp.c b/src/iperf_tcp.c index e025515ab..97e46170c 100644 --- a/src/iperf_tcp.c +++ b/src/iperf_tcp.c @@ -88,6 +88,18 @@ iperf_tcp_send(struct iperf_stream *sp) if (!sp->pending_size) sp->pending_size = sp->settings->blksize; +#if defined(SUPPORTED_MSG_ZEROCOPY) + if (sp->test->zerocopy == ZEROCOPY_MSG_ZEROCOPY) { + /* Wait until it is safe to rewite the sending buffer */ + r = wait_zerocopy_buffer_available(sp); + if (r < 0) { + if (sp->test->debug_level >= DEBUG_LEVEL_INFO) + printf("Waining for TCP MSG_ZEROCOPY buffer to become available failed, errno=%s\n", strerror(errno)); + return r; + } + r = Nsend_sp(sp, sp->buffer, sp->pending_size, Ptcp, MSG_ZEROCOPY); + } else +#endif /* SUPPORTED_MSG_ZEROCOPY */ if (sp->test->zerocopy) r = Nsendfile(sp->buffer_fd, sp->socket, sp->buffer, sp->pending_size); else @@ -120,12 +132,27 @@ iperf_tcp_accept(struct iperf_test * test) char cookie[COOKIE_SIZE] = {0}; socklen_t len; struct sockaddr_storage addr; +#if defined(SUPPORTED_MSG_ZEROCOPY) + int opt; +#endif /* SUPPORTED_MSG_ZEROCOPY */ len = sizeof(addr); if ((s = accept(test->listener, (struct sockaddr *) &addr, &len)) < 0) { i_errno = IESTREAMCONNECT; return -1; } + +#if defined(SUPPORTED_MSG_ZEROCOPY) + /* Setting should be done before the socket is conected */ + if (test->zerocopy == ZEROCOPY_MSG_ZEROCOPY) { + opt = 1; + if (setsockopt(s, SOL_SOCKET, SO_ZEROCOPY, &opt, sizeof(opt)) < 0) { + i_errno = IESTREAMACCEPT; + return -1; + } + } +#endif /* SUPPORTED_MSG_ZEROCOPY */ + #if defined(HAVE_SO_MAX_PACING_RATE) /* If fq socket pacing is specified, enable it. */ @@ -380,7 +407,7 @@ iperf_tcp_connect(struct iperf_test *test) int saved_errno; int rcvbuf_actual, sndbuf_actual; - s = create_socket(test->settings->domain, SOCK_STREAM, test->bind_address, test->bind_dev, test->bind_port, test->server_hostname, test->server_port, &server_res); + s = create_socket(test->settings->domain, SOCK_STREAM, test->bind_address, test->bind_dev, test->bind_port, test->server_hostname, test->server_port, &server_res, test->zerocopy); if (s < 0) { i_errno = IESTREAMCONNECT; return -1; diff --git a/src/iperf_time.c b/src/iperf_time.c index a435dd30d..4afff1cdb 100644 --- a/src/iperf_time.c +++ b/src/iperf_time.c @@ -86,6 +86,16 @@ iperf_time_in_usecs(struct iperf_time *time) return time->secs * 1000000LL + time->usecs; } +uint64_t +iperf_time_now_in_usecs() +{ + struct iperf_time time; + + iperf_time_now(&time); + return iperf_time_in_usecs(&time); +} + + double iperf_time_in_secs(struct iperf_time *time) { diff --git a/src/iperf_time.h b/src/iperf_time.h index 588ee2624..34ef20080 100644 --- a/src/iperf_time.h +++ b/src/iperf_time.h @@ -36,6 +36,8 @@ struct iperf_time { int iperf_time_now(struct iperf_time *time1); +uint64_t iperf_time_now_in_usecs(); + void iperf_time_add_usecs(struct iperf_time *time1, uint64_t usecs); int iperf_time_compare(struct iperf_time *time1, struct iperf_time *time2); diff --git a/src/iperf_udp.c b/src/iperf_udp.c index a603236df..13e2fdde6 100644 --- a/src/iperf_udp.c +++ b/src/iperf_udp.c @@ -201,6 +201,21 @@ iperf_udp_send(struct iperf_stream *sp) int r; int size = sp->settings->blksize; struct iperf_time before; + int sock_opt = 0; + +#if defined(SUPPORTED_MSG_ZEROCOPY) + if (sp->test->zerocopy == ZEROCOPY_MSG_ZEROCOPY) { + sock_opt = MSG_ZEROCOPY; + + /* Wait until it is safe to rewite the sending buffer */ + r = wait_zerocopy_buffer_available(sp); + if (r < 0) { + if (sp->test->debug_level >= DEBUG_LEVEL_INFO) + printf("Waining for UDP MSG_ZEROCOPY buffer to become available failed, errno=%s\n", strerror(errno)); + return r; + } + } +#endif /* SUPPORTED_MSG_ZEROCOPY */ iperf_time_now(&before); @@ -234,7 +249,12 @@ iperf_udp_send(struct iperf_stream *sp) } - r = Nwrite(sp->socket, sp->buffer, size, Pudp); +#if defined(SUPPORTED_MSG_ZEROCOPY) + if (sock_opt & MSG_ZEROCOPY) { + r = Nsend_sp(sp, sp->buffer, size, Pudp, sock_opt); + } else +#endif /* SUPPORTED_MSG_ZEROCOPY */ + r = Nsend(sp->socket, sp->buffer, size, Pudp, sock_opt); if (r <= 0) { --sp->packet_count; /* Don't count messages that no data was sent from them. @@ -242,6 +262,8 @@ iperf_udp_send(struct iperf_stream *sp) if (r < 0) { if (r == NET_SOFTERROR && sp->test->debug_level >= DEBUG_LEVEL_INFO) printf("UDP send failed on NET_SOFTERROR. errno=%s\n", strerror(errno)); + else + printf("UDP send failed. errno=%s\n", strerror(errno)); return r; } } @@ -373,6 +395,9 @@ iperf_udp_accept(struct iperf_test *test) socklen_t len; int sz, s; int rc; +#if defined(SUPPORTED_MSG_ZEROCOPY) + int opt; +#endif /* SUPPORTED_MSG_ZEROCOPY */ /* * Get the current outstanding socket. This socket will be used to handle @@ -380,6 +405,17 @@ iperf_udp_accept(struct iperf_test *test) */ s = test->prot_listener; +#if defined(SUPPORTED_MSG_ZEROCOPY) + /* Setting should be done before the socket is conected */ + if (test->zerocopy == ZEROCOPY_MSG_ZEROCOPY) { + opt = 1; + if (setsockopt(s, SOL_SOCKET, SO_ZEROCOPY, &opt, sizeof(opt)) < 0) { + i_errno = IESTREAMACCEPT; + return -1; + } + } +#endif /* SUPPORTED_MSG_ZEROCOPY */ + /* * Grab the UDP packet sent by the client. From that we can extract the * client's address, and then use that information to bind the remote side @@ -446,6 +482,7 @@ iperf_udp_accept(struct iperf_test *test) /* * Create a new "listening" socket to replace the one we were using before. */ + FD_CLR(test->prot_listener, &test->read_set); // No control messages from old listener test->prot_listener = netannounce(test->settings->domain, Pudp, test->bind_address, test->bind_dev, test->server_port); if (test->prot_listener < 0) { i_errno = IESTREAMLISTEN; @@ -507,7 +544,7 @@ iperf_udp_connect(struct iperf_test *test) int i, max_len_wait_for_reply; /* Create and bind our local socket. */ - if ((s = netdial(test->settings->domain, Pudp, test->bind_address, test->bind_dev, test->bind_port, test->server_hostname, test->server_port, -1)) < 0) { + if ((s = netdial(test->settings->domain, Pudp, test->bind_address, test->bind_dev, test->bind_port, test->server_hostname, test->server_port, -1, test->zerocopy)) < 0) { i_errno = IESTREAMCONNECT; return -1; } diff --git a/src/net.c b/src/net.c index 632ae0319..d5c982d60 100644 --- a/src/net.c +++ b/src/net.c @@ -38,6 +38,7 @@ #include #include #include +#include #ifdef HAVE_SENDFILE #ifdef linux @@ -124,11 +125,14 @@ timeout_connect(int s, const struct sockaddr *name, socklen_t namelen, /* create a socket */ int -create_socket(int domain, int proto, const char *local, const char *bind_dev, int local_port, const char *server, int port, struct addrinfo **server_res_out) +create_socket(int domain, int proto, const char *local, const char *bind_dev, int local_port, const char *server, int port, struct addrinfo **server_res_out, int zerocopy) { struct addrinfo hints, *local_res = NULL, *server_res = NULL; int s, saved_errno; char portstr[6]; +#if defined(SUPPORTED_MSG_ZEROCOPY) + int opt; +#endif /* SUPPORTED_MSG_ZEROCOPY */ if (local) { memset(&hints, 0, sizeof(hints)); @@ -157,6 +161,22 @@ create_socket(int domain, int proto, const char *local, const char *bind_dev, in return -1; } +#if defined(SUPPORTED_MSG_ZEROCOPY) + /* Setting should be done before the socket is conected */ + if (zerocopy == ZEROCOPY_MSG_ZEROCOPY) { + opt = 1; + + if (setsockopt(s, SOL_SOCKET, SO_ZEROCOPY, &opt, sizeof(opt)) < 0) { + saved_errno = errno; + close(s); + freeaddrinfo(local_res); + freeaddrinfo(server_res); + errno = saved_errno; + return -1; + } + } +#endif /* SUPPORTED_MSG_ZEROCOPY */ + if (bind_dev) { #if defined(HAVE_SO_BINDTODEVICE) if (setsockopt(s, SOL_SOCKET, SO_BINDTODEVICE, @@ -234,12 +254,12 @@ create_socket(int domain, int proto, const char *local, const char *bind_dev, in /* make connection to server */ int -netdial(int domain, int proto, const char *local, const char *bind_dev, int local_port, const char *server, int port, int timeout) +netdial(int domain, int proto, const char *local, const char *bind_dev, int local_port, const char *server, int port, int timeout, int zerocopy) { struct addrinfo *server_res = NULL; int s, saved_errno; - s = create_socket(domain, proto, local, bind_dev, local_port, server, port, &server_res); + s = create_socket(domain, proto, local, bind_dev, local_port, server, port, &server_res, zerocopy); if (s < 0) { return -1; } @@ -454,18 +474,230 @@ Nread(int fd, char *buf, size_t count, int prot) } +#if defined(SUPPORTED_MSG_ZEROCOPY) + +#define ZC_MAX_ZEROCOPY_POLL_TIMEOUT 60000 // ms +#define ZC_MIN_ZEROCOPY_POLL_TIMEOUT 500 // ms +#define ZC_CFG_WAITTIME_US 500000 // us + +static int do_recv_completion(struct iperf_stream *sp) +{ + struct sock_extended_err *serr; + struct msghdr msg = {}; + struct cmsghdr *cm; + uint32_t hi, lo, range; + int ret, zerocopy; + char control[128]; + + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + ret = recvmsg(sp->socket, &msg, MSG_ERRQUEUE); + if (ret == -1) { + if (errno == EAGAIN) { + if (sp->test->debug_level >= DEBUG_LEVEL_DEBUG) + printf("notification recvmsg() failed with EAGAIN, completions=%lu, socket=%d, errno=%s;\n", sp->completions, sp->socket, strerror(errno)); + return 0; + } else { + // error(1, errno, "recvmsg notification"); + error(0, errno, "notification recvmsg() error, completions=%lu, socket=%d", sp->completions, sp->socket); + return -1; + } + } + if (msg.msg_flags & MSG_CTRUNC) { + //error(1, errno, "recvmsg notification: truncated"); + error(0, errno, "notification recvmsg() truncated, completions=%lu, socket=%d", sp->completions, sp->socket); + return -1; + } + + cm = CMSG_FIRSTHDR(&msg); + if (!cm) { + // error(1, 0, "cmsg: no cmsg"); + error(0, errno, "notification cmsg: no cmsg, completions=%lu, socket=%d", sp->completions, sp->socket); + return -1; + } + + if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || + (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR) )) + // || (cm->cmsg_level == SOL_PACKET && cm->cmsg_type == PACKET_TX_TIMESTAMP))) + { + // error(1, 0, "serr: wrong type: %d.%d", cm->cmsg_level, cm->cmsg_type); + error(0, errno, "notification serr: wrong type: %d.%d, completions=%lu, socket=%d", cm->cmsg_level, cm->cmsg_type, sp->completions, sp->socket); + return -1; + } + + serr = (void *) CMSG_DATA(cm); + + if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { + // error(1, 0, "serr: wrong origin: %u", serr->ee_origin); + error(0, errno, "notification serr: wrong origin: %u, completions=%lu, socket=%d", serr->ee_origin, sp->completions, sp->socket); + return -1; + } + if (serr->ee_errno != 0) { + // error(1, 0, "serr: wrong error code: %u", serr->ee_errno); + error(0, errno, "notification serr: wrong error code: %u, completions=%lu, socket=%d", serr->ee_errno, sp->completions, sp->socket); + return -1; + } + + hi = serr->ee_data; + lo = serr->ee_info; + range = hi - lo + 1; + if (sp->test->debug_level >= DEBUG_LEVEL_DEBUG) + printf("notification lo=%u, hi=%u, range=%u, completions=%lu, socket=%d;\n", lo, hi, range, sp->completions, sp->socket); + + /* Detect notification gaps. These should not happen often, if at all. + * Gaps can occur due to drops, reordering and retransmissions. + */ + //if (lo != next_completion) fprintf(stderr, "gap: %u..%u does not append to %u\n", lo, hi, next_completion); + if (lo != sp->next_completion && sp->test->debug_level >= DEBUG_LEVEL_WARN) + printf("notification gap: %u..%u does not append to %u, completions=%lu, socket=%d;\n", lo, hi, sp->next_completion, sp->completions, sp->socket); + sp->next_completion = hi + 1; + + zerocopy = !(serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED); + if (sp->zerocopied == -1) + sp->zerocopied = zerocopy; + else if (sp->zerocopied != zerocopy) { + //fprintf(stderr, "serr: inconsistent\n"); + if (sp->test->debug_level >= DEBUG_LEVEL_WARN) + printf("notification serr: inconsistent, completions=%lu, socket=%d;\n", sp->completions, sp->socket); + sp->zerocopied = zerocopy; + } + + sp->completions += range; + return 1; +} + +/* Read all outstanding messages on the errqueue */ +static void do_recv_completions(struct iperf_stream *sp) +{ + while (do_recv_completion(sp) > 0 && sp->completions < sp->expected_completions) {} +} + + +static int do_poll(struct iperf_stream *sp, int events, int timeout) +{ + struct pollfd pfd; + int ret; + + pfd.events = events; + pfd.revents = 0; + pfd.fd = sp->socket; + + ret = poll(&pfd, 1, timeout); + if (ret == -1) { + // error(1, errno, "poll"); + error(0, errno, "polling notifications failed: pfd.revents=%x, events=%x, completions=%lu, socket=%d", pfd.revents, events, sp->completions, sp->socket); + ret = -1; + } else if (pfd.revents & POLLNVAL) { + ret = -1; + if (sp->test->debug_level >= DEBUG_LEVEL_INFO) + printf("poll notification returned POLLNVAL, socket=%d;\n", sp->socket); + } else { + ret = ret && (pfd.revents & events); + } + + return ret; +} + + +/* Wait for all remaining completions on the errqueue */ +static void do_recv_remaining_completions(struct iperf_stream *sp) +{ + int64_t tstop; + int ret; + + if (sp->completions < sp->expected_completions) { + tstop = iperf_time_now_in_usecs() + ZC_CFG_WAITTIME_US; + do { + + ret = do_poll(sp, POLLERR, ZC_MIN_ZEROCOPY_POLL_TIMEOUT); + if (ret == -1) { + if (sp->test->debug_level >= DEBUG_LEVEL_ERROR) + printf("poll notification failed, socket=%d, errno=%s;\n", sp->socket, strerror(errno)); + } else if (ret != 0) { + do_recv_completions(sp); + } + } while (ret != -1 && sp->completions < sp->expected_completions && iperf_time_now_in_usecs() < tstop); + } + + if (sp->completions != sp->expected_completions) { + if (sp->test->debug_level >= DEBUG_LEVEL_ERROR) + printf("failed waiting for missing notifications: %lu < %lu, making expected and current counters equal, socket=%d;\n", sp->completions, sp->expected_completions, sp->socket); + sp->completions = sp->expected_completions; + } +} + + +/** Wait until it is safe to rewite the sending buffer. + * Returns -1 on timeout or failure. + */ +int wait_zerocopy_buffer_available(struct iperf_stream *sp) { + int total_timeout = 0; + int ret = 1; + + if (sp->completions < sp->expected_completions) { + do { + ret = do_poll(sp, POLLOUT, ZC_MIN_ZEROCOPY_POLL_TIMEOUT); + if (ret == -1) { + if (sp->test->debug_level >= DEBUG_LEVEL_ERROR) + printf("poll notification failed, socket=%d, errno=%s;\n", sp->socket, strerror(errno)); + } else if (ret != 0) { + do_recv_completions(sp); + } + } while (ret == 0 && (total_timeout += ZC_MIN_ZEROCOPY_POLL_TIMEOUT) < ZC_MAX_ZEROCOPY_POLL_TIMEOUT); + + + if (ret == 0) { + if (sp->test->debug_level >= DEBUG_LEVEL_INFO) + printf("waiting for POLLOUT notification timed-out, socket=%d, errno=%s;\n", sp->socket, strerror(errno)); + } else if (ret != -1) { + do_recv_remaining_completions(sp); + } + } + + return ret; +} + /* - * N W R I T E + * N S E N D to SP */ +int +Nsend_sp(struct iperf_stream *sp, const char *buf, size_t count, int prot, int sock_opt) +{ + int ret; + + ret = Nsend(sp->socket, buf, count, prot, sock_opt); + if (sock_opt & MSG_ZEROCOPY) + sp->expected_completions++; + + return ret; +} + +#endif /* SUPPORTED_MSG_ZEROCOPY */ + +/* + * N W R I T E + */ int Nwrite(int fd, const char *buf, size_t count, int prot) +{ + return Nsend(fd, buf, count, prot, 0); +} + +/* + * N S E N D + */ +int +Nsend(int fd, const char *buf, size_t count, int prot, int sock_opt) { register ssize_t r; register size_t nleft = count; while (nleft > 0) { - r = write(fd, buf, nleft); + if (sock_opt) + r = send(fd, buf, nleft, sock_opt); + else + r = write(fd, buf, nleft); if (r < 0) { switch (errno) { case EINTR: diff --git a/src/net.h b/src/net.h index f0e1b4f98..51fb3f862 100644 --- a/src/net.h +++ b/src/net.h @@ -28,17 +28,23 @@ #define __NET_H int timeout_connect(int s, const struct sockaddr *name, socklen_t namelen, int timeout); -int create_socket(int domain, int proto, const char *local, const char *bind_dev, int local_port, const char *server, int port, struct addrinfo **server_res_out); -int netdial(int domain, int proto, const char *local, const char *bind_dev, int local_port, const char *server, int port, int timeout); +int create_socket(int domain, int proto, const char *local, const char *bind_dev, int local_port, const char *server, int port, struct addrinfo **server_res_out, int zerocopy); +int netdial(int domain, int proto, const char *local, const char *bind_dev, int local_port, const char *server, int port, int timeout, int zerocopy); int netannounce(int domain, int proto, const char *local, const char *bind_dev, int port); int Nread(int fd, char *buf, size_t count, int prot); int Nwrite(int fd, const char *buf, size_t count, int prot) /* __attribute__((hot)) */; +int Nsend(int fd, const char *buf, size_t count, int prot, int sock_opt) /* __attribute__((hot)) */; int has_sendfile(void); int Nsendfile(int fromfd, int tofd, const char *buf, size_t count) /* __attribute__((hot)) */; int setnonblocking(int fd, int nonblocking); int getsockdomain(int sock); int parse_qos(const char *tos); +#if defined(SUPPORTED_MSG_ZEROCOPY) +int Nsend_sp(struct iperf_stream *sp, const char *buf, size_t count, int prot, int sock_opt); +int wait_zerocopy_buffer_available(struct iperf_stream *sp); +#endif /* SUPPORTED_MSG_ZEROCOPY */ + #define NET_SOFTERROR -1 #define NET_HARDERROR -2