Skip to content

Commit

Permalink
add multiple threads and use C11 threads
Browse files Browse the repository at this point in the history
  • Loading branch information
sreimers committed May 17, 2022
1 parent 42d5e69 commit 343b0b4
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 49 deletions.
27 changes: 15 additions & 12 deletions modules/turn/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ static void destructor(void *arg)
{
struct allocation *al = arg;

pthread_mutex_lock(&turndp()->mutex);
mtx_lock(&turndp()->mutex);
list_unlink(&al->le_map);
pthread_mutex_unlock(&turndp()->mutex);
mtx_unlock(&turndp()->mutex);

hash_flush(al->perms);
mem_deref(al->perms);
Expand All @@ -64,8 +64,11 @@ static void destructor(void *arg)
tmr_cancel(&al->tmr);
mem_deref(al->username);
mem_deref(al->cli_sock);

/* @TODO check fd deref cleanup on turn worker thread */
mem_deref(al->rel_us);
mem_deref(al->rsv_us);

turndp()->allocc_cur--;
}

Expand Down Expand Up @@ -94,17 +97,17 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg)
}
}

pthread_mutex_lock(&al->mutex);
mtx_lock(&al->mutex);
perm = perm_find(al->perms, src);
pthread_mutex_unlock(&al->mutex);
mtx_unlock(&al->mutex);
if (!perm) {
++al->dropc_rx;
return;
}

pthread_mutex_lock(&al->mutex);
mtx_lock(&al->mutex);
chan = chan_peer_find(al->chans, src);
pthread_mutex_unlock(&al->mutex);
mtx_unlock(&al->mutex);
if (chan) {
uint16_t len = mbuf_get_left(mb);
size_t start;
Expand Down Expand Up @@ -197,9 +200,9 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,
udp_thread_detach(al->rel_us);
udp_thread_detach(al->rsv_us);

pthread_mutex_lock(&turndp()->mutex);
mtx_lock(&turndp()->mutex);
list_append(&turndp()->re_map, &al->le_map, al);
pthread_mutex_unlock(&turndp()->mutex);
mtx_unlock(&turndp()->mutex);

return (i == PORT_TRY_MAX) ? EADDRINUSE : err;
}
Expand Down Expand Up @@ -263,7 +266,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
goto reply;
}

