From 5a959e0240d940ccf7b5b645d8cbf72e28e01503 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=85=E6=A2=A6?= <1101766085@qq.com> Date: Wed, 21 Feb 2024 17:38:06 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E5=AE=8C=E5=96=84=20IMqttMessageI?= =?UTF-8?q?nterceptor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/server/MqttMessageInterceptors.java | 32 +++++++++++++++++-- .../core/server/MqttServerAioListener.java | 11 +++++-- .../interceptor/IMqttMessageInterceptor.java | 27 ++++++++++++++-- 3 files changed, 64 insertions(+), 6 deletions(-) diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttMessageInterceptors.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttMessageInterceptors.java index 0c20cb8d..3cdb7f35 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttMessageInterceptors.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttMessageInterceptors.java @@ -26,7 +26,6 @@ /** * mqtt 消息拦截器集合 * - * @since 1.3.9 * @author L.cm */ public class MqttMessageInterceptors { @@ -45,6 +44,20 @@ void add(IMqttMessageInterceptor interceptor) { this.interceptors.add(interceptor); } + /** + * 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected + * + * @param context ChannelContext + * @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败 + * @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接 + * @throws Exception Exception + */ + public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) throws Exception { + for (IMqttMessageInterceptor interceptor : interceptors) { + interceptor.onAfterConnected(context, isConnected, isReconnect); + } + } + /** * 接收到TCP层传过来的数据后 * @@ -64,8 +77,9 @@ public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) thro * @param context ChannelContext * @param message MqttMessage * @param packetSize packetSize + * @throws Exception Exception */ - public void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) { + public void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) throws Exception { for (IMqttMessageInterceptor interceptor : interceptors) { interceptor.onAfterDecoded(context, message, packetSize); } @@ -85,4 +99,18 @@ public void onAfterHandled(ChannelContext context, MqttMessage message, long cos } } + /** + * 处理一个消息包后 + * + * @param context ChannelContext + * @param message MqttMessage + * @param isSentSuccess 是否发送成功 + * @throws Exception Exception + */ + public void onAfterSent(ChannelContext context, MqttMessage message, boolean isSentSuccess) throws Exception { + for (IMqttMessageInterceptor interceptor : interceptors) { + interceptor.onAfterSent(context, message, isSentSuccess); + } + } + } diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java index e9eaf64b..fa59654a 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java @@ -63,6 +63,11 @@ public boolean onHeartbeatTimeout(ChannelContext context, long interval, int hea return false; } + @Override + public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) throws Exception { + messageInterceptors.onAfterConnected(context, isConnected, isReconnect); + } + @Override public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) { // 标记认证为 false @@ -139,11 +144,13 @@ private void notify(ChannelContext context, String clientId, String username, St } @Override - public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) { + public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) throws Exception { // 1. http 请求处理 boolean isHttpRequest = context.get(MqttConst.IS_HTTP) != null; if (isHttpRequest) { MqttHttpHelper.close(context, packet); + } else if (packet instanceof MqttMessage) { + messageInterceptors.onAfterSent(context, (MqttMessage) packet, isSentSuccess); } } @@ -153,7 +160,7 @@ public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) thro } @Override - public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) { + public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) throws Exception { if (packet instanceof MqttMessage) { messageInterceptors.onAfterDecoded(context, (MqttMessage) packet, packetSize); } diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/interceptor/IMqttMessageInterceptor.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/interceptor/IMqttMessageInterceptor.java index 9a0ba408..f3a9533d 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/interceptor/IMqttMessageInterceptor.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/interceptor/IMqttMessageInterceptor.java @@ -22,11 +22,22 @@ /** * mqtt 消息拦截器 * - * @since 1.3.9 * @author L.cm */ public interface IMqttMessageInterceptor { + /** + * 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected + * + * @param context ChannelContext + * @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败 + * @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接 + * @throws Exception Exception + */ + default void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) throws Exception { + + } + /** * 接收到TCP层传过来的数据后 * @@ -44,8 +55,9 @@ default void onAfterReceivedBytes(ChannelContext context, int receivedBytes) thr * @param context ChannelContext * @param message MqttMessage * @param packetSize packetSize + * @throws Exception Exception */ - default void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) { + default void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) throws Exception { } @@ -61,4 +73,15 @@ default void onAfterHandled(ChannelContext context, MqttMessage message, long co } + /** + * 处理一个消息包后 + * + * @param context ChannelContext + * @param message MqttMessage + * @param isSentSuccess 是否发送成功 + * @throws Exception Exception + */ + default void onAfterSent(ChannelContext context, MqttMessage message, boolean isSentSuccess) throws Exception { + + } }