Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(all): tune single Thread into SingleThreadExecutor #5410

Merged
merged 5 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.tron.common.es;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "common")
public class ExecutorServiceManager {

public static ExecutorService newSingleThreadExecutor(String name) {
return newSingleThreadExecutor(name, false);
}

public static ExecutorService newSingleThreadExecutor(String name, boolean isDaemon) {
return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
}


public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) {
return newSingleThreadScheduledExecutor(name, false);
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name,
boolean isDaemon) {
return Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
}

public static void shutdownAndAwaitTermination(ExecutorService pool, String name) {
if (pool == null) {
return;
}
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there are two waits of 60s, will the 60s time be too long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60s is the maximum value to be waited until the pool stops, if it doesn't, the pool may have a problem executing the task and needs to be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.warn("Pool {} did not terminate", name);
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
22 changes: 12 additions & 10 deletions consensus/src/main/java/org/tron/consensus/dpos/DposTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;

import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutorService;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.util.ObjectUtils;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.Sha256Hash;
Expand All @@ -34,16 +36,18 @@ public class DposTask {
@Setter
private DposService dposService;

private Thread produceThread;
private ExecutorService produceExecutor;

private final String name = "DPosMiner";

private volatile boolean isRunning = true;

public void init() {

if (!dposService.isEnable() || StringUtils.isEmpty(dposService.getMiners())) {
if (!dposService.isEnable() || ObjectUtils.isEmpty(dposService.getMiners())) {
return;
}

produceExecutor = ExecutorServiceManager.newSingleThreadExecutor(name);
Runnable runnable = () -> {
while (isRunning) {
try {
Expand All @@ -67,17 +71,15 @@ public void init() {
}
}
};
produceThread = new Thread(runnable, "DPosMiner");
produceThread.start();
produceExecutor.submit(runnable);
logger.info("DPoS task started.");
}

public void stop() {
logger.info("DPoS task shutdown...");
isRunning = false;
if (produceThread != null) {
produceThread.interrupt();
}
logger.info("DPoS task stopped.");
ExecutorServiceManager.shutdownAndAwaitTermination(produceExecutor, name);
logger.info("DPoS task shutdown complete");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the thread shutdown log be printed in the function shutdownAndAwaitTermination?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Great!

}

private State produceBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.backup.BackupManager;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.p2p.stats.TrafficStats;

Expand All @@ -29,20 +31,24 @@ public class BackupServer {

private volatile boolean shutdown = false;

private final String name = "BackupServer";
private ExecutorService executor;

@Autowired
public BackupServer(final BackupManager backupManager) {
this.backupManager = backupManager;
}

public void initServer() {
if (port > 0 && commonParameter.getBackupMembers().size() > 0) {
new Thread(() -> {
executor = ExecutorServiceManager.newSingleThreadExecutor(name);
executor.submit(() -> {
try {
start();
} catch (Exception e) {
logger.error("Start backup server failed, {}", e);
}
}, "BackupServer").start();
});
}
}

Expand Down Expand Up @@ -95,5 +101,7 @@ public void close() {
logger.warn("Closing backup server failed.", e);
}
}
ExecutorServiceManager.shutdownAndAwaitTermination(executor, name);
logger.info("Backup server closed.");
}
}
25 changes: 17 additions & 8 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.tron.api.GrpcAPI.TransactionInfoList;
import org.tron.common.args.GenesisBlock;
import org.tron.common.bloom.Bloom;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.logsfilter.EventPluginLoader;
import org.tron.common.logsfilter.FilterQuery;
import org.tron.common.logsfilter.capsule.BlockFilterCapsule;
Expand Down Expand Up @@ -253,6 +254,13 @@ public class Manager {
private AtomicInteger blockWaitLock = new AtomicInteger(0);
private Object transactionLock = new Object();

private ExecutorService rePushEs;
private static final String rePushEsName = "repush";
private ExecutorService triggerEs;
private static final String triggerEsName = "event-trigger";
private ExecutorService filterEs;
private static final String filterEsName = "filter";

/**
* Cycle thread to rePush Transactions
*/
Expand Down Expand Up @@ -429,14 +437,17 @@ public BlockingQueue<TransactionCapsule> getRePushTransactions() {

public void stopRePushThread() {
isRunRePushThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(rePushEs, rePushEsName);
}

public void stopRePushTriggerThread() {
isRunTriggerCapsuleProcessThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(triggerEs, triggerEsName);
}

public void stopFilterProcessThread() {
isRunFilterProcessThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(filterEs, filterEsName);
}

@PostConstruct
Expand Down Expand Up @@ -524,21 +535,19 @@ public void init() {
revokingStore.enable();
validateSignService = Executors
.newFixedThreadPool(Args.getInstance().getValidateSignThreadNum());
Thread rePushThread = new Thread(rePushLoop);
rePushThread.setDaemon(true);
rePushThread.start();
rePushEs = ExecutorServiceManager.newSingleThreadExecutor(rePushEsName, true);
rePushEs.submit(rePushLoop);
// add contract event listener for subscribing
if (Args.getInstance().isEventSubscribe()) {
startEventSubscribing();
Thread triggerCapsuleProcessThread = new Thread(triggerCapsuleProcessLoop);
triggerCapsuleProcessThread.setDaemon(true);
triggerCapsuleProcessThread.start();
triggerEs = ExecutorServiceManager.newSingleThreadExecutor(triggerEsName, true);
triggerEs.submit(triggerCapsuleProcessLoop);
}

// start json rpc filter process
if (CommonParameter.getInstance().isJsonRpcFilterEnabled()) {
Thread filterProcessThread = new Thread(filterProcessLoop);
filterProcessThread.start();
filterEs = ExecutorServiceManager.newSingleThreadExecutor(filterEsName, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isDaemon original default value is false

filterEs.submit(filterProcessLoop);
}

//initStoreFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.JsonUtil;
Expand All @@ -27,28 +27,24 @@ public class NodePersistService {
private final boolean isNodePersist = CommonParameter.getInstance().isNodeDiscoveryPersist();
@Autowired
private CommonStore commonStore;
private Timer nodePersistTaskTimer;

private ScheduledExecutorService nodePersistExecutor;

private final String name = "NodePersistTask";

public void init() {
if (isNodePersist) {
nodePersistTaskTimer = new Timer("NodePersistTaskTimer");
nodePersistTaskTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
dbWrite();
}
}, DB_COMMIT_RATE, DB_COMMIT_RATE);
nodePersistExecutor = ExecutorServiceManager.newSingleThreadScheduledExecutor(name);
nodePersistExecutor.scheduleAtFixedRate(this::dbWrite, DB_COMMIT_RATE, DB_COMMIT_RATE,
TimeUnit.MILLISECONDS);
}
}

public void close() {
if (Objects.isNull(nodePersistTaskTimer)) {
return;
}
try {
nodePersistTaskTimer.cancel();
} catch (Exception e) {
logger.error("Close nodePersistTaskTimer failed", e);
if (isNodePersist) {
logger.info("Node persist service shutdown...");
ExecutorServiceManager.shutdownAndAwaitTermination(nodePersistExecutor, name);
logger.info("Node persist service shutdown complete");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.tron.common.backup;

import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.tron.common.backup.socket.BackupServer;
import org.tron.common.parameter.CommonParameter;
import org.tron.core.Constant;
import org.tron.core.config.args.Args;


public class BackupServerTest {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private BackupServer backupServer;

@Before
public void setUp() throws Exception {
Args.setParam(new String[]{"-d", temporaryFolder.newFolder().toString()}, Constant.TEST_CONF);
CommonParameter.getInstance().setBackupPort(80);
List<String> members = new ArrayList<>();
members.add("127.0.0.2");
CommonParameter.getInstance().setBackupMembers(members);
BackupManager backupManager = new BackupManager();
backupManager.init();
backupServer = new BackupServer(backupManager);
}

@After
public void tearDown() {
backupServer.close();
Args.clearParam();
}

@Test
public void test() throws InterruptedException {
backupServer.initServer();
}
}