Skip to content

Commit

Permalink
feat: async health check task
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Sep 25, 2024
1 parent ae02979 commit 8b498bb
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions ingester-protocol/src/main/java/io/greptime/RouterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -66,24 +67,36 @@ public boolean init(RouterOptions opts) {

long refreshPeriod = this.opts.getRefreshPeriodSeconds();
if (refreshPeriod > 0) {
AtomicLong order = new AtomicLong(0);
this.refresher = REFRESHER_POOL.getObject();
this.refresher.scheduleWithFixedDelay(
() -> {
try {
Map<Endpoint, Boolean> health = this.checkHealth().get();
long thisOrder = order.incrementAndGet();
checkHealth().whenComplete((r, t) -> {
if (t != null) {
LOG.warn("Failed to check health", t);
return;
}

// I don't want to worry about the overflow issue with long anymore,
// because assuming one increment per second, it will take 292 years
// to overflow. I think that's sufficient.
if (thisOrder < order.get()) {
LOG.warn("Skip outdated health check result, order: {}", thisOrder);
return;
}

List<Endpoint> activities = new ArrayList<>();
List<Endpoint> inactivities = new ArrayList<>();
for (Map.Entry<Endpoint, Boolean> entry : health.entrySet()) {
for (Map.Entry<Endpoint, Boolean> entry : r.entrySet()) {
if (entry.getValue()) {
activities.add(entry.getKey());
} else {
inactivities.add(entry.getKey());
}
}
this.router.onRefresh(activities, inactivities);
} catch (Throwable t) {
LOG.warn("Failed to check health", t);
}
});
},
Util.randomInitialDelay(180),
refreshPeriod,
Expand Down Expand Up @@ -279,9 +292,7 @@ public CompletableFuture<Endpoint> routeFor(Void request) {

@Override
public void onRefresh(List<Endpoint> activities, List<Endpoint> inactivities) {
if (inactivities != null && !inactivities.isEmpty()) {
LOG.warn("Some endpoints are inactive: {}", inactivities);
}
LOG.info("Router cache refreshed, activities: {}, inactivities: {}", activities, inactivities);
this.endpointsRef.set(new Endpoints(activities, inactivities));
}
}
Expand Down

0 comments on commit 8b498bb

Please sign in to comment.