Skip to content

Commit

Permalink
[ISSUE apache#4796] Use ThreadPoolFactory to create thread pool in ev…
Browse files Browse the repository at this point in the history
…entmesh-common model, and Set thread names to facilitate troubleshooting.
  • Loading branch information
jevinjiang committed Mar 20, 2024
1 parent 0250a89 commit fde8830
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.common;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -57,4 +58,8 @@ public static ScheduledExecutorService createSingleScheduledExecutor(final Strin
public static ScheduledExecutorService createScheduledExecutor(int core, ThreadFactory threadFactory) {
return Executors.newScheduledThreadPool(core, threadFactory);
}

public static ExecutorService createSingleExecutor(final String threadName) {
return Executors.newSingleThreadExecutor(new EventMeshThreadFactory(threadName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand Down Expand Up @@ -56,7 +57,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -87,9 +87,11 @@ public class SourceWorker implements ConnectorWorker {

private volatile RecordOffsetManagement.CommittableOffsets committableOffsets;

private final ExecutorService pollService = Executors.newSingleThreadExecutor();
private final ExecutorService pollService =
ThreadPoolFactory.createSingleExecutor("eventMesh-sourceWorker-pollService");

private final ExecutorService startService = Executors.newSingleThreadExecutor();
private final ExecutorService startService =
ThreadPoolFactory.createSingleExecutor("eventMesh-sourceWorker-startService");

private final BlockingQueue<ConnectRecord> queue;
private final EventMeshTCPClient<CloudEvent> eventMeshTCPClient;
Expand Down

0 comments on commit fde8830

Please sign in to comment.