Skip to content

Commit

Permalink
End test immediatelly when all number of blocks/bytes sent/received
Browse files Browse the repository at this point in the history
  • Loading branch information
davidBar-On committed Sep 22, 2024
1 parent 0ea9cb6 commit 8c939c8
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 9 deletions.
4 changes: 4 additions & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ enum debug_level {
struct iperf_test
{
pthread_mutex_t print_mutex;
pthread_mutex_t pipe_mutex;

char role; /* 'c' lient or 's' erver */
enum iperf_mode mode;
Expand Down Expand Up @@ -326,6 +327,9 @@ struct iperf_test

int ctrl_sck_mss; /* MSS for the control channel */

int pipe_end_of_test_fds[2]; /* fot select() terminating when -n/-k flags are set */
int pipe_end_of_test_created;

#if defined(HAVE_SSL)
char *server_authorized_users;
EVP_PKEY *server_rsa_private_key;
Expand Down
154 changes: 146 additions & 8 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,69 @@ void iperf_close_logfile(struct iperf_test *test)
}
}


/*
* Open the self pipe.
*/
void iperf_open_pipe_end_of_test(struct iperf_test *test)
{
int flags;
int failures = 0;

if (pipe(test->pipe_end_of_test_fds) == -1) {
warning("Failed to create pipe to end test immediatelly when sending bytes/blocks in completed");
return;
}
/* pipe was created */
test->pipe_end_of_test_created = 1;
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Created end of test pipe - read_fd=%d, write_fd=%d.\n",
test->pipe_end_of_test_fds[0], test->pipe_end_of_test_fds[1]);
}
/* Make read end of pipe nonblocking */
flags = fcntl(test->pipe_end_of_test_fds[0], F_GETFL);
if (flags == -1) {
warning("Failed to get read side of pipe flags for setting O_NONBLOCK");
failures++;
} else {
flags |= O_NONBLOCK;
if (fcntl(test->pipe_end_of_test_fds[0], F_SETFL, flags) == -1) {
warning("Failed to set read side of pipe flags to O_NONBLOCK");
failures++;
}
}
/* Make write end of pipe nonblocking */
if (failures == 0) {
flags = fcntl(test->pipe_end_of_test_fds[1], F_GETFL);
if (flags == -1) {
warning("Failed to get write side of pipe flags for setting O_NONBLOCK");
failures++;
} else {
flags |= O_NONBLOCK;
if (fcntl(test->pipe_end_of_test_fds[1], F_SETFL, flags) == -1) {
warning("Failed to set write side of pipe flags to O_NONBLOCK");
failures++;
}
}
}

if (failures > 0) {
iperf_close_pipe_end_of_test(test);
}

return;
}

void iperf_close_pipe_end_of_test(struct iperf_test *test)
{
if (test->pipe_end_of_test_created == 1) {
close(test->pipe_end_of_test_fds[0]);
close(test->pipe_end_of_test_fds[1]);
test->pipe_end_of_test_created = 0;
}
}


