Skip to content

Commit

Permalink
🐛 修复 mqtt-client mqtt5 连接
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunMengLu committed Apr 4, 2024
1 parent b90c952 commit 7136bc0
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 178 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.iot.mqtt.client;

import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;

import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;

/**
* 客户端测试
*
* @author L.cm
*/
public class Mqtt5ClientTest {
private static final Logger logger = LoggerFactory.getLogger(Mqtt5ClientTest.class);

public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("mica")
.password("mica")
.version(MqttVersion.MQTT_5)
.cleanSession(false)
.sessionExpiryIntervalSecs(7200)
.connectListener(new MqttClientConnectListener())
.willMessage(builder -> {
builder.topic("/test/offline")
.messageText("down")
.retain(false)
.qos(MqttQoS.AT_MOST_ONCE); // 遗嘱消息
})
// 同步连接,也可以使用 connect() 异步(可以避免 broker 没启动照成启动卡住),但是下面的订阅和发布可能还没连接成功。
.connectSync();

client.subQos0("/test/123", new IMqttClientMessageListener() {
@Override
public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
// 订阅成功之后触发,可在此处做一些业务逻辑
logger.info("topicFilter:{} MqttQoS:{} 订阅成功!!!", topicFilter, mqttQoS);
}

@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
}
});

client.publish("/test/client", "mica最牛皮1".getBytes(StandardCharsets.UTF_8));
client.publish("/test/client", "mica最牛皮2".getBytes(StandardCharsets.UTF_8));
client.publish("/test/client", "mica最牛皮3".getBytes(StandardCharsets.UTF_8));

Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
}
}, 1000, 2000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.tio.core.ChannelContext;
import org.tio.utils.buffer.ByteBufferAllocator;
import org.tio.utils.buffer.ByteBufferUtil;
import org.tio.utils.hutool.FastByteBuffer;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -138,7 +139,7 @@ private static ByteBuffer encodeConnectMessage(ChannelContext ctx,
final byte[] willPropertiesBytes;
if (variableHeader.isWillFlag()) {
willPropertiesBytes = encodePropertiesIfNeeded(mqttVersion, payload.willProperties());
payloadBufferSize += propertiesBytes.length;
payloadBufferSize += willPropertiesBytes.length;
} else {
willPropertiesBytes = ByteBufferUtil.EMPTY_BYTES;
}
Expand Down Expand Up @@ -505,7 +506,7 @@ private static byte[] encodePropertiesIfNeeded(MqttVersion mqttVersion,
}

private static byte[] encodeProperties(MqttProperties mqttProperties) {
WriteBuffer writeBuffer = new WriteBuffer(128);
FastByteBuffer writeBuffer = new FastByteBuffer(128);
for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(property.propertyId);
switch (propertyType) {
Expand All @@ -528,15 +529,15 @@ private static byte[] encodeProperties(MqttProperties mqttProperties) {
writeBuffer.writeVarLengthInt(property.propertyId);
final short twoBytesInPropValue =
((MqttProperties.IntegerProperty) property).value.shortValue();
writeBuffer.writeShort(twoBytesInPropValue);
writeBuffer.writeShortBE(twoBytesInPropValue);
break;
case PUBLICATION_EXPIRY_INTERVAL:
case SESSION_EXPIRY_INTERVAL:
case WILL_DELAY_INTERVAL:
case MAXIMUM_PACKET_SIZE:
writeBuffer.writeVarLengthInt(property.propertyId);
final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
writeBuffer.writeInt(fourBytesIntPropValue);
writeBuffer.writeIntBE(fourBytesIntPropValue);
break;
case SUBSCRIPTION_IDENTIFIER:
writeBuffer.writeVarLengthInt(property.propertyId);
Expand Down Expand Up @@ -566,7 +567,7 @@ private static byte[] encodeProperties(MqttProperties mqttProperties) {
case AUTHENTICATION_DATA:
writeBuffer.writeVarLengthInt(property.propertyId);
final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
writeBuffer.writeShort((short) binaryPropValue.length);
writeBuffer.writeShortBE((short) binaryPropValue.length);
writeBuffer.writeBytes(binaryPropValue, 0, binaryPropValue.length);
break;
default:
Expand Down Expand Up @@ -614,12 +615,12 @@ private static int getVariableLengthInt(int num) {
return count;
}

private static void writeEagerUTF8String(WriteBuffer buf, String s) {
private static void writeEagerUTF8String(FastByteBuffer buf, String s) {
if (s == null) {
buf.writeShort((short) 0);
buf.writeShortBE((short) 0);
} else {
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
buf.writeShort((short) bytes.length);
buf.writeShortBE((short) bytes.length);
buf.writeBytes(bytes);
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package net.dreamlu.iot.mqtt.core.server.http.websocket;

import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.WriteBuffer;
import net.dreamlu.iot.mqtt.core.server.MqttMessageInterceptors;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import org.slf4j.Logger;
Expand All @@ -30,6 +29,7 @@
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.utils.buffer.ByteBufferUtil;
import org.tio.utils.hutool.FastByteBuffer;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.server.handler.IWsMsgHandler;
Expand Down Expand Up @@ -95,15 +95,15 @@ public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, Ch
@Override
public void onAfterHandshaked(HttpRequest request, HttpResponse response, ChannelContext context) {
// 在连接中添加 WriteBuffer 用来处理半包消息
context.computeIfAbsent(MQTT_WS_MSG_BODY_KEY, key -> new WriteBuffer());
context.computeIfAbsent(MQTT_WS_MSG_BODY_KEY, key -> new FastByteBuffer());
}

/**
* 字节消息(binaryType = arraybuffer)过来后会走这个方法
*/
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext context) throws Exception {
WriteBuffer wsBody = context.get(MQTT_WS_MSG_BODY_KEY);
FastByteBuffer wsBody = context.get(MQTT_WS_MSG_BODY_KEY);
ByteBuffer buffer = getMqttBody(wsBody, bytes);
if (buffer == null) {
return null;
Expand Down Expand Up @@ -173,7 +173,7 @@ public Object onText(WsRequest wsRequest, String text, ChannelContext context) {
* @param bytes 消息类容
* @return ByteBuffer
*/
private static synchronized ByteBuffer getMqttBody(WriteBuffer wsBody, byte[] bytes) {
private static synchronized ByteBuffer getMqttBody(FastByteBuffer wsBody, byte[] bytes) {
wsBody.writeBytes(bytes);
int length = wsBody.size();
if (length < 2) {
Expand Down

0 comments on commit 7136bc0

Please sign in to comment.