From cce93e898a9792ca0d31f95b1935a7fa2513950a Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 6 Nov 2023 01:06:28 +0100 Subject: [PATCH] improve drain and fix is_limited --- src/udx.c | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/udx.c b/src/udx.c index 1f99f66c..8c05e308 100644 --- a/src/udx.c +++ b/src/udx.c @@ -116,6 +116,17 @@ addr_to_v4 (struct sockaddr_in6 *addr) { memcpy(addr, &in, sizeof(in)); } +static inline uint32_t +max_payload (udx_stream_t *stream) { + assert(stream->mtu > (AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE)); + return stream->mtu - (stream->remote_addr.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE); +} + +static inline uint32_t +cwnd_in_bytes (udx_stream_t *stream) { + return stream->cwnd * max_payload(stream); +} + static void on_uv_poll (uv_poll_t *handle, int status, int events); @@ -401,7 +412,7 @@ on_bytes_acked (udx_stream_t* stream, udx_stream_write_t *w, size_t bytes) { w->bytes -= bytes; stream->writes_queued_bytes -= bytes; - if (stream->hit_high_watermark && stream->writes_queued_bytes < UDX_HIGH_WATERMARK) { + if (stream->hit_high_watermark && stream->writes_queued_bytes < UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) { stream->hit_high_watermark = false; if (stream->on_drain != NULL) stream->on_drain(stream); } @@ -599,12 +610,6 @@ send_state_packet (udx_stream_t *stream) { return update_poll(stream->socket); } -static inline uint32_t -max_payload (udx_stream_t *stream) { - assert(stream->mtu > (AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE)); - return stream->mtu - (stream->remote_addr.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE); -} - static int send_data_packet (udx_stream_t *stream, udx_packet_t *pkt) { if (stream->inflight + pkt->size > stream->cwnd * max_payload(stream)) { @@ -1200,7 +1205,7 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd buf_len -= UDX_HEADER_SIZE; size_t header_len = (data_offset > 0 && data_offset < buf_len) ? data_offset : buf_len; - bool is_limited = stream->inflight + 2 * max_payload(stream) < stream->cwnd * max_payload(stream); + bool is_limited = stream->writes_queued_bytes < UDX_HIGH_WATERMARK; bool sacked = (type & UDX_HEADER_SACK) ? process_sacks(stream, buf, header_len) > 0 : false; @@ -2073,7 +2078,7 @@ udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t int err = fill_window(stream); if (err < 0) return err; - if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK) { + if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) { stream->hit_high_watermark = true; return 0; } @@ -2100,7 +2105,7 @@ udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_bu int err = fill_window(stream); if (err < 0) return err; - if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK) { + if (stream->writes_queued_bytes > UDX_HIGH_WATERMARK + cwnd_in_bytes(stream)) { stream->hit_high_watermark = true; return 0; }