From 8f7d7e324a1845487d5cd83c3000420f26b72073 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=85=E6=A2=A6?= <1101766085@qq.com> Date: Sun, 18 Feb 2024 17:22:27 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=B7=BB=E5=8A=A0=E7=A7=81?= =?UTF-8?q?=E6=9C=8D=E7=89=88=E5=AE=A2=E6=88=B7=E7=AB=AF=E5=85=A8=E5=B1=80?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E5=8A=9F=E8=83=BD=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/mqtt/client/MqttClientGlobalTest.java | 47 ++++++++++++ .../client/DefaultMqttClientProcessor.java | 49 +++++++++++-- .../IMqttClientGlobalMessageListener.java | 39 ++++++++++ .../mqtt/core/client/MqttClientCreator.java | 51 +++++++++++-- .../iot/mqtt/codec/MqttTopicSubscription.java | 4 ++ .../iot/mqtt/core/util/ThreadUtil.java | 72 ------------------- .../spring/client/MqttClientTemplate.java | 9 ++- .../config/MqttClientConfiguration.java | 15 ++-- .../client/config/MqttClientProperties.java | 7 ++ 9 files changed, 207 insertions(+), 86 deletions(-) create mode 100644 example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientGlobalTest.java create mode 100644 mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientGlobalMessageListener.java delete mode 100644 mica-mqtt-common/src/main/java/net/dreamlu/iot/mqtt/core/util/ThreadUtil.java diff --git a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientGlobalTest.java b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientGlobalTest.java new file mode 100644 index 00000000..4b88881d --- /dev/null +++ b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/MqttClientGlobalTest.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.client; + +import net.dreamlu.iot.mqtt.core.client.MqttClient; +import org.tio.utils.buffer.ByteBufferUtil; + +/** + * 客户端全局订阅测试 + * + * @author L.cm + */ +public class MqttClientGlobalTest { + + public static void main(String[] args) { + // 初始化 mqtt 客户端 + MqttClient.create() + .ip("127.0.0.1") + .port(1883) + .username("admin") + .password("123456") + // 全局订阅的 topic + .globalSubscribe("/test", "/test/123") + // 全局监听,也会监听到服务端 http api 订阅的数据 + .globalMessageListener((context, topic, message, payload) -> { + System.out.println("topic:\t" + topic); + System.out.println("payload:\t" + ByteBufferUtil.toString(payload)); + }) +// .debug() + .connectSync(); + } + +} diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java index e30070d5..34bea250 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @@ -39,17 +40,19 @@ */ public class DefaultMqttClientProcessor implements IMqttClientProcessor { private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class); - private final int reSubscribeBatchSize; + private final MqttClientCreator mqttClientCreator; private final IMqttClientSession clientSession; private final IMqttClientConnectListener connectListener; + private final IMqttClientGlobalMessageListener globalMessageListener; private final IMqttClientMessageIdGenerator messageIdGenerator; private final TimerTaskService taskService; private final ExecutorService executor; public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) { - this.reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize(); + this.mqttClientCreator = mqttClientCreator; this.clientSession = mqttClientCreator.getClientSession(); this.connectListener = mqttClientCreator.getConnectListener(); + this.globalMessageListener = mqttClientCreator.getGlobalMessageListener(); this.messageIdGenerator = mqttClientCreator.getMessageIdGenerator(); this.taskService = mqttClientCreator.getTaskService(); this.executor = mqttClientCreator.getMqttExecutor(); @@ -118,6 +121,11 @@ private void publishConnectEvent(ChannelContext context) { * @param context ChannelContext */ private void reSendSubscription(ChannelContext context) { + // 0. 全局订阅 + Set globalSubscribe = mqttClientCreator.getGlobalSubscribe(); + if (globalSubscribe != null && !globalSubscribe.isEmpty()) { + globalReSendSubscription(context, globalSubscribe); + } List reSubscriptionList = clientSession.getAndCleanSubscription(); // 1. 判断是否为空 if (reSubscriptionList.isEmpty()) { @@ -125,6 +133,8 @@ private void reSendSubscription(ChannelContext context) { } // 2. 订阅的数量 int subscribedSize = reSubscriptionList.size(); + // 重新订阅批次大小 + int reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize(); if (subscribedSize <= reSubscribeBatchSize) { reSendSubscription(context, reSubscriptionList); } else { @@ -135,6 +145,22 @@ private void reSendSubscription(ChannelContext context) { } } + /** + * 全局订阅,不需要存储 session + * + * @param context ChannelContext + * @param globalReSubscriptionList globalReSubscriptionList + */ + private void globalReSendSubscription(ChannelContext context, Set globalReSubscriptionList) { + int messageId = messageIdGenerator.getId(); + MqttSubscribeMessage message = MqttMessageBuilders.subscribe() + .addSubscriptions(globalReSubscriptionList) + .messageId(messageId) + .build(); + boolean result = Tio.send(context, message); + logger.info("MQTT globalReSubscriptionList:{} messageId:{} resubscribing result:{}", globalReSubscriptionList, messageId, result); + } + /** * 批量重新订阅 * @@ -328,11 +354,26 @@ public void processPubComp(MqttMessage message) { * @param message MqttPublishMessage */ private void invokeListenerForPublish(ChannelContext context, String topicName, MqttPublishMessage message) { + final byte[] payload = message.payload(); + // 全局消息监听器 + if (globalMessageListener != null) { + executor.submit(() -> { + try { + globalMessageListener.onMessage(context, topicName, message, payload); + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + }); + } + // topic 订阅监听 List subscriptionList = clientSession.getMatchedSubscription(topicName); if (subscriptionList.isEmpty()) { - logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", topicName); + if (globalMessageListener == null || mqttClientCreator.isDebug()) { + logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", topicName); + } else { + logger.debug("Mqtt message to accept topic:{} subscriptionList is empty.", topicName); + } } else { - final byte[] payload = message.payload(); subscriptionList.forEach(subscription -> { IMqttClientMessageListener listener = subscription.getListener(); executor.submit(() -> { diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientGlobalMessageListener.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientGlobalMessageListener.java new file mode 100644 index 00000000..d4a0443a --- /dev/null +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/IMqttClientGlobalMessageListener.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.core.client; + +import net.dreamlu.iot.mqtt.codec.MqttPublishMessage; +import org.tio.core.ChannelContext; + +/** + * mqtt 全局消息处理 + * + * @author L.cm + */ +public interface IMqttClientGlobalMessageListener { + + /** + * 监听到消息 + * + * @param context ChannelContext + * @param topic topic + * @param message MqttPublishMessage + * @param payload payload + */ + void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload); + +} diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java index c3a0d33f..ae1f0678 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientCreator.java @@ -16,10 +16,7 @@ package net.dreamlu.iot.mqtt.core.client; -import net.dreamlu.iot.mqtt.codec.MqttConnectReasonCode; -import net.dreamlu.iot.mqtt.codec.MqttConstant; -import net.dreamlu.iot.mqtt.codec.MqttProperties; -import net.dreamlu.iot.mqtt.codec.MqttVersion; +import net.dreamlu.iot.mqtt.codec.*; import org.tio.client.ReconnConf; import org.tio.client.TioClient; import org.tio.client.TioClientConfig; @@ -36,8 +33,10 @@ import org.tio.utils.timer.TimerTaskService; import java.io.InputStream; +import java.util.*; import java.util.concurrent.*; import java.util.function.Consumer; +import java.util.stream.Collectors; /** * mqtt 客户端构造器 @@ -145,6 +144,14 @@ public final class MqttClientCreator { * 连接监听器 */ private IMqttClientConnectListener connectListener; + /** + * 全局订阅 + */ + private Set globalSubscribe; + /** + * 全局消息监听器 + */ + private IMqttClientGlobalMessageListener globalMessageListener; /** * 客户端 session */ @@ -274,6 +281,14 @@ public IMqttClientConnectListener getConnectListener() { return connectListener; } + public Set getGlobalSubscribe() { + return globalSubscribe; + } + + public IMqttClientGlobalMessageListener getGlobalMessageListener() { + return globalMessageListener; + } + public IMqttClientSession getClientSession() { return clientSession; } @@ -447,6 +462,34 @@ public MqttClientCreator connectListener(IMqttClientConnectListener connectListe return this; } + public MqttClientCreator globalSubscribe(String... topics) { + Objects.requireNonNull(topics, "globalSubscribe topics is null."); + List subscriptionList = Arrays.stream(topics) + .map(MqttTopicSubscription::new) + .collect(Collectors.toList()); + return globalSubscribe(subscriptionList); + } + + public MqttClientCreator globalSubscribe(MqttTopicSubscription... topics) { + Objects.requireNonNull(topics, "globalSubscribe topics is null."); + return globalSubscribe(Arrays.asList(topics)); + } + + public MqttClientCreator globalSubscribe(List topicList) { + Objects.requireNonNull(topicList, "globalSubscribe topicList is null."); + if (this.globalSubscribe == null) { + this.globalSubscribe = new HashSet<>(topicList); + } else { + this.globalSubscribe.addAll(topicList); + } + return this; + } + + public MqttClientCreator globalMessageListener(IMqttClientGlobalMessageListener globalMessageListener) { + this.globalMessageListener = globalMessageListener; + return this; + } + public MqttClientCreator clientSession(IMqttClientSession clientSession) { this.clientSession = clientSession; return this; diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttTopicSubscription.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttTopicSubscription.java index ebea7f28..dd67af05 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttTopicSubscription.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttTopicSubscription.java @@ -27,6 +27,10 @@ public final class MqttTopicSubscription { private final String topicFilter; private final MqttSubscriptionOption option; + public MqttTopicSubscription(String topicFilter) { + this(topicFilter, MqttQoS.AT_MOST_ONCE); + } + public MqttTopicSubscription(String topicFilter, MqttQoS qualityOfService) { this.topicFilter = topicFilter; this.option = MqttSubscriptionOption.onlyFromQos(qualityOfService); diff --git a/mica-mqtt-common/src/main/java/net/dreamlu/iot/mqtt/core/util/ThreadUtil.java b/mica-mqtt-common/src/main/java/net/dreamlu/iot/mqtt/core/util/ThreadUtil.java deleted file mode 100644 index 45786258..00000000 --- a/mica-mqtt-common/src/main/java/net/dreamlu/iot/mqtt/core/util/ThreadUtil.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.dreamlu.iot.mqtt.core.util; - -import org.tio.utils.thread.ThreadUtils; -import org.tio.utils.thread.pool.SynThreadPoolExecutor; - -import java.util.concurrent.ExecutorService; - -/** - * mqtt 线程工具类,已弃用,使用 org.tio.utils.thread.ThreadUtils - * - * @author L.cm - */ -@Deprecated -public final class ThreadUtil { - - /** - * 挂起当前线程,建议使用 ThreadUtils#sleep - * - * @param millis 挂起的毫秒数 - * @return 被中断返回false,否则true - */ - public static boolean sleep(long millis) { - return ThreadUtils.sleep(millis); - } - - /** - * 获取 tio group 线程池,建议使用 ThreadUtils#getGroupExecutor - * - * @param groupPoolSize group 线程大小 - * @return ThreadPoolExecutor - */ - public static ExecutorService getGroupExecutor(int groupPoolSize) { - return ThreadUtils.getGroupExecutor(groupPoolSize); - } - - /** - * 获取 getTioExecutor 线程池,建议使用 ThreadUtils#getTioExecutor - * - * @param tioPoolSize tio 线程池大小 - * @return SynThreadPoolExecutor - */ - public static SynThreadPoolExecutor getTioExecutor(int tioPoolSize) { - return ThreadUtils.getTioExecutor(tioPoolSize); - } - - /** - * 获取 mqtt 业务线程池,建议使用 ThreadUtils#getBizExecutor - * - * @param poolSize 业务线程池大小 - * @return ThreadPoolExecutor - */ - public static ExecutorService getMqttExecutor(int poolSize) { - return ThreadUtils.getBizExecutor(poolSize); - } - -} diff --git a/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientTemplate.java b/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientTemplate.java index 992cea07..77eb8b1d 100644 --- a/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientTemplate.java +++ b/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientTemplate.java @@ -38,20 +38,23 @@ public class MqttClientTemplate implements SmartInitializingSingleton, Disposabl public static final String DEFAULT_CLIENT_TEMPLATE_BEAN = "mqttClientTemplate"; private final MqttClientCreator clientCreator; private final ObjectProvider clientConnectListenerObjectProvider; + private final ObjectProvider globalMessageListenerObjectProvider; private final ObjectProvider customizersObjectProvider; private final List tempSubscriptionList; private MqttClient client; public MqttClientTemplate(MqttClientCreator clientCreator, ObjectProvider clientConnectListenerObjectProvider) { - this(clientCreator, clientConnectListenerObjectProvider, null); + this(clientCreator, clientConnectListenerObjectProvider, null, null); } public MqttClientTemplate(MqttClientCreator clientCreator, ObjectProvider clientConnectListenerObjectProvider, + ObjectProvider globalMessageListenerObjectProvider, ObjectProvider customizersObjectProvider) { this.clientCreator = clientCreator; this.clientConnectListenerObjectProvider = clientConnectListenerObjectProvider; + this.globalMessageListenerObjectProvider = globalMessageListenerObjectProvider; this.customizersObjectProvider = customizersObjectProvider; this.tempSubscriptionList = new ArrayList<>(); } @@ -286,6 +289,10 @@ void addSubscriptionList(String[] topicFilters, MqttQoS qos, IMqttClientMessageL public void afterSingletonsInstantiated() { // 配置客户端连接监听器 clientConnectListenerObjectProvider.ifAvailable(clientCreator::connectListener); + // 全局监听器 + if (globalMessageListenerObjectProvider != null) { + globalMessageListenerObjectProvider.ifAvailable(clientCreator::globalMessageListener); + } // 自定义处理 if (customizersObjectProvider != null) { customizersObjectProvider.ifAvailable(customizer -> customizer.customize(clientCreator)); diff --git a/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/config/MqttClientConfiguration.java b/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/config/MqttClientConfiguration.java index ba258264..02af9828 100644 --- a/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/config/MqttClientConfiguration.java +++ b/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/config/MqttClientConfiguration.java @@ -16,10 +16,8 @@ package net.dreamlu.iot.mqtt.spring.client.config; -import net.dreamlu.iot.mqtt.core.client.IMqttClientConnectListener; -import net.dreamlu.iot.mqtt.core.client.IMqttClientSession; -import net.dreamlu.iot.mqtt.core.client.MqttClient; -import net.dreamlu.iot.mqtt.core.client.MqttClientCreator; +import net.dreamlu.iot.mqtt.codec.MqttTopicSubscription; +import net.dreamlu.iot.mqtt.core.client.*; import net.dreamlu.iot.mqtt.spring.client.MqttClientCustomizer; import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribeDetector; import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate; @@ -35,6 +33,7 @@ import org.springframework.util.StringUtils; import java.nio.charset.StandardCharsets; +import java.util.List; /** * mqtt client 配置 @@ -103,6 +102,11 @@ public MqttClientCreator mqttClientCreator(MqttClientProperties properties, } }); } + // 全局订阅 + List globalSubscribe = properties.getGlobalSubscribe(); + if (globalSubscribe != null && !globalSubscribe.isEmpty()) { + clientCreator.globalSubscribe(globalSubscribe); + } // 客户端 session clientSessionObjectProvider.ifAvailable(clientCreator::clientSession); return clientCreator; @@ -111,8 +115,9 @@ public MqttClientCreator mqttClientCreator(MqttClientProperties properties, @Bean(MqttClientTemplate.DEFAULT_CLIENT_TEMPLATE_BEAN) public MqttClientTemplate mqttClientTemplate(MqttClientCreator mqttClientCreator, ObjectProvider clientConnectListenerObjectProvider, + ObjectProvider globalMessageListenerObjectProvider, ObjectProvider customizers) { - return new MqttClientTemplate(mqttClientCreator, clientConnectListenerObjectProvider, customizers); + return new MqttClientTemplate(mqttClientCreator, clientConnectListenerObjectProvider, globalMessageListenerObjectProvider, customizers); } @Bean diff --git a/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/config/MqttClientProperties.java b/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/config/MqttClientProperties.java index 4821739d..ece0b171 100644 --- a/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/config/MqttClientProperties.java +++ b/starter/mica-mqtt-client-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/config/MqttClientProperties.java @@ -20,11 +20,14 @@ import lombok.Setter; import net.dreamlu.iot.mqtt.codec.MqttConstant; import net.dreamlu.iot.mqtt.codec.MqttQoS; +import net.dreamlu.iot.mqtt.codec.MqttTopicSubscription; import net.dreamlu.iot.mqtt.codec.MqttVersion; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.util.unit.DataSize; import org.tio.utils.buffer.ByteBufferAllocator; +import java.util.List; + /** * MqttClient 配置 * @@ -123,6 +126,10 @@ public class MqttClientProperties { * 遗嘱消息 */ private WillMessage willMessage; + /** + * 全局订阅 + */ + private List globalSubscribe; /** * 是否开启监控,默认:false 不开启,节省内存 */