Skip to content

Commit

Permalink
Use new [ap_]atomics for proxy_worker_shared->busy/lbstatus.
Browse files Browse the repository at this point in the history
  • Loading branch information
ylavic committed Nov 29, 2023
1 parent 69999ad commit ef70720
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 191 deletions.
40 changes: 22 additions & 18 deletions modules/proxy/balancers/mod_lbmethod_bybusyness.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "ap_mpm.h"
#include "apr_version.h"
#include "ap_hooks.h"
#include "ap_atomic.h"

module AP_MODULE_DECLARE_DATA lbmethod_bybusyness_module;

Expand All @@ -30,23 +31,26 @@ static APR_OPTIONAL_FN_TYPE(proxy_balancer_get_best_worker)
static int is_best_bybusyness(proxy_worker *current, proxy_worker *prev_best, void *baton)
{
int *total_factor = (int *)baton;
apr_size_t current_busy = ap_proxy_get_busy_count(current);
apr_size_t prev_best_busy = 0;
int lbfactor = current->s->lbfactor, lbstatus;
apr_size_t current_busy, prev_best_busy = 0;

current->s->lbstatus += current->s->lbfactor;
*total_factor += current->s->lbfactor;
*total_factor += lbfactor;
lbstatus = ap_atomic_int_add_sat(&current->s->lbstatus, lbfactor);
if (lbstatus > APR_INT32_MAX - lbfactor) {
lbstatus = APR_INT32_MAX;
}
else {
lbstatus += lbfactor;
}

current_busy = ap_atomic_size_get(&current->s->busy);
if (prev_best)
prev_best_busy = ap_proxy_get_busy_count(prev_best);


return (
!prev_best
|| (current_busy < prev_best_busy)
|| (
(current_busy == prev_best_busy)
&& (current->s->lbstatus > prev_best->s->lbstatus)
)
);
prev_best_busy = ap_atomic_size_get(&prev_best->s->busy);

return (!prev_best
|| (current_busy < prev_best_busy)
|| (current_busy == prev_best_busy
&& lbstatus > ap_atomic_int_get(&prev_best->s->lbstatus)));
}

static proxy_worker *find_best_bybusyness(proxy_balancer *balancer,
Expand All @@ -58,7 +62,7 @@ static proxy_worker *find_best_bybusyness(proxy_balancer *balancer,
&total_factor);

if (worker) {
worker->s->lbstatus -= total_factor;
ap_atomic_int_sub_sat(&worker->s->lbstatus, total_factor);
}

return worker;
Expand All @@ -71,8 +75,8 @@ static apr_status_t reset(proxy_balancer *balancer, server_rec *s)
proxy_worker **worker;
worker = (proxy_worker **)balancer->workers->elts;
for (i = 0; i < balancer->workers->nelts; i++, worker++) {
(*worker)->s->lbstatus = 0;
ap_proxy_set_busy_count(*worker, 0);
ap_atomic_int_set(&(*worker)->s->lbstatus, 0);
ap_atomic_size_set(&(*worker)->s->busy, 0);
}
return APR_SUCCESS;
}
Expand Down
23 changes: 17 additions & 6 deletions modules/proxy/balancers/mod_lbmethod_byrequests.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ap_mpm.h"
#include "apr_version.h"
#include "ap_hooks.h"
#include "ap_atomic.h"

module AP_MODULE_DECLARE_DATA lbmethod_byrequests_module;

Expand All @@ -28,11 +29,19 @@ static APR_OPTIONAL_FN_TYPE(proxy_balancer_get_best_worker)
static int is_best_byrequests(proxy_worker *current, proxy_worker *prev_best, void *baton)
{
int *total_factor = (int *)baton;
int lbfactor = current->s->lbfactor, lbstatus;

current->s->lbstatus += current->s->lbfactor;
*total_factor += current->s->lbfactor;
*total_factor += lbfactor;
lbstatus = ap_atomic_int_add_sat(&current->s->lbstatus, lbfactor);
if (lbstatus > APR_INT32_MAX - lbfactor) {
lbstatus = APR_INT32_MAX;
}
else {
lbstatus += lbfactor;
}

return (!prev_best || (current->s->lbstatus > prev_best->s->lbstatus));
return (!prev_best
|| lbstatus > ap_atomic_int_get(&prev_best->s->lbstatus));
}

