Skip to content

Commit

Permalink
defer adding padding to probe packets until send_packet() to simplify…
Browse files Browse the repository at this point in the history
… code (#227)
  • Loading branch information
jthomas43 authored Dec 3, 2024
1 parent 906ab07 commit a9f5af9
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 51 deletions.
4 changes: 0 additions & 4 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,6 @@ struct udx_packet_s {
// just alloc it in place here, easier to manage
char header[UDX_HEADER_SIZE];
unsigned short nbufs;

// inefficient - only relevant for stream_t packets
unsigned short nwbufs;
udx_stream_write_buf_t **wbufs;
};

struct udx_socket_send_s {
Expand Down
86 changes: 39 additions & 47 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) {
}
}

static udx_stream_write_buf_t **
wbufs_offset (udx_packet_t *pkt) {
return (udx_stream_write_buf_t **) (((uv_buf_t *) (pkt + 1)) + pkt->nbufs);
}

static void
clear_outgoing_packets (udx_stream_t *stream) {
// todo: skip the math, and just
Expand All @@ -433,14 +438,12 @@ clear_outgoing_packets (udx_stream_t *stream) {

assert(pkt->nbufs >= 2);

int diff = pkt->nbufs - pkt->nwbufs;
assert(diff == 1 || diff == 2); // either header buf, or header + padding buff

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
udx_stream_write_buf_t **wbufs = wbufs_offset(pkt);

for (int i = 0; i < pkt->nwbufs; i++) {
size_t pkt_len = bufs[i + diff].len;
udx_stream_write_buf_t *wbuf = pkt->wbufs[i];
for (int i = 1; i < pkt->nbufs; i++) {
size_t pkt_len = bufs[i].len;
udx_stream_write_buf_t *wbuf = wbufs[i - 1];
on_bytes_acked(wbuf, pkt_len, true);

// todo: move into on_bytes_acked itself
Expand Down Expand Up @@ -517,10 +520,6 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_
pkt->nbufs = 1 + nuserbufs;
bufs[0] = uv_buf_init((char *) &(pkt->header), UDX_HEADER_SIZE);

// for now, set when stream writes data
pkt->wbufs = NULL;
pkt->nwbufs = 0;

for (int i = 0; i < nuserbufs; i++) {
bufs[i + 1] = userbufs[i];
pkt->size += userbufs[i].len;
Expand All @@ -541,16 +540,6 @@ mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) {
return 0;
}
// debug_printf("mtu: probeify rid=%u seq=%u size=%u wanted=%d padding=%d\n", udx__swap_uint32_if_be(((unsigned int *) pkt->header)[1]), pkt->seq, pkt->size + header_size, wanted_size, padding_size);
static char probe_data[256] = {0};

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
for (int i = pkt->nbufs; i > 1; i--) {
bufs[i] = bufs[i - 1];
}
pkt->nbufs++;

bufs[1].len = padding_size;
bufs[1].base = probe_data;

pkt->header[3] = padding_size;
pkt->is_mtu_probe = true;
Expand All @@ -563,17 +552,6 @@ mtu_unprobeify_packet (udx_packet_t *pkt, udx_stream_t *stream) {
assert(pkt->is_mtu_probe);

pkt->header[3] = 0;

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);

// [header][padding][2][3] 4 = nbufs

for (int i = 2; i < pkt->nbufs; i++) {
bufs[i - 1] = bufs[i];
}

pkt->nbufs--;

pkt->is_mtu_probe = false;

debug_printf("mtu: probe failed rid=%u %d/%d", stream->remote_id, stream->mtu_probe_count, UDX_MTU_MAX_PROBES);
Expand Down Expand Up @@ -1063,13 +1041,12 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) {
stream->reordering_seen = true;
}

int diff = pkt->nbufs - pkt->nwbufs;

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
udx_stream_write_buf_t **wbufs = wbufs_offset(pkt);

for (int i = 0; i < pkt->nwbufs; i++) {
size_t pkt_len = bufs[i + diff].len;
udx_stream_write_buf_t *wbuf = pkt->wbufs[i];
for (int i = 1; i < pkt->nbufs; i++) {
size_t pkt_len = bufs[i].len;
udx_stream_write_buf_t *wbuf = wbufs[i - 1];

on_bytes_acked(wbuf, pkt_len, false);

Expand Down Expand Up @@ -1484,7 +1461,25 @@ send_packet (udx_socket_t *socket, udx_packet_t *pkt) {
pkt->dest_len = sizeof(struct sockaddr_in6);
}

ssize_t rc = udx__sendmsg(socket, (uv_buf_t *) (pkt + 1), pkt->nbufs, (struct sockaddr *) &(pkt->dest), pkt->dest_len);
uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
int nbufs = pkt->nbufs;
uv_buf_t _bufs[UDX_MAX_COMBINED_WRITES + 2];

if (pkt->is_mtu_probe) {
int padding_size = pkt->header[3];
static char probe_data[256] = {0};
_bufs[0] = bufs[0];
_bufs[1].base = probe_data;
_bufs[1].len = padding_size;

for (int i = 1; i < pkt->nbufs; i++) {
_bufs[1 + i] = bufs[i];
}
bufs = _bufs;
nbufs = nbufs + 1;
}

ssize_t rc = udx__sendmsg(socket, bufs, nbufs, (struct sockaddr *) &(pkt->dest), pkt->dest_len);

if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);

Expand Down Expand Up @@ -1754,17 +1749,12 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
}

assert(header_flag & UDX_HEADER_DATA_OR_END);
int nbufs = 2 + nwbufs; // extra for 1.header 2.padding
int nbufs = 1 + nwbufs; // extra buf for header

udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + sizeof(uv_buf_t) * nbufs + sizeof(void *) * nwbufs);

init_stream_packet(pkt, header_flag, stream, bufs, nwbufs);
pkt->wbufs = (udx_stream_write_buf_t **) (((uv_buf_t *) (pkt + 1)) + nbufs);
pkt->nwbufs = nwbufs;

for (int i = 0; i < nwbufs; i++) {
pkt->wbufs[i] = wbufs[i];
}
memcpy(wbufs_offset(pkt), wbufs, sizeof(wbufs[0]) * nwbufs);

pkt->ttl = 0;

Expand All @@ -1773,13 +1763,14 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
ssize_t rc = send_packet(socket, pkt);

if (rc == UV_EAGAIN) {
for (int i = 0; i < pkt->nwbufs; i++) {
udx_stream_write_buf_t *wbuf = pkt->wbufs[i];
int i = nwbufs;
while (nwbufs--) {
udx_stream_write_buf_t *wbuf = wbufs[i];
if (wbuf->bytes_acked + wbuf->bytes_inflight == wbuf->buf.len) {
udx__queue_head(&stream->write_queue, &wbuf->queue);
}

wbuf->bytes_inflight -= bufs[(pkt->is_mtu_probe ? 2 : 1) + i].len;
wbuf->bytes_inflight -= bufs[i + 1].len;
}

free(pkt);
Expand Down Expand Up @@ -2132,6 +2123,7 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_
memcpy(&(pkt->dest), dest, pkt->dest_len);
pkt->lost = false;
pkt->retransmitted = false;
pkt->is_mtu_probe = false;
pkt->transmits = 0;
pkt->rto_timeouts = 0;
pkt->nbufs = 1;
Expand Down

0 comments on commit a9f5af9

Please sign in to comment.