diff --git a/chainbase/src/main/java/org/tron/core/ChainBaseManager.java b/chainbase/src/main/java/org/tron/core/ChainBaseManager.java index e43d442534a..d148021f6c4 100644 --- a/chainbase/src/main/java/org/tron/core/ChainBaseManager.java +++ b/chainbase/src/main/java/org/tron/core/ChainBaseManager.java @@ -244,6 +244,10 @@ public class ChainBaseManager { @Setter private long lowestBlockNum = -1; // except num = 0. + @Getter + @Setter + private long latestSaveBlockTime; + // for test only public List getWitnesses() { return witnessScheduleStore.getActiveWitnesses(); @@ -381,6 +385,7 @@ private void init() { this.lowestBlockNum = this.blockIndexStore.getLimitNumber(1, 1).stream() .map(BlockId::getNum).findFirst().orElse(0L); this.nodeType = getLowestBlockNum() > 1 ? NodeType.LITE : NodeType.FULL; + this.latestSaveBlockTime = System.currentTimeMillis(); } public void shutdown() { diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 22159063333..62ed12d856c 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -333,6 +333,9 @@ public class CommonParameter { public boolean isOpenFullTcpDisconnect; @Getter @Setter + public int inactiveThreshold; + @Getter + @Setter public boolean nodeDetectEnable; @Getter @Setter diff --git a/common/src/main/java/org/tron/core/Constant.java b/common/src/main/java/org/tron/core/Constant.java index 0e634d3ef7d..da3b2b1becc 100644 --- a/common/src/main/java/org/tron/core/Constant.java +++ b/common/src/main/java/org/tron/core/Constant.java @@ -198,6 +198,8 @@ public class Constant { public static final String NODE_IS_OPEN_FULL_TCP_DISCONNECT = "node.isOpenFullTcpDisconnect"; + public static final String NODE_INACTIVE_THRESHOLD = "node.inactiveThreshold"; + public static final String NODE_DETECT_ENABLE = "node.nodeDetectEnable"; public static final String NODE_MAX_TRANSACTION_PENDING_SIZE = "node.maxTransactionPendingSize"; diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 422efefaed8..7b089530a41 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -173,6 +173,7 @@ public static void clearParam() { PARAMETER.receiveTcpMinDataLength = 2048; PARAMETER.isOpenFullTcpDisconnect = false; PARAMETER.nodeDetectEnable = false; + PARAMETER.inactiveThreshold = 600; PARAMETER.supportConstant = false; PARAMETER.debug = false; PARAMETER.minTimeRatio = 0.0; @@ -845,6 +846,9 @@ public static void setParam(final String[] args, final String confFileName) { PARAMETER.nodeDetectEnable = config.hasPath(Constant.NODE_DETECT_ENABLE) && config.getBoolean(Constant.NODE_DETECT_ENABLE); + PARAMETER.inactiveThreshold = config.hasPath(Constant.NODE_INACTIVE_THRESHOLD) + ? config.getInt(Constant.NODE_INACTIVE_THRESHOLD) : 600; + PARAMETER.maxTransactionPendingSize = config.hasPath(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE) ? config.getInt(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE) : 2000; diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index ef2f5c81124..66aeccdda39 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -1384,6 +1384,7 @@ public void updateDynamicProperties(BlockCapsule block) { (chainBaseManager.getDynamicPropertiesStore().getLatestBlockHeaderNumber() - chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum() + 1)); + chainBaseManager.setLatestSaveBlockTime(System.currentTimeMillis()); Metrics.gaugeSet(MetricKeys.Gauge.HEADER_HEIGHT, block.getNum()); Metrics.gaugeSet(MetricKeys.Gauge.HEADER_TIME, block.getTimeStamp()); } diff --git a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java index 7518b1347a7..0100dc443d9 100644 --- a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java +++ b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java @@ -19,6 +19,7 @@ import org.tron.core.net.message.PbftMessageFactory; import org.tron.core.net.message.TronMessage; import org.tron.core.net.message.TronMessageFactory; +import org.tron.core.net.message.adv.FetchInvDataMessage; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.message.base.DisconnectMessage; import org.tron.core.net.message.handshake.HelloMessage; @@ -38,6 +39,7 @@ import org.tron.p2p.P2pEventHandler; import org.tron.p2p.connection.Channel; import org.tron.protos.Protocol; +import org.tron.protos.Protocol.Inventory.InventoryType; import org.tron.protos.Protocol.ReasonCode; @Slf4j(topic = "net") @@ -205,6 +207,7 @@ private void processMessage(PeerConnection peer, byte[] data) { default: throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString()); } + updateLastActiveTime(peer, msg); } catch (Exception e) { processException(peer, msg, e); } finally { @@ -220,6 +223,27 @@ private void processMessage(PeerConnection peer, byte[] data) { } } + private void updateLastActiveTime(PeerConnection peer, TronMessage msg) { + MessageTypes type = msg.getType(); + + boolean flag = false; + switch (type) { + case SYNC_BLOCK_CHAIN: + case BLOCK_CHAIN_INVENTORY: + case BLOCK: + flag = true; + break; + case FETCH_INV_DATA: + flag = ((FetchInvDataMessage) msg).getInventoryType().equals(InventoryType.BLOCK); + break; + default: + break; + } + if (flag) { + peer.setLastActiveTime(System.currentTimeMillis()); + } + } + private void processException(PeerConnection peer, TronMessage msg, Exception ex) { Protocol.ReasonCode code; diff --git a/framework/src/main/java/org/tron/core/net/TronNetService.java b/framework/src/main/java/org/tron/core/net/TronNetService.java index 03becf5d4e9..5b99f94c0db 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetService.java +++ b/framework/src/main/java/org/tron/core/net/TronNetService.java @@ -22,6 +22,7 @@ import org.tron.core.net.peer.PeerStatusCheck; import org.tron.core.net.service.adv.AdvService; import org.tron.core.net.service.effective.EffectiveCheckService; +import org.tron.core.net.service.effective.ResilienceService; import org.tron.core.net.service.fetchblock.FetchBlockService; import org.tron.core.net.service.nodepersist.NodePersistService; import org.tron.core.net.service.relay.RelayService; @@ -50,6 +51,9 @@ public class TronNetService { @Autowired private PeerStatusCheck peerStatusCheck; + @Autowired + private ResilienceService resilienceService; + @Autowired private TransactionsMsgHandler transactionsMsgHandler; @@ -88,6 +92,7 @@ public void start() { advService.init(); syncService.init(); peerStatusCheck.init(); + resilienceService.init(); transactionsMsgHandler.init(); fetchBlockService.init(); nodePersistService.init(); @@ -110,6 +115,7 @@ public void close() { nodePersistService.close(); advService.close(); syncService.close(); + resilienceService.close(); peerStatusCheck.close(); transactionsMsgHandler.close(); fetchBlockService.close(); @@ -177,7 +183,7 @@ private P2pConfig updateConfig(P2pConfig config) { config.setMaxConnectionsWithSameIp(parameter.getMaxConnectionsWithSameIp()); config.setPort(parameter.getNodeListenPort()); config.setNetworkId(parameter.getNodeP2pVersion()); - config.setDisconnectionPolicyEnable(parameter.isOpenFullTcpDisconnect()); + config.setDisconnectionPolicyEnable(false); config.setNodeDetectEnable(parameter.isNodeDetectEnable()); config.setDiscoverEnable(parameter.isNodeDiscoveryEnable()); if (StringUtils.isEmpty(config.getIp()) && hasIpv4Stack(NetUtil.getAllLocalAddress())) { diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java index a8ad8d0ec73..5e303bd3d6f 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java @@ -39,6 +39,9 @@ public void processMessage(PeerConnection peer, TronMessage msg) { Item item = new Item(id, type); peer.getAdvInvReceive().put(item, System.currentTimeMillis()); advService.addInv(item); + if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) { + peer.setLastActiveTime(System.currentTimeMillis()); + } } } diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java index 958ebfe5561..f575253c50c 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java @@ -33,7 +33,6 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep peer.disconnect(Protocol.ReasonCode.BAD_PROTOCOL); return; } - long remainNum = 0; List summaryChainIds = syncBlockChainMessage.getBlockIds(); diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 677ac8c9d5d..da97f74c241 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -79,6 +79,10 @@ public class PeerConnection { @Setter private ByteString address; + @Getter + @Setter + private volatile long lastActiveTime; + @Getter @Setter private TronState tronState = TronState.INIT; @@ -159,6 +163,7 @@ public void setChannel(Channel channel) { this.isRelayPeer = true; } this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress()); + lastActiveTime = System.currentTimeMillis(); } public void setBlockBothHave(BlockId blockId) { diff --git a/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java b/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java new file mode 100644 index 00000000000..fbbb2855934 --- /dev/null +++ b/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java @@ -0,0 +1,186 @@ +package org.tron.core.net.service.effective; + +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; +import org.tron.common.parameter.CommonParameter; +import org.tron.core.ChainBaseManager; +import org.tron.core.config.args.Args; +import org.tron.core.net.TronNetDelegate; +import org.tron.core.net.peer.PeerConnection; +import org.tron.protos.Protocol.ReasonCode; + +@Slf4j(topic = "net") +@Component +public class ResilienceService { + + private static final long inactiveThreshold = + CommonParameter.getInstance().getInactiveThreshold() * 1000L; + public static final long blockNotChangeThreshold = 90 * 1000L; + + //when node is isolated, retention percent peers will not be disconnected + public static final double retentionPercent = 0.8; + private static final int initialDelay = 300; + private static final String esName = "resilience-service"; + private final ScheduledExecutorService executor = ExecutorServiceManager + .newSingleThreadScheduledExecutor(esName); + + @Autowired + private TronNetDelegate tronNetDelegate; + + @Autowired + private ChainBaseManager chainBaseManager; + + public void init() { + if (Args.getInstance().isOpenFullTcpDisconnect) { + executor.scheduleWithFixedDelay(() -> { + try { + disconnectRandom(); + } catch (Exception e) { + logger.error("DisconnectRandom node failed", e); + } + }, initialDelay, 60, TimeUnit.SECONDS); + } else { + logger.info("OpenFullTcpDisconnect is disabled"); + } + + executor.scheduleWithFixedDelay(() -> { + try { + disconnectLan(); + } catch (Exception e) { + logger.error("DisconnectLan node failed", e); + } + }, initialDelay, 10, TimeUnit.SECONDS); + + executor.scheduleWithFixedDelay(() -> { + try { + disconnectIsolated2(); + } catch (Exception e) { + logger.error("DisconnectIsolated node failed", e); + } + }, initialDelay, 30, TimeUnit.SECONDS); + } + + private void disconnectRandom() { + int peerSize = tronNetDelegate.getActivePeer().size(); + if (peerSize >= CommonParameter.getInstance().getMaxConnections()) { + long now = System.currentTimeMillis(); + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.isDisconnect()) + .filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold) + .filter(peer -> !peer.getChannel().isTrustPeer()) + .collect(Collectors.toList()); + if (!peers.isEmpty()) { + int index = new Random().nextInt(peers.size()); + disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION); + } + } + } + + private void disconnectLan() { + if (isLanNode()) { + // disconnect from the node that has keep inactive for more than inactiveThreshold + // and its lastActiveTime is smallest + int peerSize = tronNetDelegate.getActivePeer().size(); + if (peerSize >= CommonParameter.getInstance().getMinConnections()) { + long now = System.currentTimeMillis(); + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.isDisconnect()) + .filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold) + .filter(peer -> !peer.getChannel().isTrustPeer()) + .collect(Collectors.toList()); + Optional one = getEarliestPeer(peers); + one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL)); + } + } + } + + private void disconnectIsolated2() { + if (isIsolateLand2()) { + logger.info("Node is isolated, try to disconnect from peers"); + int peerSize = tronNetDelegate.getActivePeer().size(); + + //disconnect from the node whose lastActiveTime is smallest + if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) { + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.isDisconnect()) + .filter(peer -> !peer.getChannel().isTrustPeer()) + .filter(peer -> peer.getChannel().isActive()) + .collect(Collectors.toList()); + + Optional one = getEarliestPeer(peers); + one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL)); + } + + //disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection, + //so new peers can come in + peerSize = tronNetDelegate.getActivePeer().size(); + int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent); + if (peerSize > threshold) { + int disconnectSize = peerSize - threshold; + List peers = tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.isDisconnect()) + .filter(peer -> !peer.getChannel().isTrustPeer()) + .filter(peer -> !peer.getChannel().isActive()) + .collect(Collectors.toList()); + try { + peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo)); + } catch (Exception e) { + logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage()); + return; + } + + if (peers.size() > disconnectSize) { + peers = peers.subList(0, disconnectSize); + } + peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL)); + } + } + } + + private Optional getEarliestPeer(List pees) { + Optional one = Optional.empty(); + try { + one = pees.stream() + .min(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo)); + } catch (Exception e) { + logger.warn("Get earliest peer failed: {}", e.getMessage()); + } + return one; + } + + private boolean isLanNode() { + int peerSize = tronNetDelegate.getActivePeer().size(); + int activePeerSize = (int) tronNetDelegate.getActivePeer().stream() + .filter(peer -> peer.getChannel().isActive()) + .count(); + return peerSize > 0 && peerSize == activePeerSize; + } + + private boolean isIsolateLand2() { + int advPeerCount = (int) tronNetDelegate.getActivePeer().stream() + .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs()) + .count(); + long diff = System.currentTimeMillis() - chainBaseManager.getLatestSaveBlockTime(); + return advPeerCount >= 1 && diff >= blockNotChangeThreshold; + } + + private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode) { + int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastActiveTime()) / 1000); + logger.info("Disconnect from peer {}, inactive seconds {}", peer.getInetSocketAddress(), + inactiveSeconds); + peer.disconnect(reasonCode); + } + + public void close() { + ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName); + } +} diff --git a/framework/src/main/resources/config-localtest.conf b/framework/src/main/resources/config-localtest.conf index f1ac104c9ed..50e7539c1d0 100644 --- a/framework/src/main/resources/config-localtest.conf +++ b/framework/src/main/resources/config-localtest.conf @@ -99,6 +99,7 @@ node { # check the peer data transfer ,disconnect factor isOpenFullTcpDisconnect = true + inactiveThreshold = 600 //seconds p2p { version = 333 # 11111: mainnet; 20180622: testnet diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index 78427c30f87..f9fc2dd673d 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -180,6 +180,7 @@ node { minParticipationRate = 15 isOpenFullTcpDisconnect = false + inactiveThreshold = 600 //seconds p2p { version = 11111 # 11111: mainnet; 20180622: testnet diff --git a/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java b/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java index 0008ec315d5..7a3dc30cb86 100644 --- a/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java +++ b/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java @@ -12,10 +12,13 @@ import org.tron.common.utils.Sha256Hash; import org.tron.core.Constant; import org.tron.core.config.args.Args; +import org.tron.core.net.message.TronMessage; +import org.tron.core.net.message.adv.FetchInvDataMessage; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.peer.PeerConnection; import org.tron.core.net.service.statistics.PeerStatistics; import org.tron.protos.Protocol; +import org.tron.protos.Protocol.Inventory.InventoryType; public class P2pEventHandlerImplTest { @@ -108,4 +111,22 @@ public void testProcessInventoryMessage() throws Exception { Assert.assertEquals(300, count); } + + @Test + public void testUpdateLastActiveTime() throws Exception { + String[] a = new String[0]; + Args.setParam(a, Constant.TESTNET_CONF); + + PeerConnection peer = new PeerConnection(); + P2pEventHandlerImpl p2pEventHandler = new P2pEventHandlerImpl(); + + Method method = p2pEventHandler.getClass() + .getDeclaredMethod("updateLastActiveTime", PeerConnection.class, TronMessage.class); + method.setAccessible(true); + + long t1 = System.currentTimeMillis(); + FetchInvDataMessage message = new FetchInvDataMessage(new ArrayList<>(), InventoryType.BLOCK); + method.invoke(p2pEventHandler, peer, message); + Assert.assertTrue(peer.getLastActiveTime() >= t1); + } } diff --git a/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java b/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java new file mode 100644 index 00000000000..a8b8e04d3cb --- /dev/null +++ b/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java @@ -0,0 +1,182 @@ +package org.tron.core.net.services; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; + +import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.tron.common.application.TronApplicationContext; +import org.tron.common.utils.ReflectUtils; +import org.tron.core.ChainBaseManager; +import org.tron.core.Constant; +import org.tron.core.config.DefaultConfig; +import org.tron.core.config.args.Args; +import org.tron.core.net.peer.PeerConnection; +import org.tron.core.net.peer.PeerManager; +import org.tron.core.net.service.effective.ResilienceService; +import org.tron.p2p.connection.Channel; + +public class ResilienceServiceTest { + + protected TronApplicationContext context; + private ResilienceService service; + private ChainBaseManager chainBaseManager; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void init() throws IOException { + Args.setParam(new String[] {"--output-directory", + temporaryFolder.newFolder().toString(), "--debug"}, Constant.TEST_CONF); + context = new TronApplicationContext(DefaultConfig.class); + chainBaseManager = context.getBean(ChainBaseManager.class); + service = context.getBean(ResilienceService.class); + } + + @Test + public void testDisconnectRandom() { + int maxConnection = 30; + Assert.assertEquals(maxConnection, Args.getInstance().getMaxConnections()); + clearPeers(); + Assert.assertEquals(0, PeerManager.getPeers().size()); + + for (int i = 0; i < maxConnection; i++) { + InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001); + Channel c1 = spy(Channel.class); + ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); + ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); + ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class)); + Mockito.doNothing().when(c1).send((byte[]) any()); + + PeerManager.add(context, c1); + } + ReflectUtils.invokeMethod(service, "disconnectRandom"); + Assert.assertEquals(maxConnection, PeerManager.getPeers().size()); + + PeerConnection p1 = PeerManager.getPeers().get(1); + p1.setLastActiveTime( + System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000); + PeerConnection p2 = PeerManager.getPeers().get(10); + p2.setLastActiveTime( + System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000); + + ReflectUtils.invokeMethod(service, "disconnectRandom"); + Assert.assertEquals(maxConnection - 1, PeerManager.getPeers().size()); + } + + @Test + public void testDisconnectLan() { + int minConnection = 8; + Assert.assertEquals(minConnection, Args.getInstance().getMinConnections()); + clearPeers(); + Assert.assertEquals(0, PeerManager.getPeers().size()); + + for (int i = 0; i < 9; i++) { + InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001); + Channel c1 = spy(Channel.class); + ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); + ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); + ReflectUtils.setFieldValue(c1, "isActive", true); + ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class)); + Mockito.doNothing().when(c1).send((byte[]) any()); + + PeerManager.add(context, c1); + } + + Assert.assertEquals(9, PeerManager.getPeers().size()); + + boolean isLan = ReflectUtils.invokeMethod(service, "isLanNode"); + Assert.assertTrue(isLan); + + PeerConnection p1 = PeerManager.getPeers().get(1); + InetSocketAddress address1 = p1.getChannel().getInetSocketAddress(); + p1.setLastActiveTime( + System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000); + PeerConnection p2 = PeerManager.getPeers().get(2); + InetSocketAddress address2 = p2.getChannel().getInetSocketAddress(); + p2.setLastActiveTime( + System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000); + + ReflectUtils.invokeMethod(service, "disconnectLan"); + Assert.assertEquals(8, PeerManager.getPeers().size()); + Set addressSet = new HashSet<>(); + PeerManager.getPeers() + .forEach(p -> addressSet.add(p.getChannel().getInetSocketAddress())); + Assert.assertTrue(addressSet.contains(address1)); + Assert.assertFalse(addressSet.contains(address2)); + + ReflectUtils.invokeMethod(service, "disconnectLan"); + Assert.assertEquals(7, PeerManager.getPeers().size()); + addressSet.clear(); + PeerManager.getPeers() + .forEach(p -> addressSet.add(p.getChannel().getInetSocketAddress())); + Assert.assertFalse(addressSet.contains(address1)); + + ReflectUtils.invokeMethod(service, "disconnectLan"); + Assert.assertEquals(7, PeerManager.getPeers().size()); + } + + @Test + public void testDisconnectIsolated2() { + int maxConnection = 30; + Assert.assertEquals(maxConnection, Args.getInstance().getMaxConnections()); + clearPeers(); + Assert.assertEquals(0, PeerManager.getPeers().size()); + + int addSize = (int) (maxConnection * ResilienceService.retentionPercent) + 2; //26 + for (int i = 0; i < addSize; i++) { + InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001); + Channel c1 = spy(Channel.class); + ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); + ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); + // 1 ~ 3 is active, 4 ~ 26 is not active + ReflectUtils.setFieldValue(c1, "isActive", i <= 2); + ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class)); + Mockito.doNothing().when(c1).send((byte[]) any()); + + PeerManager.add(context, c1); + } + PeerManager.getPeers().get(10).setNeedSyncFromUs(false); + PeerManager.getPeers().get(10).setNeedSyncFromPeer(false); + chainBaseManager.setLatestSaveBlockTime( + System.currentTimeMillis() - ResilienceService.blockNotChangeThreshold - 100L); + boolean isIsolated = ReflectUtils.invokeMethod(service, "isIsolateLand2"); + Assert.assertTrue(isIsolated); + + ReflectUtils.invokeMethod(service, "disconnectIsolated2"); + int activeNodeSize = (int) PeerManager.getPeers().stream() + .filter(p -> p.getChannel().isActive()) + .count(); + int passiveSize = (int) PeerManager.getPeers().stream() + .filter(p -> !p.getChannel().isActive()) + .count(); + Assert.assertEquals(2, activeNodeSize); + Assert.assertEquals((int) (maxConnection * ResilienceService.retentionPercent), + activeNodeSize + passiveSize); + Assert.assertEquals((int) (maxConnection * ResilienceService.retentionPercent), + PeerManager.getPeers().size()); + } + + private void clearPeers() { + for (PeerConnection p : PeerManager.getPeers()) { + PeerManager.remove(p.getChannel()); + } + } + + @After + public void destroy() { + Args.clearParam(); + context.destroy(); + } +} \ No newline at end of file diff --git a/framework/src/test/resources/args-test.conf b/framework/src/test/resources/args-test.conf index 91913dfe32e..cf5d0b8d718 100644 --- a/framework/src/test/resources/args-test.conf +++ b/framework/src/test/resources/args-test.conf @@ -92,6 +92,7 @@ node { maxConnections = 30 minConnections = 8 minActiveConnections = 3 + inactiveThreshold = 600 //seconds p2p { version = 43 # 43: testnet; 101: debug diff --git a/framework/src/test/resources/config-localtest.conf b/framework/src/test/resources/config-localtest.conf index d7f573fe90e..1d7ae09af7c 100644 --- a/framework/src/test/resources/config-localtest.conf +++ b/framework/src/test/resources/config-localtest.conf @@ -96,6 +96,7 @@ node { # check the peer data transfer ,disconnect factor isOpenFullTcpDisconnect = true + inactiveThreshold = 600 //seconds p2p { version = 333 # 11111: mainnet; 20180622: testnet diff --git a/framework/src/test/resources/config-test.conf b/framework/src/test/resources/config-test.conf index db24bb2a8a0..62337f02fc5 100644 --- a/framework/src/test/resources/config-test.conf +++ b/framework/src/test/resources/config-test.conf @@ -100,6 +100,7 @@ node { # nodeId = e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c # } ] + inactiveThreshold = 600 //seconds p2p { version = 43 # 43: testnet; 101: debug