Skip to content

Commit

Permalink
[MINOR] new Runnable() can be replaced with lambda (#685)
Browse files Browse the repository at this point in the history
* gradle

* Anonymous new Runnable() can be replaced with lambda

* Anonymous new Runnable() can be replaced with lambda

* add logger print exception
  • Loading branch information
li-xiao-shuang authored Dec 31, 2021
1 parent 0b3a968 commit 42decd4
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,11 @@ public void sendResponse(ChannelHandlerContext ctx, DefaultFullHttpResponse resp
}
}

ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
httpLogger.warn("send response to [{}] fail, will close this channel",
RemotingHelper.parseChannelRemoteAddr(f.channel()));
f.channel().close();
}
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
httpLogger.warn("send response to [{}] fail, will close this channel",
RemotingHelper.parseChannelRemoteAddr(f.channel()));
f.channel().close();
}
});
}
Expand All @@ -195,7 +192,6 @@ public void start() throws Exception {
} catch (Exception e1) {
httpServerLogger.error("HTTPServer shutdown Err!", e);
}
return;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public EventMeshTCPServer(EventMeshServer eventMeshServer,
this.registry = registry;
}

private void startServer() throws Exception {
private void startServer() {
Runnable r = () -> {
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelInitializer channelInitializer = new ChannelInitializer() {
Expand Down Expand Up @@ -174,6 +174,7 @@ public void initChannel(Channel ch) throws Exception {
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(channelInitializer);

try {
int port = eventMeshTCPConfiguration.eventMeshTcpServerPort;
ChannelFuture f = bootstrap.bind(port).sync();
Expand All @@ -186,7 +187,6 @@ public void initChannel(Channel ch) throws Exception {
} catch (Exception e1) {
logger.error("EventMeshTCPServer RemotingServer shutdown Err!", e);
}
return;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class SSLContextFactory {


public static SSLContext getSslContext() {
SSLContext sslContext = null;
SSLContext sslContext;
try {
protocol = System.getProperty("ssl.server.protocol", "TLSv1.1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,100 +73,99 @@ public void init() throws Exception {

public void start() throws Exception {
logger.info("consumerManager started......");
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
logger.info("clientInfo check start.....");
synchronized (eventMeshHTTPServer.localClientInfoMapping) {
Map<String, List<Client>> clientInfoMap =
eventMeshHTTPServer.localClientInfoMapping;
if (clientInfoMap.size() > 0) {
for (String key : clientInfoMap.keySet()) {
String consumerGroup = key.split("@")[0];
String topic = key.split("@")[1];
List<Client> clientList = clientInfoMap.get(key);
Iterator<Client> clientIterator = clientList.iterator();
boolean isChange = false;
while (clientIterator.hasNext()) {
Client client = clientIterator.next();
//The time difference is greater than 3 heartbeat cycles
if (System.currentTimeMillis() - client.lastUpTime.getTime()
> DEFAULT_UPDATE_TIME) {
logger.warn(
"client {} lastUpdate time {} over three heartbeat cycles",
JsonUtils.serialize(client), client.lastUpTime);
clientIterator.remove();
isChange = true;
}

scheduledExecutorService.scheduleAtFixedRate(() -> {
logger.info("clientInfo check start.....");
synchronized (eventMeshHTTPServer.localClientInfoMapping) {
Map<String, List<Client>> clientInfoMap =
eventMeshHTTPServer.localClientInfoMapping;
if (clientInfoMap.size() > 0) {
for (String key : clientInfoMap.keySet()) {
String consumerGroup = key.split("@")[0];
String topic = key.split("@")[1];
List<Client> clientList = clientInfoMap.get(key);
Iterator<Client> clientIterator = clientList.iterator();
boolean isChange = false;
while (clientIterator.hasNext()) {
Client client = clientIterator.next();
//The time difference is greater than 3 heartbeat cycles
if (System.currentTimeMillis() - client.lastUpTime.getTime()
> DEFAULT_UPDATE_TIME) {
logger.warn(
"client {} lastUpdate time {} over three heartbeat cycles",
JsonUtils.serialize(client), client.lastUpTime);
clientIterator.remove();
isChange = true;
}
if (isChange) {
if (clientList.size() > 0) {
//change url
logger.info("consumerGroup {} client info changing",
consumerGroup);
Map<String, List<String>> idcUrls = new HashMap<>();
Set<String> clientUrls = new HashSet<>();
for (Client client : clientList) {
clientUrls.add(client.url);
if (idcUrls.containsKey(client.idc)) {
idcUrls.get(client.idc)
.add(StringUtils.deleteWhitespace(client.url));
} else {
List<String> urls = new ArrayList<>();
urls.add(client.url);
idcUrls.put(client.idc, urls);
}
}
if (isChange) {
if (clientList.size() > 0) {
//change url
logger.info("consumerGroup {} client info changing",
consumerGroup);
Map<String, List<String>> idcUrls = new HashMap<>();
Set<String> clientUrls = new HashSet<>();
for (Client client : clientList) {
clientUrls.add(client.url);
if (idcUrls.containsKey(client.idc)) {
idcUrls.get(client.idc)
.add(StringUtils.deleteWhitespace(client.url));
} else {
List<String> urls = new ArrayList<>();
urls.add(client.url);
idcUrls.put(client.idc, urls);
}
synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
ConsumerGroupConf consumerGroupConf =
eventMeshHTTPServer.localConsumerGroupMapping
.get(consumerGroup);
Map<String, ConsumerGroupTopicConf> map =
consumerGroupConf.getConsumerGroupTopicConf();
for (String topicKey : map.keySet()) {
if (StringUtils.equals(topic, topicKey)) {
ConsumerGroupTopicConf latestTopicConf =
new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(topic);
latestTopicConf.setSubscriptionItem(
map.get(topicKey).getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);

latestTopicConf.setIdcUrls(idcUrls);

map.put(topic, latestTopicConf);
}
}
}
synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
ConsumerGroupConf consumerGroupConf =
eventMeshHTTPServer.localConsumerGroupMapping
.put(consumerGroup, consumerGroupConf);
logger.info(
"consumerGroup {} client info changed, "
+ "consumerGroupConf {}", consumerGroup,
JsonUtils.serialize(consumerGroupConf));
try {
notifyConsumerManager(consumerGroup, consumerGroupConf);
} catch (Exception e) {
e.printStackTrace();
.get(consumerGroup);
Map<String, ConsumerGroupTopicConf> map =
consumerGroupConf.getConsumerGroupTopicConf();
for (String topicKey : map.keySet()) {
if (StringUtils.equals(topic, topicKey)) {
ConsumerGroupTopicConf latestTopicConf =
new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(topic);
latestTopicConf.setSubscriptionItem(
map.get(topicKey).getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);

latestTopicConf.setIdcUrls(idcUrls);

map.put(topic, latestTopicConf);
}
}
eventMeshHTTPServer.localConsumerGroupMapping
.put(consumerGroup, consumerGroupConf);
logger.info(
"consumerGroup {} client info changed, "
+ "consumerGroupConf {}", consumerGroup,
JsonUtils.serialize(consumerGroupConf));

} else {
logger.info("consumerGroup {} client info removed",
consumerGroup);
//remove
try {
notifyConsumerManager(consumerGroup, null);
notifyConsumerManager(consumerGroup, consumerGroupConf);
} catch (Exception e) {
e.printStackTrace();
logger.error("notifyConsumerManager error",e);
}
}

eventMeshHTTPServer.localConsumerGroupMapping.keySet()
.removeIf(s -> StringUtils.equals(consumerGroup, s));
} else {
logger.info("consumerGroup {} client info removed",
consumerGroup);
//remove
try {
notifyConsumerManager(consumerGroup, null);
} catch (Exception e) {
logger.error("notifyConsumerManager error",e);
}
}

eventMeshHTTPServer.localConsumerGroupMapping.keySet()
.removeIf(s -> StringUtils.equals(consumerGroup, s));
}
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public void init() {
pool = new ThreadPoolExecutor(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerRetryThreadNum,
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerRetryThreadNum,
60000,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerRetryBlockQSize),
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerRetryBlockQSize),
new ThreadFactory() {
private AtomicInteger count = new AtomicInteger();

Expand All @@ -74,27 +74,24 @@ public Thread newThread(Runnable r) {
},
new ThreadPoolExecutor.AbortPolicy());

dispatcher = new Thread(new Runnable() {
@Override
public void run() {
try {
DelayRetryable retryObj = null;
while (!Thread.currentThread().isInterrupted() && (retryObj = failed.take()) != null) {
final DelayRetryable delayRetryable = retryObj;
pool.execute(() -> {
try {
delayRetryable.retry();
if (retryLogger.isDebugEnabled()) {
retryLogger.debug("retryObj : {}", delayRetryable);
}
} catch (Exception e) {
retryLogger.error("http-retry-dispatcher error!", e);
dispatcher = new Thread(() -> {
try {
DelayRetryable retryObj;
while (!Thread.currentThread().isInterrupted() && (retryObj = failed.take()) != null) {
final DelayRetryable delayRetryable = retryObj;
pool.execute(() -> {
try {
delayRetryable.retry();
if (retryLogger.isDebugEnabled()) {
retryLogger.debug("retryObj : {}", delayRetryable);
}
});
}
} catch (Exception e) {
retryLogger.error("http-retry-dispatcher error!", e);
} catch (Exception e) {
retryLogger.error("http-retry-dispatcher error!", e);
}
});
}
} catch (Exception e) {
retryLogger.error("http-retry-dispatcher error!", e);
}
}, "http-retry-dispatcher");
dispatcher.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,14 @@ public void pushRetry(RetryContext retryContext) {
}

public void init() {
dispatcher = new Thread(new Runnable() {
@Override
public void run() {
try {
RetryContext retryContext = null;
while ((retryContext = retrys.take()) != null) {
final RetryContext finalRetryContext = retryContext;
pool.execute(() -> {
finalRetryContext.retry();
});
}
} catch (Exception e) {
logger.error("retry-dispatcher error!", e);
dispatcher = new Thread(() -> {
try {
RetryContext retryContext;
while ((retryContext = retrys.take()) != null) {
pool.execute(retryContext::retry);
}
} catch (Exception e) {
logger.error("retry-dispatcher error!", e);
}
}, "retry-dispatcher");
dispatcher.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,25 @@ public void init() throws Exception {

public void start() throws Exception {
openTelemetryHTTPMetricsExporter.start();
metricsSchedule.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
summaryMetrics.snapshotHTTPTPS();
summaryMetrics.snapshotSendBatchMsgTPS();
summaryMetrics.snapshotSendMsgTPS();
summaryMetrics.snapshotPushMsgTPS();
} catch (Exception ex) {
logger.warn("eventMesh snapshot tps metrics err", ex);
}
metricsSchedule.scheduleAtFixedRate(() -> {
try {
summaryMetrics.snapshotHTTPTPS();
summaryMetrics.snapshotSendBatchMsgTPS();
summaryMetrics.snapshotSendMsgTPS();
summaryMetrics.snapshotPushMsgTPS();
} catch (Exception ex) {
logger.warn("eventMesh snapshot tps metrics err", ex);
}
}, 0, 1000, TimeUnit.MILLISECONDS);

metricsSchedule.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
logPrintServerMetrics();
} catch (Exception ex) {
logger.warn("eventMesh print metrics err", ex);
}

metricsSchedule.scheduleAtFixedRate(() -> {
try {
logPrintServerMetrics();
} catch (Exception ex) {
logger.warn("eventMesh print metrics err", ex);
}
}, 1000, SummaryMetrics.STATIC_PERIOD, TimeUnit.MILLISECONDS);

logger.info("HTTPMetricsServer started......");
}

Expand Down
Loading

0 comments on commit 42decd4

Please sign in to comment.