int
iperf_set_send_state(struct iperf_test *test, signed char state)
{
Expand Down Expand Up @@ -1997,6 +2060,7 @@ iperf_send_mt(struct iperf_stream *sp)
register struct iperf_test *test = sp->test;
struct iperf_time now;
int no_throttle_check;
int rc;

/* Can we do multisend mode? */
if (test->settings->burst != 0)
Expand All @@ -2014,13 +2078,31 @@ iperf_send_mt(struct iperf_stream *sp)
iperf_time_now(&now);
streams_active = 0;
{
if (sp->green_light && sp->sender) {
if (sp->green_light) {
// XXX If we hit one of these ending conditions maybe
// want to stop even trying to send something?
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
break;
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
if ((test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) ||
(test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks))
{
/* Waking up main thread when all data was sent */
if (test->pipe_end_of_test_created == 1) {
/* If write fail on EINTR, etc. retry will be done in next send */
if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) {
perror("iperf_send_mt: pthread_mutex_lock pipe_mutex for already sent");
}
rc = write(test->pipe_end_of_test_fds[1], "x", 1);
if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) {
perror("iperf_send_mt: pthread_mutex_unlock pipe_mutex for already sent");
}
if (rc <= 0) {
warning("Failed to write to pipe to end test when all blocks or bytes already sent");
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All blocks or bytes already sent. Wrote to pipe to wakeup main thread.\n");
}
}
break;
}
if ((r = sp->snd(sp)) < 0) {
if (r == NET_SOFTERROR)
break;
Expand All @@ -2031,6 +2113,28 @@ iperf_send_mt(struct iperf_stream *sp)
test->bytes_sent += r;
if (!sp->pending_size)
++test->blocks_sent;
/* Waking up main thread when all data was sent */
if ((test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) ||
(test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks))
{
if (test->pipe_end_of_test_created == 1) {
/* If write fail on EINTR, etc. retry will be done in next send */
if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) {
perror("iperf_send_mt: pthread_mutex_lock pipe_mutex for all sent");
}
rc = write(test->pipe_end_of_test_fds[1], "x", 1);
if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) {
perror("iperf_send_mt: pthread_mutex_unlock pipe_mutex for all sent");
}
if (rc <= 0) {
warning("Failed to write to pipe to end test when all blocks or bytes sent");
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All blocks or bytes sent. Wrote to pipe to wakeup main thread.\n");
}
}
break;
}
if (no_throttle_check)
iperf_check_throttle(sp, &now);
}
Expand All @@ -2053,7 +2157,7 @@ iperf_send_mt(struct iperf_stream *sp)
int
iperf_recv_mt(struct iperf_stream *sp)
{
int r;
int r, rc;
struct iperf_test *test = sp->test;

if ((r = sp->rcv(sp)) < 0) {
Expand All @@ -2067,6 +2171,26 @@ iperf_recv_mt(struct iperf_stream *sp)
if (r > 0) {
test->bytes_received += r;
++test->blocks_received;
/* Waking up main thread when all data was received */
if (test->pipe_end_of_test_created == 1 &&
((test->settings->bytes != 0 && test->bytes_received >= test->settings->bytes) ||
(test->settings->blocks != 0 && test->blocks_received >= test->settings->blocks)))
{
/* If write fail on EINTR, etc. retry will be done in next read */
if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) {
perror("iperf_recv_mt: pthread_mutex_lock pipe_mutex for all received");
}
rc = write(test->pipe_end_of_test_fds[1], "x", 1);
if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) {
perror("iperf_recv_mt: pthread_mutex_unlock pipe_mutex for all received");
}
if (rc <= 0) {
warning("Failed to write to pipe to end test when all blocks or bytes received");
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All blocks or bytes received. Wrote to pipe to wakeup main thread.\n");
}
}
}

return 0;
Expand Down Expand Up @@ -2951,9 +3075,13 @@ iperf_new_test()
}

if (pthread_mutex_init(&(test->print_mutex), &mutexattr) != 0) {
perror("iperf_new_test: pthread_mutex_init");
perror("iperf_new_test: pthread_mutex_init print_mutex");
}
pthread_mutexattr_destroy(&mutexattr);

if (pthread_mutex_init(&(test->pipe_mutex), &mutexattr) != 0) {
perror("iperf_new_test: pthread_mutex_init pipe_mutex");
}
pthread_mutexattr_destroy(&mutexattr);

test->settings = (struct iperf_settings *) malloc(sizeof(struct iperf_settings));
Expand Down Expand Up @@ -3054,6 +3182,7 @@ iperf_defaults(struct iperf_test *testp)
testp->settings->rcv_timeout.secs = DEFAULT_NO_MSG_RCVD_TIMEOUT / SEC_TO_mS;
testp->settings->rcv_timeout.usecs = (DEFAULT_NO_MSG_RCVD_TIMEOUT % SEC_TO_mS) * mS_TO_US;
testp->zerocopy = 0;
testp->pipe_end_of_test_created = 0;

memset(testp->cookie, 0, COOKIE_SIZE);

Expand Down Expand Up @@ -3130,6 +3259,7 @@ iperf_defaults(struct iperf_test *testp)
void
iperf_free_test(struct iperf_test *test)
{
int rc;
struct protocol *prot;
struct iperf_stream *sp;

Expand Down Expand Up @@ -3210,11 +3340,10 @@ iperf_free_test(struct iperf_test *test)
}

