diff --git a/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java b/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java new file mode 100644 index 00000000000..f1e60fdcfbc --- /dev/null +++ b/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java @@ -0,0 +1,55 @@ +package org.tron.common.es; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import lombok.extern.slf4j.Slf4j; + +@Slf4j(topic = "common") +public class ExecutorServiceManager { + + public static ExecutorService newSingleThreadExecutor(String name) { + return newSingleThreadExecutor(name, false); + } + + public static ExecutorService newSingleThreadExecutor(String name, boolean isDaemon) { + return Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build()); + } + + + public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) { + return newSingleThreadScheduledExecutor(name, false); + } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name, + boolean isDaemon) { + return Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build()); + } + + public static void shutdownAndAwaitTermination(ExecutorService pool, String name) { + if (pool == null) { + return; + } + logger.info("Pool {} shutdown...", name); + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) { + logger.warn("Pool {} did not terminate", name); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + logger.info("Pool {} shutdown done", name); + } +} diff --git a/consensus/src/main/java/org/tron/consensus/dpos/DposTask.java b/consensus/src/main/java/org/tron/consensus/dpos/DposTask.java index dcfa85ca5f3..537fe49ae65 100644 --- a/consensus/src/main/java/org/tron/consensus/dpos/DposTask.java +++ b/consensus/src/main/java/org/tron/consensus/dpos/DposTask.java @@ -3,12 +3,14 @@ import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL; import com.google.protobuf.ByteString; +import java.util.concurrent.ExecutorService; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.joda.time.DateTime; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; +import org.springframework.util.ObjectUtils; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.ByteArray; import org.tron.common.utils.Sha256Hash; @@ -34,16 +36,18 @@ public class DposTask { @Setter private DposService dposService; - private Thread produceThread; + private ExecutorService produceExecutor; + + private final String name = "DPosMiner"; private volatile boolean isRunning = true; public void init() { - if (!dposService.isEnable() || StringUtils.isEmpty(dposService.getMiners())) { + if (!dposService.isEnable() || ObjectUtils.isEmpty(dposService.getMiners())) { return; } - + produceExecutor = ExecutorServiceManager.newSingleThreadExecutor(name); Runnable runnable = () -> { while (isRunning) { try { @@ -67,17 +71,13 @@ public void init() { } } }; - produceThread = new Thread(runnable, "DPosMiner"); - produceThread.start(); + produceExecutor.submit(runnable); logger.info("DPoS task started."); } public void stop() { isRunning = false; - if (produceThread != null) { - produceThread.interrupt(); - } - logger.info("DPoS task stopped."); + ExecutorServiceManager.shutdownAndAwaitTermination(produceExecutor, name); } private State produceBlock() { diff --git a/framework/src/main/java/org/tron/common/backup/socket/BackupServer.java b/framework/src/main/java/org/tron/common/backup/socket/BackupServer.java index e3b1de31736..fa2c0947852 100644 --- a/framework/src/main/java/org/tron/common/backup/socket/BackupServer.java +++ b/framework/src/main/java/org/tron/common/backup/socket/BackupServer.java @@ -7,11 +7,13 @@ import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.tron.common.backup.BackupManager; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.p2p.stats.TrafficStats; @@ -29,6 +31,9 @@ public class BackupServer { private volatile boolean shutdown = false; + private final String name = "BackupServer"; + private ExecutorService executor; + @Autowired public BackupServer(final BackupManager backupManager) { this.backupManager = backupManager; @@ -36,13 +41,14 @@ public BackupServer(final BackupManager backupManager) { public void initServer() { if (port > 0 && commonParameter.getBackupMembers().size() > 0) { - new Thread(() -> { + executor = ExecutorServiceManager.newSingleThreadExecutor(name); + executor.submit(() -> { try { start(); } catch (Exception e) { logger.error("Start backup server failed, {}", e); } - }, "BackupServer").start(); + }); } } @@ -88,6 +94,7 @@ public void initChannel(NioDatagramChannel ch) public void close() { logger.info("Closing backup server..."); shutdown = true; + ExecutorServiceManager.shutdownAndAwaitTermination(executor, name); if (channel != null) { try { channel.close().await(10, TimeUnit.SECONDS); @@ -95,5 +102,6 @@ public void close() { logger.warn("Closing backup server failed.", e); } } + logger.info("Backup server closed."); } } diff --git a/framework/src/main/java/org/tron/core/config/TronLogShutdownHook.java b/framework/src/main/java/org/tron/core/config/TronLogShutdownHook.java index 880aa7e3090..f873b88ca44 100644 --- a/framework/src/main/java/org/tron/core/config/TronLogShutdownHook.java +++ b/framework/src/main/java/org/tron/core/config/TronLogShutdownHook.java @@ -16,9 +16,9 @@ public class TronLogShutdownHook extends ShutdownHookBase { private static final Duration CHECK_SHUTDOWN_DELAY = Duration.buildByMilliseconds(100); /** - * The check times before shutdown. default is 50 + * The check times before shutdown. default is 60000/100 = 600 times. */ - private Integer check_times = 50; + private final long check_times = 60 * 1000 / CHECK_SHUTDOWN_DELAY.getMilliseconds(); public TronLogShutdownHook() { } diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index 3928f657e44..3a18dce32a9 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -49,6 +49,7 @@ import org.tron.api.GrpcAPI.TransactionInfoList; import org.tron.common.args.GenesisBlock; import org.tron.common.bloom.Bloom; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.logsfilter.EventPluginLoader; import org.tron.common.logsfilter.FilterQuery; import org.tron.common.logsfilter.capsule.BlockFilterCapsule; @@ -253,6 +254,13 @@ public class Manager { private AtomicInteger blockWaitLock = new AtomicInteger(0); private Object transactionLock = new Object(); + private ExecutorService rePushEs; + private static final String rePushEsName = "repush"; + private ExecutorService triggerEs; + private static final String triggerEsName = "event-trigger"; + private ExecutorService filterEs; + private static final String filterEsName = "filter"; + /** * Cycle thread to rePush Transactions */ @@ -429,14 +437,17 @@ public BlockingQueue getRePushTransactions() { public void stopRePushThread() { isRunRePushThread = false; + ExecutorServiceManager.shutdownAndAwaitTermination(rePushEs, rePushEsName); } public void stopRePushTriggerThread() { isRunTriggerCapsuleProcessThread = false; + ExecutorServiceManager.shutdownAndAwaitTermination(triggerEs, triggerEsName); } public void stopFilterProcessThread() { isRunFilterProcessThread = false; + ExecutorServiceManager.shutdownAndAwaitTermination(filterEs, filterEsName); } @PostConstruct @@ -524,21 +535,19 @@ public void init() { revokingStore.enable(); validateSignService = Executors .newFixedThreadPool(Args.getInstance().getValidateSignThreadNum()); - Thread rePushThread = new Thread(rePushLoop); - rePushThread.setDaemon(true); - rePushThread.start(); + rePushEs = ExecutorServiceManager.newSingleThreadExecutor(rePushEsName, true); + rePushEs.submit(rePushLoop); // add contract event listener for subscribing if (Args.getInstance().isEventSubscribe()) { startEventSubscribing(); - Thread triggerCapsuleProcessThread = new Thread(triggerCapsuleProcessLoop); - triggerCapsuleProcessThread.setDaemon(true); - triggerCapsuleProcessThread.start(); + triggerEs = ExecutorServiceManager.newSingleThreadExecutor(triggerEsName, true); + triggerEs.submit(triggerCapsuleProcessLoop); } // start json rpc filter process if (CommonParameter.getInstance().isJsonRpcFilterEnabled()) { - Thread filterProcessThread = new Thread(filterProcessLoop); - filterProcessThread.start(); + filterEs = ExecutorServiceManager.newSingleThreadExecutor(filterEsName); + filterEs.submit(filterProcessLoop); } //initStoreFactory diff --git a/framework/src/main/java/org/tron/core/net/service/nodepersist/NodePersistService.java b/framework/src/main/java/org/tron/core/net/service/nodepersist/NodePersistService.java index 457fe7c55cb..2445a7d64e5 100644 --- a/framework/src/main/java/org/tron/core/net/service/nodepersist/NodePersistService.java +++ b/framework/src/main/java/org/tron/core/net/service/nodepersist/NodePersistService.java @@ -4,12 +4,12 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Objects; -import java.util.Timer; -import java.util.TimerTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.ByteArray; import org.tron.common.utils.JsonUtil; @@ -27,28 +27,22 @@ public class NodePersistService { private final boolean isNodePersist = CommonParameter.getInstance().isNodeDiscoveryPersist(); @Autowired private CommonStore commonStore; - private Timer nodePersistTaskTimer; + + private ScheduledExecutorService nodePersistExecutor; + + private final String name = "NodePersistTask"; public void init() { if (isNodePersist) { - nodePersistTaskTimer = new Timer("NodePersistTaskTimer"); - nodePersistTaskTimer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - dbWrite(); - } - }, DB_COMMIT_RATE, DB_COMMIT_RATE); + nodePersistExecutor = ExecutorServiceManager.newSingleThreadScheduledExecutor(name); + nodePersistExecutor.scheduleAtFixedRate(this::dbWrite, DB_COMMIT_RATE, DB_COMMIT_RATE, + TimeUnit.MILLISECONDS); } } public void close() { - if (Objects.isNull(nodePersistTaskTimer)) { - return; - } - try { - nodePersistTaskTimer.cancel(); - } catch (Exception e) { - logger.error("Close nodePersistTaskTimer failed", e); + if (isNodePersist) { + ExecutorServiceManager.shutdownAndAwaitTermination(nodePersistExecutor, name); } } diff --git a/framework/src/test/java/org/tron/common/backup/BackupServerTest.java b/framework/src/test/java/org/tron/common/backup/BackupServerTest.java new file mode 100644 index 00000000000..34b17ec186f --- /dev/null +++ b/framework/src/test/java/org/tron/common/backup/BackupServerTest.java @@ -0,0 +1,44 @@ +package org.tron.common.backup; + +import java.util.ArrayList; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.tron.common.backup.socket.BackupServer; +import org.tron.common.parameter.CommonParameter; +import org.tron.core.Constant; +import org.tron.core.config.args.Args; + + +public class BackupServerTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private BackupServer backupServer; + + @Before + public void setUp() throws Exception { + Args.setParam(new String[]{"-d", temporaryFolder.newFolder().toString()}, Constant.TEST_CONF); + CommonParameter.getInstance().setBackupPort(80); + List members = new ArrayList<>(); + members.add("127.0.0.2"); + CommonParameter.getInstance().setBackupMembers(members); + BackupManager backupManager = new BackupManager(); + backupManager.init(); + backupServer = new BackupServer(backupManager); + } + + @After + public void tearDown() { + backupServer.close(); + Args.clearParam(); + } + + @Test + public void test() throws InterruptedException { + backupServer.initServer(); + } +}