Skip to content

Commit

Permalink
tweak interface and add teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Dec 9, 2024
1 parent 5081f96 commit c14168c
Show file tree
Hide file tree
Showing 26 changed files with 124 additions and 85 deletions.
4 changes: 2 additions & 2 deletions examples/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ main (int argc, char **argv) {

uv_loop_init(&loop);

udx_init(&loop, &udx);
udx_init(&loop, &udx, NULL);

udx_socket_init(&udx, &sock);
udx_socket_init(&udx, &sock, NULL);

struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 18082, &addr);
Expand Down
4 changes: 2 additions & 2 deletions examples/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ main (int argc, char **argv) {
chunk.len = 16384;
chunk.base = calloc(1, chunk.len);

udx_init(&loop, &udx);
udx_init(&loop, &udx, NULL);

udx_socket_init(&udx, &sock);
udx_socket_init(&udx, &sock, NULL);

struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 18081, &addr);
Expand Down
4 changes: 2 additions & 2 deletions examples/udxperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,8 @@ main (int argc, char **argv) {
}

uv_loop_init(&loop);
udx_init(&loop, &udx);
udx_socket_init(&udx, &sock);
udx_init(&loop, &udx, NULL);
udx_socket_init(&udx, &sock, NULL);

if (is_server) {
server();
Expand Down
17 changes: 11 additions & 6 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ struct udx_s {
uv_loop_t *loop;

int refs;
bool teardown;

udx_idle_cb on_idle;

udx_socket_t *sockets;
Expand Down Expand Up @@ -405,13 +407,16 @@ struct udx_interface_event_s {
};

int
udx_init (uv_loop_t *loop, udx_t *udx);
udx_init (uv_loop_t *loop, udx_t *udx, udx_idle_cb on_idle);

int
udx_is_idle (udx_t *udx);

void
udx_idle (udx_t *udx, udx_idle_cb cb);
udx_teardown (udx_t *udx);

int
udx_socket_init (udx_t *udx, udx_socket_t *socket);
udx_socket_init (udx_t *udx, udx_socket_t *socket, udx_socket_close_cb cb);

int
udx_socket_get_send_buffer_size (udx_socket_t *socket, int *value);
Expand Down Expand Up @@ -462,7 +467,7 @@ int
udx_socket_recv_stop (udx_socket_t *socket);

int
udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb);
udx_socket_close (udx_socket_t *socket);

// only exposed here as a convenience / debug tool - the udx instance uses this automatically
int
Expand Down Expand Up @@ -532,7 +537,7 @@ int
udx_lookup (udx_t *udx, udx_lookup_t *req, const char *host, unsigned int flags, udx_lookup_cb cb);

int
udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle);
udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle, udx_interface_event_close_cb cb);

int
udx_interface_event_start (udx_interface_event_t *handle, udx_interface_event_cb cb, uint64_t frequency);
Expand All @@ -541,7 +546,7 @@ int
udx_interface_event_stop (udx_interface_event_t *handle);

int
udx_interface_event_close (udx_interface_event_t *handle, udx_interface_event_close_cb cb);
udx_interface_event_close (udx_interface_event_t *handle);

#ifdef __cplusplus
}
Expand Down
52 changes: 43 additions & 9 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,13 @@ close_stream (udx_stream_t *stream, int err) {
uv_close((uv_handle_t *) &stream->tlp_timer, finalize_maybe);
uv_close((uv_handle_t *) &stream->zwp_timer, finalize_maybe);

if (udx->teardown && udx->streams == NULL) {
udx_socket_t *socket;
udx__link_foreach(udx->sockets, socket) {
udx_socket_close(socket);
}
}

return 1;
}

Expand Down Expand Up @@ -1938,9 +1945,10 @@ on_uv_poll (uv_poll_t *handle, int status, int events) {
}

