Skip to content

Commit

Permalink
don't disconnect from high peer if its number <=1
Browse files Browse the repository at this point in the history
  • Loading branch information
317787106 committed Sep 4, 2024
1 parent 6c3a526 commit 59e92c8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ResilienceService {
//when node is isolated, retention percent peers will not be disconnected
public static final double retentionPercent = 0.8;
private static final int initialDelay = 300;
public static final int broadcastPeerSize = 3;
public static final int minBroadcastPeerSize = 3;
private static final String esName = "resilience-service";
private final ScheduledExecutorService executor = ExecutorServiceManager
.newSingleThreadScheduledExecutor(esName);
Expand Down Expand Up @@ -82,25 +82,35 @@ private void disconnectRandom() {
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.collect(Collectors.toList());

if (peers.size() >= broadcastPeerSize) {
if (peers.size() >= minBroadcastPeerSize) {
long now = System.currentTimeMillis();
Map<Object, Integer> weights = new HashMap<>();
peers.forEach(peer -> weights.put(peer,
(int) Math.ceil((double) (now - peer.getLastInteractiveTime()) / 500)));
peers.forEach(peer -> {
int weight = (int) Math.ceil((double) (now - peer.getLastInteractiveTime()) / 500);
weights.put(peer, Math.max(weight, 1));
});
WeightedRandom weightedRandom = new WeightedRandom(weights);
PeerConnection one;
try {
one = (PeerConnection) weightedRandom.next();
} catch (Exception e) {
logger.warn("Get random peer failed: {}", e.getMessage());
return;
}
PeerConnection one = (PeerConnection) weightedRandom.next();
disconnectFromPeer(one, ReasonCode.RANDOM_ELIMINATION, DisconnectCause.RANDOM_ELIMINATION);
} else {
return;
}

int needSyncFromPeerCount = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(PeerConnection::isNeedSyncFromPeer)
.count();
if (needSyncFromPeerCount >= 2) {
peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> peer.isNeedSyncFromUs() || peer.isNeedSyncFromPeer())
.collect(Collectors.toList());
} else {
peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(PeerConnection::isNeedSyncFromUs)
.collect(Collectors.toList());
}
if (!peers.isEmpty()) {
int index = new Random().nextInt(peers.size());
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
DisconnectCause.RANDOM_ELIMINATION);
Expand Down Expand Up @@ -220,7 +230,7 @@ private enum DisconnectCause {
ISOLATE2_PASSIVE,
}

class WeightedRandom {
static class WeightedRandom {

private final Map<Object, Integer> weights;
private final Random random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,36 +62,36 @@ public void testDisconnectRandom() {
PeerManager.add(context, c1);
}
for (PeerConnection peer : PeerManager.getPeers()
.subList(0, ResilienceService.broadcastPeerSize)) {
.subList(0, ResilienceService.minBroadcastPeerSize)) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(false);
peer.setLastInteractiveTime(System.currentTimeMillis() - 1000);
}
for (PeerConnection peer : PeerManager.getPeers()
.subList(ResilienceService.broadcastPeerSize, maxConnection + 1)) {
.subList(ResilienceService.minBroadcastPeerSize, maxConnection + 1)) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(true);
}
int size1 = (int) PeerManager.getPeers().stream()
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.count();
Assert.assertEquals(ResilienceService.broadcastPeerSize, size1);
Assert.assertEquals(ResilienceService.minBroadcastPeerSize, size1);
Assert.assertEquals(maxConnection + 1, PeerManager.getPeers().size());

//disconnect from broadcasting peer
ReflectUtils.invokeMethod(service, "disconnectRandom");
size1 = (int) PeerManager.getPeers().stream()
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.count();
Assert.assertEquals(ResilienceService.broadcastPeerSize - 1, size1);
Assert.assertEquals(ResilienceService.minBroadcastPeerSize - 1, size1);
Assert.assertEquals(maxConnection, PeerManager.getPeers().size());

//disconnect from syncing peer
ReflectUtils.invokeMethod(service, "disconnectRandom");
size1 = (int) PeerManager.getPeers().stream()
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.count();
Assert.assertEquals(ResilienceService.broadcastPeerSize - 1, size1);
Assert.assertEquals(ResilienceService.minBroadcastPeerSize - 1, size1);
Assert.assertEquals(maxConnection - 1, PeerManager.getPeers().size());
}

Expand Down

0 comments on commit 59e92c8

Please sign in to comment.