Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

turn: check fd deref cleanup on turn worker thread #8

Open
wants to merge 11 commits into
base: turn_thread
Choose a base branch
from
58 changes: 35 additions & 23 deletions modules/turn/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ static void destructor(void *arg)
{
struct allocation *al = arg;

mtx_lock(&turndp()->mutex);
list_unlink(&al->le_map);
mtx_unlock(&turndp()->mutex);

hash_flush(al->perms);
mem_deref(al->perms);
mem_deref(al->chans);
Expand All @@ -65,9 +61,11 @@ static void destructor(void *arg)
mem_deref(al->username);
mem_deref(al->cli_sock);

/* @TODO check fd deref cleanup on turn worker thread */
mem_deref(al->rel_us);
mem_deref(al->rsv_us);
udp_handler_set(al->uks->rel_us, NULL, NULL);
mtx_lock(&turndp()->mutex);
list_unlink(&al->uks->le);
list_append(&turndp()->rm_map, &al->uks->le, al->uks);
mtx_unlock(&turndp()->mutex);

turndp()->allocc_cur--;
}
Expand All @@ -89,6 +87,9 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg)
struct chan *chan;
int err;

if (!al)
return;

if (al->proto == IPPROTO_TCP) {

if (tcp_conn_txqsz(al->cli_sock) > TCP_MAX_TXQSZ) {
Expand Down Expand Up @@ -162,13 +163,13 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,

for (i=0; i<PORT_TRY_MAX; i++) {

err = udp_listen(&al->rel_us, rel_addr, udp_recv, al);
err = udp_listen(&al->uks->rel_us, rel_addr, udp_recv, al);
if (err)
break;

err = udp_local_get(al->rel_us, &al->rel_addr);
err = udp_local_get(al->uks->rel_us, &al->rel_addr);
if (err) {
al->rel_us = mem_deref(al->rel_us);
al->uks->rel_us = mem_deref(al->uks->rel_us);
break;
}

Expand All @@ -178,7 +179,7 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,
restund_debug("turn: try#%u: %J\n", i, &al->rel_addr);

if (sa_port(&al->rel_addr) & 0x1) {
al->rel_us = mem_deref(al->rel_us);
al->uks->rel_us = mem_deref(al->uks->rel_us);
continue;
}

Expand All @@ -188,20 +189,20 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,
al->rsv_addr = al->rel_addr;
sa_set_port(&al->rsv_addr, sa_port(&al->rel_addr) + 1);

err = udp_listen(&al->rsv_us, &al->rsv_addr, NULL, NULL);
err = udp_listen(&al->uks->rsv_us, &al->rsv_addr, NULL, NULL);
if (err) {
al->rel_us = mem_deref(al->rel_us);
al->uks->rel_us = mem_deref(al->uks->rel_us);
continue;
}
break;
}

/* Release fd for new thread and re_map*/
udp_thread_detach(al->rel_us);
udp_thread_detach(al->rsv_us);
udp_thread_detach(al->uks->rel_us);
udp_thread_detach(al->uks->rsv_us);

mtx_lock(&turndp()->mutex);
list_append(&turndp()->re_map, &al->le_map, al);
list_append(&turndp()->re_map, &al->uks->le, al);
mtx_unlock(&turndp()->mutex);

return (i == PORT_TRY_MAX) ? EADDRINUSE : err;
Expand Down Expand Up @@ -233,9 +234,9 @@ static int rsvt_listen(const struct hash *ht, struct allocation *al,
if (!alr)
return ENOENT;

al->rel_us = alr->rsv_us;
udp_handler_set(al->rel_us, udp_recv, al);
alr->rsv_us = NULL;
al->uks->rel_us = alr->uks->rsv_us;
udp_handler_set(al->uks->rel_us, udp_recv, al);
alr->uks->rsv_us = NULL;
al->rel_addr = alr->rsv_addr;
sa_init(&alr->rsv_addr, AF_UNSPEC);

Expand Down Expand Up @@ -360,6 +361,17 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
STUN_ATTR_SOFTWARE, restund_software);
goto out;
}

al->uks = mem_zalloc(sizeof(struct udp_socks), NULL);
if (!al->uks) {
restund_warning("turn: no memory for allocation udp socks\n");
++turnd->reply.scode_500;
rerr = stun_ereply(proto, sock, src, 0, msg,
500, "Server Error",
ctx->key, ctx->keylen, ctx->fp, 1,
STUN_ATTR_SOFTWARE, restund_software);
goto out;
}

hash_append(turnd->ht_alloc, sa_hash(src, SA_ALL), &al->he, al);
tmr_start(&al->tmr, lifetime * 1000, timeout, al);
Expand Down Expand Up @@ -416,9 +428,9 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
goto out;
}

udp_rxbuf_presz_set(al->rel_us, 4);
udp_rxbuf_presz_set(al->uks->rel_us, 4);
if (turndp()->udp_sockbuf_size > 0)
(void)udp_sockbuf_set(al->rel_us, turndp()->udp_sockbuf_size);
(void)udp_sockbuf_set(al->uks->rel_us, turndp()->udp_sockbuf_size);

