Skip to content

Commit

Permalink
add client/server example (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh authored Nov 11, 2023
1 parent 97a897d commit cfe183c
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,5 @@ install(FILES include/udx.h DESTINATION include)
if(PROJECT_IS_TOP_LEVEL)
enable_testing()
add_subdirectory(test)
add_subdirectory(examples)
endif()
14 changes: 14 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
add_executable(server server.c)
add_executable(client client.c)

target_link_libraries(
server
PRIVATE
udx_static
)

target_link_libraries(
client
PRIVATE
udx_static
)
90 changes: 90 additions & 0 deletions examples/client.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#include <stdio.h>
#include <uv.h>
#include <stdlib.h>

#include "../include/udx.h"
#ifdef _WIN32
#include <process.h>
#else
#include <unistd.h>
#endif

static uv_loop_t loop;
static udx_t udx;

static udx_socket_t sock;
static udx_socket_send_t req;

static udx_stream_t stream;
static struct sockaddr_in dest_addr;

static size_t bytes_recv = 0;
static uint64_t started = 0;

static uint32_t client_id = 1;
static uint32_t server_id = 2;

static uv_timer_t timer;

static uint64_t
get_milliseconds () {
return uv_hrtime() / 1000000;
}

static void
on_uv_interval (uv_timer_t *handle) {
printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started);
}

static void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
if (started == 0) {
started = get_milliseconds();
uv_timer_init(&loop, &timer);
uv_timer_start(&timer, on_uv_interval, 5000, 5000);
}

if (read_len < 0) {
printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started);
printf("stream is done!\n");
exit(0);
}

bytes_recv += read_len;
}

static void
on_send (udx_socket_send_t *r, int status) {
udx_stream_init(&udx, &stream, client_id, NULL);
udx_stream_connect(&stream, &sock, server_id, (struct sockaddr *) &dest_addr);
udx_stream_read_start(&stream, on_read);
}

int
main (int argc, char **argv) {
if (argc < 2) return 1;

uv_ip4_addr(argv[1], 18081, &dest_addr);

uv_loop_init(&loop);

udx_init(&loop, &udx);

udx_socket_init(&udx, &sock);

struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 18082, &addr);

udx_socket_bind(&sock, (struct sockaddr *) &addr, 0);

client_id = (uint32_t) getpid();
server_id = client_id + 1;

uint32_t ids[2] = { client_id, server_id };

uv_buf_t buf = uv_buf_init((char *) ids, 8);
udx_socket_send(&req, &sock, &buf, 1, (struct sockaddr *) &dest_addr, on_send);

uv_run(&loop, UV_RUN_DEFAULT);
return 0;
}
131 changes: 131 additions & 0 deletions examples/server.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#include <stdio.h>
#include <stdlib.h>

#include "../include/udx.h"

#define PUMP_BYTES (1024 * 1024 * 1024)

static uv_loop_t loop;
static udx_t udx;

static udx_socket_t sock;
static udx_stream_t stream;

static bool stream_is_active = false;
static bool stream_is_queued = false;
static uint32_t client_id = 0;
static uint32_t server_id = 0;
static size_t bytes_sent = 0;
static struct sockaddr_in dest_addr;

static uv_buf_t chunk;
static uv_buf_t empty = { .base = NULL, .len = 0 };

static bool printed_warning = false;

static void
pump_stream();

static void
on_close (udx_stream_t *stream, int status) {
printf("stream closed with status %i\n", status);

stream_is_active = false;
bytes_sent = 0;

if (stream_is_queued) {
stream_is_queued = false;
pump_stream();
}
}

static void
on_ack (udx_stream_write_t *req, int status, int unordered) {
free(req);
}

static void
on_ack_end (udx_stream_write_t *req, int status, int unordered) {
udx_stream_destroy(req->handle);
free(req);
}

static void
pump_writes () {
while (bytes_sent < PUMP_BYTES) {
udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t));
bytes_sent += chunk.len;

if (udx_stream_write(req, &stream, &chunk, 1, on_ack)) continue;

udx_stream_write_resume(&stream, pump_writes);
return;
}

udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t));
udx_stream_write_end(req, &stream, &empty, 1, on_ack_end);
}

static void
pump_stream () {
stream_is_active = true;

char dst_ip[20];
uv_ip4_name(&dest_addr, dst_ip, 20);

printf("pumping %d bytes to stream to %s...\n", PUMP_BYTES, dst_ip);

udx_stream_init(&udx, &stream, server_id, on_close);
udx_stream_connect(&stream, &sock, client_id, (struct sockaddr *) &dest_addr);

pump_writes();
}

static void
on_recv (udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from) {
if (read_len != 8) {
if (!printed_warning) {
printed_warning = true;
printf("warning: unknown packet received (%zd bytes)\n", read_len);
}
return;
}

printf("client requested streams...\n");

uint32_t *ids = (uint32_t *) buf->base;

client_id = *(ids++);
server_id = *(ids++);
dest_addr = *((struct sockaddr_in *) from);

if (stream_is_active) {
stream_is_queued = true;
udx_stream_destroy(&stream);
return;
}

pump_stream();
}

int
main (int argc, char **argv) {
uv_loop_init(&loop);

chunk.len = 16384;
chunk.base = calloc(1, chunk.len);

udx_init(&loop, &udx);

udx_socket_init(&udx, &sock);

struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 18081, &addr);

udx_socket_bind(&sock, (struct sockaddr *) &addr, 0);

udx_socket_recv_start(&sock, on_recv);

uv_run(&loop, UV_RUN_DEFAULT);
return 0;
}

0 comments on commit cfe183c

Please sign in to comment.