diff --git a/modules/turn/alloc.c b/modules/turn/alloc.c index 03bd813..daa59fc 100644 --- a/modules/turn/alloc.c +++ b/modules/turn/alloc.c @@ -52,6 +52,10 @@ static void destructor(void *arg) { struct allocation *al = arg; + mtx_lock(&turndp()->mutex); + list_unlink(&al->le_map); + mtx_unlock(&turndp()->mutex); + hash_flush(al->perms); mem_deref(al->perms); mem_deref(al->chans); @@ -60,8 +64,11 @@ static void destructor(void *arg) tmr_cancel(&al->tmr); mem_deref(al->username); mem_deref(al->cli_sock); + + /* @TODO check fd deref cleanup on turn worker thread */ mem_deref(al->rel_us); mem_deref(al->rsv_us); + turndp()->allocc_cur--; } @@ -90,13 +97,17 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg) } } + mtx_lock(&al->mutex); perm = perm_find(al->perms, src); + mtx_unlock(&al->mutex); if (!perm) { ++al->dropc_rx; return; } + mtx_lock(&al->mutex); chan = chan_peer_find(al->chans, src); + mtx_unlock(&al->mutex); if (chan) { uint16_t len = mbuf_get_left(mb); size_t start; @@ -185,6 +196,14 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al, break; } + /* Release fd for new thread and re_map*/ + udp_thread_detach(al->rel_us); + udp_thread_detach(al->rsv_us); + + mtx_lock(&turndp()->mutex); + list_append(&turndp()->re_map, &al->le_map, al); + mtx_unlock(&turndp()->mutex); + return (i == PORT_TRY_MAX) ? EADDRINUSE : err; } @@ -247,7 +266,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, goto reply; } - restund_debug("turn: allocation already exists (%J)\n", src); + restund_warning("turn: allocation already exists (%J)\n", src); ++turnd->reply.scode_437; rerr = stun_ereply(proto, sock, src, 0, msg, 437, "Allocation TID Mismatch", @@ -351,6 +370,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, al->cli_addr = *src; al->srv_addr = *dst; al->proto = proto; + mtx_init(&al->mutex, mtx_plain); sa_init(&al->rsv_addr, AF_UNSPEC); turndp()->allocc_tot++; turndp()->allocc_cur++; @@ -466,7 +486,9 @@ void refresh_request(struct turnd *turnd, struct allocation *al, lifetime = lifetime ? MAX(lifetime, TURN_DEFAULT_LIFETIME) : 0; lifetime = MIN(lifetime, turnd->lifetime_max); + mtx_lock(&al->mutex); tmr_start(&al->tmr, lifetime * 1000, timeout, al); + mtx_unlock(&al->mutex); restund_debug("turn: allocation %p refresh (%us)\n", al, lifetime); diff --git a/modules/turn/chan.c b/modules/turn/chan.c index b8850a2..20c3b00 100644 --- a/modules/turn/chan.c +++ b/modules/turn/chan.c @@ -27,7 +27,7 @@ struct chan { struct le he_numb; struct le he_peer; struct sa peer; - const struct allocation *al; + struct allocation *al; time_t expires; uint16_t numb; }; @@ -50,8 +50,10 @@ static void destructor(void *arg) restund_debug("turn: allocation %p channel 0x%x %J destroyed\n", chan->al, chan->numb, &chan->peer); + mtx_lock(&chan->al->mutex); hash_unlink(&chan->he_numb); hash_unlink(&chan->he_peer); + mtx_unlock(&chan->al->mutex); } @@ -185,17 +187,19 @@ void chan_status(const struct chanlist *cl, struct mbuf *mb) static struct chan *chan_create(struct chanlist *cl, uint16_t numb, const struct sa *peer, - const struct allocation *al) + struct allocation *al) { struct chan *chan; - if (!cl || !peer) + if (!cl || !peer || !al) return NULL; chan = mem_zalloc(sizeof(*chan), destructor); if (!chan) return NULL; + mtx_lock(&al->mutex); + hash_append(cl->ht_numb, numb, &chan->he_numb, chan); hash_append(cl->ht_peer, sa_hash(peer, SA_ALL), &chan->he_peer, chan); @@ -204,6 +208,8 @@ static struct chan *chan_create(struct chanlist *cl, uint16_t numb, chan->al = al; chan->expires = time(NULL) + CHAN_LIFETIME; + mtx_unlock(&al->mutex); + restund_debug("turn: allocation %p channel 0x%x %J created\n", chan->al, chan->numb, &chan->peer); @@ -216,7 +222,9 @@ static void chan_refresh(struct chan *chan) if (!chan) return; + mtx_lock(&chan->al->mutex); chan->expires = time(NULL) + CHAN_LIFETIME; + mtx_unlock(&chan->al->mutex); restund_debug("turn: allocation %p channel 0x%x %J refreshed\n", chan->al, chan->numb, &chan->peer); diff --git a/modules/turn/perm.c b/modules/turn/perm.c index fb2e72f..2ebf55e 100644 --- a/modules/turn/perm.c +++ b/modules/turn/perm.c @@ -19,7 +19,7 @@ struct perm { struct le he; struct sa peer; struct restund_trafstat ts; - const struct allocation *al; + struct allocation *al; time_t expires; time_t start; bool new; @@ -38,7 +38,9 @@ static void destructor(void *arg) struct perm *perm = arg; int err; + mtx_lock(&perm->al->mutex); hash_unlink(&perm->he); + mtx_unlock(&perm->al->mutex); restund_debug("turn: allocation %p permission %j destroyed " "(%llu/%llu %llu/%llu)\n", @@ -90,7 +92,7 @@ struct perm *perm_find(const struct hash *ht, const struct sa *peer) struct perm *perm_create(struct hash *ht, const struct sa *peer, - const struct allocation *al) + struct allocation *al) { const time_t now = time(NULL); struct perm *perm; @@ -102,6 +104,8 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer, if (!perm) return NULL; + mtx_lock(&al->mutex); + hash_append(ht, sa_hash(peer, SA_ADDR), &perm->he, perm); perm->peer = *peer; @@ -109,6 +113,8 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer, perm->expires = now + PERM_LIFETIME; perm->start = now; + mtx_unlock(&al->mutex); + restund_debug("turn: allocation %p permission %j created\n", al, peer); return perm; diff --git a/modules/turn/turn.c b/modules/turn/turn.c index ae7fb46..0e0d9b1 100644 --- a/modules/turn/turn.c +++ b/modules/turn/turn.c @@ -11,7 +11,8 @@ enum { - ALLOC_DEFAULT_BSIZE = 512, + ALLOC_DEFAULT_BSIZE = 1024, + TURN_THREADS = 4 }; @@ -23,7 +24,8 @@ struct tuple { static struct turnd turnd; - +static struct tmr timers[TURN_THREADS]; +static thrd_t tid[TURN_THREADS]; struct turnd *turndp(void) { @@ -335,6 +337,60 @@ static struct restund_cmdsub cmd_turnreply = { }; +static void tmr_handler(void *arg) +{ + struct tmr *tmr = arg; + struct le *le; + + mtx_lock(&turndp()->mutex); + if (!turndp()->run) + re_cancel(); + + /* Reassign one allocation by time */ + LIST_FOREACH(&turndp()->re_map, le) + { + struct allocation *al = le->data; + mtx_lock(&al->mutex); + udp_thread_attach(al->rel_us); + udp_thread_attach(al->rsv_us); + mtx_unlock(&al->mutex); + } + list_clear(&turndp()->re_map); + + mtx_unlock(&turndp()->mutex); + + tmr_start(tmr, 10, tmr_handler, tmr); +} + + +static int thread_handler(void *arg) +{ + struct tmr *tmr = arg; + int err; + + err = re_thread_init(); + if (err) { + restund_error("turn: re_thread_init failed %m\n", err); + return 0; + } + + fd_setsize(-1); + + tmr_start(tmr, 10, tmr_handler, tmr); + + err = re_main(NULL); + if (err) + restund_error("turn: re_main failed %m\n", err); + + tmr_cancel(tmr); + + tmr_debug(); + re_thread_close(); + + return 0; +} + + static int module_init(void) { uint32_t x, bsize = ALLOC_DEFAULT_BSIZE; @@ -406,6 +462,24 @@ static int module_init(void) goto out; } + list_init(&turnd.re_map); + + turnd.run = true; + err = mtx_init(&turnd.mutex, mtx_plain); + if (err) { + restund_error("turn: mtx_init err: %d\n", err); + goto out; + } + + for (int i = 0; i < TURN_THREADS; i++) { + err = thrd_create(&tid[i], thread_handler, + &timers[i]); + if (err) { + restund_error("turn: thrd_create err: %m\n", err); + goto out; + } + } + restund_debug("turn: lifetime=%u ext=%j ext6=%j bsz=%u\n", turnd.lifetime_max, &turnd.rel_addr, &turnd.rel_addr6, bsize); @@ -417,6 +491,14 @@ static int module_init(void) static int module_close(void) { + mtx_lock(&turnd.mutex); + turnd.run = false; + mtx_unlock(&turnd.mutex); + + for (int i = 0; i < TURN_THREADS; i++) { + thrd_join(tid[i], NULL); + } + hash_flush(turnd.ht_alloc); turnd.ht_alloc = mem_deref(turnd.ht_alloc); restund_cmd_unsubscribe(&cmd_turnreply); diff --git a/modules/turn/turn.h b/modules/turn/turn.h index a7f882d..dc09a9d 100644 --- a/modules/turn/turn.h +++ b/modules/turn/turn.h @@ -4,6 +4,8 @@ * Copyright (C) 2010 Creytiv.com */ +#include + struct turnd { struct sa rel_addr; struct sa rel_addr6; @@ -17,6 +19,9 @@ struct turnd { uint32_t allocc_cur; uint32_t lifetime_max; uint32_t udp_sockbuf_size; + mtx_t mutex; + bool run; + struct list re_map; struct { uint64_t scode_400; @@ -35,6 +40,8 @@ struct chanlist; struct allocation { struct le he; + struct le le_map; + mtx_t mutex; struct tmr tmr; uint8_t tid[STUN_TID_SIZE]; struct sa cli_addr; @@ -73,7 +80,7 @@ struct perm; struct perm *perm_find(const struct hash *ht, const struct sa *addr); struct perm *perm_create(struct hash *ht, const struct sa *peer, - const struct allocation *al); + struct allocation *al); void perm_refresh(struct perm *perm); void perm_tx_stat(struct perm *perm, size_t bytc); void perm_rx_stat(struct perm *perm, size_t bytc);