diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index 94eedb6..a569513 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -66,14 +67,28 @@ public boolean init(RouterOptions opts) { long refreshPeriod = this.opts.getRefreshPeriodSeconds(); if (refreshPeriod > 0) { + AtomicLong order = new AtomicLong(0); this.refresher = REFRESHER_POOL.getObject(); this.refresher.scheduleWithFixedDelay( () -> { - try { - Map health = this.checkHealth().get(); + long thisOrder = order.incrementAndGet(); + checkHealth().whenComplete((r, t) -> { + if (t != null) { + LOG.warn("Failed to check health", t); + return; + } + + // I don't want to worry about the overflow issue with long anymore, + // because assuming one increment per second, it will take 292 years + // to overflow. I think that's sufficient. + if (thisOrder < order.get()) { + LOG.warn("Skip outdated health check result, order: {}", thisOrder); + return; + } + List activities = new ArrayList<>(); List inactivities = new ArrayList<>(); - for (Map.Entry entry : health.entrySet()) { + for (Map.Entry entry : r.entrySet()) { if (entry.getValue()) { activities.add(entry.getKey()); } else { @@ -81,9 +96,7 @@ public boolean init(RouterOptions opts) { } } this.router.onRefresh(activities, inactivities); - } catch (Throwable t) { - LOG.warn("Failed to check health", t); - } + }); }, Util.randomInitialDelay(180), refreshPeriod, @@ -279,9 +292,7 @@ public CompletableFuture routeFor(Void request) { @Override public void onRefresh(List activities, List inactivities) { - if (inactivities != null && !inactivities.isEmpty()) { - LOG.warn("Some endpoints are inactive: {}", inactivities); - } + LOG.info("Router cache refreshed, activities: {}, inactivities: {}", activities, inactivities); this.endpointsRef.set(new Endpoints(activities, inactivities)); } }