restund_debug("turn: allocation %p created %s/%J/%J - %J (%us)\n",
al, stun_transp_name(al->proto), &al->cli_addr,
Expand All @@ -427,7 +439,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
alx = al;

reply:
if (alx->rsv_us) {
if (alx->uks->rsv_us) {
rsv = (uint64_t)sa_hash(src, SA_ALL) << 32;
rsv |= (uint64_t)sa_stunaf(&alx->rsv_addr) << 24;
rsv += sa_port(&alx->rsv_addr);
Expand All @@ -449,7 +461,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
STUN_ATTR_XOR_RELAY_ADDR,
public ? &public_addr : &alx->rel_addr,
STUN_ATTR_LIFETIME, &lifetime,
STUN_ATTR_RSV_TOKEN, alx->rsv_us ? &rsv : NULL,
STUN_ATTR_RSV_TOKEN, alx->uks->rsv_us ? &rsv : NULL,
STUN_ATTR_XOR_MAPPED_ADDR, src,
STUN_ATTR_SOFTWARE, restund_software);
out:
Expand Down
70 changes: 61 additions & 9 deletions modules/turn/turn.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ static bool indication_handler(struct restund_msgctx *ctx, int proto,
if (restund_addr_is_blocked(psa))
err = EPERM;
else
err = udp_send(al->rel_us, psa, &data->v.data);
err = udp_send(al->uks->rel_us, psa, &data->v.data);
if (err)
turnd.errc_tx++;
else {
Expand Down Expand Up @@ -243,7 +243,7 @@ static bool raw_handler(int proto, const struct sa *src,
if (restund_addr_is_blocked(psa))
err = EPERM;
else
err = udp_send(al->rel_us, psa, mb);
err = udp_send(al->uks->rel_us, psa, mb);
if (err)
turnd.errc_tx++;
else {
Expand Down Expand Up @@ -344,19 +344,70 @@ static void tmr_handler(void *arg)

mtx_lock(&turndp()->mutex);
if (!turndp()->run)
{
re_cancel();
goto out;
}

struct list *re_map = &turndp()->re_map;
struct list *rm_map = &turndp()->rm_map;
if (!list_isempty(re_map) && (!list_isempty(rm_map)))
{
goto out;
}

thrd_t thrd = thrd_current();

/* Reassign one allocation by time */
LIST_FOREACH(&turndp()->re_map, le)
LIST_FOREACH(re_map, le)
{
struct allocation *al = le->data;
struct allocation *al = list_ledata(le);
mtx_lock(&al->mutex);
udp_thread_attach(al->rel_us);
udp_thread_attach(al->rsv_us);
udp_thread_attach(al->uks->rel_us);
udp_thread_attach(al->uks->rsv_us);
al->uks->thrd = thrd;

mtx_unlock(&al->mutex);
}
list_clear(&turndp()->re_map);
list_clear(re_map);

le = list_head(rm_map);
while (le)
{
struct udp_socks *uks = list_ledata(le);
le = le->next;

uint64_t jif = tmr_jiffies_usec();
if (0 == turndp()->ts)
{
turndp()->ts = jif;
}

if ((jif - turndp()->ts) > 1000000) // 1s
{
restund_error("no processing for a long time, check thread has exited.");

mem_deref(uks->rel_us);
mem_deref(uks->rsv_us);
list_unlink(&uks->le);

mem_deref(uks);
turndp()->ts = 0;
}

if (thrd_equal(uks->thrd, thrd)) {
udp_thread_detach(uks->rel_us);
udp_thread_detach(uks->rsv_us);
jobo-zt marked this conversation as resolved.
Show resolved Hide resolved
mem_deref(uks->rel_us);
mem_deref(uks->rsv_us);

list_unlink(&uks->le);
mem_deref(uks);
turndp()->ts = 0;
}
}

out:
mtx_unlock(&turndp()->mutex);

tmr_start(tmr, 10, tmr_handler, tmr);
Expand Down Expand Up @@ -463,7 +514,9 @@ static int module_init(void)
}

list_init(&turnd.re_map);
list_init(&turnd.rm_map);

turnd.ts = 0;
turnd.run = true;
err = mtx_init(&turnd.mutex, mtx_plain);
if (err) {
Expand All @@ -472,8 +525,7 @@ static int module_init(void)
}

for (int i = 0; i < TURN_THREADS; i++) {
err = thrd_create(&tid[i], thread_handler,
&timers[i]);
err = thrd_create(&tid[i], thread_handler, &timers[i]);
if (err) {
restund_error("turn: thrd_create err: %m\n", err);
goto out;
Expand Down
13 changes: 10 additions & 3 deletions modules/turn/turn.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ struct turnd {
mtx_t mutex;
bool run;
struct list re_map;
struct list rm_map;
uint64_t ts;

struct {
uint64_t scode_400;
Expand All @@ -36,9 +38,15 @@ struct turnd {

struct chanlist;

struct udp_socks{
struct le le;
thrd_t thrd;
struct udp_sock *rel_us;
struct udp_sock *rsv_us;
};

struct allocation {
struct le he;
struct le le_map;
mtx_t mutex;
struct tmr tmr;
uint8_t tid[STUN_TID_SIZE];
Expand All @@ -47,8 +55,7 @@ struct allocation {
struct sa rel_addr;
struct sa rsv_addr;
void *cli_sock;
struct udp_sock *rel_us;
struct udp_sock *rsv_us;
struct udp_socks *uks;
char *username;
struct hash *perms;
struct chanlist *chans;
Expand Down