diff --git a/src/main/java/org/tron/p2p/connection/business/detect/NodeDetectService.java b/src/main/java/org/tron/p2p/connection/business/detect/NodeDetectService.java index ca25f0c..5917cf9 100644 --- a/src/main/java/org/tron/p2p/connection/business/detect/NodeDetectService.java +++ b/src/main/java/org/tron/p2p/connection/business/detect/NodeDetectService.java @@ -4,6 +4,7 @@ import com.google.common.cache.CacheBuilder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.tron.p2p.base.Parameter; import org.tron.p2p.connection.Channel; import org.tron.p2p.connection.business.MessageProcess; @@ -27,9 +28,10 @@ public class NodeDetectService implements MessageProcess { @Getter private static final Cache badNodesCache = CacheBuilder - .newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.HOURS).build(); + .newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.HOURS).build(); - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("nodeDetectService").build()); private final long NODE_DETECT_THRESHOLD = 5 * 60 * 1000; @@ -89,7 +91,7 @@ public void work() { n = Math.min(n, nodeStats.size()); - for(int i = 0; i < n; i++) { + for (int i = 0; i < n; i++) { detect(nodeStats.get(i)); } } @@ -108,7 +110,7 @@ private void loadNodes() { int size = nodeStatMap.size(); int count = 0; List nodes = NodeManager.getConnectableNodes(); - for (Node node: nodes) { + for (Node node : nodes) { InetSocketAddress socketAddress = node.getPreferInetSocketAddress(); if (socketAddress != null && !nodeStatMap.containsKey(socketAddress) @@ -131,15 +133,15 @@ private void detect(NodeStat stat) { peerClient.connectAsync(stat.getNode(), true); } catch (Exception e) { log.warn("Detect node {} failed, {}", - stat.getNode().getPreferInetSocketAddress(), e.getMessage()); + stat.getNode().getPreferInetSocketAddress(), e.getMessage()); nodeStatMap.remove(stat.getSocketAddress()); } } public synchronized void processMessage(Channel channel, Message message) { - StatusMessage statusMessage = (StatusMessage)message; + StatusMessage statusMessage = (StatusMessage) message; - if(!channel.isActive()) { + if (!channel.isActive()) { channel.setDiscoveryMode(true); channel.send(new StatusMessage()); channel.getCtx().close(); @@ -153,8 +155,8 @@ public synchronized void processMessage(Channel channel, Message message) { } long cost = System.currentTimeMillis() - nodeStat.getLastDetectTime(); - if(cost > NODE_DETECT_TIMEOUT - || statusMessage.getRemainConnections() == 0) { + if (cost > NODE_DETECT_TIMEOUT + || statusMessage.getRemainConnections() == 0) { badNodesCache.put(socketAddress.getAddress(), cost); nodeStatMap.remove(socketAddress); } @@ -167,7 +169,7 @@ public synchronized void processMessage(Channel channel, Message message) { public void notifyDisconnect(Channel channel) { - if(!channel.isActive()) { + if (!channel.isActive()) { return; } @@ -181,7 +183,7 @@ public void notifyDisconnect(Channel channel) { return; } - if(nodeStat.getLastDetectTime() != nodeStat.getLastSuccessDetectTime()) { + if (nodeStat.getLastDetectTime() != nodeStat.getLastSuccessDetectTime()) { badNodesCache.put(socketAddress.getAddress(), System.currentTimeMillis()); nodeStatMap.remove(socketAddress); } diff --git a/src/main/java/org/tron/p2p/connection/business/keepalive/KeepAliveService.java b/src/main/java/org/tron/p2p/connection/business/keepalive/KeepAliveService.java index 30b0bf5..f1403f3 100644 --- a/src/main/java/org/tron/p2p/connection/business/keepalive/KeepAliveService.java +++ b/src/main/java/org/tron/p2p/connection/business/keepalive/KeepAliveService.java @@ -7,6 +7,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.tron.p2p.connection.Channel; import org.tron.p2p.connection.ChannelManager; import org.tron.p2p.connection.business.MessageProcess; @@ -17,8 +18,8 @@ @Slf4j(topic = "net") public class KeepAliveService implements MessageProcess { - private final ScheduledExecutorService executor = - Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "KeepAlive")); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("keepAlive").build()); public void init() { executor.scheduleWithFixedDelay(() -> { diff --git a/src/main/java/org/tron/p2p/connection/business/pool/ConnPoolService.java b/src/main/java/org/tron/p2p/connection/business/pool/ConnPoolService.java index 7b8f956..06a91c0 100644 --- a/src/main/java/org/tron/p2p/connection/business/pool/ConnPoolService.java +++ b/src/main/java/org/tron/p2p/connection/business/pool/ConnPoolService.java @@ -20,6 +20,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.bouncycastle.util.encoders.Hex; import org.tron.p2p.P2pConfig; import org.tron.p2p.P2pEventHandler; @@ -47,8 +48,10 @@ public class ConnPoolService extends P2pEventHandler { private final AtomicInteger activePeersCount = new AtomicInteger(0); @Getter private final AtomicInteger connectingPeersCount = new AtomicInteger(0); - private final ScheduledExecutorService poolLoopExecutor = Executors.newSingleThreadScheduledExecutor(); - private final ScheduledExecutorService disconnectExecutor = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService poolLoopExecutor = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("connPool").build()); + private final ScheduledExecutorService disconnectExecutor = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("randomDisconnect").build()); public P2pConfig p2pConfig = Parameter.p2pConfig; private PeerClient peerClient; @@ -116,7 +119,8 @@ private void connect(boolean isFilterActiveNodes) { Parameter.p2pConfig.getIpv6(), Parameter.p2pConfig.getPort())); p2pConfig.getActiveNodes().forEach(address -> { - if (!isFilterActiveNodes && !inetInUse.contains(address) && !addressInUse.contains(address.getAddress())) { + if (!isFilterActiveNodes && !inetInUse.contains(address) && !addressInUse.contains( + address.getAddress())) { addressInUse.add(address.getAddress()); inetInUse.add(address); Node node = new Node(address); //use a random NodeId for config activeNodes diff --git a/src/main/java/org/tron/p2p/connection/socket/PeerClient.java b/src/main/java/org/tron/p2p/connection/socket/PeerClient.java index 8eb120f..fb3fd00 100644 --- a/src/main/java/org/tron/p2p/connection/socket/PeerClient.java +++ b/src/main/java/org/tron/p2p/connection/socket/PeerClient.java @@ -8,9 +8,8 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.bouncycastle.util.encoders.Hex; import org.tron.p2p.base.Parameter; import org.tron.p2p.connection.ChannelManager; @@ -23,14 +22,8 @@ public class PeerClient { private EventLoopGroup workerGroup; public void init() { - workerGroup = new NioEventLoopGroup(0, new ThreadFactory() { - private final AtomicInteger cnt = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "PeerClient-" + cnt.getAndIncrement()); - } - }); + workerGroup = new NioEventLoopGroup(0, + new BasicThreadFactory.Builder().namingPattern("peerClient-%d").build()); } public void close() { @@ -49,9 +42,9 @@ public void connect(String host, int port, String remoteId) { public ChannelFuture connect(Node node, ChannelFutureListener future) { ChannelFuture channelFuture = connectAsync( - node.getPreferInetSocketAddress().getAddress().getHostAddress(), - node.getPort(), - node.getId() == null ? Hex.toHexString(NetUtil.getNodeId()) : node.getHexId(), false, false); + node.getPreferInetSocketAddress().getAddress().getHostAddress(), node.getPort(), + node.getId() == null ? Hex.toHexString(NetUtil.getNodeId()) : node.getHexId(), false, + false); if (future != null) { channelFuture.addListener(future); } diff --git a/src/main/java/org/tron/p2p/connection/socket/PeerServer.java b/src/main/java/org/tron/p2p/connection/socket/PeerServer.java index 054cb84..d3b1b9d 100644 --- a/src/main/java/org/tron/p2p/connection/socket/PeerServer.java +++ b/src/main/java/org/tron/p2p/connection/socket/PeerServer.java @@ -10,10 +10,12 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.tron.p2p.base.Parameter; @Slf4j(topic = "net") public class PeerServer { + private ChannelFuture channelFuture; private boolean listening; @@ -36,9 +38,11 @@ public void close() { } public void start(int port) { - EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup bossGroup = new NioEventLoopGroup(1, + new BasicThreadFactory.Builder().namingPattern("peerBoss").build()); //if threads = 0, it is number of core * 2 - EventLoopGroup workerGroup = new NioEventLoopGroup(Parameter.TCP_NETTY_WORK_THREAD_NUM); + EventLoopGroup workerGroup = new NioEventLoopGroup(Parameter.TCP_NETTY_WORK_THREAD_NUM, + new BasicThreadFactory.Builder().namingPattern("peerWorker-%d").build()); P2pChannelInitializer p2pChannelInitializer = new P2pChannelInitializer("", false, true); try { ServerBootstrap b = new ServerBootstrap(); diff --git a/src/main/java/org/tron/p2p/discover/protocol/kad/DiscoverTask.java b/src/main/java/org/tron/p2p/discover/protocol/kad/DiscoverTask.java index bb232b1..d633b78 100644 --- a/src/main/java/org/tron/p2p/discover/protocol/kad/DiscoverTask.java +++ b/src/main/java/org/tron/p2p/discover/protocol/kad/DiscoverTask.java @@ -6,6 +6,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.tron.p2p.discover.Node; import org.tron.p2p.discover.protocol.kad.table.KademliaOptions; import org.tron.p2p.utils.NetUtil; @@ -13,7 +14,8 @@ @Slf4j(topic = "net") public class DiscoverTask { - private ScheduledExecutorService discoverer = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService discoverer = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("discoverTask").build()); private KadService kadService; diff --git a/src/main/java/org/tron/p2p/discover/protocol/kad/KadService.java b/src/main/java/org/tron/p2p/discover/protocol/kad/KadService.java index 1d2a430..7f2051e 100644 --- a/src/main/java/org/tron/p2p/discover/protocol/kad/KadService.java +++ b/src/main/java/org/tron/p2p/discover/protocol/kad/KadService.java @@ -14,6 +14,7 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.tron.p2p.base.Parameter; import org.tron.p2p.discover.DiscoverService; import org.tron.p2p.discover.Node; @@ -55,7 +56,8 @@ public void init() { for (InetSocketAddress address : Parameter.p2pConfig.getActiveNodes()) { bootNodes.add(new Node(address)); } - this.pongTimer = Executors.newSingleThreadScheduledExecutor(); + this.pongTimer = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("pongTimer").build()); this.homeNode = new Node(Parameter.p2pConfig.getNodeID(), Parameter.p2pConfig.getIp(), Parameter.p2pConfig.getIpv6(), Parameter.p2pConfig.getPort()); this.table = new NodeTable(homeNode); diff --git a/src/main/java/org/tron/p2p/discover/socket/DiscoverServer.java b/src/main/java/org/tron/p2p/discover/socket/DiscoverServer.java index 6f52688..7b8ca97 100644 --- a/src/main/java/org/tron/p2p/discover/socket/DiscoverServer.java +++ b/src/main/java/org/tron/p2p/discover/socket/DiscoverServer.java @@ -1,6 +1,5 @@ package org.tron.p2p.discover.socket; -import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -8,12 +7,15 @@ import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.tron.p2p.base.Parameter; import org.tron.p2p.stats.TrafficStats; @Slf4j(topic = "net") public class DiscoverServer { + private Channel channel; private EventHandler eventHandler; @@ -46,7 +48,8 @@ public void close() { } private void start() throws Exception { - NioEventLoopGroup group = new NioEventLoopGroup(Parameter.UDP_NETTY_WORK_THREAD_NUM); + NioEventLoopGroup group = new NioEventLoopGroup(Parameter.UDP_NETTY_WORK_THREAD_NUM, + new BasicThreadFactory.Builder().namingPattern("discoverServer").build()); try { while (!shutdown) { Bootstrap b = new Bootstrap(); diff --git a/src/main/java/org/tron/p2p/dns/sync/Client.java b/src/main/java/org/tron/p2p/dns/sync/Client.java index bd76a89..064e0da 100644 --- a/src/main/java/org/tron/p2p/dns/sync/Client.java +++ b/src/main/java/org/tron/p2p/dns/sync/Client.java @@ -10,9 +10,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.tron.p2p.base.Parameter; import org.tron.p2p.dns.lookup.LookUpTxt; import org.tron.p2p.dns.tree.Algorithm; @@ -39,7 +41,8 @@ public class Client { private final Map trees = new ConcurrentHashMap<>(); private final Map clientTrees = new HashMap<>(); - private final ScheduledExecutorService syncer = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService syncer = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("dnsSyncer").build()); public Client() { this.cache = CacheBuilder.newBuilder() diff --git a/src/main/java/org/tron/p2p/dns/update/PublishService.java b/src/main/java/org/tron/p2p/dns/update/PublishService.java index d156df9..7a94543 100644 --- a/src/main/java/org/tron/p2p/dns/update/PublishService.java +++ b/src/main/java/org/tron/p2p/dns/update/PublishService.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.tron.p2p.base.Parameter; import org.tron.p2p.discover.Node; import org.tron.p2p.discover.NodeManager; @@ -23,7 +24,8 @@ public class PublishService { private static final long publishDelay = 1 * 60 * 60; - private ScheduledExecutorService publisher = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService publisher = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("publishService").build()); private Publish publish; public void init() {