Skip to content

Commit

Permalink
Simplify send path (#206)
Browse files Browse the repository at this point in the history
* introduce linked queue

* rack_detect_loss exit early when pkt->time_sent > rack_time_sent

* fifo removed

* merge _rx/_tx suffix branch

* remove sendmmsg / batching. simplify sending

---------

Co-authored-by: James Thomas <jthomas>
  • Loading branch information
jthomas43 authored Sep 27, 2024
1 parent 92777aa commit 15bb51d
Show file tree
Hide file tree
Showing 14 changed files with 736 additions and 955 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ target_sources(
src/cirbuf.c
src/endian.h
src/endian.c
src/fifo.h
src/fifo.c
src/queue.c
src/queue.h
src/io.h
src/udx.c
)
Expand Down
14 changes: 7 additions & 7 deletions examples/udxperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ print_interval (udxperf_client_t *client, uint64_t bytes, uint64_t start, uint64
byte_snprintf(bps_buf, sizeof bps_buf, bytes / time_sec, 'a');
bps_buf[19] = '\0';

printf("[%3d] %6.4f-%6.4f sec %s %s/sec", stream->local_id, (start - client->start_time) / 1000.0, (end - client->start_time) / 1000.0, bytes_buf, bps_buf, stream->cwnd);
printf("[%3d] %6.4f-%6.4f sec %s %s/sec", stream->local_id, (start - client->start_time) / 1000.0, (end - client->start_time) / 1000.0, bytes_buf, bps_buf);
if (is_client && extra_wanted) {
printf(" cwnd=%d ssthresh=%d fast_recovery_count=%d rto_count=%d rtx_count=%d", stream->cwnd, stream->ssthresh, stream->fast_recovery_count, stream->rto_count, stream->retransmit_count);
}
Expand All @@ -298,7 +298,7 @@ server_on_read (udx_stream_t *stream, ssize_t read_len, const uv_buf_t *buf) {
c->end_time = uv_now(&loop);

if (read_len == UV_EOF) {
print_interval(c, stream->bytes_in, c->start_time, c->end_time);
print_interval(c, stream->bytes_rx, c->start_time, c->end_time);
c->ended = true;
if (interval_ms) {
uv_timer_stop(&c->report_timer);
Expand All @@ -317,11 +317,11 @@ server_report_interval (uv_timer_t *timer) {
uint64_t now = uv_now(&loop);

if (c->time_last_report) {
print_interval(c, stream->bytes_in - c->bytes_last_report, c->time_last_report, now);
print_interval(c, stream->bytes_rx - c->bytes_last_report, c->time_last_report, now);
}

c->time_last_report = now;
c->bytes_last_report = stream->bytes_in;
c->bytes_last_report = stream->bytes_rx;
}

static void
Expand Down Expand Up @@ -421,7 +421,7 @@ finish_timer_cb (uv_timer_t *timer) {

udx_stream_write_end(req, stream, NULL, 0, write_end_cb);

print_interval(c, stream->bytes_out, c->start_time, c->end_time);
print_interval(c, stream->bytes_tx, c->start_time, c->end_time);

if (interval_ms) {
uv_timer_stop(&c->report_timer);
Expand All @@ -437,11 +437,11 @@ client_report_interval (uv_timer_t *timer) {
uint64_t now = uv_now(&loop);

if (c->time_last_report) {
print_interval(c, stream->bytes_out - c->bytes_last_report, c->time_last_report, now);
print_interval(c, stream->bytes_tx - c->bytes_last_report, c->time_last_report, now);
}

c->time_last_report = now;
c->bytes_last_report = stream->bytes_out;
c->bytes_last_report = stream->bytes_tx;
}

static void
Expand Down
96 changes: 44 additions & 52 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,25 @@ extern "C" {
#define UDX_SOCKET_CLOSING 0b0100
#define UDX_SOCKET_CLOSING_HANDLES 0b1000

#define UDX_STREAM_CONNECTED 0b00000000001
#define UDX_STREAM_RECEIVING 0b00000000010
#define UDX_STREAM_READING 0b00000000100
#define UDX_STREAM_ENDING 0b00000001000
#define UDX_STREAM_ENDING_REMOTE 0b00000010000
#define UDX_STREAM_ENDED 0b00000100000
#define UDX_STREAM_ENDED_REMOTE 0b00001000000
#define UDX_STREAM_DESTROYING 0b00010000000
#define UDX_STREAM_DESTROYED 0b00100000000
#define UDX_STREAM_DESTROYED_REMOTE 0b01000000000
#define UDX_STREAM_CLOSED 0b10000000000

#define UDX_PACKET_TYPE_STREAM_RELAY 0b00000
#define UDX_PACKET_TYPE_STREAM_STATE 0b00001
#define UDX_PACKET_TYPE_STREAM_WRITE 0b00010
#define UDX_PACKET_TYPE_STREAM_SEND 0b00100
#define UDX_PACKET_TYPE_STREAM_DESTROY 0b01000
#define UDX_PACKET_TYPE_SOCKET_SEND 0b10000
#define UDX_STREAM_CONNECTED 0b000000001
#define UDX_STREAM_RECEIVING 0b000000010
#define UDX_STREAM_READING 0b000000100
#define UDX_STREAM_ENDING 0b000001000
#define UDX_STREAM_ENDING_REMOTE 0b000010000
#define UDX_STREAM_ENDED 0b000100000
#define UDX_STREAM_ENDED_REMOTE 0b001000000
#define UDX_STREAM_DESTROYING 0b010000000
#define UDX_STREAM_CLOSED 0b100000000

#define UDX_HEADER_DATA 0b00001
#define UDX_HEADER_END 0b00010
#define UDX_HEADER_SACK 0b00100
#define UDX_HEADER_MESSAGE 0b01000
#define UDX_HEADER_DESTROY 0b10000

#define UDX_STREAM_WRITE_WANT_DATA 0b0001
#define UDX_STREAM_WRITE_WANT_STATE 0b0010
#define UDX_STREAM_WRITE_WANT_TLP 0b0100
#define UDX_STREAM_WRITE_WANT_DESTROY 0b1000
#define UDX_STREAM_WRITE_WANT_STATE 0b001
#define UDX_STREAM_WRITE_WANT_TLP 0b010
#define UDX_STREAM_WRITE_WANT_DESTROY 0b100

typedef struct {
uint32_t seq;
Expand All @@ -73,17 +63,10 @@ typedef struct {
udx_cirbuf_val_t **values;
} udx_cirbuf_t;

typedef struct {
uint32_t btm;
uint32_t len;
uint32_t max_len;
uint32_t mask;
void **values;
} udx_fifo_t;

typedef struct udx_s udx_t;
typedef struct udx_socket_s udx_socket_t;
typedef struct udx_stream_s udx_stream_t;
typedef struct udx_queue_node_s udx_queue_node_t;
typedef struct udx_packet_s udx_packet_t;

typedef struct udx_socket_send_s udx_socket_send_t;
Expand Down Expand Up @@ -131,18 +114,28 @@ struct udx_s {

udx_cirbuf_t streams_by_id;

uint64_t bytes_in;
uint64_t bytes_out;
uint64_t bytes_rx;
uint64_t bytes_tx;

uint64_t packets_rx;
uint64_t packets_tx;
};

uint64_t packets_in;
uint64_t packets_out;
struct udx_queue_node_s {
udx_queue_node_t *next;
udx_queue_node_t *prev;
};

typedef struct udx_queue_s {
udx_queue_node_t node;
uint32_t len;
} udx_queue_t;

struct udx_socket_s {
uv_udp_t handle;
uv_poll_t io_poll;

udx_fifo_t send_queue;
udx_queue_t send_queue;

udx_t *udx;
udx_cirbuf_t *streams_by_id; // for convenience
Expand All @@ -159,11 +152,11 @@ struct udx_socket_s {
udx_socket_recv_cb on_recv;
udx_socket_close_cb on_close;

uint64_t bytes_in;
uint64_t bytes_out;
uint64_t bytes_rx;
uint64_t bytes_tx;

uint64_t packets_in;
uint64_t packets_out;
uint64_t packets_rx;
uint64_t packets_tx;
};

typedef struct udx_cong_s {
Expand Down Expand Up @@ -250,7 +243,6 @@ struct udx_stream_s {
uint32_t rack_next_seq;
uint32_t rack_fack;

uint32_t pkts_inflight; // packets inflight to the other peer
uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)?

// tlp
Expand All @@ -276,25 +268,28 @@ struct udx_stream_s {
// congestion state
udx_cong_t cong;

udx_fifo_t write_queue; // udx_stream_write_t
udx_queue_t write_queue;

udx_cirbuf_t outgoing;
udx_cirbuf_t incoming;

udx_fifo_t retransmit_queue; // udx_packet_t
udx_queue_t retransmit_queue; // udx_packet_t
udx_queue_t inflight_queue; // udx_packet_t

udx_fifo_t unordered;
// udx_queue_t unordered;
udx_queue_t unordered_queue;

uint64_t bytes_in;
uint64_t bytes_out;
uint64_t bytes_rx;
uint64_t bytes_tx;

uint64_t packets_in;
uint64_t packets_out;
uint64_t packets_rx;
uint64_t packets_tx;
};

struct udx_packet_s {
uint32_t seq; // must be the first entry, so its compat with the cirbuf
udx_queue_node_t queue;

int type;
int ttl;

bool lost;
Expand All @@ -305,13 +300,9 @@ struct udx_packet_s {
uint16_t size;
uint64_t time_sent;

void *ctx; // stream_send_t | socket_send_t | stream_t

struct sockaddr_storage dest;
int dest_len;

uint32_t fifo_gc; // for removing from inflight / retransmit queue

// just alloc it in place here, easier to manage
char header[UDX_HEADER_SIZE];
unsigned short nbufs;
Expand All @@ -334,6 +325,7 @@ struct udx_socket_send_s {
struct udx_stream_write_buf_s {
// immutable original buf
uv_buf_t buf;
udx_queue_node_t queue;

// 1. remove from write_queue when bytes_inflight + bytes_acked == buf.len
// 2. free when bytes_acked == buf.len
Expand Down
28 changes: 27 additions & 1 deletion src/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#define DEBUG 0
#endif

#include "queue.h"

#ifdef DEBUG_STATS
#include "../include/udx.h"
#include <uv.h>
Expand Down Expand Up @@ -34,7 +36,7 @@ debug_print_cwnd_stats (udx_stream_t *stream) {
static void
debug_print_outgoing (udx_stream_t *stream) {
if (DEBUG) {
for (uint32_t s = stream->remote_acked; s < stream->seq; s++) {
for (uint32_t s = stream->remote_acked; s != stream->seq; s++) {
udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&stream->outgoing, s);
if (pkt == NULL) {
debug_printf("-");
Expand All @@ -48,6 +50,30 @@ debug_print_outgoing (udx_stream_t *stream) {
}
}
debug_printf("\n");
uint64_t now = uv_hrtime() / 1000000;
udx_queue_node_t *q;
if (stream->inflight_queue.len > 0) {
debug_printf("inflight q =");
udx__queue_foreach(q, &stream->inflight_queue.node) {
// for (udx_queue_node_t *q = stream->inflight_queue.node.next; q != &stream->inflight_queue.node; q = q->next) {
udx_packet_t *pkt = udx__queue_data(q, udx_packet_t, queue);
assert(!pkt->lost);
debug_printf("%u %lums ", pkt->seq, now - pkt->time_sent);
}
debug_printf("\n");
}
if (stream->retransmit_queue.len > 0) {
debug_printf("retransmit q =");
udx__queue_foreach(q, &stream->retransmit_queue.node) {
udx_packet_t *pkt = udx__queue_data(q, udx_packet_t, queue);
assert(pkt->lost);
debug_printf("%u %lums ", pkt->seq, now - pkt->time_sent);
}
debug_printf("\n");
}
}
}
*/
Expand Down
89 changes: 0 additions & 89 deletions src/fifo.c

This file was deleted.

Loading

0 comments on commit 15bb51d

Please sign in to comment.