/*
Expand Down Expand Up @@ -84,10 +93,12 @@ static proxy_worker *find_best_byrequests(proxy_balancer *balancer,
request_rec *r)
{
int total_factor = 0;
proxy_worker *worker = ap_proxy_balancer_get_best_worker_fn(balancer, r, is_best_byrequests, &total_factor);
proxy_worker *worker = ap_proxy_balancer_get_best_worker_fn(balancer, r,
is_best_byrequests,
&total_factor);

if (worker) {
worker->s->lbstatus -= total_factor;
ap_atomic_int_sub_sat(&worker->s->lbstatus, total_factor);
}

return worker;
Expand All @@ -100,7 +111,7 @@ static apr_status_t reset(proxy_balancer *balancer, server_rec *s)
proxy_worker **worker;
worker = (proxy_worker **)balancer->workers->elts;
for (i = 0; i < balancer->workers->nelts; i++, worker++) {
(*worker)->s->lbstatus = 0;
ap_atomic_int_set(&(*worker)->s->lbstatus, 0);
}
return APR_SUCCESS;
}
Expand Down
5 changes: 3 additions & 2 deletions modules/proxy/balancers/mod_lbmethod_bytraffic.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ap_mpm.h"
#include "apr_version.h"
#include "ap_hooks.h"
#include "ap_atomic.h"

module AP_MODULE_DECLARE_DATA lbmethod_bytraffic_module;

Expand Down Expand Up @@ -73,8 +74,8 @@ static apr_status_t reset(proxy_balancer *balancer, server_rec *s)
proxy_worker **worker;
worker = (proxy_worker **)balancer->workers->elts;
for (i = 0; i < balancer->workers->nelts; i++, worker++) {
(*worker)->s->lbstatus = 0;
(*worker)->s->busy = 0;
ap_atomic_int_set(&(*worker)->s->lbstatus, 0);
ap_atomic_size_set(&(*worker)->s->busy, 0);
(*worker)->s->transferred = 0;
(*worker)->s->read = 0;
}
Expand Down
29 changes: 18 additions & 11 deletions modules/proxy/mod_proxy_balancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ap_mpm.h"
#include "apr_version.h"
#include "ap_hooks.h"
#include "ap_atomic.h"
#include "apr_date.h"
#include "apr_escape.h"
#include "mod_watchdog.h"
Expand Down Expand Up @@ -485,6 +486,13 @@ static void force_recovery(proxy_balancer *balancer, server_rec *s)
}
}

static apr_status_t proxy_decrement_busy_count(void *arg)
{
proxy_worker *worker = arg;
ap_atomic_size_sub_sat(&worker->s->busy, 1);
return APR_SUCCESS;
}

static int proxy_balancer_pre_request(proxy_worker **worker,
proxy_balancer **balancer,
request_rec *r,
Expand Down Expand Up @@ -545,12 +553,12 @@ static int proxy_balancer_pre_request(proxy_worker **worker,
* not in error state or not disabled.
*/
if (PROXY_WORKER_IS_USABLE(*workers)) {
(*workers)->s->lbstatus += (*workers)->s->lbfactor;
ap_atomic_int_add_sat(&(*workers)->s->lbstatus, (*workers)->s->lbfactor);
total_factor += (*workers)->s->lbfactor;
}
workers++;
}
runtime->s->lbstatus -= total_factor;
ap_atomic_int_sub_sat(&runtime->s->lbstatus, total_factor);
}
runtime->s->elected++;

Expand Down Expand Up @@ -623,8 +631,8 @@ static int proxy_balancer_pre_request(proxy_worker **worker,
*worker = runtime;
}

ap_proxy_increment_busy_count(*worker);
apr_pool_cleanup_register(r->pool, *worker, ap_proxy_decrement_busy_count,
ap_atomic_size_add_sat(&(*worker)->s->busy, 1);
apr_pool_cleanup_register(r->pool, *worker, proxy_decrement_busy_count,
apr_pool_cleanup_null);

/* Add balancer/worker info to env. */
Expand Down Expand Up @@ -730,12 +738,11 @@ static void recalc_factors(proxy_balancer *balancer)
* load factor will always be 100
*/
if (balancer->workers->nelts == 1) {
(*workers)->s->lbstatus = (*workers)->s->lbfactor = 100;
return;
workers[0]->s->lbfactor = 100;
}
for (i = 0; i < balancer->workers->nelts; i++) {
/* Update the status entries */
workers[i]->s->lbstatus = workers[i]->s->lbfactor;
ap_atomic_int_set(&workers[i]->s->lbstatus, workers[i]->s->lbfactor);
}
}

