From 356ab224935161fd25b17fe0205378627751c03a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=85=E6=A2=A6?= <1101766085@qq.com> Date: Wed, 22 May 2024 17:33:36 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E5=90=8C=E6=AD=A5=E7=A7=81?= =?UTF-8?q?=E6=9C=8D=E9=83=A8=E5=88=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/mqtt/aliyun/MqttClientTest.java | 16 +- .../net/dreamlu/iot/mqtt/broker/DeviceC.java | 12 +- .../iot/mqtt/client/Mqtt5ClientTest.java | 12 +- .../iot/mqtt/client/MqttClientSyncTest.java | 2 + .../iot/mqtt/client/MqttClientTest.java | 12 +- .../iot/mqtt/huawei/MqttClientTest.java | 14 +- .../iot/mqtt/core/client/MqttClient.java | 151 +++++++++++------- .../core/client/MqttClientHeartbeatTask.java | 122 ++++++++++++++ 8 files changed, 239 insertions(+), 102 deletions(-) create mode 100644 mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientHeartbeatTask.java diff --git a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java index 4a7d0bbb..c3d909a3 100644 --- a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java +++ b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java @@ -19,8 +19,6 @@ import net.dreamlu.iot.mqtt.core.client.MqttClient; import java.nio.charset.StandardCharsets; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ThreadLocalRandom; /** @@ -57,15 +55,11 @@ public static void main(String[] args) { System.out.println(topic + '\t' + new String(payload, StandardCharsets.UTF_8)); }); - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - int LightSwitch = ThreadLocalRandom.current().nextBoolean() ? 0 : 1; - String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":" + LightSwitch + "}}"; - client.publish("/sys/" + productKey + "/" + deviceName + "/thing/event/property/post", content.getBytes(StandardCharsets.UTF_8)); - } - }, 3000, 3000); + client.schedule(() -> { + int LightSwitch = ThreadLocalRandom.current().nextBoolean() ? 0 : 1; + String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":" + LightSwitch + "}}"; + client.publish("/sys/" + productKey + "/" + deviceName + "/thing/event/property/post", content.getBytes(StandardCharsets.UTF_8)); + }, 3000); } } diff --git a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/broker/DeviceC.java b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/broker/DeviceC.java index 730f0db4..a2cea40e 100644 --- a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/broker/DeviceC.java +++ b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/broker/DeviceC.java @@ -19,8 +19,6 @@ import net.dreamlu.iot.mqtt.core.client.MqttClient; import java.nio.charset.StandardCharsets; -import java.util.Timer; -import java.util.TimerTask; /** * 设备 C,每 5 秒上报一个数据 @@ -38,13 +36,9 @@ public static void main(String[] args) { .password("123456") .connectSync(); - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - client.publish("/a/door/open", "open".getBytes(StandardCharsets.UTF_8)); - } - }, 5000, 5000); + client.schedule(() -> { + client.publish("/a/door/open", "open".getBytes(StandardCharsets.UTF_8)); + }, 5000); } } diff --git a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/Mqtt5ClientTest.java b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/Mqtt5ClientTest.java index 15a8d856..cd4d58f6 100644 --- a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/Mqtt5ClientTest.java +++ b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/Mqtt5ClientTest.java @@ -26,8 +26,6 @@ import org.tio.core.ChannelContext; import java.nio.charset.StandardCharsets; -import java.util.Timer; -import java.util.TimerTask; /** * 客户端测试 @@ -74,12 +72,8 @@ public void onMessage(ChannelContext context, String topic, MqttPublishMessage m client.publish("/test/client", "mica最牛皮2".getBytes(StandardCharsets.UTF_8)); client.publish("/test/client", "mica最牛皮3".getBytes(StandardCharsets.UTF_8)); - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8)); - } - }, 1000, 2000); + client.schedule(() -> { + client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8)); + }, 1000); } } diff --git a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientSyncTest.java b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientSyncTest.java index f47b6cb1..47d62938 100644 --- a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientSyncTest.java +++ b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientSyncTest.java @@ -49,5 +49,7 @@ public static void main(String[] args) { // 连接上之后发送消息,注意:连接时出现异常等就不会发出 client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8)); + // 2.3.0 开始支持,可停止 +// client.stop(); } } diff --git a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientTest.java b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientTest.java index 2984e9eb..38f8e2f5 100644 --- a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientTest.java +++ b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientTest.java @@ -25,8 +25,6 @@ import org.tio.core.ChannelContext; import java.nio.charset.StandardCharsets; -import java.util.Timer; -import java.util.TimerTask; /** * 客户端测试 @@ -76,12 +74,8 @@ public void onMessage(ChannelContext context, String topic, MqttPublishMessage m client.publish("/test/client", "mica最牛皮2".getBytes(StandardCharsets.UTF_8)); client.publish("/test/client", "mica最牛皮3".getBytes(StandardCharsets.UTF_8)); - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8)); - } - }, 1000, 2000); + client.schedule(() -> { + client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8)); + }, 2000); } } diff --git a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/huawei/MqttClientTest.java b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/huawei/MqttClientTest.java index 1343e044..c5780eb2 100644 --- a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/huawei/MqttClientTest.java +++ b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/huawei/MqttClientTest.java @@ -19,8 +19,6 @@ import net.dreamlu.iot.mqtt.core.client.MqttClient; import java.nio.charset.StandardCharsets; -import java.util.Timer; -import java.util.TimerTask; /** * 客户端测试 @@ -54,7 +52,7 @@ public static void main(String[] args) { .connectSync(); // 订阅命令下发topic - String cmdRequestTopic = "$oc/devices/" + deviceId + "/sys/commands/#"; + String cmdRequestTopic = "$oc/devices/" + deviceId + "/sys/commands/#"; client.subQos0(cmdRequestTopic, (context, topic, message, payload) -> { System.out.println(topic + '\t' + new String(payload, StandardCharsets.UTF_8)); @@ -64,13 +62,9 @@ public static void main(String[] args) { String reportTopic = "$oc/devices/" + deviceId + "/sys/properties/report"; String jsonMsg = "{\"services\":[{\"service_id\":\"Temperature\", \"properties\":{\"value\":57}},{\"service_id\":\"Battery\",\"properties\":{\"level\":88}}]}"; - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - client.publish(reportTopic, jsonMsg.getBytes(StandardCharsets.UTF_8)); - } - }, 3000, 3000); + client.schedule(() -> { + client.publish(reportTopic, jsonMsg.getBytes(StandardCharsets.UTF_8)); + }, 3000); } } diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java index 158377de..6c66573a 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java @@ -31,9 +31,12 @@ import org.tio.core.Tio; import org.tio.core.intf.Packet; import org.tio.utils.thread.ThreadUtils; +import org.tio.utils.timer.TimerTask; import org.tio.utils.timer.TimerTaskService; import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -54,6 +57,7 @@ public final class MqttClient { private final TioClientConfig clientTioConfig; private final IMqttClientSession clientSession; private final TimerTaskService taskService; + private final ExecutorService mqttExecutor; private final IMqttClientMessageIdGenerator messageIdGenerator; private ClientChannelContext context; @@ -66,6 +70,7 @@ public static MqttClientCreator create() { this.config = config; this.clientTioConfig = tioClient.getTioClientConfig(); this.taskService = config.getTaskService(); + this.mqttExecutor = config.getMqttExecutor(); this.clientSession = config.getClientSession(); this.messageIdGenerator = config.getMessageIdGenerator(); startHeartbeatTask(); @@ -372,6 +377,81 @@ public boolean publish(String topic, byte[] payload, MqttQoS qos, Consumer new org.tio.utils.timer.TimerTask(delay) { + @Override + public void run() { + try { + // 1. 再次添加 任务 + systemTimer.add(this); + // 2. 执行任务 + if (executor == null) { + command.run(); + } else { + executor.execute(command); + } + } catch (Exception e) { + logger.error("Mqtt client schedule error", e); + } + } + })); + } + + /** + * 添加定时任务 + * + * @param command runnable + * @param delay delay + * @return TimerTask + */ + public TimerTask scheduleOnce(Runnable command, long delay) { + return scheduleOnce(command, delay, null); + } + + /** + * 添加定时任务 + * + * @param command runnable + * @param delay delay + * @param executor 用于自定义线程池,处理耗时业务 + * @return TimerTask + */ + public TimerTask scheduleOnce(Runnable command, long delay, Executor executor) { + return config.getTaskService().addTask((systemTimer -> new TimerTask(delay) { + @Override + public void run() { + try { + if (executor == null) { + command.run(); + } else { + executor.execute(command); + } + } catch (Exception e) { + logger.error("Mqtt client schedule once error", e); + } + } + })); + } + /** * 异步连接 * @@ -456,11 +536,7 @@ public boolean reconnect(Node serverNode) { * @return 是否需要重新订阅 */ public static boolean isNeedReSub(ChannelContext context) { - if (context.containsKey(MQTT_NEED_RE_SUB)) { - context.remove(MQTT_NEED_RE_SUB); - return true; - } - return false; + return context.getAndRemove(MQTT_NEED_RE_SUB) != null; } /** @@ -492,6 +568,19 @@ public boolean stop() { this.disconnect(); // 3. 停止 tio boolean result = tioClient.stop(); + // 4. 停止工作线程 + try { + mqttExecutor.shutdown(); + } catch (Exception e1) { + logger.error(e1.getMessage(), e1); + } + try { + // 等待线程池中的任务结束,客户端等待 6 秒基本上足够了 + result &= mqttExecutor.awaitTermination(6, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error(e.getMessage(), e); + } logger.info("MqttClient stop result:{}", result); // 4. 清理 session this.clientSession.clean(); @@ -568,58 +657,12 @@ public boolean isDisconnected() { * mqtt 定时任务:发心跳 */ private void startHeartbeatTask() { - // 先判断用户是否开启心跳检测 - final long heartbeatTimeout = TimeUnit.SECONDS.toMillis(config.getKeepAliveSecs()); - if (heartbeatTimeout <= 0) { + final int keepAliveSecs = config.getKeepAliveSecs(); + if (keepAliveSecs <= 0) { logger.warn("用户取消了 mica-mqtt 的心跳定时发送功能,请用户自己去完成心跳机制"); return; } - final ClientGroupStat clientGroupStat = (ClientGroupStat) clientTioConfig.groupStat; - final TioClientHandler clientHandler = clientTioConfig.getTioClientHandler(); - final String id = clientTioConfig.getId(); - new Thread(() -> { - while (!clientTioConfig.isStopped()) { - try { - Set set = clientTioConfig.connecteds; - long currTime = System.currentTimeMillis(); - for (ChannelContext entry : set) { - ClientChannelContext channelContext = (ClientChannelContext) entry; - if (channelContext.isClosed() || channelContext.isRemoved()) { - continue; - } - long interval = currTime - channelContext.stat.latestTimeOfSentPacket; - if (interval >= heartbeatTimeout) { - Packet packet = clientHandler.heartbeatPacket(channelContext); - if (packet != null) { - Boolean result = Tio.send(channelContext, packet); - if (clientTioConfig.debug && logger.isInfoEnabled()) { - logger.info("{} 发送心跳包 result:{}", channelContext, result); - } - } - } - } - // 打印连接信息 - if (clientTioConfig.debug && logger.isInfoEnabled()) { - if (clientTioConfig.statOn) { - logger.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", id, set.size(), clientGroupStat.closed.sum(), - clientGroupStat.receivedPackets.sum(), clientGroupStat.receivedBytes.sum(), clientGroupStat.handledPackets.sum(), - clientGroupStat.sentPackets.sum(), clientGroupStat.sentBytes.sum()); - } else { - logger.info("[{}]: curr:{}, closed:{}", id, set.size(), clientGroupStat.closed.sum()); - } - } - } catch (Throwable e) { - logger.error("", e); - } finally { - try { - Thread.sleep(heartbeatTimeout / 3); - } catch (Throwable e) { - Thread.currentThread().interrupt(); - logger.error(e.getMessage(), e); - } - } - } - }, "mqtt-heartbeat" + id).start(); + taskService.addTask(systemTimer -> new MqttClientHeartbeatTask(systemTimer, clientTioConfig, keepAliveSecs)); } } diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientHeartbeatTask.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientHeartbeatTask.java new file mode 100644 index 00000000..55dfa15e --- /dev/null +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientHeartbeatTask.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.core.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tio.client.ClientChannelContext; +import org.tio.client.ClientGroupStat; +import org.tio.client.TioClientConfig; +import org.tio.client.intf.TioClientHandler; +import org.tio.core.ChannelContext; +import org.tio.core.Tio; +import org.tio.core.intf.Packet; +import org.tio.utils.timer.Timer; +import org.tio.utils.timer.TimerTask; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * mqtt 客户端心跳任务 + * + * @author L.cm + */ +public class MqttClientHeartbeatTask extends TimerTask { + private static final Logger logger = LoggerFactory.getLogger(MqttClientHeartbeatTask.class); + private final Timer timer; + private final TioClientConfig clientTioConfig; + private final ClientGroupStat clientGroupStat; + private final TioClientHandler tioHandler; + private final String id; + /** + * 心跳超时时间,采用 keepAliveSecs - delayMs(定时的时间),经量保证 60s 发一次心跳 + */ + private final long intervalTimeout; + + public MqttClientHeartbeatTask(Timer timer, TioClientConfig clientTioConfig, int keepAliveSecs) { + super(TimeUnit.SECONDS.toMillis(keepAliveSecs / 3)); + this.timer = timer; + this.clientTioConfig = clientTioConfig; + this.clientGroupStat = (ClientGroupStat) clientTioConfig.groupStat; + this.tioHandler = clientTioConfig.getTioClientHandler(); + this.id = clientTioConfig.getId(); + this.intervalTimeout = TimeUnit.SECONDS.toMillis(keepAliveSecs) - delayMs; + } + + @Override + public void run() { + // 1. 添加 task,保持后续执行 + timer.add(this); + // 2. 已经停止,跳过 + if (clientTioConfig.isStopped()) { + return; + } + Set set = clientTioConfig.connecteds; + long currTime = System.currentTimeMillis(); + long decodeQueueSizeAll = 0; + long handlerQueueSizeAll = 0; + long sendQueueSizeAll = 0; + try { + for (ChannelContext entry : set) { + ClientChannelContext channelContext = (ClientChannelContext) entry; + if (channelContext.isClosed() || channelContext.isRemoved()) { + continue; + } + long interval = currTime - channelContext.stat.latestTimeOfSentPacket; + if (interval > intervalTimeout) { + Packet packet = tioHandler.heartbeatPacket(channelContext); + if (packet != null) { + boolean result = Tio.send(channelContext, packet); + if (clientTioConfig.debug && logger.isInfoEnabled()) { + logger.info("{} 发送心跳包 result:{}", channelContext, result); + } + } + } + // 客户端队列数据详情 + int decodeQueueSize = channelContext.getDecodeQueueSize(); + if (decodeQueueSize > 0) { + decodeQueueSizeAll += decodeQueueSize; + } + int handlerQueueSize = channelContext.getHandlerQueueSize(); + if (handlerQueueSize > 0) { + handlerQueueSizeAll += handlerQueueSize; + } + int sendQueueSize = channelContext.getSendQueueSize(); + if (sendQueueSize > 0) { + sendQueueSizeAll += sendQueueSize; + } + } + // 打印连接信息 + if (clientTioConfig.debug && logger.isInfoEnabled()) { + if (clientTioConfig.statOn) { + logger.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b), QueueSize[decode:{},handler:{},send:{}]", + id, set.size(), clientGroupStat.closed.sum(), + clientGroupStat.receivedPackets.sum(), clientGroupStat.receivedBytes.sum(), clientGroupStat.handledPackets.sum(), + clientGroupStat.sentPackets.sum(), clientGroupStat.sentBytes.sum(), + decodeQueueSizeAll, handlerQueueSizeAll, sendQueueSizeAll); + } else { + logger.info("[{}]: curr:{}, closed:{}, QueueSize[decode:{},handler:{},send:{}]", + id, set.size(), clientGroupStat.closed.sum(), + decodeQueueSizeAll, handlerQueueSizeAll, sendQueueSizeAll); + } + } + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + } +}