Skip to content

Commit

Permalink
turn: add multithreading support
Browse files Browse the repository at this point in the history
  • Loading branch information
sreimers committed Sep 7, 2023
1 parent 0d055bd commit 6c521e2
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 9 deletions.
24 changes: 23 additions & 1 deletion modules/turn/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ static void destructor(void *arg)
{
struct allocation *al = arg;

mtx_lock(&turndp()->mutex);
list_unlink(&al->le_map);
mtx_unlock(&turndp()->mutex);

hash_flush(al->perms);
mem_deref(al->perms);
mem_deref(al->chans);
Expand All @@ -60,8 +64,11 @@ static void destructor(void *arg)
tmr_cancel(&al->tmr);
mem_deref(al->username);
mem_deref(al->cli_sock);

/* @TODO check fd deref cleanup on turn worker thread */
mem_deref(al->rel_us);
mem_deref(al->rsv_us);

turndp()->allocc_cur--;
}

Expand Down Expand Up @@ -90,13 +97,17 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg)
}
}

mtx_lock(&al->mutex);
perm = perm_find(al->perms, src);
mtx_unlock(&al->mutex);
if (!perm) {
++al->dropc_rx;
return;
}

mtx_lock(&al->mutex);
chan = chan_peer_find(al->chans, src);
mtx_unlock(&al->mutex);
if (chan) {
uint16_t len = mbuf_get_left(mb);
size_t start;
Expand Down Expand Up @@ -185,6 +196,14 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,
break;
}

/* Release fd for new thread and re_map*/
udp_thread_detach(al->rel_us);
udp_thread_detach(al->rsv_us);

mtx_lock(&turndp()->mutex);
list_append(&turndp()->re_map, &al->le_map, al);
mtx_unlock(&turndp()->mutex);

return (i == PORT_TRY_MAX) ? EADDRINUSE : err;
}

Expand Down Expand Up @@ -247,7 +266,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
goto reply;
}