Expand Down Expand Up @@ -1542,7 +1549,7 @@ static void balancer_display_page(request_rec *r, proxy_server_conf *conf,
worker->s->retries);
ap_rprintf(r,
" <httpd:lbstatus>%d</httpd:lbstatus>\n",
worker->s->lbstatus);
ap_atomic_int_get(&worker->s->lbstatus));
ap_rprintf(r,
" <httpd:loadfactor>%.2f</httpd:loadfactor>\n",
(float)(worker->s->lbfactor)/100.0);
Expand All @@ -1563,7 +1570,7 @@ static void balancer_display_page(request_rec *r, proxy_server_conf *conf,
"</httpd:redirect>\n", NULL);
ap_rprintf(r,
" <httpd:busy>%" APR_SIZE_T_FMT "</httpd:busy>\n",
ap_proxy_get_busy_count(worker));
ap_atomic_size_get(&worker->s->busy));
ap_rprintf(r, " <httpd:lbset>%d</httpd:lbset>\n",
worker->s->lbset);
/* End proxy_worker_stat */
Expand Down Expand Up @@ -1736,8 +1743,8 @@ static void balancer_display_page(request_rec *r, proxy_server_conf *conf,
ap_rvputs(r, ap_proxy_parse_wstatus(r->pool, worker), NULL);
ap_rputs("</td>", r);
ap_rprintf(r, "<td>%" APR_SIZE_T_FMT "</td>", worker->s->elected);
ap_rprintf(r, "<td>%" APR_SIZE_T_FMT "</td>", ap_proxy_get_busy_count(worker));
ap_rprintf(r, "<td>%d</td><td>", worker->s->lbstatus);
ap_rprintf(r, "<td>%" APR_SIZE_T_FMT "</td>", ap_atomic_size_get(&worker->s->busy));
ap_rprintf(r, "<td>%d</td><td>", ap_atomic_int_get(&worker->s->lbstatus));
ap_rputs(apr_strfsize(worker->s->transferred, fbuf), r);
ap_rputs("</td><td>", r);
ap_rputs(apr_strfsize(worker->s->read, fbuf), r);
Expand Down
123 changes: 4 additions & 119 deletions modules/proxy/proxy_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "apr_strings.h"
#include "apr_hash.h"
#include "apr_atomic.h"
#include "ap_atomic.h"
#include "http_core.h"
#include "proxy_util.h"
#include "ajp.h"
Expand Down Expand Up @@ -1488,7 +1489,9 @@ static proxy_worker *proxy_balancer_get_best_worker(proxy_balancer *balancer,
if (best_worker) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server, APLOGNO(10123)
"proxy: %s selected worker \"%s\" : busy %" APR_SIZE_T_FMT " : lbstatus %d",
balancer->lbmethod->name, best_worker->s->name, best_worker->s->busy, best_worker->s->lbstatus);
balancer->lbmethod->name, best_worker->s->name,
ap_atomic_size_get(&best_worker->s->busy),
ap_atomic_int_get(&best_worker->s->lbstatus));
}

return best_worker;
Expand Down Expand Up @@ -5392,124 +5395,6 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
return APR_SUCCESS;
}

