Skip to content

Commit

Permalink
Merge pull request #5924 from 317787106/feature/test_isolated3
Browse files Browse the repository at this point in the history
feat(net): disconnect from inactive nodes if necessary
  • Loading branch information
lvs007 authored Jul 29, 2024
2 parents d6bbbe4 + 14fda8f commit 54b0395
Show file tree
Hide file tree
Showing 18 changed files with 448 additions and 2 deletions.
5 changes: 5 additions & 0 deletions chainbase/src/main/java/org/tron/core/ChainBaseManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public class ChainBaseManager {
@Setter
private long lowestBlockNum = -1; // except num = 0.

@Getter
@Setter
private long latestSaveBlockTime;

// for test only
public List<ByteString> getWitnesses() {
return witnessScheduleStore.getActiveWitnesses();
Expand Down Expand Up @@ -381,6 +385,7 @@ private void init() {
this.lowestBlockNum = this.blockIndexStore.getLimitNumber(1, 1).stream()
.map(BlockId::getNum).findFirst().orElse(0L);
this.nodeType = getLowestBlockNum() > 1 ? NodeType.LITE : NodeType.FULL;
this.latestSaveBlockTime = System.currentTimeMillis();
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ public class CommonParameter {
public boolean isOpenFullTcpDisconnect;
@Getter
@Setter
public int inactiveThreshold;
@Getter
@Setter
public boolean nodeDetectEnable;
@Getter
@Setter
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public class Constant {

public static final String NODE_IS_OPEN_FULL_TCP_DISCONNECT = "node.isOpenFullTcpDisconnect";

public static final String NODE_INACTIVE_THRESHOLD = "node.inactiveThreshold";

public static final String NODE_DETECT_ENABLE = "node.nodeDetectEnable";

public static final String NODE_MAX_TRANSACTION_PENDING_SIZE = "node.maxTransactionPendingSize";
Expand Down
4 changes: 4 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public static void clearParam() {
PARAMETER.receiveTcpMinDataLength = 2048;
PARAMETER.isOpenFullTcpDisconnect = false;
PARAMETER.nodeDetectEnable = false;
PARAMETER.inactiveThreshold = 600;
PARAMETER.supportConstant = false;
PARAMETER.debug = false;
PARAMETER.minTimeRatio = 0.0;
Expand Down Expand Up @@ -845,6 +846,9 @@ public static void setParam(final String[] args, final String confFileName) {
PARAMETER.nodeDetectEnable = config.hasPath(Constant.NODE_DETECT_ENABLE)
&& config.getBoolean(Constant.NODE_DETECT_ENABLE);

PARAMETER.inactiveThreshold = config.hasPath(Constant.NODE_INACTIVE_THRESHOLD)
? config.getInt(Constant.NODE_INACTIVE_THRESHOLD) : 600;

PARAMETER.maxTransactionPendingSize = config.hasPath(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE)
? config.getInt(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE) : 2000;

Expand Down
1 change: 1 addition & 0 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ public void updateDynamicProperties(BlockCapsule block) {
(chainBaseManager.getDynamicPropertiesStore().getLatestBlockHeaderNumber()
- chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum()
+ 1));
chainBaseManager.setLatestSaveBlockTime(System.currentTimeMillis());
Metrics.gaugeSet(MetricKeys.Gauge.HEADER_HEIGHT, block.getNum());
Metrics.gaugeSet(MetricKeys.Gauge.HEADER_TIME, block.getTimeStamp());
}
Expand Down
24 changes: 24 additions & 0 deletions framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.tron.core.net.message.PbftMessageFactory;
import org.tron.core.net.message.TronMessage;
import org.tron.core.net.message.TronMessageFactory;
import org.tron.core.net.message.adv.FetchInvDataMessage;
import org.tron.core.net.message.adv.InventoryMessage;
import org.tron.core.net.message.base.DisconnectMessage;
import org.tron.core.net.message.handshake.HelloMessage;
Expand All @@ -38,6 +39,7 @@
import org.tron.p2p.P2pEventHandler;
import org.tron.p2p.connection.Channel;
import org.tron.protos.Protocol;
import org.tron.protos.Protocol.Inventory.InventoryType;
import org.tron.protos.Protocol.ReasonCode;

@Slf4j(topic = "net")
Expand Down Expand Up @@ -205,6 +207,7 @@ private void processMessage(PeerConnection peer, byte[] data) {
default:
throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString());
}
updateLastActiveTime(peer, msg);
} catch (Exception e) {
processException(peer, msg, e);
} finally {
Expand All @@ -220,6 +223,27 @@ private void processMessage(PeerConnection peer, byte[] data) {
}
}

private void updateLastActiveTime(PeerConnection peer, TronMessage msg) {
MessageTypes type = msg.getType();

boolean flag = false;
switch (type) {
case SYNC_BLOCK_CHAIN:
case BLOCK_CHAIN_INVENTORY:
case BLOCK:
flag = true;
break;
case FETCH_INV_DATA:
flag = ((FetchInvDataMessage) msg).getInventoryType().equals(InventoryType.BLOCK);
break;
default:
break;
}
if (flag) {
peer.setLastActiveTime(System.currentTimeMillis());
}
}

private void processException(PeerConnection peer, TronMessage msg, Exception ex) {
Protocol.ReasonCode code;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.tron.core.net.peer.PeerStatusCheck;
import org.tron.core.net.service.adv.AdvService;
import org.tron.core.net.service.effective.EffectiveCheckService;
import org.tron.core.net.service.effective.ResilienceService;
import org.tron.core.net.service.fetchblock.FetchBlockService;
import org.tron.core.net.service.nodepersist.NodePersistService;
import org.tron.core.net.service.relay.RelayService;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class TronNetService {
@Autowired
private PeerStatusCheck peerStatusCheck;

@Autowired
private ResilienceService resilienceService;

@Autowired
private TransactionsMsgHandler transactionsMsgHandler;

Expand Down Expand Up @@ -88,6 +92,7 @@ public void start() {
advService.init();
syncService.init();
peerStatusCheck.init();
resilienceService.init();
transactionsMsgHandler.init();
fetchBlockService.init();
nodePersistService.init();
Expand All @@ -110,6 +115,7 @@ public void close() {
nodePersistService.close();
advService.close();
syncService.close();
resilienceService.close();
peerStatusCheck.close();
transactionsMsgHandler.close();
fetchBlockService.close();
Expand Down Expand Up @@ -177,7 +183,7 @@ private P2pConfig updateConfig(P2pConfig config) {
config.setMaxConnectionsWithSameIp(parameter.getMaxConnectionsWithSameIp());
config.setPort(parameter.getNodeListenPort());
config.setNetworkId(parameter.getNodeP2pVersion());
config.setDisconnectionPolicyEnable(parameter.isOpenFullTcpDisconnect());
config.setDisconnectionPolicyEnable(false);
config.setNodeDetectEnable(parameter.isNodeDetectEnable());
config.setDiscoverEnable(parameter.isNodeDiscoveryEnable());
if (StringUtils.isEmpty(config.getIp()) && hasIpv4Stack(NetUtil.getAllLocalAddress())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public void processMessage(PeerConnection peer, TronMessage msg) {
Item item = new Item(id, type);
peer.getAdvInvReceive().put(item, System.currentTimeMillis());
advService.addInv(item);
if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) {
peer.setLastActiveTime(System.currentTimeMillis());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
peer.disconnect(Protocol.ReasonCode.BAD_PROTOCOL);
return;
}

long remainNum = 0;

List<BlockId> summaryChainIds = syncBlockChainMessage.getBlockIds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class PeerConnection {
@Setter
private ByteString address;

@Getter
@Setter
private volatile long lastActiveTime;

@Getter
@Setter
private TronState tronState = TronState.INIT;
Expand Down Expand Up @@ -159,6 +163,7 @@ public void setChannel(Channel channel) {
this.isRelayPeer = true;
}
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
lastActiveTime = System.currentTimeMillis();
}

public void setBlockBothHave(BlockId blockId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package org.tron.core.net.service.effective;

import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.core.ChainBaseManager;
import org.tron.core.config.args.Args;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.peer.PeerConnection;
import org.tron.protos.Protocol.ReasonCode;

@Slf4j(topic = "net")
@Component
public class ResilienceService {

private static final long inactiveThreshold =
CommonParameter.getInstance().getInactiveThreshold() * 1000L;
public static final long blockNotChangeThreshold = 90 * 1000L;

//when node is isolated, retention percent peers will not be disconnected
public static final double retentionPercent = 0.8;
private static final int initialDelay = 300;
private static final String esName = "resilience-service";
private final ScheduledExecutorService executor = ExecutorServiceManager
.newSingleThreadScheduledExecutor(esName);

@Autowired
private TronNetDelegate tronNetDelegate;

@Autowired
private ChainBaseManager chainBaseManager;

public void init() {
if (Args.getInstance().isOpenFullTcpDisconnect) {
executor.scheduleWithFixedDelay(() -> {
try {
disconnectRandom();
} catch (Exception e) {
logger.error("DisconnectRandom node failed", e);
}
}, initialDelay, 60, TimeUnit.SECONDS);
} else {
logger.info("OpenFullTcpDisconnect is disabled");
}

executor.scheduleWithFixedDelay(() -> {
try {
disconnectLan();
} catch (Exception e) {
logger.error("DisconnectLan node failed", e);
}
}, initialDelay, 10, TimeUnit.SECONDS);

executor.scheduleWithFixedDelay(() -> {
try {
disconnectIsolated2();
} catch (Exception e) {
logger.error("DisconnectIsolated node failed", e);
}
}, initialDelay, 30, TimeUnit.SECONDS);
}

private void disconnectRandom() {
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
.collect(Collectors.toList());
if (!peers.isEmpty()) {
int index = new Random().nextInt(peers.size());
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION);
}
}
}

private void disconnectLan() {
if (isLanNode()) {
// disconnect from the node that has keep inactive for more than inactiveThreshold
// and its lastActiveTime is smallest
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMinConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
.collect(Collectors.toList());
Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}
}
}

private void disconnectIsolated2() {
if (isIsolateLand2()) {
logger.info("Node is isolated, try to disconnect from peers");
int peerSize = tronNetDelegate.getActivePeer().size();

//disconnect from the node whose lastActiveTime is smallest
if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> peer.getChannel().isActive())
.collect(Collectors.toList());

Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}

//disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection,
//so new peers can come in
peerSize = tronNetDelegate.getActivePeer().size();
int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent);
if (peerSize > threshold) {
int disconnectSize = peerSize - threshold;
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.getChannel().isActive())
.collect(Collectors.toList());
try {
peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage());
return;
}

if (peers.size() > disconnectSize) {
peers = peers.subList(0, disconnectSize);
}
peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}
}
}

private Optional<PeerConnection> getEarliestPeer(List<PeerConnection> pees) {
Optional<PeerConnection> one = Optional.empty();
try {
one = pees.stream()
.min(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Get earliest peer failed: {}", e.getMessage());
}
return one;
}

private boolean isLanNode() {
int peerSize = tronNetDelegate.getActivePeer().size();
int activePeerSize = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> peer.getChannel().isActive())
.count();
return peerSize > 0 && peerSize == activePeerSize;
}

private boolean isIsolateLand2() {
int advPeerCount = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.count();
long diff = System.currentTimeMillis() - chainBaseManager.getLatestSaveBlockTime();
return advPeerCount >= 1 && diff >= blockNotChangeThreshold;
}

private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode) {
int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastActiveTime()) / 1000);
logger.info("Disconnect from peer {}, inactive seconds {}", peer.getInetSocketAddress(),
inactiveSeconds);
peer.disconnect(reasonCode);
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName);
}
}
1 change: 1 addition & 0 deletions framework/src/main/resources/config-localtest.conf
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ node {

# check the peer data transfer ,disconnect factor
isOpenFullTcpDisconnect = true
inactiveThreshold = 600 //seconds

p2p {
version = 333 # 11111: mainnet; 20180622: testnet
Expand Down
1 change: 1 addition & 0 deletions framework/src/main/resources/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ node {
minParticipationRate = 15

isOpenFullTcpDisconnect = false
inactiveThreshold = 600 //seconds

p2p {
version = 11111 # 11111: mainnet; 20180622: testnet
Expand Down
Loading

0 comments on commit 54b0395

Please sign in to comment.