Skip to content

Commit

Permalink
Merge branch 'main' of github.com:jthomas43/libudx
Browse files Browse the repository at this point in the history
  • Loading branch information
jthomas43 committed Nov 6, 2023
2 parents a494959 + 58b5ae7 commit 28ad7b7
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 12 deletions.
1 change: 1 addition & 0 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ struct udx_packet_s {

udx_fifo_t *send_queue; // pointer to socket->send_queue
uint32_t fifo_gc; // index into socket->send_queue
udx_stream_t *stream; // pointer to the stream if stream packet

uint8_t transmits;
uint16_t size;
Expand Down
3 changes: 3 additions & 0 deletions src/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ addr_to_v6 (struct sockaddr_in *addr) {
memcpy(addr, &in, sizeof(in));
}

void
udx__ensure_latest_stream_ack (udx_packet_t *packet);

void
udx__trigger_send_callback (udx_packet_t *packet);
void
Expand Down
4 changes: 4 additions & 0 deletions src/io_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ udx__on_writable (udx_socket_t *socket) {
pkt->dest_len = sizeof(struct sockaddr_in6);
}

udx__ensure_latest_stream_ack(pkt);

batch[pkts] = pkt;
struct mmsghdr *p = &h[pkts];
memset(p, 0, sizeof(*p));
Expand Down Expand Up @@ -218,6 +220,8 @@ udx__on_writable (udx_socket_t *socket) {
pkt->dest_len = sizeof(struct sockaddr_in6);
}

udx__ensure_latest_stream_ack(pkt);

ssize_t size = udx__sendmsg(socket, pkt->bufs, pkt->bufs_len, (struct sockaddr *) &(pkt->dest), pkt->dest_len);

if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);
Expand Down
2 changes: 2 additions & 0 deletions src/io_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ udx__on_writable (udx_socket_t *socket) {
pkt->dest_len = sizeof(struct sockaddr_in6);
}

udx__ensure_latest_stream_ack(pkt);

ssize_t size = udx__sendmsg(socket, pkt->bufs, pkt->bufs_len, (struct sockaddr *) &(pkt->dest), pkt->dest_len);

if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);
Expand Down
48 changes: 36 additions & 12 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
#define UDX_CONG_INIT_CWND 3
#define UDX_CONG_MAX_CWND 65536

#define UDX_HIGH_WATERMARK 262144
#define UDX_HIGH_WATERMARK 262144

typedef struct {
uint32_t seq; // must be the first entry, so its compat with the cirbuf
Expand Down Expand Up @@ -116,6 +116,17 @@ addr_to_v4 (struct sockaddr_in6 *addr) {
memcpy(addr, &in, sizeof(in));
}

static inline uint32_t
max_payload (udx_stream_t *stream) {
assert(stream->mtu > (AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE));
return stream->mtu - (stream->remote_addr.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE);
}

static inline uint32_t
cwnd_in_bytes (udx_stream_t *stream) {
return stream->cwnd * max_payload(stream);
}

static void
on_uv_poll (uv_poll_t *handle, int status, int events);

Expand Down Expand Up @@ -200,6 +211,22 @@ on_udx_timer_close (uv_handle_t *handle) {
trigger_socket_close(socket);
}

void
udx__ensure_latest_stream_ack (udx_packet_t *pkt) {
if (pkt->stream == NULL) return; // not a stream

uint32_t *i = (uint32_t *) pkt->header;

i += 4;

uint32_t packet_ack = *i;
uint32_t actual_ack = udx__swap_uint32_if_be(pkt->stream->ack);

if (packet_ack != actual_ack) {
*i = actual_ack;
}
}

void
udx__close_handles (udx_socket_t *handle) {
if (handle->status & UDX_SOCKET_CLOSING_HANDLES) return;
Expand Down Expand Up @@ -397,11 +424,11 @@ clear_incoming_packets (udx_stream_t *stream) {
}

static void
on_bytes_acked (udx_stream_t* stream, udx_stream_write_t *w, size_t bytes) {
on_bytes_acked (udx_stream_t *stream, udx_stream_write_t *w, size_t bytes) {
w->bytes -= bytes;
stream->writes_queued_bytes -= bytes;

if (stream->hit_high_watermark && stream->writes_queued_bytes < UDX_HIGH_WATERMARK) {
if (stream->hit_high_watermark && stream->writes_queued_bytes < UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) {
stream->hit_high_watermark = false;
if (stream->on_drain != NULL) stream->on_drain(stream);
}
Expand Down Expand Up @@ -498,6 +525,7 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_
pkt->dest = stream->remote_addr;
pkt->dest_len = stream->remote_addr_len;
pkt->send_queue = NULL;
pkt->stream = stream;

pkt->bufs_len = 2;

Expand Down Expand Up @@ -599,12 +627,6 @@ send_state_packet (udx_stream_t *stream) {
return update_poll(stream->socket);
}

static inline uint32_t
max_payload (udx_stream_t *stream) {
assert(stream->mtu > (AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE));
return stream->mtu - (stream->remote_addr.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE);
}

static int
send_data_packet (udx_stream_t *stream, udx_packet_t *pkt) {
if (stream->inflight + pkt->size > stream->cwnd * max_payload(stream)) {
Expand Down Expand Up @@ -1148,6 +1170,7 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint8_
pkt->type = UDX_PACKET_STREAM_RELAY;
pkt->header[3] = data_offset;
pkt->seq = seq;
pkt->stream = NULL;

pkt->send_queue = &relay->socket->send_queue;
pkt->fifo_gc = udx__fifo_push(&relay->socket->send_queue, pkt);
Expand Down Expand Up @@ -1200,7 +1223,7 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
buf_len -= UDX_HEADER_SIZE;

size_t header_len = (data_offset > 0 && data_offset < buf_len) ? data_offset : buf_len;
bool is_limited = stream->inflight + 2 * max_payload(stream) < stream->cwnd * max_payload(stream);
bool is_limited = stream->writes_queued_bytes < UDX_HIGH_WATERMARK;

bool sacked = (type & UDX_HEADER_SACK) ? process_sacks(stream, buf, header_len) > 0 : false;

Expand Down Expand Up @@ -1579,6 +1602,7 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_
pkt->type = UDX_PACKET_SEND;
pkt->ttl = ttl;
pkt->ctx = req;
pkt->stream = NULL;

if (dest->sa_family == AF_INET) {
pkt->dest_len = sizeof(struct sockaddr_in);
Expand Down Expand Up @@ -2073,7 +2097,7 @@ udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t
int err = fill_window(stream);
if (err < 0) return err;

if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK) {
if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) {
stream->hit_high_watermark = true;
return 0;
}
Expand All @@ -2100,7 +2124,7 @@ udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_bu
int err = fill_window(stream);
if (err < 0) return err;

if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK) {
if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) {
stream->hit_high_watermark = true;
return 0;
}
Expand Down

0 comments on commit 28ad7b7

Please sign in to comment.