Skip to content

Commit

Permalink
Merge pull request #57 from tronprotocol/feature/optimize_log
Browse files Browse the repository at this point in the history
feat(log): optimize thread's name
  • Loading branch information
317787106 authored Aug 30, 2023
2 parents f6fd9f2 + dada357 commit a5e6d04
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.cache.CacheBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.connection.Channel;
import org.tron.p2p.connection.business.MessageProcess;
Expand All @@ -27,9 +28,10 @@ public class NodeDetectService implements MessageProcess {

@Getter
private static final Cache<InetAddress, Long> badNodesCache = CacheBuilder
.newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.HOURS).build();
.newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.HOURS).build();

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("nodeDetectService").build());

private final long NODE_DETECT_THRESHOLD = 5 * 60 * 1000;

Expand Down Expand Up @@ -89,7 +91,7 @@ public void work() {

n = Math.min(n, nodeStats.size());

for(int i = 0; i < n; i++) {
for (int i = 0; i < n; i++) {
detect(nodeStats.get(i));
}
}
Expand All @@ -108,7 +110,7 @@ private void loadNodes() {
int size = nodeStatMap.size();
int count = 0;
List<Node> nodes = NodeManager.getConnectableNodes();
for (Node node: nodes) {
for (Node node : nodes) {
InetSocketAddress socketAddress = node.getPreferInetSocketAddress();
if (socketAddress != null
&& !nodeStatMap.containsKey(socketAddress)
Expand All @@ -131,15 +133,15 @@ private void detect(NodeStat stat) {
peerClient.connectAsync(stat.getNode(), true);
} catch (Exception e) {
log.warn("Detect node {} failed, {}",
stat.getNode().getPreferInetSocketAddress(), e.getMessage());
stat.getNode().getPreferInetSocketAddress(), e.getMessage());
nodeStatMap.remove(stat.getSocketAddress());
}
}

public synchronized void processMessage(Channel channel, Message message) {
StatusMessage statusMessage = (StatusMessage)message;
StatusMessage statusMessage = (StatusMessage) message;

if(!channel.isActive()) {
if (!channel.isActive()) {
channel.setDiscoveryMode(true);
channel.send(new StatusMessage());
channel.getCtx().close();
Expand All @@ -153,8 +155,8 @@ public synchronized void processMessage(Channel channel, Message message) {
}

long cost = System.currentTimeMillis() - nodeStat.getLastDetectTime();
if(cost > NODE_DETECT_TIMEOUT
|| statusMessage.getRemainConnections() == 0) {
if (cost > NODE_DETECT_TIMEOUT
|| statusMessage.getRemainConnections() == 0) {
badNodesCache.put(socketAddress.getAddress(), cost);
nodeStatMap.remove(socketAddress);
}
Expand All @@ -167,7 +169,7 @@ public synchronized void processMessage(Channel channel, Message message) {

public void notifyDisconnect(Channel channel) {

if(!channel.isActive()) {
if (!channel.isActive()) {
return;
}

Expand All @@ -181,7 +183,7 @@ public void notifyDisconnect(Channel channel) {
return;
}

if(nodeStat.getLastDetectTime() != nodeStat.getLastSuccessDetectTime()) {
if (nodeStat.getLastDetectTime() != nodeStat.getLastSuccessDetectTime()) {
badNodesCache.put(socketAddress.getAddress(), System.currentTimeMillis());
nodeStatMap.remove(socketAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.connection.Channel;
import org.tron.p2p.connection.ChannelManager;
import org.tron.p2p.connection.business.MessageProcess;
Expand All @@ -17,8 +18,8 @@
@Slf4j(topic = "net")
public class KeepAliveService implements MessageProcess {

private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "KeepAlive"));
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("keepAlive").build());

public void init() {
executor.scheduleWithFixedDelay(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.bouncycastle.util.encoders.Hex;
import org.tron.p2p.P2pConfig;
import org.tron.p2p.P2pEventHandler;
Expand Down Expand Up @@ -47,8 +48,10 @@ public class ConnPoolService extends P2pEventHandler {
private final AtomicInteger activePeersCount = new AtomicInteger(0);
@Getter
private final AtomicInteger connectingPeersCount = new AtomicInteger(0);
private final ScheduledExecutorService poolLoopExecutor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService disconnectExecutor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService poolLoopExecutor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("connPool").build());
private final ScheduledExecutorService disconnectExecutor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("randomDisconnect").build());

public P2pConfig p2pConfig = Parameter.p2pConfig;
private PeerClient peerClient;
Expand Down Expand Up @@ -116,7 +119,8 @@ private void connect(boolean isFilterActiveNodes) {
Parameter.p2pConfig.getIpv6(), Parameter.p2pConfig.getPort()));

p2pConfig.getActiveNodes().forEach(address -> {
if (!isFilterActiveNodes && !inetInUse.contains(address) && !addressInUse.contains(address.getAddress())) {
if (!isFilterActiveNodes && !inetInUse.contains(address) && !addressInUse.contains(
address.getAddress())) {
addressInUse.add(address.getAddress());
inetInUse.add(address);
Node node = new Node(address); //use a random NodeId for config activeNodes
Expand Down
19 changes: 6 additions & 13 deletions src/main/java/org/tron/p2p/connection/socket/PeerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.bouncycastle.util.encoders.Hex;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.connection.ChannelManager;
Expand All @@ -23,14 +22,8 @@ public class PeerClient {
private EventLoopGroup workerGroup;

public void init() {
workerGroup = new NioEventLoopGroup(0, new ThreadFactory() {
private final AtomicInteger cnt = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PeerClient-" + cnt.getAndIncrement());
}
});
workerGroup = new NioEventLoopGroup(0,
new BasicThreadFactory.Builder().namingPattern("peerClient-%d").build());
}

public void close() {
Expand All @@ -49,9 +42,9 @@ public void connect(String host, int port, String remoteId) {

public ChannelFuture connect(Node node, ChannelFutureListener future) {
ChannelFuture channelFuture = connectAsync(
node.getPreferInetSocketAddress().getAddress().getHostAddress(),
node.getPort(),
node.getId() == null ? Hex.toHexString(NetUtil.getNodeId()) : node.getHexId(), false, false);
node.getPreferInetSocketAddress().getAddress().getHostAddress(), node.getPort(),
node.getId() == null ? Hex.toHexString(NetUtil.getNodeId()) : node.getHexId(), false,
false);
if (future != null) {
channelFuture.addListener(future);
}
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/tron/p2p/connection/socket/PeerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;

@Slf4j(topic = "net")
public class PeerServer {

private ChannelFuture channelFuture;
private boolean listening;

Expand All @@ -36,9 +38,11 @@ public void close() {
}

public void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup bossGroup = new NioEventLoopGroup(1,
new BasicThreadFactory.Builder().namingPattern("peerBoss").build());
//if threads = 0, it is number of core * 2
EventLoopGroup workerGroup = new NioEventLoopGroup(Parameter.TCP_NETTY_WORK_THREAD_NUM);
EventLoopGroup workerGroup = new NioEventLoopGroup(Parameter.TCP_NETTY_WORK_THREAD_NUM,
new BasicThreadFactory.Builder().namingPattern("peerWorker-%d").build());
P2pChannelInitializer p2pChannelInitializer = new P2pChannelInitializer("", false, true);
try {
ServerBootstrap b = new ServerBootstrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.discover.Node;
import org.tron.p2p.discover.protocol.kad.table.KademliaOptions;
import org.tron.p2p.utils.NetUtil;

@Slf4j(topic = "net")
public class DiscoverTask {

private ScheduledExecutorService discoverer = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService discoverer = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("discoverTask").build());

private KadService kadService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.discover.DiscoverService;
import org.tron.p2p.discover.Node;
Expand Down Expand Up @@ -55,7 +56,8 @@ public void init() {
for (InetSocketAddress address : Parameter.p2pConfig.getActiveNodes()) {
bootNodes.add(new Node(address));
}
this.pongTimer = Executors.newSingleThreadScheduledExecutor();
this.pongTimer = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("pongTimer").build());
this.homeNode = new Node(Parameter.p2pConfig.getNodeID(), Parameter.p2pConfig.getIp(),
Parameter.p2pConfig.getIpv6(), Parameter.p2pConfig.getPort());
this.table = new NodeTable(homeNode);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package org.tron.p2p.discover.socket;

import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.stats.TrafficStats;

@Slf4j(topic = "net")
public class DiscoverServer {

private Channel channel;
private EventHandler eventHandler;

Expand Down Expand Up @@ -46,7 +48,8 @@ public void close() {
}

private void start() throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup(Parameter.UDP_NETTY_WORK_THREAD_NUM);
NioEventLoopGroup group = new NioEventLoopGroup(Parameter.UDP_NETTY_WORK_THREAD_NUM,
new BasicThreadFactory.Builder().namingPattern("discoverServer").build());
try {
while (!shutdown) {
Bootstrap b = new Bootstrap();
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/tron/p2p/dns/sync/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.dns.lookup.LookUpTxt;
import org.tron.p2p.dns.tree.Algorithm;
Expand All @@ -39,7 +41,8 @@ public class Client {
private final Map<String, Tree> trees = new ConcurrentHashMap<>();
private final Map<String, ClientTree> clientTrees = new HashMap<>();

private final ScheduledExecutorService syncer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService syncer = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("dnsSyncer").build());

public Client() {
this.cache = CacheBuilder.newBuilder()
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/tron/p2p/dns/update/PublishService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.discover.Node;
import org.tron.p2p.discover.NodeManager;
Expand All @@ -23,7 +24,8 @@ public class PublishService {

private static final long publishDelay = 1 * 60 * 60;

private ScheduledExecutorService publisher = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService publisher = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("publishService").build());
private Publish publish;

public void init() {
Expand Down

0 comments on commit a5e6d04

Please sign in to comment.