diff --git a/include/udx.h b/include/udx.h index 9ab71d5f..fd59b9d8 100644 --- a/include/udx.h +++ b/include/udx.h @@ -102,17 +102,17 @@ typedef struct udx_lookup_s udx_lookup_t; typedef struct udx_interface_event_s udx_interface_event_t; typedef void (*udx_socket_send_cb)(udx_socket_send_t *req, int status); -typedef void (*udx_socket_recv_cb)(udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from); -typedef void (*udx_socket_close_cb)(udx_socket_t *handle); +typedef void (*udx_socket_recv_cb)(udx_socket_t *socket, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from); +typedef void (*udx_socket_close_cb)(udx_socket_t *socket); -typedef int (*udx_stream_firewall_cb)(udx_stream_t *handle, udx_socket_t *socket, const struct sockaddr *from); -typedef void (*udx_stream_read_cb)(udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf); -typedef void (*udx_stream_drain_cb)(udx_stream_t *handle); -typedef void (*udx_stream_remote_changed_cb)(udx_stream_t *handle); +typedef int (*udx_stream_firewall_cb)(udx_stream_t *stream, udx_socket_t *socket, const struct sockaddr *from); +typedef void (*udx_stream_read_cb)(udx_stream_t *stream, ssize_t read_len, const uv_buf_t *buf); +typedef void (*udx_stream_drain_cb)(udx_stream_t *stream); +typedef void (*udx_stream_remote_changed_cb)(udx_stream_t *stream); typedef void (*udx_stream_ack_cb)(udx_stream_write_t *req, int status, int unordered); typedef void (*udx_stream_send_cb)(udx_stream_send_t *req, int status); -typedef void (*udx_stream_recv_cb)(udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf); -typedef void (*udx_stream_close_cb)(udx_stream_t *handle, int status); +typedef void (*udx_stream_recv_cb)(udx_stream_t *stream, ssize_t read_len, const uv_buf_t *buf); +typedef void (*udx_stream_close_cb)(udx_stream_t *stream, int status); typedef void (*udx_lookup_cb)(udx_lookup_t *handle, int status, const struct sockaddr *addr, int addr_len); @@ -135,7 +135,7 @@ struct udx_s { }; struct udx_socket_s { - uv_udp_t socket; + uv_udp_t handle; uv_poll_t io_poll; udx_fifo_t send_queue; @@ -175,7 +175,7 @@ struct udx_stream_s { int set_id; int status; int out_of_order; - int recovery; + int recovery; // number of packets to send before recovery finished int deferred_ack; bool hit_high_watermark; @@ -235,7 +235,7 @@ struct udx_stream_s { uint32_t pkts_inflight; // packets inflight to the other peer uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)? uint32_t retransmits_waiting; // how many retransmits are waiting to be sent? if 0, then inflight iteration is faster - uint32_t seq_flushed; // highest seq that has been flushed + uint32_t seq_flushed; // highest seq that has been flushed. usually seq_flushed == seq // timestamps... uint64_t rto_timeout; @@ -287,7 +287,7 @@ struct udx_packet_s { struct udx_socket_send_s { udx_packet_t pkt; - udx_socket_t *handle; + udx_socket_t *socket; udx_socket_send_cb on_send; @@ -299,7 +299,7 @@ struct udx_stream_write_s { uv_buf_t buf; bool is_write_end; - udx_stream_t *handle; + udx_stream_t *stream; udx_stream_ack_cb on_ack; void *data; @@ -307,7 +307,7 @@ struct udx_stream_write_s { struct udx_stream_send_s { udx_packet_t pkt; - udx_stream_t *handle; + udx_stream_t *stream; udx_stream_send_cb on_send; @@ -338,114 +338,114 @@ struct udx_interface_event_s { }; int -udx_init (uv_loop_t *loop, udx_t *handle); +udx_init (uv_loop_t *loop, udx_t *udx); int -udx_socket_init (udx_t *handle, udx_socket_t *socket); +udx_socket_init (udx_t *udx, udx_socket_t *socket); int -udx_socket_get_send_buffer_size (udx_socket_t *handle, int *value); +udx_socket_get_send_buffer_size (udx_socket_t *socket, int *value); int -udx_socket_set_send_buffer_size (udx_socket_t *handle, int value); +udx_socket_set_send_buffer_size (udx_socket_t *socket, int value); int -udx_socket_get_recv_buffer_size (udx_socket_t *handle, int *value); +udx_socket_get_recv_buffer_size (udx_socket_t *socket, int *value); int -udx_socket_set_recv_buffer_size (udx_socket_t *handle, int value); +udx_socket_set_recv_buffer_size (udx_socket_t *socket, int value); int -udx_socket_get_ttl (udx_socket_t *handle, int *ttl); +udx_socket_get_ttl (udx_socket_t *socket, int *ttl); int -udx_socket_set_ttl (udx_socket_t *handle, int ttl); +udx_socket_set_ttl (udx_socket_t *socket, int ttl); int -udx_socket_bind (udx_socket_t *handle, const struct sockaddr *addr, unsigned int flags); +udx_socket_bind (udx_socket_t *socket, const struct sockaddr *addr, unsigned int flags); int -udx_socket_getsockname (udx_socket_t *handle, struct sockaddr *name, int *name_len); +udx_socket_getsockname (udx_socket_t *socket, struct sockaddr *name, int *name_len); int -udx_socket_send (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *addr, udx_socket_send_cb cb); +udx_socket_send (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *addr, udx_socket_send_cb cb); int -udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *addr, int ttl, udx_socket_send_cb cb); +udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *addr, int ttl, udx_socket_send_cb cb); int -udx_socket_recv_start (udx_socket_t *handle, udx_socket_recv_cb cb); +udx_socket_recv_start (udx_socket_t *socket, udx_socket_recv_cb cb); int -udx_socket_recv_stop (udx_socket_t *handle); +udx_socket_recv_stop (udx_socket_t *socket); int -udx_socket_close (udx_socket_t *handle, udx_socket_close_cb cb); +udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb); // only exposed here as a convenience / debug tool - the udx instance uses this automatically int -udx_check_timeouts (udx_t *handle); +udx_check_timeouts (udx_t *udx); int -udx_stream_init (udx_t *udx, udx_stream_t *handle, uint32_t local_id, udx_stream_close_cb close_cb); +udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream_close_cb close_cb); int -udx_stream_get_mtu (udx_stream_t *handle, uint16_t *mtu); +udx_stream_get_mtu (udx_stream_t *stream, uint16_t *mtu); int -udx_stream_get_seq (udx_stream_t *handle, uint32_t *seq); +udx_stream_get_seq (udx_stream_t *stream, uint32_t *seq); int -udx_stream_set_seq (udx_stream_t *handle, uint32_t seq); +udx_stream_set_seq (udx_stream_t *stream, uint32_t seq); int -udx_stream_get_ack (udx_stream_t *handle, uint32_t *ack); +udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack); int -udx_stream_set_ack (udx_stream_t *handle, uint32_t ack); +udx_stream_set_ack (udx_stream_t *stream, uint32_t ack); int -udx_stream_connect (udx_stream_t *handle, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr); +udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr); int udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr, udx_stream_remote_changed_cb remote_changed_cb); int -udx_stream_relay_to (udx_stream_t *handle, udx_stream_t *destination); +udx_stream_relay_to (udx_stream_t *stream, udx_stream_t *destination); int -udx_stream_firewall (udx_stream_t *handle, udx_stream_firewall_cb firewall_cb); +udx_stream_firewall (udx_stream_t *stream, udx_stream_firewall_cb firewall_cb); int -udx_stream_recv_start (udx_stream_t *handle, udx_stream_recv_cb cb); +udx_stream_recv_start (udx_stream_t *stream, udx_stream_recv_cb cb); int -udx_stream_recv_stop (udx_stream_t *handle); +udx_stream_recv_stop (udx_stream_t *stream); int -udx_stream_read_start (udx_stream_t *handle, udx_stream_read_cb cb); +udx_stream_read_start (udx_stream_t *stream, udx_stream_read_cb cb); int -udx_stream_read_stop (udx_stream_t *handle); +udx_stream_read_stop (udx_stream_t *stream); // only exposed here as a convenience / debug tool - the udx instance uses this automatically int -udx_stream_check_timeouts (udx_stream_t *handle); +udx_stream_check_timeouts (udx_stream_t *stream); int -udx_stream_send (udx_stream_send_t *req, udx_stream_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb); +udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb); int -udx_stream_write_resume (udx_stream_t *handle, udx_stream_drain_cb drain_cb); +udx_stream_write_resume (udx_stream_t *stream, udx_stream_drain_cb drain_cb); int -udx_stream_write (udx_stream_write_t *req, udx_stream_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb); +udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb); int -udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb); +udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb); int -udx_stream_destroy (udx_stream_t *handle); +udx_stream_destroy (udx_stream_t *stream); int udx_lookup (uv_loop_t *loop, udx_lookup_t *req, const char *host, unsigned int flags, udx_lookup_cb cb); diff --git a/src/debug.h b/src/debug.h index be63ab73..70bdcca1 100644 --- a/src/debug.h +++ b/src/debug.h @@ -33,11 +33,6 @@ debug_print_cwnd_stats (udx_stream_t *stream) { static void debug_print_outgoing (udx_stream_t *stream) { if (DEBUG) { - uint32_t i = stream->seq_flushed - stream->remote_acked; - uint32_t j = stream->seq - stream->seq_flushed; - - debug_printf("%-*s%-*s%s\n", i, "RA", j, "SF", "Seq"); - for (uint32_t s = stream->remote_acked; s < stream->seq; s++) { udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&stream->outgoing, s); if (pkt == NULL) { @@ -45,15 +40,15 @@ debug_print_outgoing (udx_stream_t *stream) { continue; } - if (pkt->type == UDX_PACKET_INFLIGHT) { + if (pkt->status == UDX_PACKET_INFLIGHT) { debug_printf("I"); continue; } - if (pkt->type == UDX_PACKET_SENDING) { + if (pkt->status == UDX_PACKET_SENDING) { debug_printf("S"); continue; } - if (pkt->type == UDX_PACKET_WAITING) { + if (pkt->status == UDX_PACKET_WAITING) { debug_printf("W"); continue; } diff --git a/src/io_win.c b/src/io_win.c index 0a6081dc..fd210407 100644 --- a/src/io_win.c +++ b/src/io_win.c @@ -38,11 +38,11 @@ udx__get_link_mtu (const struct sockaddr *addr) { } ssize_t -udx__sendmsg (udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, struct sockaddr *addr, int addr_len) { +udx__sendmsg (udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, struct sockaddr *addr, int addr_len) { DWORD bytes, flags = 0; int result = WSASendTo( - handle->socket.socket, + socket->handle.socket, (WSABUF *) bufs, bufs_len, &bytes, @@ -61,11 +61,11 @@ udx__sendmsg (udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len } ssize_t -udx__recvmsg (udx_socket_t *handle, uv_buf_t *buf, struct sockaddr *addr, int addr_len) { +udx__recvmsg (udx_socket_t *socket, uv_buf_t *buf, struct sockaddr *addr, int addr_len) { DWORD bytes, flags = 0; int result = WSARecvFrom( - handle->socket.socket, + socket->handle.socket, (WSABUF *) buf, 1, &bytes, diff --git a/src/udx.c b/src/udx.c index 1f99f66c..1976606f 100644 --- a/src/udx.c +++ b/src/udx.c @@ -47,7 +47,7 @@ #define UDX_CONG_INIT_CWND 3 #define UDX_CONG_MAX_CWND 65536 -#define UDX_HIGH_WATERMARK 262144 +#define UDX_HIGH_WATERMARK 262144 typedef struct { uint32_t seq; // must be the first entry, so its compat with the cirbuf @@ -201,29 +201,29 @@ on_udx_timer_close (uv_handle_t *handle) { } void -udx__close_handles (udx_socket_t *handle) { - if (handle->status & UDX_SOCKET_CLOSING_HANDLES) return; - handle->status |= UDX_SOCKET_CLOSING_HANDLES; +udx__close_handles (udx_socket_t *socket) { + if (socket->status & UDX_SOCKET_CLOSING_HANDLES) return; + socket->status |= UDX_SOCKET_CLOSING_HANDLES; - if (handle->status & UDX_SOCKET_BOUND) { - handle->pending_closes++; - uv_poll_stop(&(handle->io_poll)); - uv_close((uv_handle_t *) &(handle->io_poll), on_uv_close); + if (socket->status & UDX_SOCKET_BOUND) { + socket->pending_closes++; + uv_poll_stop(&(socket->io_poll)); + uv_close((uv_handle_t *) &(socket->io_poll), on_uv_close); } - handle->pending_closes += 2; // one below and one in trigger_socket_close - uv_close((uv_handle_t *) &(handle->socket), on_uv_close); + socket->pending_closes += 2; // one below and one in trigger_socket_close + uv_close((uv_handle_t *) &(socket->handle), on_uv_close); - udx_t *udx = handle->udx; + udx_t *udx = socket->udx; udx->sockets--; if (udx->sockets > 0 || udx->timer_closed_by) { - trigger_socket_close(handle); + trigger_socket_close(socket); return; } - udx->timer_closed_by = handle; + udx->timer_closed_by = socket; uv_timer_stop(&(udx->timer)); uv_close((uv_handle_t *) &(udx->timer), on_udx_timer_close); @@ -397,7 +397,7 @@ clear_incoming_packets (udx_stream_t *stream) { } static void -on_bytes_acked (udx_stream_t* stream, udx_stream_write_t *w, size_t bytes) { +on_bytes_acked (udx_stream_t *stream, udx_stream_write_t *w, size_t bytes) { w->bytes -= bytes; stream->writes_queued_bytes -= bytes; @@ -707,6 +707,7 @@ fill_window (udx_stream_t *stream) { pkt->ctx = w; stream->seq++; + stream->seq_flushed = stream->seq; udx__cirbuf_set(&stream->outgoing, (udx_cirbuf_val_t *) pkt); @@ -721,9 +722,6 @@ fill_window (udx_stream_t *stream) { } */ - assert(seq_compare(stream->seq_flushed, pkt->seq) <= 0); - stream->seq_flushed = pkt->seq + 1; - pkt->send_queue = &stream->socket->send_queue; pkt->fifo_gc = udx__fifo_push(&stream->socket->send_queue, pkt); @@ -861,8 +859,6 @@ rack_detect_loss (udx_stream_t *stream) { stream->pkts_inflight--; stream->retransmits_waiting++; - debug_printf("rack to on seq=%d\n", seq); - if (seq_was_probe(stream, seq)) { mtu_probes_lost++; if (seq == stream->mtu_probe_seq[stream->mtu_probe_count - 1] && stream->mtu_state == UDX_MTU_STATE_SEARCH) { @@ -897,7 +893,7 @@ rack_detect_loss (udx_stream_t *stream) { // only reduce congestion window if more than just the mtu probe was lost reduce_cwnd(stream, false); - debug_printf("fast recovery: started, recovery=%u inflight=%zu cwnd=%u acked=%u, seq=%u srtt=%u\n", stream->recovery, stream->inflight, stream->cwnd, stream->remote_acked, stream->seq_flushed, stream->srtt); + debug_printf("fast recovery: start=[%u:%u] (%u pkts) inflight=%zu cwnd=%u srtt=%u\n", stream->remote_acked, stream->seq_flushed, stream->recovery, stream->inflight, stream->cwnd, stream->srtt); } fill_window(stream); @@ -1070,6 +1066,7 @@ process_sacks (udx_stream_t *stream, char *buf, size_t buf_len) { uint32_t start = udx__swap_uint32_if_be(*(sacks++)); uint32_t end = udx__swap_uint32_if_be(*(sacks++)); int32_t len = seq_diff(end, start); + debug_printf("sack received [%u:%u] len=%d\n", start, end, len); for (int32_t j = 0; j < len; j++) { int a = ack_packet(stream, start + j, 1); @@ -1344,7 +1341,7 @@ udx__trigger_send_callback (udx_packet_t *pkt) { if (pkt->type == UDX_PACKET_STREAM_SEND) { udx_stream_send_t *req = pkt->ctx; - udx_stream_t *stream = req->handle; + udx_stream_t *stream = req->stream; remove_next(&(stream->unordered)); if (req->on_send != NULL) { @@ -1434,17 +1431,17 @@ udx_check_timeouts (udx_t *handle) { } int -udx_socket_init (udx_t *udx, udx_socket_t *handle) { +udx_socket_init (udx_t *udx, udx_socket_t *socket) { ref_inc(udx); - handle->family = 0; - handle->status = 0; - handle->events = 0; - handle->pending_closes = 0; - handle->ttl = UDX_DEFAULT_TTL; + socket->family = 0; + socket->status = 0; + socket->events = 0; + socket->pending_closes = 0; + socket->ttl = UDX_DEFAULT_TTL; - handle->udx = udx; - handle->streams_by_id = &(udx->streams_by_id); + socket->udx = udx; + socket->streams_by_id = &(udx->streams_by_id); udx->sockets++; @@ -1455,122 +1452,122 @@ udx_socket_init (udx_t *udx, udx_socket_t *handle) { udx_start_timer(udx); } - handle->on_recv = NULL; - handle->on_close = NULL; + socket->on_recv = NULL; + socket->on_close = NULL; - udx__fifo_init(&(handle->send_queue), 16); + udx__fifo_init(&(socket->send_queue), 16); - uv_udp_t *socket = &(handle->socket); + uv_udp_t *handle = &(socket->handle); // Asserting all the errors here as it massively simplifies error handling. // In practice these will never fail. - int err = uv_udp_init(udx->loop, socket); + int err = uv_udp_init(udx->loop, handle); assert(err == 0); - socket->data = handle; + handle->data = socket; return err; } int -udx_socket_get_send_buffer_size (udx_socket_t *handle, int *value) { +udx_socket_get_send_buffer_size (udx_socket_t *socket, int *value) { *value = 0; - return uv_send_buffer_size((uv_handle_t *) &(handle->socket), value); + return uv_send_buffer_size((uv_handle_t *) &(socket->handle), value); } int -udx_socket_set_send_buffer_size (udx_socket_t *handle, int value) { +udx_socket_set_send_buffer_size (udx_socket_t *socket, int value) { if (value < 1) return UV_EINVAL; - return uv_send_buffer_size((uv_handle_t *) &(handle->socket), &value); + return uv_send_buffer_size((uv_handle_t *) &(socket->handle), &value); } int -udx_socket_get_recv_buffer_size (udx_socket_t *handle, int *value) { +udx_socket_get_recv_buffer_size (udx_socket_t *socket, int *value) { *value = 0; - return uv_recv_buffer_size((uv_handle_t *) &(handle->socket), value); + return uv_recv_buffer_size((uv_handle_t *) &(socket->handle), value); } int -udx_socket_set_recv_buffer_size (udx_socket_t *handle, int value) { +udx_socket_set_recv_buffer_size (udx_socket_t *socket, int value) { if (value < 1) return UV_EINVAL; - return uv_recv_buffer_size((uv_handle_t *) &(handle->socket), &value); + return uv_recv_buffer_size((uv_handle_t *) &(socket->handle), &value); } int -udx_socket_get_ttl (udx_socket_t *handle, int *ttl) { - *ttl = handle->ttl; +udx_socket_get_ttl (udx_socket_t *socket, int *ttl) { + *ttl = socket->ttl; return 0; } int -udx_socket_set_ttl (udx_socket_t *handle, int ttl) { +udx_socket_set_ttl (udx_socket_t *socket, int ttl) { if (ttl < 1 || ttl > 255) return UV_EINVAL; - handle->ttl = ttl; - return uv_udp_set_ttl((uv_udp_t *) &(handle->socket), ttl); + socket->ttl = ttl; + return uv_udp_set_ttl((uv_udp_t *) &(socket->handle), ttl); } int -udx_socket_bind (udx_socket_t *handle, const struct sockaddr *addr, unsigned int flags) { - uv_udp_t *socket = &(handle->socket); - uv_poll_t *poll = &(handle->io_poll); +udx_socket_bind (udx_socket_t *socket, const struct sockaddr *addr, unsigned int flags) { + uv_udp_t *handle = &(socket->handle); + uv_poll_t *poll = &(socket->io_poll); uv_os_fd_t fd; if (addr->sa_family == AF_INET) { - handle->family = 4; + socket->family = 4; } else if (addr->sa_family == AF_INET6) { - handle->family = 6; + socket->family = 6; } else { return UV_EINVAL; } // This might actually fail in practice, so - int err = uv_udp_bind(socket, addr, flags); + int err = uv_udp_bind(handle, addr, flags); if (err) return err; // Asserting all the errors here as it massively simplifies error handling // and in practice non of these will fail, as all our handles are valid and alive. - err = uv_udp_set_ttl(socket, handle->ttl); + err = uv_udp_set_ttl(handle, socket->ttl); assert(err == 0); int send_buffer_size = UDX_DEFAULT_BUFFER_SIZE; - err = uv_send_buffer_size((uv_handle_t *) socket, &send_buffer_size); + err = uv_send_buffer_size((uv_handle_t *) handle, &send_buffer_size); assert(err == 0); int recv_buffer_size = UDX_DEFAULT_BUFFER_SIZE; - err = uv_recv_buffer_size((uv_handle_t *) socket, &recv_buffer_size); + err = uv_recv_buffer_size((uv_handle_t *) handle, &recv_buffer_size); assert(err == 0); - err = uv_fileno((const uv_handle_t *) socket, &fd); + err = uv_fileno((const uv_handle_t *) handle, &fd); assert(err == 0); - err = uv_poll_init_socket(handle->udx->loop, poll, fd); + err = uv_poll_init_socket(socket->udx->loop, poll, fd); assert(err == 0); - handle->status |= UDX_SOCKET_BOUND; - poll->data = handle; + socket->status |= UDX_SOCKET_BOUND; + poll->data = socket; - return update_poll(handle); + return update_poll(socket); } int -udx_socket_getsockname (udx_socket_t *handle, struct sockaddr *name, int *name_len) { - return uv_udp_getsockname(&(handle->socket), name, name_len); +udx_socket_getsockname (udx_socket_t *socket, struct sockaddr *name, int *name_len) { + return uv_udp_getsockname(&(socket->handle), name, name_len); } int -udx_socket_send (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *dest, udx_socket_send_cb cb) { - return udx_socket_send_ttl(req, handle, bufs, bufs_len, dest, 0, cb); +udx_socket_send (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *dest, udx_socket_send_cb cb) { + return udx_socket_send_ttl(req, socket, bufs, bufs_len, dest, 0, cb); } int -udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *dest, int ttl, udx_socket_send_cb cb) { +udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *dest, int ttl, udx_socket_send_cb cb) { if (ttl < 0 /* 0 is "default" */ || ttl > 255) return UV_EINVAL; assert(bufs_len == 1); - req->handle = handle; + req->socket = socket; req->on_send = cb; udx_packet_t *pkt = &(req->pkt); @@ -1597,44 +1594,44 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_ pkt->bufs[0] = bufs[0]; - pkt->send_queue = &handle->send_queue; - pkt->fifo_gc = udx__fifo_push(&(handle->send_queue), pkt); - return update_poll(handle); + pkt->send_queue = &socket->send_queue; + pkt->fifo_gc = udx__fifo_push(&socket->send_queue, pkt); + return update_poll(socket); } int -udx_socket_recv_start (udx_socket_t *handle, udx_socket_recv_cb cb) { - if (handle->status & UDX_SOCKET_RECEIVING) return UV_EALREADY; +udx_socket_recv_start (udx_socket_t *socket, udx_socket_recv_cb cb) { + if (socket->status & UDX_SOCKET_RECEIVING) return UV_EALREADY; - handle->on_recv = cb; - handle->status |= UDX_SOCKET_RECEIVING; + socket->on_recv = cb; + socket->status |= UDX_SOCKET_RECEIVING; - return update_poll(handle); + return update_poll(socket); } int -udx_socket_recv_stop (udx_socket_t *handle) { - if ((handle->status & UDX_SOCKET_RECEIVING) == 0) return 0; +udx_socket_recv_stop (udx_socket_t *socket) { + if ((socket->status & UDX_SOCKET_RECEIVING) == 0) return 0; - handle->on_recv = NULL; - handle->status ^= UDX_SOCKET_RECEIVING; + socket->on_recv = NULL; + socket->status ^= UDX_SOCKET_RECEIVING; - return update_poll(handle); + return update_poll(socket); } int -udx_socket_close (udx_socket_t *handle, udx_socket_close_cb cb) { - // if (handle->streams_len > 0) return UV_EBUSY; +udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) { + // if (socket->streams_len > 0) return UV_EBUSY; - handle->status |= UDX_SOCKET_CLOSING; + socket->status |= UDX_SOCKET_CLOSING; - handle->on_close = cb; + socket->on_close = cb; // allow stream packets to flush, but cancel anything else - int queued = handle->send_queue.len; + int queued = socket->send_queue.len; while (queued--) { - udx_packet_t *pkt = udx__fifo_shift(&(handle->send_queue)); + udx_packet_t *pkt = udx__fifo_shift(&(socket->send_queue)); if (pkt == NULL) break; if (pkt->type == UDX_PACKET_SEND) { @@ -1650,264 +1647,255 @@ udx_socket_close (udx_socket_t *handle, udx_socket_close_cb cb) { // stream packet, allow them to flush, by requeueing them // flips the order but these are all state packets so whatevs - pkt->send_queue = &handle->send_queue; - pkt->fifo_gc = udx__fifo_push(&handle->send_queue, pkt); + pkt->send_queue = &socket->send_queue; + pkt->fifo_gc = udx__fifo_push(&socket->send_queue, pkt); } - if (handle->send_queue.len == 0) { - udx__close_handles(handle); + if (socket->send_queue.len == 0) { + udx__close_handles(socket); } return 0; } int -udx_stream_init (udx_t *udx, udx_stream_t *handle, uint32_t local_id, udx_stream_close_cb close_cb) { +udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream_close_cb close_cb) { ref_inc(udx); - handle->local_id = local_id; - handle->remote_id = 0; - handle->set_id = 0; - handle->status = 0; - handle->out_of_order = 0; - handle->recovery = 0; - handle->socket = NULL; - handle->relayed = false; - handle->relay_to = NULL; - handle->udx = udx; - - handle->reordering_seen = false; - handle->retransmitting = 0; - - handle->hit_high_watermark = false; - handle->writes_queued_bytes = 0; - - handle->remote_changing = false; - handle->on_remote_changed = NULL; - handle->seq_on_remote_changed = 0; - - handle->mtu = UDX_MTU_BASE; - handle->mtu_state = UDX_MTU_STATE_BASE; - handle->mtu_probe_wanted = false; - handle->mtu_probe_count = 0; - handle->mtu_probe_size = UDX_MTU_BASE; // starts with first ack, counts as a confirmation of base - handle->mtu_max = UDX_MTU_MAX; // revised in connect() - - handle->seq = 0; - handle->ack = 0; - handle->remote_acked = 0; - - handle->srtt = 0; - handle->rttvar = 0; - handle->rto = 1000; - handle->rto_timeout = get_milliseconds() + handle->rto; - handle->rack_timeout = 0; - - handle->rack_rtt_min = 0; - handle->rack_rtt = 0; - handle->rack_time_sent = 0; - handle->rack_next_seq = 0; - handle->rack_fack = 0; - - handle->deferred_ack = 0; - - handle->pkts_waiting = 0; - handle->pkts_inflight = 0; - handle->pkts_buffered = 0; - handle->retransmits_waiting = 0; - handle->seq_flushed = 0; - - handle->sacks = 0; - handle->inflight = 0; - handle->ssthresh = 255; - handle->cwnd = UDX_CONG_INIT_CWND; - handle->cwnd_cnt = 0; - handle->rwnd = 0; - - handle->on_firewall = NULL; - handle->on_read = NULL; - handle->on_recv = NULL; - handle->on_drain = NULL; - handle->on_close = close_cb; + stream->local_id = local_id; + stream->remote_id = 0; + stream->set_id = 0; + stream->status = 0; + stream->out_of_order = 0; + stream->recovery = 0; + stream->socket = NULL; + stream->relayed = false; + stream->relay_to = NULL; + stream->udx = udx; + + stream->reordering_seen = false; + stream->retransmitting = 0; + + stream->hit_high_watermark = false; + stream->writes_queued_bytes = 0; + + stream->remote_changing = false; + stream->on_remote_changed = NULL; + stream->seq_on_remote_changed = 0; + + stream->mtu = UDX_MTU_BASE; + stream->mtu_state = UDX_MTU_STATE_BASE; + stream->mtu_probe_wanted = false; + stream->mtu_probe_count = 0; + stream->mtu_probe_size = UDX_MTU_BASE; // starts with first ack, counts as a confirmation of base + stream->mtu_max = UDX_MTU_MAX; // revised in connect() + + stream->seq = 0; + stream->ack = 0; + stream->remote_acked = 0; + + stream->srtt = 0; + stream->rttvar = 0; + stream->rto = 1000; + stream->rto_timeout = get_milliseconds() + stream->rto; + stream->rack_timeout = 0; + + stream->rack_rtt_min = 0; + stream->rack_rtt = 0; + stream->rack_time_sent = 0; + stream->rack_next_seq = 0; + stream->rack_fack = 0; + + stream->deferred_ack = 0; + + stream->pkts_waiting = 0; + stream->pkts_inflight = 0; + stream->pkts_buffered = 0; + stream->retransmits_waiting = 0; + stream->seq_flushed = 0; + + stream->sacks = 0; + stream->inflight = 0; + stream->ssthresh = 255; + stream->cwnd = UDX_CONG_INIT_CWND; + stream->cwnd_cnt = 0; + stream->rwnd = 0; + + stream->on_firewall = NULL; + stream->on_read = NULL; + stream->on_recv = NULL; + stream->on_drain = NULL; + stream->on_close = close_cb; // Clear congestion state - memset(&(handle->cong), 0, sizeof(udx_cong_t)); + memset(&(stream->cong), 0, sizeof(udx_cong_t)); - udx__cirbuf_init(&(handle->relaying_streams), 2); + udx__cirbuf_init(&(stream->relaying_streams), 2); // Init stream write/read buffers - udx__cirbuf_init(&(handle->outgoing), 16); - udx__cirbuf_init(&(handle->incoming), 16); - udx__fifo_init(&(handle->unordered), 1); + udx__cirbuf_init(&(stream->outgoing), 16); + udx__cirbuf_init(&(stream->incoming), 16); + udx__fifo_init(&(stream->unordered), 1); - udx__fifo_init(&handle->write_queue, 1); + udx__fifo_init(&stream->write_queue, 1); - handle->set_id = udx->streams_len++; + stream->set_id = udx->streams_len++; if (udx->streams_len == udx->streams_max_len) { udx->streams_max_len *= 2; udx->streams = realloc(udx->streams, udx->streams_max_len * sizeof(udx_stream_t *)); } - udx->streams[handle->set_id] = handle; + udx->streams[stream->set_id] = stream; // Add the socket to the active set - udx__cirbuf_set(&(udx->streams_by_id), (udx_cirbuf_val_t *) handle); + udx__cirbuf_set(&(udx->streams_by_id), (udx_cirbuf_val_t *) stream); return 0; } int -udx_stream_get_mtu (udx_stream_t *handle, uint16_t *mtu) { - *mtu = handle->mtu; +udx_stream_get_mtu (udx_stream_t *stream, uint16_t *mtu) { + *mtu = stream->mtu; return 0; } int -udx_stream_get_seq (udx_stream_t *handle, uint32_t *seq) { - *seq = handle->seq; +udx_stream_get_seq (udx_stream_t *stream, uint32_t *seq) { + *seq = stream->seq; return 0; } int -udx_stream_set_seq (udx_stream_t *handle, uint32_t seq) { - handle->seq = handle->seq_flushed = seq; +udx_stream_set_seq (udx_stream_t *stream, uint32_t seq) { + stream->seq = stream->seq_flushed = seq; return 0; } int -udx_stream_get_ack (udx_stream_t *handle, uint32_t *ack) { - *ack = handle->ack; +udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack) { + *ack = stream->ack; return 0; } int -udx_stream_set_ack (udx_stream_t *handle, uint32_t ack) { - handle->ack = ack; +udx_stream_set_ack (udx_stream_t *stream, uint32_t ack) { + stream->ack = ack; return 0; } int -udx_stream_firewall (udx_stream_t *handle, udx_stream_firewall_cb cb) { - handle->on_firewall = cb; +udx_stream_firewall (udx_stream_t *stream, udx_stream_firewall_cb cb) { + stream->on_firewall = cb; return 0; } int -udx_stream_recv_start (udx_stream_t *handle, udx_stream_recv_cb cb) { - if (handle->status & UDX_STREAM_RECEIVING) return UV_EALREADY; +udx_stream_recv_start (udx_stream_t *stream, udx_stream_recv_cb cb) { + if (stream->status & UDX_STREAM_RECEIVING) return UV_EALREADY; - handle->on_recv = cb; - handle->status |= UDX_STREAM_RECEIVING; + stream->on_recv = cb; + stream->status |= UDX_STREAM_RECEIVING; - return handle->socket == NULL ? 0 : update_poll(handle->socket); + return stream->socket == NULL ? 0 : update_poll(stream->socket); } int -udx_stream_recv_stop (udx_stream_t *handle) { - if ((handle->status & UDX_STREAM_RECEIVING) == 0) return 0; +udx_stream_recv_stop (udx_stream_t *stream) { + if ((stream->status & UDX_STREAM_RECEIVING) == 0) return 0; - handle->on_recv = NULL; - handle->status ^= UDX_STREAM_RECEIVING; + stream->on_recv = NULL; + stream->status ^= UDX_STREAM_RECEIVING; - return handle->socket == NULL ? 0 : update_poll(handle->socket); + return stream->socket == NULL ? 0 : update_poll(stream->socket); } int -udx_stream_read_start (udx_stream_t *handle, udx_stream_read_cb cb) { - if (handle->status & UDX_STREAM_READING) return UV_EALREADY; +udx_stream_read_start (udx_stream_t *stream, udx_stream_read_cb cb) { + if (stream->status & UDX_STREAM_READING) return UV_EALREADY; - handle->on_read = cb; - handle->status |= UDX_STREAM_READING; + stream->on_read = cb; + stream->status |= UDX_STREAM_READING; - return handle->socket == NULL ? 0 : update_poll(handle->socket); + return stream->socket == NULL ? 0 : update_poll(stream->socket); } int -udx_stream_read_stop (udx_stream_t *handle) { - if ((handle->status & UDX_STREAM_READING) == 0) return 0; +udx_stream_read_stop (udx_stream_t *stream) { + if ((stream->status & UDX_STREAM_READING) == 0) return 0; - handle->on_read = NULL; - handle->status ^= UDX_STREAM_READING; + stream->on_read = NULL; + stream->status ^= UDX_STREAM_READING; - return handle->socket == NULL ? 0 : update_poll(handle->socket); + return stream->socket == NULL ? 0 : update_poll(stream->socket); } static void -check_deferred_ack (udx_stream_t *handle) { - if (handle->deferred_ack == 0) return; - if (--(handle->deferred_ack) > 0) return; - send_state_packet(handle); +check_deferred_ack (udx_stream_t *stream) { + if (stream->deferred_ack == 0) return; + if (--(stream->deferred_ack) > 0) return; + send_state_packet(stream); } int -udx_stream_check_timeouts (udx_stream_t *handle) { - if ((handle->status & UDX_STREAM_CONNECTED) == 0) { +udx_stream_check_timeouts (udx_stream_t *stream) { + if ((stream->status & UDX_STREAM_CONNECTED) == 0) { return 0; } - if (handle->remote_acked == handle->seq && handle->write_queue.len == 0) { - check_deferred_ack(handle); + if (stream->remote_acked == stream->seq && stream->write_queue.len == 0) { + check_deferred_ack(stream); return 0; } - const uint64_t now = handle->inflight ? get_milliseconds() : 0; + const uint64_t now = stream->inflight ? get_milliseconds() : 0; - if (handle->rack_timeout > 0 && now >= handle->rack_timeout) { - rack_detect_loss(handle); + if (stream->rack_timeout > 0 && now >= stream->rack_timeout) { + rack_detect_loss(stream); } - if (now > handle->rto_timeout) { + if (now > stream->rto_timeout) { // Bail out of fast recovery mode if we are in it - handle->recovery = 0; + stream->recovery = 0; // Make sure to clear all new packets that are in the queue - unqueue_first_transmits(handle); + unqueue_first_transmits(stream); // Reduce the congestion window (full reset) - reduce_cwnd(handle, true); + reduce_cwnd(stream, true); // Ensure it backs off until data is acked... - handle->rto_timeout = now + 2 * handle->rto; + stream->rto_timeout = now + 2 * stream->rto; // Consider all packet losts - seems to be the simple consensus across different stream impls // which we like cause it is nice and simple to implement. - for (uint32_t seq = handle->remote_acked; seq != handle->seq_flushed; seq++) { - udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&(handle->outgoing), seq); + for (uint32_t seq = stream->remote_acked; seq != stream->seq_flushed; seq++) { + udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&(stream->outgoing), seq); if (pkt == NULL || pkt->status != UDX_PACKET_INFLIGHT || pkt->is_retransmit) continue; if (pkt->transmits >= UDX_MAX_TRANSMITS) { - handle->status |= UDX_STREAM_DESTROYED; - close_maybe(handle, UV_ETIMEDOUT); + stream->status |= UDX_STREAM_DESTROYED; + close_maybe(stream, UV_ETIMEDOUT); return 1; } pkt->status = UDX_PACKET_WAITING; pkt->is_retransmit = UDX_SLOW_RETRANSMIT; - handle->inflight -= pkt->size; - handle->pkts_waiting++; - handle->pkts_inflight--; - handle->retransmits_waiting++; + stream->inflight -= pkt->size; + stream->pkts_waiting++; + stream->pkts_inflight--; + stream->retransmits_waiting++; } - // todo: handle possibility of downward MTU change - // this would require re-sending in-flight packets that were too big to send. - // resizing is easier if sequence numbers are based on bytes - - // handle->mtu = UDX_MTU_BASE; - // handle->mtu_state = UDX_MTU_STATE_ERROR; - // handle->mtu_probe_count = 0; - // handle->mtu_probe_size = UDX_MTU_BASE; - - debug_printf("timeout! pkt loss detected - inflight=%zu ssthresh=%u cwnd=%u acked=%u seq=%u rtt=%u\n", handle->inflight, handle->ssthresh, handle->cwnd, handle->remote_acked, handle->seq_flushed, handle->srtt); + debug_printf("rto timeout! pkt loss detected - inflight=%zu ssthresh=%u cwnd=%u acked=%u seq=%u rtt=%u\n", stream->inflight, stream->ssthresh, stream->cwnd, stream->remote_acked, stream->seq_flushed, stream->srtt); } - check_deferred_ack(handle); + check_deferred_ack(stream); - int err = fill_window(handle); + int err = fill_window(stream); return err < 0 ? err : 0; } @@ -1965,23 +1953,23 @@ udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t r } int -udx_stream_connect (udx_stream_t *handle, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr) { - if (handle->status & UDX_STREAM_CONNECTED) { +udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr) { + if (stream->status & UDX_STREAM_CONNECTED) { return UV_EISCONN; } - handle->status |= UDX_STREAM_CONNECTED; + stream->status |= UDX_STREAM_CONNECTED; - handle->remote_id = remote_id; - handle->socket = socket; + stream->remote_id = remote_id; + stream->socket = socket; if (remote_addr->sa_family == AF_INET) { - handle->remote_addr_len = sizeof(struct sockaddr_in); + stream->remote_addr_len = sizeof(struct sockaddr_in); if (((struct sockaddr_in *) remote_addr)->sin_port == 0) { return UV_EINVAL; } } else if (remote_addr->sa_family == AF_INET6) { - handle->remote_addr_len = sizeof(struct sockaddr_in6); + stream->remote_addr_len = sizeof(struct sockaddr_in6); if (((struct sockaddr_in6 *) remote_addr)->sin6_port == 0) { return UV_EINVAL; } @@ -1989,11 +1977,11 @@ udx_stream_connect (udx_stream_t *handle, udx_socket_t *socket, uint32_t remote_ return UV_EINVAL; } - memcpy(&(handle->remote_addr), remote_addr, handle->remote_addr_len); + memcpy(&(stream->remote_addr), remote_addr, stream->remote_addr_len); - if (socket->family == 6 && handle->remote_addr.ss_family == AF_INET) { - addr_to_v6((struct sockaddr_in *) &(handle->remote_addr)); - handle->remote_addr_len = sizeof(struct sockaddr_in6); + if (socket->family == 6 && stream->remote_addr.ss_family == AF_INET) { + addr_to_v6((struct sockaddr_in *) &(stream->remote_addr)); + stream->remote_addr_len = sizeof(struct sockaddr_in6); } int mtu = udx__get_link_mtu(remote_addr); @@ -2002,34 +1990,34 @@ udx_stream_connect (udx_stream_t *handle, udx_socket_t *socket, uint32_t remote_ mtu = UDX_MTU_MAX; } - handle->mtu_max = mtu; + stream->mtu_max = mtu; - return update_poll(handle->socket); + return update_poll(stream->socket); } int -udx_stream_relay_to (udx_stream_t *handle, udx_stream_t *destination) { - if (handle->relayed || (destination->status & UDX_STREAM_CLOSED) != 0) return UV_EINVAL; +udx_stream_relay_to (udx_stream_t *stream, udx_stream_t *destination) { + if (stream->relayed || (destination->status & UDX_STREAM_CLOSED) != 0) return UV_EINVAL; - handle->relayed = true; - handle->relay_to = destination; + stream->relayed = true; + stream->relay_to = destination; - udx__cirbuf_set(&(destination->relaying_streams), (udx_cirbuf_val_t *) handle); + udx__cirbuf_set(&(destination->relaying_streams), (udx_cirbuf_val_t *) stream); return 0; } int -udx_stream_send (udx_stream_send_t *req, udx_stream_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb) { +udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb) { assert(bufs_len == 1); - req->handle = handle; + req->stream = stream; req->on_send = cb; - udx_socket_t *socket = handle->socket; + udx_socket_t *socket = stream->socket; udx_packet_t *pkt = &(req->pkt); - init_stream_packet(pkt, UDX_HEADER_MESSAGE, handle, &bufs[0]); + init_stream_packet(pkt, UDX_HEADER_MESSAGE, stream, &bufs[0]); pkt->status = UDX_PACKET_SENDING; pkt->type = UDX_PACKET_STREAM_SEND; @@ -2040,14 +2028,14 @@ udx_stream_send (udx_stream_send_t *req, udx_stream_t *handle, const uv_buf_t bu pkt->send_queue = &socket->send_queue; pkt->fifo_gc = udx__fifo_push(&socket->send_queue, pkt); - udx__fifo_push(&(handle->unordered), pkt); + udx__fifo_push(&(stream->unordered), pkt); return update_poll(socket); } int -udx_stream_write_resume (udx_stream_t *handle, udx_stream_drain_cb drain_cb) { - handle->on_drain = drain_cb; +udx_stream_write_resume (udx_stream_t *stream, udx_stream_drain_cb drain_cb) { + stream->on_drain = drain_cb; return 0; } @@ -2055,7 +2043,7 @@ int udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb) { assert(bufs_len == 1); - req->handle = stream; + req->stream = stream; req->on_ack = ack_cb; // if this is the first inflight packet, we should "restart" rto timer @@ -2087,7 +2075,7 @@ udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_bu stream->status |= UDX_STREAM_ENDING; - req->handle = stream; + req->stream = stream; req->on_ack = ack_cb; req->bytes = bufs[0].len; @@ -2109,24 +2097,24 @@ udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_bu } int -udx_stream_destroy (udx_stream_t *handle) { - if ((handle->status & UDX_STREAM_CONNECTED) == 0) { - handle->status |= UDX_STREAM_DESTROYED; - close_maybe(handle, 0); +udx_stream_destroy (udx_stream_t *stream) { + if ((stream->status & UDX_STREAM_CONNECTED) == 0) { + stream->status |= UDX_STREAM_DESTROYED; + close_maybe(stream, 0); return 0; } - handle->status |= UDX_STREAM_DESTROYING; + stream->status |= UDX_STREAM_DESTROYING; // clear the outgoing packets immediately as we don't want anything to leave to the network // while the destroy packet is being flushed. we could also destroy incoming packets here, but // that creates some reentry trickiness incase this was called from on_read. // todo: can we delete this line now that the queue in front of the destroy packet is shorter? - clear_outgoing_packets(handle); + clear_outgoing_packets(stream); - if (handle->relayed) { - handle->status |= UDX_STREAM_DESTROYED; - close_maybe(handle, 0); + if (stream->relayed) { + stream->status |= UDX_STREAM_DESTROYED; + close_maybe(stream, 0); return 0; } @@ -2134,20 +2122,20 @@ udx_stream_destroy (udx_stream_t *handle) { uv_buf_t buf = uv_buf_init(NULL, 0); - init_stream_packet(pkt, UDX_HEADER_DESTROY, handle, &buf); + init_stream_packet(pkt, UDX_HEADER_DESTROY, stream, &buf); pkt->status = UDX_PACKET_SENDING; pkt->type = UDX_PACKET_STREAM_DESTROY; pkt->ttl = 0; - pkt->ctx = handle; + pkt->ctx = stream; - handle->seq++; + stream->seq++; - pkt->send_queue = &handle->socket->send_queue; - pkt->fifo_gc = udx__fifo_push(&(handle->socket->send_queue), pkt); - udx__fifo_push(&(handle->unordered), pkt); + pkt->send_queue = &stream->socket->send_queue; + pkt->fifo_gc = udx__fifo_push(&(stream->socket->send_queue), pkt); + udx__fifo_push(&(stream->unordered), pkt); - int err = update_poll(handle->socket); + int err = update_poll(stream->socket); return err < 0 ? err : 1; } diff --git a/test/stream-write-read-perf.c b/test/stream-write-read-perf.c index 051bc2e5..bcacbdeb 100644 --- a/test/stream-write-read-perf.c +++ b/test/stream-write-read-perf.c @@ -38,7 +38,7 @@ uint64_t write_hash = HASH_INIT; void on_ack (udx_stream_write_t *r, int status, int unordered) { printf("write acked, status=%d %s\n", status, status == UV_ECANCELED ? "(UV_ECANCELED)" : ""); - udx_stream_destroy(r->handle); + udx_stream_destroy(r->stream); udx_stream_destroy(&astream); }