restund_debug("turn: allocation already exists (%J)\n", src);
restund_warning("turn: allocation already exists (%J)\n", src);
++turnd->reply.scode_437;
rerr = stun_ereply(proto, sock, src, 0, msg,
437, "Allocation TID Mismatch",
Expand Down Expand Up @@ -351,6 +370,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
al->cli_addr = *src;
al->srv_addr = *dst;
al->proto = proto;
mtx_init(&al->mutex, mtx_plain);
sa_init(&al->rsv_addr, AF_UNSPEC);
turndp()->allocc_tot++;
turndp()->allocc_cur++;
Expand Down Expand Up @@ -466,7 +486,9 @@ void refresh_request(struct turnd *turnd, struct allocation *al,
lifetime = lifetime ? MAX(lifetime, TURN_DEFAULT_LIFETIME) : 0;
lifetime = MIN(lifetime, turnd->lifetime_max);

mtx_lock(&al->mutex);
tmr_start(&al->tmr, lifetime * 1000, timeout, al);
mtx_unlock(&al->mutex);

restund_debug("turn: allocation %p refresh (%us)\n", al, lifetime);

Expand Down
14 changes: 11 additions & 3 deletions modules/turn/chan.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct chan {
struct le he_numb;
struct le he_peer;
struct sa peer;
const struct allocation *al;
struct allocation *al;
time_t expires;
uint16_t numb;
};
Expand All @@ -50,8 +50,10 @@ static void destructor(void *arg)
restund_debug("turn: allocation %p channel 0x%x %J destroyed\n",
chan->al, chan->numb, &chan->peer);

mtx_lock(&chan->al->mutex);
hash_unlink(&chan->he_numb);
hash_unlink(&chan->he_peer);
mtx_unlock(&chan->al->mutex);
}


Expand Down Expand Up @@ -185,17 +187,19 @@ void chan_status(const struct chanlist *cl, struct mbuf *mb)

static struct chan *chan_create(struct chanlist *cl, uint16_t numb,
const struct sa *peer,
const struct allocation *al)
struct allocation *al)
{
struct chan *chan;

if (!cl || !peer)
if (!cl || !peer || !al)
return NULL;

chan = mem_zalloc(sizeof(*chan), destructor);
if (!chan)
return NULL;

mtx_lock(&al->mutex);

hash_append(cl->ht_numb, numb, &chan->he_numb, chan);
hash_append(cl->ht_peer, sa_hash(peer, SA_ALL), &chan->he_peer, chan);

Expand All @@ -204,6 +208,8 @@ static struct chan *chan_create(struct chanlist *cl, uint16_t numb,
chan->al = al;
chan->expires = time(NULL) + CHAN_LIFETIME;

mtx_unlock(&al->mutex);

restund_debug("turn: allocation %p channel 0x%x %J created\n",
chan->al, chan->numb, &chan->peer);

Expand All @@ -216,7 +222,9 @@ static void chan_refresh(struct chan *chan)
if (!chan)
return;

mtx_lock(&chan->al->mutex);
chan->expires = time(NULL) + CHAN_LIFETIME;
mtx_unlock(&chan->al->mutex);

restund_debug("turn: allocation %p channel 0x%x %J refreshed\n",
chan->al, chan->numb, &chan->peer);
Expand Down
10 changes: 8 additions & 2 deletions modules/turn/perm.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct perm {
struct le he;
struct sa peer;
struct restund_trafstat ts;
const struct allocation *al;
struct allocation *al;
time_t expires;
time_t start;
bool new;
Expand All @@ -38,7 +38,9 @@ static void destructor(void *arg)
struct perm *perm = arg;
int err;

mtx_lock(&perm->al->mutex);
hash_unlink(&perm->he);
mtx_unlock(&perm->al->mutex);

restund_debug("turn: allocation %p permission %j destroyed "
"(%llu/%llu %llu/%llu)\n",
Expand Down Expand Up @@ -90,7 +92,7 @@ struct perm *perm_find(const struct hash *ht, const struct sa *peer)


struct perm *perm_create(struct hash *ht, const struct sa *peer,
const struct allocation *al)
struct allocation *al)
{
const time_t now = time(NULL);
struct perm *perm;
Expand All @@ -102,13 +104,17 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer,
if (!perm)
return NULL;

mtx_lock(&al->mutex);

hash_append(ht, sa_hash(peer, SA_ADDR), &perm->he, perm);

perm->peer = *peer;
perm->al = al;
perm->expires = now + PERM_LIFETIME;
perm->start = now;

mtx_unlock(&al->mutex);

restund_debug("turn: allocation %p permission %j created\n", al, peer);

return perm;
Expand Down
86 changes: 84 additions & 2 deletions modules/turn/turn.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@


enum {
ALLOC_DEFAULT_BSIZE = 512,
ALLOC_DEFAULT_BSIZE = 1024,
TURN_THREADS = 4
};


Expand All @@ -23,7 +24,8 @@ struct tuple {


static struct turnd turnd;

static struct tmr timers[TURN_THREADS];
static thrd_t tid[TURN_THREADS];

struct turnd *turndp(void)
{
Expand Down Expand Up @@ -335,6 +337,60 @@ static struct restund_cmdsub cmd_turnreply = {
};


static void tmr_handler(void *arg)
{
struct tmr *tmr = arg;
struct le *le;

mtx_lock(&turndp()->mutex);
if (!turndp()->run)
re_cancel();

/* Reassign one allocation by time */
LIST_FOREACH(&turndp()->re_map, le)
{
struct allocation *al = le->data;
mtx_lock(&al->mutex);
udp_thread_attach(al->rel_us);
udp_thread_attach(al->rsv_us);
mtx_unlock(&al->mutex);
}
list_clear(&turndp()->re_map);

mtx_unlock(&turndp()->mutex);

tmr_start(tmr, 10, tmr_handler, tmr);
}


static int thread_handler(void *arg)
{
struct tmr *tmr = arg;
int err;

err = re_thread_init();
if (err) {
restund_error("turn: re_thread_init failed %m\n", err);
return 0;
}

fd_setsize(-1);

tmr_start(tmr, 10, tmr_handler, tmr);

err = re_main(NULL);
if (err)
restund_error("turn: re_main failed %m\n", err);

tmr_cancel(tmr);

tmr_debug();
re_thread_close();

return 0;
}


static int module_init(void)
{
uint32_t x, bsize = ALLOC_DEFAULT_BSIZE;
Expand Down Expand Up @@ -406,6 +462,24 @@ static int module_init(void)
goto out;
}

list_init(&turnd.re_map);

turnd.run = true;
err = mtx_init(&turnd.mutex, mtx_plain);
if (err) {
restund_error("turn: mtx_init err: %d\n", err);
goto out;
}

for (int i = 0; i < TURN_THREADS; i++) {
err = thrd_create(&tid[i], thread_handler,
&timers[i]);
if (err) {
restund_error("turn: thrd_create err: %m\n", err);
goto out;
}
}

restund_debug("turn: lifetime=%u ext=%j ext6=%j bsz=%u\n",
turnd.lifetime_max, &turnd.rel_addr, &turnd.rel_addr6,
bsize);
Expand All @@ -417,6 +491,14 @@ static int module_init(void)

static int module_close(void)
{
mtx_lock(&turnd.mutex);
turnd.run = false;
mtx_unlock(&turnd.mutex);

for (int i = 0; i < TURN_THREADS; i++) {
thrd_join(tid[i], NULL);
}

hash_flush(turnd.ht_alloc);
turnd.ht_alloc = mem_deref(turnd.ht_alloc);
restund_cmd_unsubscribe(&cmd_turnreply);
Expand Down
7 changes: 6 additions & 1 deletion modules/turn/turn.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ struct turnd {
uint32_t allocc_cur;
uint32_t lifetime_max;
uint32_t udp_sockbuf_size;
mtx_t mutex;
bool run;
struct list re_map;

struct {
uint64_t scode_400;
Expand All @@ -35,6 +38,8 @@ struct chanlist;

struct allocation {
struct le he;
struct le le_map;
mtx_t mutex;
struct tmr tmr;
uint8_t tid[STUN_TID_SIZE];
struct sa cli_addr;
Expand Down Expand Up @@ -73,7 +78,7 @@ struct perm;

struct perm *perm_find(const struct hash *ht, const struct sa *addr);
struct perm *perm_create(struct hash *ht, const struct sa *peer,
const struct allocation *al);
struct allocation *al);
void perm_refresh(struct perm *perm);
void perm_tx_stat(struct perm *perm, size_t bytc);
void perm_rx_stat(struct perm *perm, size_t bytc);
Expand Down

0 comments on commit 6c521e2

Please sign in to comment.