Skip to content

Commit

Permalink
[Enhancement] add penalty time when node is added in blocklist
Browse files Browse the repository at this point in the history
* add penalty time when a node is added into the blocklist to avoid the
  node get removed from the list too quick

Fixes StarRocks#54111

Signed-off-by: Kevin Xiaohua Cai <[email protected]>
  • Loading branch information
kevincai committed Dec 23, 2024
1 parent 33da390 commit 9b70dad
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 18 deletions.
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3279,6 +3279,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static long black_host_connect_failures_within_time = 5;

@ConfField(mutable = true, comment = "The minimal time in milliseconds for the node to stay in the blocklist")
public static long black_host_penalty_min_ms = 500; // 500ms

@ConfField(mutable = false)
public static int jdbc_connection_pool_size = 8;

Expand Down
19 changes: 15 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/qe/HostBlacklist.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,24 @@ public void refresh() {

for (Map.Entry<Long, DisconnectEvent> entry : hostBlacklist.entrySet()) {
Long nodeId = entry.getKey();
// 1. If the node is null, means that the node has been removed.
// 2. check the all ports of the node
ComputeNode node = clusterInfoService.getBackendOrComputeNode(nodeId);
if (node == null) {
// Unknown node, the node must be removed from the system
offlineNode.add(nodeId);
} else if (clusterInfoService.checkNodeAvailable(node) &&
entry.getValue().type == DisconnectEvent.TYPE_AUTO) {
continue;
}

LocalDateTime penaltyEndTime =
entry.getValue().disconnectTime.plusNanos(Config.black_host_penalty_min_ms * 1000_000);
if (penaltyEndTime.isAfter(LocalDateTime.now())) {
// penaltyEndTime > now()
// It is not long enough to stay in the blocklist, keep it in the blocklist
// Avoid the node enter and exit the blocklist too quick.
continue;
}

// Check all the ports, determine if the BE node is recovered
if (clusterInfoService.checkNodeAvailable(node) && entry.getValue().type == DisconnectEvent.TYPE_AUTO) {
String host = node.getHost();
List<Integer> ports = Lists.newArrayList();
Collections.addAll(ports, node.getBePort(), node.getBrpcPort(), node.getHttpPort());
Expand Down
96 changes: 82 additions & 14 deletions fe/fe-core/src/test/java/com/starrocks/qe/SimpleSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,20 @@ public void testEmptyComputeNodeList() {
@Test
public void testUpdateBlacklist(@Mocked SystemInfoService systemInfoService, @Mocked NetUtils netUtils) {
Config.heartbeat_timeout_second = 1;
// the node is allowed to be removed from blocklist immediately.
long defaultBlockPenaltyTime = Config.black_host_penalty_min_ms;
Config.black_host_penalty_min_ms = 0;

Backend backend1 = new Backend(10001L, "host10002", 10002);
backend1.setAlive(true);
backend1.setBrpcPort(10002);
backend1.setHttpPort(10012);

ComputeNode computeNode1 = new ComputeNode(10003, "host10003", 10003);
computeNode1.setAlive(false);
computeNode1.setBrpcPort(10003);
computeNode1.setHttpPort(10013);

SimpleScheduler.addToBlocklist(10001L);
SimpleScheduler.addToBlocklist(10002L);
SimpleScheduler.addToBlocklist(10003L);
new Expectations() {
{
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
Expand All @@ -351,12 +361,6 @@ public void testUpdateBlacklist(@Mocked SystemInfoService systemInfoService, @Mo
result = null;
times = 1;

// backend 10002 will be removed
Backend backend1 = new Backend();
backend1.setAlive(true);
backend1.setHost("host10002");
backend1.setBrpcPort(10002);
backend1.setHttpPort(10012);
systemInfoService.getBackendOrComputeNode(10002L);
result = backend1;
times = 1;
Expand All @@ -370,11 +374,6 @@ public void testUpdateBlacklist(@Mocked SystemInfoService systemInfoService, @Mo
times = 1;

// backend 10003, which is not available, will not be removed
ComputeNode computeNode1 = new ComputeNode();
computeNode1.setAlive(false);
computeNode1.setHost("host10003");
computeNode1.setBrpcPort(10003);
computeNode1.setHttpPort(10013);
systemInfoService.getBackendOrComputeNode(10003L);
result = computeNode1;
times = 2;
Expand All @@ -384,6 +383,10 @@ public void testUpdateBlacklist(@Mocked SystemInfoService systemInfoService, @Mo
times = 2;
}
};
SimpleScheduler.addToBlocklist(10001L);
SimpleScheduler.addToBlocklist(10002L);
SimpleScheduler.addToBlocklist(10003L);

SimpleScheduler.getHostBlacklist().refresh();

Assert.assertFalse(SimpleScheduler.isInBlocklist(10001L));
Expand All @@ -393,6 +396,8 @@ public void testUpdateBlacklist(@Mocked SystemInfoService systemInfoService, @Mo
//Having retried for Config.heartbeat_timeout_second + 1 times, backend 10003 will be removed.
SimpleScheduler.getHostBlacklist().refresh();
Assert.assertTrue(SimpleScheduler.isInBlocklist(10003L));

Config.black_host_penalty_min_ms = defaultBlockPenaltyTime;
}

@Test
Expand Down Expand Up @@ -575,4 +580,67 @@ public void testManualAdd(@Mocked SystemInfoService systemInfoService) throws In
blacklist.refresh();
Assert.assertTrue(blacklist.contains(10003L));
}

@Test
public void testPenaltyTimeInBlockList(@Mocked SystemInfoService systemInfoService, @Mocked NetUtils netUtils)
throws InterruptedException {
Config.black_host_history_sec = 5; // 5s
HostBlacklist blacklist = new HostBlacklist();
blacklist.disableAutoUpdate();

// stay at least 2 seconds before removed from the list
long originPenaltyTime = Config.black_host_penalty_min_ms;
Config.black_host_penalty_min_ms = 2000;

long nodeId = 2000L;
ComputeNode node = new ComputeNode(nodeId, "computeNode", 1111);
node.setBrpcPort(0);
node.updateOnce(1, 2, 3);
List<Integer> ports = new ArrayList<>();
Collections.addAll(ports, node.getBrpcPort(), node.getBePort(), node.getHttpPort(), node.getBeRpcPort());

new Expectations() {
{
globalStateMgr.getNodeMgr().getClusterInfo();
result = systemInfoService;

systemInfoService.getBackendOrComputeNode(nodeId);
result = node;

systemInfoService.checkNodeAvailable(node);
result = true;

NetUtils.checkAccessibleForAllPorts(anyString, (List<Integer>) any);
result = true;
}
};

long ts1 = System.currentTimeMillis();
blacklist.add(node.getId());
long ts2 = System.currentTimeMillis();

long checkedCount = 0;
while (ts1 + Config.black_host_penalty_min_ms > System.currentTimeMillis()) {
Assert.assertNotNull(systemInfoService.getBackendOrComputeNode(nodeId));
Assert.assertTrue(systemInfoService.checkNodeAvailable(node));
Assert.assertTrue(NetUtils.checkAccessibleForAllPorts(node.getHost(), ports));
blacklist.refresh();
// still in the blocklist
Assert.assertTrue(blacklist.contains(node.getId()));
++checkedCount;
// check every 200ms
Thread.sleep(200);
}
Assert.assertTrue(checkedCount > 0);
long remainMs = System.currentTimeMillis() - ts2 - Config.black_host_penalty_min_ms;
if (remainMs > 0) {
Thread.sleep(remainMs);
}
// must be expired in the blocklist
Assert.assertTrue(ts2 + Config.black_host_penalty_min_ms <= System.currentTimeMillis());
blacklist.refresh();
Assert.assertFalse(blacklist.contains(node.getId()));

Config.black_host_penalty_min_ms = originPenaltyTime;
}
}

0 comments on commit 9b70dad

Please sign in to comment.