diff --git a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java index 0100dc443d9..795c90b4edd 100644 --- a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java +++ b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java @@ -40,7 +40,6 @@ import org.tron.p2p.connection.Channel; import org.tron.protos.Protocol; import org.tron.protos.Protocol.Inventory.InventoryType; -import org.tron.protos.Protocol.ReasonCode; @Slf4j(topic = "net") @Component @@ -207,7 +206,7 @@ private void processMessage(PeerConnection peer, byte[] data) { default: throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString()); } - updateLastActiveTime(peer, msg); + updateLastInteractiveTime(peer, msg); } catch (Exception e) { processException(peer, msg, e); } finally { @@ -223,7 +222,7 @@ private void processMessage(PeerConnection peer, byte[] data) { } } - private void updateLastActiveTime(PeerConnection peer, TronMessage msg) { + private void updateLastInteractiveTime(PeerConnection peer, TronMessage msg) { MessageTypes type = msg.getType(); boolean flag = false; @@ -240,7 +239,7 @@ private void updateLastActiveTime(PeerConnection peer, TronMessage msg) { break; } if (flag) { - peer.setLastActiveTime(System.currentTimeMillis()); + peer.setLastInteractiveTime(System.currentTimeMillis()); } } diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java index 5e303bd3d6f..e8783b25e95 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java @@ -40,7 +40,7 @@ public void processMessage(PeerConnection peer, TronMessage msg) { peer.getAdvInvReceive().put(item, System.currentTimeMillis()); advService.addInv(item); if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) { - peer.setLastActiveTime(System.currentTimeMillis()); + peer.setLastInteractiveTime(System.currentTimeMillis()); } } } diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 24f43e1f3fa..2e08e105bed 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -81,7 +81,7 @@ public class PeerConnection { @Getter @Setter - private volatile long lastActiveTime; + private volatile long lastInteractiveTime; @Getter @Setter @@ -163,7 +163,7 @@ public void setChannel(Channel channel) { this.isRelayPeer = true; } this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress()); - lastActiveTime = System.currentTimeMillis(); + lastInteractiveTime = System.currentTimeMillis(); } public void setBlockBothHave(BlockId blockId) { @@ -245,7 +245,7 @@ public String log() { remainNum, requested == null ? 0 : (now - requested.getValue()) / Constant.ONE_THOUSAND, - (now - lastActiveTime) / Constant.ONE_THOUSAND, + (now - lastInteractiveTime) / Constant.ONE_THOUSAND, syncBlockInProcess.size()); } diff --git a/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java b/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java index b0806457a4b..67c22616bed 100644 --- a/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java +++ b/framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java @@ -74,7 +74,7 @@ private void disconnectRandom() { if (peerSize >= CommonParameter.getInstance().getMaxConnections()) { long now = System.currentTimeMillis(); List peers = tronNetDelegate.getActivePeer().stream() - .filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold) + .filter(peer -> now - peer.getLastInteractiveTime() >= inactiveThreshold) .filter(peer -> !peer.getChannel().isTrustPeer()) .filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer()) .collect(Collectors.toList()); @@ -96,7 +96,7 @@ private void disconnectLan() { if (peerSize >= CommonParameter.getInstance().getMinConnections()) { long now = System.currentTimeMillis(); List peers = tronNetDelegate.getActivePeer().stream() - .filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold) + .filter(peer -> now - peer.getLastInteractiveTime() >= inactiveThreshold) .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs()) .filter(peer -> !peer.getChannel().isTrustPeer()) .collect(Collectors.toList()); @@ -111,10 +111,12 @@ private void disconnectIsolated2() { return; } logger.warn("Node is isolated, try to disconnect from peers"); - int peerSize = tronNetDelegate.getActivePeer().size(); //disconnect from the node whose lastActiveTime is smallest - if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) { + int activePeerSize = (int) tronNetDelegate.getActivePeer().stream() + .filter(peer -> peer.getChannel().isActive()) + .count(); + if (activePeerSize >= CommonParameter.getInstance().getMinActiveConnections()) { List peers = tronNetDelegate.getActivePeer().stream() .filter(peer -> !peer.getChannel().isTrustPeer()) .filter(peer -> peer.getChannel().isActive()) @@ -127,7 +129,7 @@ private void disconnectIsolated2() { //disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection, //so new peers can come in - peerSize = tronNetDelegate.getActivePeer().size(); + int peerSize = tronNetDelegate.getActivePeer().size(); int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent); if (peerSize > threshold) { int disconnectSize = peerSize - threshold; @@ -136,7 +138,7 @@ private void disconnectIsolated2() { .filter(peer -> !peer.getChannel().isActive()) .collect(Collectors.toList()); try { - peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo)); + peers.sort(Comparator.comparing(PeerConnection::getLastInteractiveTime, Long::compareTo)); } catch (Exception e) { logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage()); return; @@ -156,7 +158,7 @@ private Optional getEarliestPeer(List pees) { Optional one = Optional.empty(); try { one = pees.stream() - .min(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo)); + .min(Comparator.comparing(PeerConnection::getLastInteractiveTime, Long::compareTo)); } catch (Exception e) { logger.warn("Get earliest peer failed: {}", e.getMessage()); } @@ -182,7 +184,8 @@ private boolean isIsolateLand2() { private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode, DisconnectCause cause) { - int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastActiveTime()) / 1000); + int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastInteractiveTime()) + / 1000); logger.info("Disconnect from peer {}, inactive seconds {}, cause: {}", peer.getInetSocketAddress(), inactiveSeconds, cause); peer.disconnect(reasonCode); diff --git a/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java b/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java index 7a3dc30cb86..e0c816a537a 100644 --- a/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java +++ b/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java @@ -113,7 +113,7 @@ public void testProcessInventoryMessage() throws Exception { } @Test - public void testUpdateLastActiveTime() throws Exception { + public void testUpdateLastInteractiveTime() throws Exception { String[] a = new String[0]; Args.setParam(a, Constant.TESTNET_CONF); @@ -121,12 +121,12 @@ public void testUpdateLastActiveTime() throws Exception { P2pEventHandlerImpl p2pEventHandler = new P2pEventHandlerImpl(); Method method = p2pEventHandler.getClass() - .getDeclaredMethod("updateLastActiveTime", PeerConnection.class, TronMessage.class); + .getDeclaredMethod("updateLastInteractiveTime", PeerConnection.class, TronMessage.class); method.setAccessible(true); long t1 = System.currentTimeMillis(); FetchInvDataMessage message = new FetchInvDataMessage(new ArrayList<>(), InventoryType.BLOCK); method.invoke(p2pEventHandler, peer, message); - Assert.assertTrue(peer.getLastActiveTime() >= t1); + Assert.assertTrue(peer.getLastInteractiveTime() >= t1); } } diff --git a/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java b/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java index cc09fb45c28..2558e089b7e 100644 --- a/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java +++ b/framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java @@ -69,10 +69,10 @@ public void testDisconnectRandom() { Assert.assertEquals(maxConnection, PeerManager.getPeers().size()); PeerConnection p1 = PeerManager.getPeers().get(1); - p1.setLastActiveTime( + p1.setLastInteractiveTime( System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000); PeerConnection p2 = PeerManager.getPeers().get(10); - p2.setLastActiveTime( + p2.setLastInteractiveTime( System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000); ReflectUtils.invokeMethod(service, "disconnectRandom"); @@ -108,11 +108,11 @@ public void testDisconnectLan() { PeerConnection p1 = PeerManager.getPeers().get(1); InetSocketAddress address1 = p1.getChannel().getInetSocketAddress(); - p1.setLastActiveTime( + p1.setLastInteractiveTime( System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000); PeerConnection p2 = PeerManager.getPeers().get(2); InetSocketAddress address2 = p2.getChannel().getInetSocketAddress(); - p2.setLastActiveTime( + p2.setLastInteractiveTime( System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000); ReflectUtils.invokeMethod(service, "disconnectLan");