restund_debug("turn: allocation already exists (%J)\n", src);
restund_warning("turn: allocation already exists (%J)\n", src);
++turnd->reply.scode_437;
rerr = stun_ereply(proto, sock, src, 0, msg,
437, "Allocation TID Mismatch",
Expand Down Expand Up @@ -367,7 +370,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
al->cli_addr = *src;
al->srv_addr = *dst;
al->proto = proto;
pthread_mutex_init(&al->mutex, NULL);
mtx_init(&al->mutex, mtx_plain);
sa_init(&al->rsv_addr, AF_UNSPEC);
turndp()->allocc_tot++;
turndp()->allocc_cur++;
Expand Down Expand Up @@ -483,9 +486,9 @@ void refresh_request(struct turnd *turnd, struct allocation *al,
lifetime = lifetime ? MAX(lifetime, TURN_DEFAULT_LIFETIME) : 0;
lifetime = MIN(lifetime, turnd->lifetime_max);

pthread_mutex_lock(&al->mutex);
mtx_lock(&al->mutex);
tmr_start(&al->tmr, lifetime * 1000, timeout, al);
pthread_mutex_unlock(&al->mutex);
mtx_unlock(&al->mutex);

restund_debug("turn: allocation %p refresh (%us)\n", al, lifetime);

Expand Down
12 changes: 6 additions & 6 deletions modules/turn/chan.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ static void destructor(void *arg)
restund_debug("turn: allocation %p channel 0x%x %J destroyed\n",
chan->al, chan->numb, &chan->peer);

pthread_mutex_lock(&chan->al->mutex);
mtx_lock(&chan->al->mutex);
hash_unlink(&chan->he_numb);
hash_unlink(&chan->he_peer);
pthread_mutex_unlock(&chan->al->mutex);
mtx_unlock(&chan->al->mutex);
}


Expand Down Expand Up @@ -198,7 +198,7 @@ static struct chan *chan_create(struct chanlist *cl, uint16_t numb,
if (!chan)
return NULL;

pthread_mutex_lock(&al->mutex);
mtx_lock(&al->mutex);

hash_append(cl->ht_numb, numb, &chan->he_numb, chan);
hash_append(cl->ht_peer, sa_hash(peer, SA_ALL), &chan->he_peer, chan);
Expand All @@ -208,7 +208,7 @@ static struct chan *chan_create(struct chanlist *cl, uint16_t numb,
chan->al = al;
chan->expires = time(NULL) + CHAN_LIFETIME;

pthread_mutex_unlock(&al->mutex);
mtx_unlock(&al->mutex);

restund_debug("turn: allocation %p channel 0x%x %J created\n",
chan->al, chan->numb, &chan->peer);
Expand All @@ -222,9 +222,9 @@ static void chan_refresh(struct chan *chan)
if (!chan)
return;

pthread_mutex_lock(&chan->al->mutex);
mtx_lock(&chan->al->mutex);
chan->expires = time(NULL) + CHAN_LIFETIME;
pthread_mutex_unlock(&chan->al->mutex);
mtx_unlock(&chan->al->mutex);

restund_debug("turn: allocation %p channel 0x%x %J refreshed\n",
chan->al, chan->numb, &chan->peer);
Expand Down
8 changes: 4 additions & 4 deletions modules/turn/perm.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ static void destructor(void *arg)
struct perm *perm = arg;
int err;

pthread_mutex_lock(&perm->al->mutex);
mtx_lock(&perm->al->mutex);
hash_unlink(&perm->he);
pthread_mutex_unlock(&perm->al->mutex);
mtx_unlock(&perm->al->mutex);

restund_debug("turn: allocation %p permission %j destroyed "
"(%llu/%llu %llu/%llu)\n",
Expand Down Expand Up @@ -104,7 +104,7 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer,
if (!perm)
return NULL;

pthread_mutex_lock(&al->mutex);
mtx_lock(&al->mutex);

hash_append(ht, sa_hash(peer, SA_ADDR), &perm->he, perm);

Expand All @@ -113,7 +113,7 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer,
perm->expires = now + PERM_LIFETIME;
perm->start = now;

pthread_mutex_unlock(&al->mutex);
mtx_unlock(&al->mutex);

restund_debug("turn: allocation %p permission %j created\n", al, peer);

Expand Down
64 changes: 40 additions & 24 deletions modules/turn/turn.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@


enum {
ALLOC_DEFAULT_BSIZE = 512,
ALLOC_DEFAULT_BSIZE = 1024,
TURN_THREADS = 4
};


Expand All @@ -23,8 +24,8 @@ struct tuple {


static struct turnd turnd;
static struct tmr tmr;

static struct tmr timers[TURN_THREADS];
static thrd_t tid[TURN_THREADS];

struct turnd *turndp(void)
{
Expand Down Expand Up @@ -338,53 +339,53 @@ static struct restund_cmdsub cmd_turnreply = {

static void tmr_handler(void *arg)
{
struct turnd *td = arg;
struct tmr *tmr = arg;
struct le *le;

pthread_mutex_lock(&td->mutex);
if (!td->run)
mtx_lock(&turndp()->mutex);
if (!turndp()->run)
re_cancel();

LIST_FOREACH(&td->re_map, le)
/* Reassign one allocation by time */
LIST_FOREACH(&turndp()->re_map, le)
{
struct allocation *al = le->data;
pthread_mutex_lock(&al->mutex);
mtx_lock(&al->mutex);
udp_thread_attach(al->rel_us);
udp_thread_attach(al->rsv_us);
pthread_mutex_unlock(&al->mutex);
mtx_unlock(&al->mutex);
}
list_clear(&td->re_map);
pthread_mutex_unlock(&td->mutex);
list_clear(&turndp()->re_map);

mtx_unlock(&turndp()->mutex);

tmr_start(&tmr, 10, tmr_handler, td);
tmr_start(tmr, 10, tmr_handler, tmr);
}


static void *thread_handler(void *arg)
static int thread_handler(void *arg)
{
struct turnd *td = arg;
struct tmr *tmr = arg;
int err;

err = re_thread_init();
if (err) {
restund_error("turn: re_thread_init failed %m\n", err);
return NULL;
return 0;
}

tmr_start(&tmr, 10, tmr_handler, td);
tmr_start(tmr, 10, tmr_handler, tmr);

err = re_main(NULL);
if (err)
restund_error("turn: re_main failed %m\n", err);

restund_warning("EXIT turn thread\n");
tmr_cancel(&tmr);
tmr_cancel(tmr);

tmr_debug();
re_thread_close();

return NULL;
return 0;
}


Expand Down Expand Up @@ -462,8 +463,20 @@ static int module_init(void)
list_init(&turnd.re_map);

turnd.run = true;
pthread_mutex_init(&turnd.mutex, NULL);
err = pthread_create(&turnd.tid, NULL, thread_handler, &turnd);
err = mtx_init(&turnd.mutex, mtx_plain);
if (err) {
restund_error("turn: mtx_init err: %d\n", err);
goto out;
}

for (int i = 0; i < TURN_THREADS; i++) {
err = thrd_create(&tid[i], thread_handler,
&timers[i]);
if (err) {
restund_error("turn: thrd_create err: %m\n", err);
goto out;
}
}

restund_debug("turn: lifetime=%u ext=%j ext6=%j bsz=%u\n",
turnd.lifetime_max, &turnd.rel_addr, &turnd.rel_addr6,
Expand All @@ -476,10 +489,13 @@ static int module_init(void)

static int module_close(void)
{
pthread_mutex_lock(&turnd.mutex);
mtx_lock(&turnd.mutex);
turnd.run = false;
pthread_mutex_unlock(&turnd.mutex);
pthread_join(turnd.tid, NULL);
mtx_unlock(&turnd.mutex);

for (int i = 0; i < TURN_THREADS; i++) {
thrd_join(tid[i], NULL);
}

hash_flush(turnd.ht_alloc);
turnd.ht_alloc = mem_deref(turnd.ht_alloc);
Expand Down
5 changes: 2 additions & 3 deletions modules/turn/turn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ struct turnd {
uint32_t allocc_cur;
uint32_t lifetime_max;
uint32_t udp_sockbuf_size;
pthread_t tid;
pthread_mutex_t mutex;
mtx_t mutex;
bool run;
struct list re_map;

Expand All @@ -42,7 +41,7 @@ struct chanlist;
struct allocation {
struct le he;
struct le le_map;
pthread_mutex_t mutex;
mtx_t mutex;
struct tmr tmr;
uint8_t tid[STUN_TID_SIZE];
struct sa cli_addr;
Expand Down

0 comments on commit 343b0b4

Please sign in to comment.