int
udx_init (uv_loop_t *loop, udx_t *udx) {
udx_init (uv_loop_t *loop, udx_t *udx, udx_idle_cb on_idle) {
udx->refs = 0;
udx->on_idle = NULL;
udx->teardown = false;
udx->on_idle = on_idle;

udx->sockets = NULL;
udx->streams = NULL;
Expand All @@ -1963,11 +1971,39 @@ udx_idle (udx_t *udx, udx_idle_cb cb) {
}

int
udx_socket_init (udx_t *udx, udx_socket_t *socket) {
udx_is_idle (udx_t *udx) {
return udx->refs == 0;
}

void
udx_teardown (udx_t *udx) {
udx->teardown = true;

if (udx->streams == NULL) {
udx_socket_t *socket;
udx__link_foreach(udx->sockets, socket) {
udx_socket_close(socket);
}
}

udx_stream_t *stream;
udx__link_foreach(udx->streams, stream) {
udx_stream_destroy(stream);
}

udx_interface_event_t *listener;
udx__link_foreach(udx->listeners, listener) {
udx_interface_event_close(listener);
}
}

int
udx_socket_init (udx_t *udx, udx_socket_t *socket, udx_socket_close_cb cb) {
udx->refs++;

udx__link_add(udx->sockets, socket);

socket->on_close = cb;
socket->family = 0;
socket->status = 0;
socket->events = 0;
Expand Down Expand Up @@ -2190,13 +2226,11 @@ udx_socket_recv_stop (udx_socket_t *socket) {
}

int
udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) {
udx_socket_close (udx_socket_t *socket) {
if (check_for_streams(socket)) return UV_EBUSY;

socket->status |= UDX_SOCKET_CLOSED;

socket->on_close = cb;

while (socket->send_queue.len > 0) {
udx_packet_t *pkt = udx__queue_data(udx__queue_shift(&socket->send_queue), udx_packet_t, queue);
assert(pkt != NULL);
Expand Down Expand Up @@ -2803,10 +2837,11 @@ on_interface_event_close (uv_handle_t *handle) {
}

int
udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle) {
udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle, udx_interface_event_close_cb cb) {
handle->udx = udx;
handle->loop = udx->loop;
handle->sorted = false;
handle->on_close = cb;

int err = uv_interface_addresses(&(handle->addrs), &(handle->addrs_len));
if (err < 0) return err;
Expand Down Expand Up @@ -2839,9 +2874,8 @@ udx_interface_event_stop (udx_interface_event_t *handle) {
}

int
udx_interface_event_close (udx_interface_event_t *handle, udx_interface_event_close_cb cb) {
udx_interface_event_close (udx_interface_event_t *handle) {
handle->on_event = NULL;
handle->on_close = cb;

uv_free_interface_addresses(handle->addrs, handle->addrs_len);

Expand Down
2 changes: 1 addition & 1 deletion test/lookup-invalid.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ main () {
int e;

uv_loop_init(&loop);
udx_init(&loop, &udx);
udx_init(&loop, &udx, NULL);

e = udx_lookup(&udx, &req, "example.invalid.", 0, on_lookup);
assert(e == 0);
Expand Down
2 changes: 1 addition & 1 deletion test/lookup-ipv6.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ main () {
int e;

uv_loop_init(&loop);
udx_init(&loop, &udx);
udx_init(&loop, &udx, NULL);

e = udx_lookup(&udx, &req, "localhost", UDX_LOOKUP_FAMILY_IPV6, on_lookup);
assert(e == 0);
Expand Down
2 changes: 1 addition & 1 deletion test/lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ main () {
int e;

uv_loop_init(&loop);
udx_init(&loop, &udx);
udx_init(&loop, &udx, NULL);

e = udx_lookup(&udx, &req, "localhost", UDX_LOOKUP_FAMILY_IPV4, on_lookup);
assert(e == 0);
Expand Down
6 changes: 3 additions & 3 deletions test/socket-send-recv-dualstack.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ main () {

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &asock);
e = udx_socket_init(&udx, &asock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &bsock);
e = udx_socket_init(&udx, &bsock, NULL);
assert(e == 0);

struct sockaddr_in baddr;
Expand Down
6 changes: 3 additions & 3 deletions test/socket-send-recv-ipv6.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ main () {

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &asock);
e = udx_socket_init(&udx, &asock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &bsock);
e = udx_socket_init(&udx, &bsock, NULL);
assert(e == 0);

struct sockaddr_in6 baddr;
Expand Down
6 changes: 3 additions & 3 deletions test/socket-send-recv-multicast.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ main () {

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &asock);
e = udx_socket_init(&udx, &asock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &bsock);
e = udx_socket_init(&udx, &bsock, NULL);
assert(e == 0);

struct sockaddr_in baddr;
Expand Down
6 changes: 3 additions & 3 deletions test/socket-send-recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ main () {

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &asock);
e = udx_socket_init(&udx, &asock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &bsock);
e = udx_socket_init(&udx, &bsock, NULL);
assert(e == 0);

struct sockaddr_in baddr;
Expand Down
10 changes: 5 additions & 5 deletions test/stream-change-remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ main () {

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &asock);
e = udx_socket_init(&udx, &asock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &bsock);
e = udx_socket_init(&udx, &bsock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &csock);
e = udx_socket_init(&udx, &csock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &dsock);
e = udx_socket_init(&udx, &dsock, NULL);
assert(e == 0);

uv_ip4_addr("127.0.0.1", 8081, &aaddr);
Expand Down
2 changes: 1 addition & 1 deletion test/stream-destroy-before-connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ main () {
uv_loop_init(&loop);

udx_t udx;
e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

udx_stream_t stream;
Expand Down
6 changes: 3 additions & 3 deletions test/stream-destroy.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void
on_close (udx_stream_t *handle, int status) {
assert(status == 0);

int e = udx_socket_close(&sock, NULL);
int e = udx_socket_close(&sock);

assert(e == 0);

Expand All @@ -26,10 +26,10 @@ main () {

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &sock);
e = udx_socket_init(&udx, &sock, NULL);
assert(e == 0);

struct sockaddr_in addr;
Expand Down
6 changes: 3 additions & 3 deletions test/stream-multiple.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ main () {

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

uv_buf_t buf = uv_buf_init(malloc(NBYTES_TO_SEND), NBYTES_TO_SEND);
Expand All @@ -81,15 +81,15 @@ main () {
int receiver_id = NSTREAMS + i;

receiver[i].read_hash = HASH_INIT;
e = udx_socket_init(&udx, &sender[i].usock);
e = udx_socket_init(&udx, &sender[i].usock, NULL);
assert(e == 0);
uv_ip4_addr("127.0.0.1", 8000 + i, &sender[i].addr);
e = udx_socket_bind(&sender[i].usock, (struct sockaddr *) &sender[i].addr, 0);
assert(e == 0);
sender[i].write = malloc(udx_stream_write_sizeof(1));
e = udx_stream_init(&udx, &sender[i].stream, sender_id, NULL, NULL);

udx_socket_init(&udx, &receiver[i].usock);
udx_socket_init(&udx, &receiver[i].usock, NULL);
uv_ip4_addr("127.0.0.1", 8100 + i, &receiver[i].addr);
e = udx_socket_bind(&receiver[i].usock, (struct sockaddr *) &receiver[i].addr, 0);
assert(e == 0);
Expand Down
4 changes: 2 additions & 2 deletions test/stream-preconnect-same-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ main () {

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &sock);
e = udx_socket_init(&udx, &sock, NULL);
assert(e == 0);

uv_ip4_addr("127.0.0.1", 8081, &addr);
Expand Down
Loading

0 comments on commit c14168c

Please sign in to comment.