From 9aee1b2198b53e237dbc8662f93e51b20f6ab1b4 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 10 Jun 2022 19:37:17 +0800 Subject: [PATCH 1/2] Support MQTT-5 Publish with user properties. --- .../mqtt/utils/PulsarMessageConverter.java | 28 ++++++++ .../base/MQTT5PublishRelatedProtocolTest.java | 70 +++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 77812f1d3..5caefe44c 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkArgument; import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.concurrent.FastThreadLocal; @@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.compression.CompressionCodecProvider; @@ -65,6 +67,17 @@ protected MessageMetadata initialValue() throws Exception { public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mqttMsg) { MessageMetadata metadata = LOCAL_MESSAGE_METADATA.get(); metadata.clear(); + MqttProperties properties = mqttMsg.variableHeader().properties(); + if (properties != null) { + properties.listAll().forEach(prop -> { + if (MqttProperties.MqttPropertyType.USER_PROPERTY.value() == prop.propertyId()) { + MqttProperties.UserProperties userProperties = (MqttProperties.UserProperties) prop; + userProperties.value().forEach(pair -> { + metadata.addProperty().setKey(pair.key).setValue(pair.value); + }); + } + }); + } return MessageImpl.create(metadata, mqttMsg.payload().nioBuffer(), SCHEMA, topic.getName()); } @@ -72,6 +85,15 @@ public static List toMqttMessages(String topicName, Entry en PacketIdGenerator packetIdGenerator, MqttQoS qos) { ByteBuf metadataAndPayload = entry.getDataBuffer(); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); + MqttProperties properties = null; + if (metadata.getPropertiesCount() > 0) { + properties = new MqttProperties(); + MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); + for (KeyValue kv : metadata.getPropertiesList()) { + userProperties.add(kv.getKey(), kv.getValue()); + } + properties.add(userProperties); + } if (metadata.hasNumMessagesInBatch()) { int batchSize = metadata.getNumMessagesInBatch(); List response = new ArrayList<>(batchSize); @@ -87,6 +109,7 @@ public static List toMqttMessages(String topicName, Entry en .payload(singleMessagePayload) .topicName(topicName) .qos(qos) + .properties(properties) .retained(false) .build()); } @@ -102,6 +125,7 @@ public static List toMqttMessages(String topicName, Entry en .payload(metadataAndPayload) .topicName(topicName) .qos(qos) + .properties(properties) .retained(false) .build()); } @@ -129,6 +153,10 @@ public static ByteBuf messageToByteBuf(Message message) { metadata.setProducerName(FAKE_MQTT_PRODUCER_NAME); } + msg.getProperties().forEach((k, v) -> { + metadata.addProperty().setKey(k).setValue(v); + }); + metadata.setCompression( CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE)); metadata.setUncompressedSize(payload.readableBytes()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java new file mode 100644 index 000000000..cf7d32ad4 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java @@ -0,0 +1,70 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.base; + +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import lombok.extern.slf4j.Slf4j; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +public class MQTT5PublishRelatedProtocolTest extends MQTTTestBase { + + @Test + public void testUserProperties() throws Exception { + final String topic = "testUserProperties"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + Mqtt5UserProperties userProperty = Mqtt5UserProperties.builder() + .add("user-1", "value-1") + .add("user-2", "value-2") + .build(); + Mqtt5UserProperty userProperty1 = Mqtt5UserProperty.of("user-1", "value-1"); + Mqtt5UserProperty userProperty2 = Mqtt5UserProperty.of("user-2", "value-2"); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(MqttQos.AT_LEAST_ONCE).userProperties(userProperty).build(); + + Mqtt5BlockingClient client2 = Mqtt5Client.builder() + .identifier( "ccc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client2.connectWith().send(); + client2.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client2.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + Assert.assertEquals(message.getUserProperties().asList().get(0).compareTo(userProperty1), 0); + Assert.assertEquals(message.getUserProperties().asList().get(1).compareTo(userProperty2), 0); + publishes.close(); + client2.unsubscribeWith().topicFilter(topic).send(); + client1.disconnect(); + client2.disconnect(); + } +} From ce977c737c1ed843e724f1b2ac3f42a2848672db Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 13 Jun 2022 10:19:27 +0800 Subject: [PATCH 2/2] fix checkstyle. --- .../mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java index cf7d32ad4..0746ccb91 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java @@ -43,10 +43,11 @@ public void testUserProperties() throws Exception { Mqtt5UserProperty userProperty1 = Mqtt5UserProperty.of("user-1", "value-1"); Mqtt5UserProperty userProperty2 = Mqtt5UserProperty.of("user-2", "value-2"); client1.connectWith().send(); - Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(MqttQos.AT_LEAST_ONCE).userProperties(userProperty).build(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(MqttQos.AT_LEAST_ONCE) + .userProperties(userProperty).build(); Mqtt5BlockingClient client2 = Mqtt5Client.builder() - .identifier( "ccc") + .identifier("ccc") .serverHost("127.0.0.1") .serverPort(getMqttBrokerPortList().get(0)) .buildBlocking();