PROXY_DECLARE(apr_status_t) ap_proxy_decrement_busy_count(void *worker_)
{
apr_size_t val;
proxy_worker *worker = worker_;

#if APR_SIZEOF_VOIDP == 4
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(apr_uint32_t));
val = apr_atomic_read32(&worker->s->busy);
while (val > 0) {
apr_size_t old = val;
val = apr_atomic_cas32(&worker->s->busy, val - 1, old);
if (val == old) {
break;
}
}
#elif APR_VERSION_AT_LEAST(1,7,4) /* APR 64bit atomics not safe before 1.7.4 */
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(apr_uint64_t));
val = apr_atomic_read64(&worker->s->busy);
while (val > 0) {
apr_size_t old = val;
val = apr_atomic_cas64(&worker->s->busy, val - 1, old);
if (val == old) {
break;
}
}
#else /* Use atomics for (64bit) pointers */
void *volatile *busy_p = (void *)&worker->s->busy;
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(void*));
AP_DEBUG_ASSERT((apr_uintptr_t)busy_p % sizeof(void*) == 0);
val = (apr_uintptr_t)apr_atomic_casptr((void *)busy_p, NULL, NULL);
while (val > 0) {
apr_size_t old = val;
val = (apr_uintptr_t)apr_atomic_casptr((void *)busy_p,
(void *)(apr_uintptr_t)(val - 1),
(void *)(apr_uintptr_t)old);
if (val == old) {
break;
}
}
#endif
return APR_SUCCESS;
}

PROXY_DECLARE(void) ap_proxy_increment_busy_count(proxy_worker *worker)
{
apr_size_t val;
#if APR_SIZEOF_VOIDP == 4
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(apr_uint32_t));
val = apr_atomic_read32(&worker->s->busy);
while (val < APR_INT32_MAX) {
apr_size_t old = val;
val = apr_atomic_cas32(&worker->s->busy, val + 1, old);
if (val == old) {
break;
}
}
#elif APR_VERSION_AT_LEAST(1,7,4) /* APR 64bit atomics not safe before 1.7.4 */
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(apr_uint64_t));
val = apr_atomic_read64(&worker->s->busy);
while (val < APR_INT64_MAX) {
apr_size_t old = val;
val = apr_atomic_cas64(&worker->s->busy, val + 1, old);
if (val == old) {
break;
}
}
#else /* Use atomics for (64bit) pointers */
void *volatile *busy_p = (void *)&worker->s->busy;
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(void*));
AP_DEBUG_ASSERT((apr_uintptr_t)busy_p % sizeof(void*) == 0);
val = (apr_uintptr_t)apr_atomic_casptr((void *)busy_p, NULL, NULL);
while (val < APR_INT64_MAX) {
apr_size_t old = val;
val = (apr_uintptr_t)apr_atomic_casptr((void *)busy_p,
(void *)(apr_uintptr_t)(val + 1),
(void *)(apr_uintptr_t)old);
if (val == old) {
break;
}
}
#endif
}

PROXY_DECLARE(apr_size_t) ap_proxy_get_busy_count(proxy_worker *worker)
{
apr_size_t val;
#if APR_SIZEOF_VOIDP == 4
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(apr_uint32_t));
val = apr_atomic_read32(&worker->s->busy);
#elif APR_VERSION_AT_LEAST(1,7,4) /* APR 64bit atomics not safe before 1.7.4 */
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(apr_uint64_t));
val = apr_atomic_read64(&worker->s->busy);
#else /* Use atomics for (64bit) pointers */
void *volatile *busy_p = (void *)&worker->s->busy;
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(void*));
AP_DEBUG_ASSERT((apr_uintptr_t)busy_p % sizeof(void*) == 0);
val = (apr_uintptr_t)apr_atomic_casptr((void *)busy_p, NULL, NULL);
#endif

return val;
}

PROXY_DECLARE(void) ap_proxy_set_busy_count(proxy_worker *worker, apr_size_t to)
{
#if APR_SIZEOF_VOIDP == 4
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(apr_uint32_t));
apr_atomic_set32(&worker->s->busy, to);
#elif APR_VERSION_AT_LEAST(1,7,4) /* APR 64bit atomics not safe before 1.7.4 */
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(apr_uint64_t));
apr_atomic_set64(&worker->s->busy, to);
#else /* Use atomics for (64bit) pointers */
void *volatile *busy_p = (void *)&worker->s->busy;
AP_DEBUG_ASSERT(sizeof(apr_size_t) == sizeof(void*));
AP_DEBUG_ASSERT((apr_uintptr_t)busy_p % sizeof(void*) == 0);
apr_atomic_xchgptr((void *)busy_p, (void *)(apr_uintptr_t)to);
#endif
}

static void add_pollset(apr_pollset_t *pollset, apr_pollfd_t *pfd,
apr_int16_t events)
{
Expand Down
Loading

0 comments on commit ef70720

Please sign in to comment.