Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(log): optimize thread's name #57

Merged
merged 7 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.cache.CacheBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.connection.Channel;
import org.tron.p2p.connection.business.MessageProcess;
Expand All @@ -27,9 +28,10 @@ public class NodeDetectService implements MessageProcess {

@Getter
private static final Cache<InetAddress, Long> badNodesCache = CacheBuilder
.newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.HOURS).build();
.newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.HOURS).build();

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("nodeDetectService").build());

private final long NODE_DETECT_THRESHOLD = 5 * 60 * 1000;

Expand Down Expand Up @@ -89,7 +91,7 @@ public void work() {

n = Math.min(n, nodeStats.size());

for(int i = 0; i < n; i++) {
for (int i = 0; i < n; i++) {
detect(nodeStats.get(i));
}
}
Expand All @@ -108,7 +110,7 @@ private void loadNodes() {
int size = nodeStatMap.size();
int count = 0;
List<Node> nodes = NodeManager.getConnectableNodes();
for (Node node: nodes) {
for (Node node : nodes) {
InetSocketAddress socketAddress = node.getPreferInetSocketAddress();
if (socketAddress != null
&& !nodeStatMap.containsKey(socketAddress)
Expand All @@ -131,15 +133,15 @@ private void detect(NodeStat stat) {
peerClient.connectAsync(stat.getNode(), true);
} catch (Exception e) {
log.warn("Detect node {} failed, {}",
stat.getNode().getPreferInetSocketAddress(), e.getMessage());
stat.getNode().getPreferInetSocketAddress(), e.getMessage());
nodeStatMap.remove(stat.getSocketAddress());
}
}

public synchronized void processMessage(Channel channel, Message message) {
StatusMessage statusMessage = (StatusMessage)message;
StatusMessage statusMessage = (StatusMessage) message;

if(!channel.isActive()) {
if (!channel.isActive()) {
channel.setDiscoveryMode(true);
channel.send(new StatusMessage());
channel.getCtx().close();
Expand All @@ -153,8 +155,8 @@ public synchronized void processMessage(Channel channel, Message message) {
}

long cost = System.currentTimeMillis() - nodeStat.getLastDetectTime();
if(cost > NODE_DETECT_TIMEOUT
|| statusMessage.getRemainConnections() == 0) {
if (cost > NODE_DETECT_TIMEOUT
|| statusMessage.getRemainConnections() == 0) {
badNodesCache.put(socketAddress.getAddress(), cost);
nodeStatMap.remove(socketAddress);
}
Expand All @@ -167,7 +169,7 @@ public synchronized void processMessage(Channel channel, Message message) {

public void notifyDisconnect(Channel channel) {

if(!channel.isActive()) {
if (!channel.isActive()) {
return;
}

Expand All @@ -181,7 +183,7 @@ public void notifyDisconnect(Channel channel) {
return;
}

if(nodeStat.getLastDetectTime() != nodeStat.getLastSuccessDetectTime()) {
if (nodeStat.getLastDetectTime() != nodeStat.getLastSuccessDetectTime()) {
badNodesCache.put(socketAddress.getAddress(), System.currentTimeMillis());
nodeStatMap.remove(socketAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.connection.Channel;
import org.tron.p2p.connection.ChannelManager;
import org.tron.p2p.connection.business.MessageProcess;
Expand All @@ -17,8 +18,8 @@
@Slf4j(topic = "net")
public class KeepAliveService implements MessageProcess {

private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "KeepAlive"));
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("keepAlive").build());

public void init() {
executor.scheduleWithFixedDelay(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.bouncycastle.util.encoders.Hex;
import org.tron.p2p.P2pConfig;
import org.tron.p2p.P2pEventHandler;
Expand Down Expand Up @@ -47,8 +48,10 @@ public class ConnPoolService extends P2pEventHandler {
private final AtomicInteger activePeersCount = new AtomicInteger(0);
@Getter
private final AtomicInteger connectingPeersCount = new AtomicInteger(0);
private final ScheduledExecutorService poolLoopExecutor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService disconnectExecutor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService poolLoopExecutor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("connPool").build());
private final ScheduledExecutorService disconnectExecutor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("randomDisconnect").build());

public P2pConfig p2pConfig = Parameter.p2pConfig;
private PeerClient peerClient;
Expand Down Expand Up @@ -116,7 +119,8 @@ private void connect(boolean isFilterActiveNodes) {
Parameter.p2pConfig.getIpv6(), Parameter.p2pConfig.getPort()));

p2pConfig.getActiveNodes().forEach(address -> {
if (!isFilterActiveNodes && !inetInUse.contains(address) && !addressInUse.contains(address.getAddress())) {
if (!isFilterActiveNodes && !inetInUse.contains(address) && !addressInUse.contains(
address.getAddress())) {
addressInUse.add(address.getAddress());
inetInUse.add(address);
Node node = new Node(address); //use a random NodeId for config activeNodes
Expand Down
19 changes: 6 additions & 13 deletions src/main/java/org/tron/p2p/connection/socket/PeerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.bouncycastle.util.encoders.Hex;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.connection.ChannelManager;
Expand All @@ -23,14 +22,8 @@ public class PeerClient {
private EventLoopGroup workerGroup;

public void init() {
workerGroup = new NioEventLoopGroup(0, new ThreadFactory() {
private final AtomicInteger cnt = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PeerClient-" + cnt.getAndIncrement());
}
});
workerGroup = new NioEventLoopGroup(0,
new BasicThreadFactory.Builder().namingPattern("peerClient-%d").build());
}

public void close() {
Expand All @@ -49,9 +42,9 @@ public void connect(String host, int port, String remoteId) {

public ChannelFuture connect(Node node, ChannelFutureListener future) {
ChannelFuture channelFuture = connectAsync(
node.getPreferInetSocketAddress().getAddress().getHostAddress(),
node.getPort(),
node.getId() == null ? Hex.toHexString(NetUtil.getNodeId()) : node.getHexId(), false, false);
node.getPreferInetSocketAddress().getAddress().getHostAddress(), node.getPort(),
node.getId() == null ? Hex.toHexString(NetUtil.getNodeId()) : node.getHexId(), false,
false);
if (future != null) {
channelFuture.addListener(future);
}
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/tron/p2p/connection/socket/PeerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;

@Slf4j(topic = "net")
public class PeerServer {

private ChannelFuture channelFuture;
private boolean listening;

Expand All @@ -36,9 +38,11 @@ public void close() {
}

public void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup bossGroup = new NioEventLoopGroup(1,
new BasicThreadFactory.Builder().namingPattern("peerBoss").build());
//if threads = 0, it is number of core * 2
EventLoopGroup workerGroup = new NioEventLoopGroup(Parameter.TCP_NETTY_WORK_THREAD_NUM);
EventLoopGroup workerGroup = new NioEventLoopGroup(Parameter.TCP_NETTY_WORK_THREAD_NUM,
new BasicThreadFactory.Builder().namingPattern("peerWorker-%d").build());
P2pChannelInitializer p2pChannelInitializer = new P2pChannelInitializer("", false, true);
try {
ServerBootstrap b = new ServerBootstrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.discover.Node;
import org.tron.p2p.discover.protocol.kad.table.KademliaOptions;
import org.tron.p2p.utils.NetUtil;

@Slf4j(topic = "net")
public class DiscoverTask {

private ScheduledExecutorService discoverer = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService discoverer = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("discoverTask").build());

private KadService kadService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.discover.DiscoverService;
import org.tron.p2p.discover.Node;
Expand Down Expand Up @@ -55,7 +56,8 @@ public void init() {
for (InetSocketAddress address : Parameter.p2pConfig.getActiveNodes()) {
bootNodes.add(new Node(address));
}
this.pongTimer = Executors.newSingleThreadScheduledExecutor();
this.pongTimer = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("pongTimer").build());
this.homeNode = new Node(Parameter.p2pConfig.getNodeID(), Parameter.p2pConfig.getIp(),
Parameter.p2pConfig.getIpv6(), Parameter.p2pConfig.getPort());
this.table = new NodeTable(homeNode);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package org.tron.p2p.discover.socket;

import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.stats.TrafficStats;

@Slf4j(topic = "net")
public class DiscoverServer {

private Channel channel;
private EventHandler eventHandler;

Expand Down Expand Up @@ -46,7 +48,8 @@ public void close() {
}

private void start() throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup(Parameter.UDP_NETTY_WORK_THREAD_NUM);
NioEventLoopGroup group = new NioEventLoopGroup(Parameter.UDP_NETTY_WORK_THREAD_NUM,
new BasicThreadFactory.Builder().namingPattern("discoverServer").build());
try {
while (!shutdown) {
Bootstrap b = new Bootstrap();
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/tron/p2p/dns/sync/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.dns.lookup.LookUpTxt;
import org.tron.p2p.dns.tree.Algorithm;
Expand All @@ -39,7 +41,8 @@ public class Client {
private final Map<String, Tree> trees = new ConcurrentHashMap<>();
private final Map<String, ClientTree> clientTrees = new HashMap<>();

private final ScheduledExecutorService syncer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService syncer = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("dnsSyncer").build());

public Client() {
this.cache = CacheBuilder.newBuilder()
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/tron/p2p/dns/update/PublishService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.discover.Node;
import org.tron.p2p.discover.NodeManager;
Expand All @@ -23,7 +24,8 @@ public class PublishService {

private static final long publishDelay = 1 * 60 * 60;

private ScheduledExecutorService publisher = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService publisher = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder().namingPattern("publishService").build());
private Publish publish;

public void init() {
Expand Down