diff --git a/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/Mqtt5ClientTest.java b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/Mqtt5ClientTest.java new file mode 100644 index 00000000..15a8d856 --- /dev/null +++ b/example/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/client/Mqtt5ClientTest.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.client; + +import net.dreamlu.iot.mqtt.codec.MqttPublishMessage; +import net.dreamlu.iot.mqtt.codec.MqttQoS; +import net.dreamlu.iot.mqtt.codec.MqttVersion; +import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener; +import net.dreamlu.iot.mqtt.core.client.MqttClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tio.core.ChannelContext; + +import java.nio.charset.StandardCharsets; +import java.util.Timer; +import java.util.TimerTask; + +/** + * 客户端测试 + * + * @author L.cm + */ +public class Mqtt5ClientTest { + private static final Logger logger = LoggerFactory.getLogger(Mqtt5ClientTest.class); + + public static void main(String[] args) { + // 初始化 mqtt 客户端 + MqttClient client = MqttClient.create() + .ip("127.0.0.1") + .port(1883) + .username("mica") + .password("mica") + .version(MqttVersion.MQTT_5) + .cleanSession(false) + .sessionExpiryIntervalSecs(7200) + .connectListener(new MqttClientConnectListener()) + .willMessage(builder -> { + builder.topic("/test/offline") + .messageText("down") + .retain(false) + .qos(MqttQoS.AT_MOST_ONCE); // 遗嘱消息 + }) + // 同步连接,也可以使用 connect() 异步(可以避免 broker 没启动照成启动卡住),但是下面的订阅和发布可能还没连接成功。 + .connectSync(); + + client.subQos0("/test/123", new IMqttClientMessageListener() { + @Override + public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) { + // 订阅成功之后触发,可在此处做一些业务逻辑 + logger.info("topicFilter:{} MqttQoS:{} 订阅成功!!!", topicFilter, mqttQoS); + } + + @Override + public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) { + logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8)); + } + }); + + client.publish("/test/client", "mica最牛皮1".getBytes(StandardCharsets.UTF_8)); + client.publish("/test/client", "mica最牛皮2".getBytes(StandardCharsets.UTF_8)); + client.publish("/test/client", "mica最牛皮3".getBytes(StandardCharsets.UTF_8)); + + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8)); + } + }, 1000, 2000); + } +} diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttEncoder.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttEncoder.java index 2f816b51..8306f61d 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttEncoder.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttEncoder.java @@ -19,6 +19,7 @@ import org.tio.core.ChannelContext; import org.tio.utils.buffer.ByteBufferAllocator; import org.tio.utils.buffer.ByteBufferUtil; +import org.tio.utils.hutool.FastByteBuffer; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -138,7 +139,7 @@ private static ByteBuffer encodeConnectMessage(ChannelContext ctx, final byte[] willPropertiesBytes; if (variableHeader.isWillFlag()) { willPropertiesBytes = encodePropertiesIfNeeded(mqttVersion, payload.willProperties()); - payloadBufferSize += propertiesBytes.length; + payloadBufferSize += willPropertiesBytes.length; } else { willPropertiesBytes = ByteBufferUtil.EMPTY_BYTES; } @@ -505,7 +506,7 @@ private static byte[] encodePropertiesIfNeeded(MqttVersion mqttVersion, } private static byte[] encodeProperties(MqttProperties mqttProperties) { - WriteBuffer writeBuffer = new WriteBuffer(128); + FastByteBuffer writeBuffer = new FastByteBuffer(128); for (MqttProperties.MqttProperty property : mqttProperties.listAll()) { MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(property.propertyId); switch (propertyType) { @@ -528,7 +529,7 @@ private static byte[] encodeProperties(MqttProperties mqttProperties) { writeBuffer.writeVarLengthInt(property.propertyId); final short twoBytesInPropValue = ((MqttProperties.IntegerProperty) property).value.shortValue(); - writeBuffer.writeShort(twoBytesInPropValue); + writeBuffer.writeShortBE(twoBytesInPropValue); break; case PUBLICATION_EXPIRY_INTERVAL: case SESSION_EXPIRY_INTERVAL: @@ -536,7 +537,7 @@ private static byte[] encodeProperties(MqttProperties mqttProperties) { case MAXIMUM_PACKET_SIZE: writeBuffer.writeVarLengthInt(property.propertyId); final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value; - writeBuffer.writeInt(fourBytesIntPropValue); + writeBuffer.writeIntBE(fourBytesIntPropValue); break; case SUBSCRIPTION_IDENTIFIER: writeBuffer.writeVarLengthInt(property.propertyId); @@ -566,7 +567,7 @@ private static byte[] encodeProperties(MqttProperties mqttProperties) { case AUTHENTICATION_DATA: writeBuffer.writeVarLengthInt(property.propertyId); final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value; - writeBuffer.writeShort((short) binaryPropValue.length); + writeBuffer.writeShortBE((short) binaryPropValue.length); writeBuffer.writeBytes(binaryPropValue, 0, binaryPropValue.length); break; default: @@ -614,12 +615,12 @@ private static int getVariableLengthInt(int num) { return count; } - private static void writeEagerUTF8String(WriteBuffer buf, String s) { + private static void writeEagerUTF8String(FastByteBuffer buf, String s) { if (s == null) { - buf.writeShort((short) 0); + buf.writeShortBE((short) 0); } else { byte[] bytes = s.getBytes(StandardCharsets.UTF_8); - buf.writeShort((short) bytes.length); + buf.writeShortBE((short) bytes.length); buf.writeBytes(bytes); } } diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/WriteBuffer.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/WriteBuffer.java deleted file mode 100644 index 6fd49d84..00000000 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/WriteBuffer.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.dreamlu.iot.mqtt.codec; - -import org.tio.utils.hutool.FastByteBuffer; - -import java.nio.ByteBuffer; - -/** - * 写出的 buffer - * - * @author L.cm - */ -public class WriteBuffer { - private final FastByteBuffer buffer; - - public WriteBuffer() { - this(new FastByteBuffer()); - } - - public WriteBuffer(int size) { - this(new FastByteBuffer(size)); - } - - public WriteBuffer(FastByteBuffer buffer) { - this.buffer = buffer; - } - - /** - * 输出一个byte类型的数据 - * - * @param b 待输出数值 - */ - public void writeByte(byte b) { - buffer.append(b); - } - - /** - * 输出一个bytes类型的数据 - * - * @param bytes 待输出数值 - */ - public void writeBytes(byte[] bytes) { - buffer.append(bytes); - } - - /** - * 输出一个bytes类型的数据 - * - * @param bytes 待输出数值 - * @param off off - * @param len len - */ - public void writeBytes(byte[] bytes, int off, int len) { - buffer.append(bytes, off, len); - } - - /** - * 输出一个short类型的数据 - * - * @param v short数值 - */ - public void writeShort(short v) { - byte[] bytes = new byte[2]; - bytes[0] = (byte) ((v >>> 8) & 0xFF); - bytes[1] = (byte) (v & 0xFF); - buffer.append(bytes); - } - - /** - * 输出int数值,占用4个字节 - * - * @param v int数值 - */ - public void writeInt(int v) { - byte[] bytes = new byte[4]; - bytes[0] = (byte) ((v >>> 24) & 0xFF); - bytes[1] = (byte) ((v >>> 16) & 0xFF); - bytes[2] = (byte) ((v >>> 8) & 0xFF); - bytes[3] = (byte) (v & 0xFF); - buffer.append(bytes); - } - - /** - * 输出long数值,占用8个字节 - * - * @param v long数值 - */ - public void writeLong(long v) { - byte[] bytes = new byte[8]; - bytes[0] = (byte) ((v >>> 56) & 0xFF); - bytes[1] = (byte) ((v >>> 48) & 0xFF); - bytes[2] = (byte) ((v >>> 40) & 0xFF); - bytes[3] = (byte) ((v >>> 32) & 0xFF); - bytes[4] = (byte) ((v >>> 24) & 0xFF); - bytes[5] = (byte) ((v >>> 16) & 0xFF); - bytes[6] = (byte) ((v >>> 8) & 0xFF); - bytes[7] = (byte) (v & 0xFF); - buffer.append(bytes); - } - - /** - * 写可变长度整数 - * - * @param num num - */ - public void writeVarLengthInt(int num) { - do { - int digit = num % 128; - num /= 128; - if (num > 0) { - digit |= 0x80; - } - writeByte((byte) digit); - } while (num > 0); - } - - /** - * 转换成数组 - * - * @return byte array - */ - public byte[] toArray() { - return buffer.toArray(); - } - - /** - * 转换成数组 - * - * @return ByteBuffer - */ - public ByteBuffer toBuffer() { - return ByteBuffer.wrap(this.toArray()); - } - - /** - * 重置 - */ - public void reset() { - buffer.reset(); - } - - /** - * buffer 大小 - * - * @return size - */ - public int size() { - return buffer.size(); - } - -} diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/http/websocket/MqttWsMsgHandler.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/http/websocket/MqttWsMsgHandler.java index b9e3aa5a..60196935 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/http/websocket/MqttWsMsgHandler.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/http/websocket/MqttWsMsgHandler.java @@ -17,7 +17,6 @@ package net.dreamlu.iot.mqtt.core.server.http.websocket; import net.dreamlu.iot.mqtt.codec.MqttMessage; -import net.dreamlu.iot.mqtt.codec.WriteBuffer; import net.dreamlu.iot.mqtt.core.server.MqttMessageInterceptors; import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; import org.slf4j.Logger; @@ -30,6 +29,7 @@ import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.utils.buffer.ByteBufferUtil; +import org.tio.utils.hutool.FastByteBuffer; import org.tio.websocket.common.WsRequest; import org.tio.websocket.common.WsResponse; import org.tio.websocket.server.handler.IWsMsgHandler; @@ -95,7 +95,7 @@ public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, Ch @Override public void onAfterHandshaked(HttpRequest request, HttpResponse response, ChannelContext context) { // 在连接中添加 WriteBuffer 用来处理半包消息 - context.computeIfAbsent(MQTT_WS_MSG_BODY_KEY, key -> new WriteBuffer()); + context.computeIfAbsent(MQTT_WS_MSG_BODY_KEY, key -> new FastByteBuffer()); } /** @@ -103,7 +103,7 @@ public void onAfterHandshaked(HttpRequest request, HttpResponse response, Channe */ @Override public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext context) throws Exception { - WriteBuffer wsBody = context.get(MQTT_WS_MSG_BODY_KEY); + FastByteBuffer wsBody = context.get(MQTT_WS_MSG_BODY_KEY); ByteBuffer buffer = getMqttBody(wsBody, bytes); if (buffer == null) { return null; @@ -173,7 +173,7 @@ public Object onText(WsRequest wsRequest, String text, ChannelContext context) { * @param bytes 消息类容 * @return ByteBuffer */ - private static synchronized ByteBuffer getMqttBody(WriteBuffer wsBody, byte[] bytes) { + private static synchronized ByteBuffer getMqttBody(FastByteBuffer wsBody, byte[] bytes) { wsBody.writeBytes(bytes); int length = wsBody.size(); if (length < 2) {