/* Destroy print mutex. iperf_printf() doesn't work after this point */
int rc;
rc = pthread_mutex_destroy(&(test->print_mutex));
if (rc != 0) {
errno = rc;
perror("iperf_free_test: pthread_mutex_destroy");
perror("iperf_free_test: pthread_mutex_destroy print_mutex");
}

if (test->logfile) {
Expand All @@ -3223,6 +3352,14 @@ iperf_free_test(struct iperf_test *test)
iperf_close_logfile(test);
}

iperf_close_pipe_end_of_test(test);
/* Destroy end of test pipe mutex. */
rc = pthread_mutex_destroy(&(test->pipe_mutex));
if (rc != 0) {
errno = rc;
perror("iperf_free_test: pthread_mutex_destroy pipe_mutex");
}

if (test->server_output_text) {
free(test->server_output_text);
test->server_output_text = NULL;
Expand Down Expand Up @@ -3351,6 +3488,7 @@ iperf_reset_test(struct iperf_test *test)
test->settings->tos = 0;
test->settings->dont_fragment = 0;
test->zerocopy = 0;
test->pipe_end_of_test_created = 0;

#if defined(HAVE_SSL)
if (test->settings->authtoken) {
Expand Down
2 changes: 2 additions & 0 deletions src/iperf_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ int iperf_create_send_timers(struct iperf_test *);
int iperf_parse_arguments(struct iperf_test *, int, char **);
int iperf_open_logfile(struct iperf_test *);
void iperf_close_logfile(struct iperf_test *);
void iperf_open_pipe_end_of_test(struct iperf_test *);
void iperf_close_pipe_end_of_test(struct iperf_test *);
void iperf_reset_test(struct iperf_test *);
void iperf_reset_stats(struct iperf_test * test);

Expand Down
29 changes: 28 additions & 1 deletion src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ iperf_run_client(struct iperf_test * test)
int64_t timeout_us;
int64_t rcv_timeout_us;
int i_errno_save;
char chr;

if (NULL == test)
{
Expand Down Expand Up @@ -684,12 +685,25 @@ iperf_run_client(struct iperf_test * test)
}

if (result > 0) {
if (FD_ISSET(test->ctrl_sck, &read_set)) {
if (FD_ISSET(test->ctrl_sck, &read_set)) { /* Control message */
if (iperf_handle_message_client(test) < 0) {
goto cleanup_and_fail;
}
FD_CLR(test->ctrl_sck, &read_set);
}

/* Wakedup by sent all of data */
if (test->pipe_end_of_test_created && FD_ISSET(test->pipe_end_of_test_fds[0], &read_set)) {
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Main thread woke up by pipe input that all blocks or bytes already sent/received.\n");
}
for (;;) { /* Consume bytes from pipe */
if (read(test->pipe_end_of_test_fds[0], &chr, 1) <= 0) {
break;
}
}
FD_CLR(test->pipe_end_of_test_fds[0], &read_set);
}
}

if (test->state == TEST_RUNNING) {
Expand All @@ -698,6 +712,19 @@ iperf_run_client(struct iperf_test * test)
if (startup) {
startup = 0;

/* Create self pipe to allow select() end when number of bytes/block were sent */
if (test->settings->bytes > 0 || test->settings->blocks > 0) {
iperf_open_pipe_end_of_test(test);
if (test->pipe_end_of_test_created == 0) {
warning("Failed to create pipe to end test immediatelly when sending bytes/blocks in completed");
} else { /* pipe was created */
/* Add the pipe to the select() read fds */
FD_SET(test->pipe_end_of_test_fds[0], &test->read_set);
test->max_fd = (test->pipe_end_of_test_fds[0] > test->max_fd) ?
test->pipe_end_of_test_fds[0] : test->max_fd;
}
}

/* Create and spin up threads */
pthread_attr_t attr;
if (pthread_attr_init(&attr) != 0) {
Expand Down

0 comments on commit 8c939c8

Please sign in to comment.