Skip to content

Commit

Permalink
improve drain and fix is_limited
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Nov 6, 2023
1 parent e29303d commit cce93e8
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ addr_to_v4 (struct sockaddr_in6 *addr) {
memcpy(addr, &in, sizeof(in));
}

static inline uint32_t
max_payload (udx_stream_t *stream) {
assert(stream->mtu > (AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE));
return stream->mtu - (stream->remote_addr.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE);
}

static inline uint32_t
cwnd_in_bytes (udx_stream_t *stream) {
return stream->cwnd * max_payload(stream);
}

static void
on_uv_poll (uv_poll_t *handle, int status, int events);

Expand Down Expand Up @@ -401,7 +412,7 @@ on_bytes_acked (udx_stream_t* stream, udx_stream_write_t *w, size_t bytes) {
w->bytes -= bytes;
stream->writes_queued_bytes -= bytes;

if (stream->hit_high_watermark && stream->writes_queued_bytes < UDX_HIGH_WATERMARK) {
if (stream->hit_high_watermark && stream->writes_queued_bytes < UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) {
stream->hit_high_watermark = false;
if (stream->on_drain != NULL) stream->on_drain(stream);
}
Expand Down Expand Up @@ -599,12 +610,6 @@ send_state_packet (udx_stream_t *stream) {
return update_poll(stream->socket);
}

static inline uint32_t
max_payload (udx_stream_t *stream) {
assert(stream->mtu > (AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE));
return stream->mtu - (stream->remote_addr.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE);
}

static int
send_data_packet (udx_stream_t *stream, udx_packet_t *pkt) {
if (stream->inflight + pkt->size > stream->cwnd * max_payload(stream)) {
Expand Down Expand Up @@ -1200,7 +1205,7 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
buf_len -= UDX_HEADER_SIZE;

size_t header_len = (data_offset > 0 && data_offset < buf_len) ? data_offset : buf_len;
bool is_limited = stream->inflight + 2 * max_payload(stream) < stream->cwnd * max_payload(stream);
bool is_limited = stream->writes_queued_bytes < UDX_HIGH_WATERMARK;

bool sacked = (type & UDX_HEADER_SACK) ? process_sacks(stream, buf, header_len) > 0 : false;

Expand Down Expand Up @@ -2073,7 +2078,7 @@ udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t
int err = fill_window(stream);
if (err < 0) return err;

if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK) {
if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) {
stream->hit_high_watermark = true;
return 0;
}
Expand All @@ -2100,7 +2105,7 @@ udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_bu
int err = fill_window(stream);
if (err < 0) return err;

if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK) {
if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) {
stream->hit_high_watermark = true;
return 0;
}
Expand Down

0 comments on commit cce93e8

Please sign in to comment.