diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java index c48c47eb1..122081bd6 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java @@ -63,6 +63,15 @@ private static Optional getReceiveMaximum(MqttProperties properties) { return Optional.ofNullable(property.value()); } + public static Optional getMaximumPacketSize(MqttProperties properties) { + MqttProperties.MqttProperty property = properties + .getProperty(MqttProperties.MqttPropertyType.MAXIMUM_PACKET_SIZE.value()); + if (property == null) { + return Optional.empty(); + } + return Optional.ofNullable(property.value()); + } + public static void parsePropertiesToStuffRestriction( ClientRestrictions.ClientRestrictionsBuilder clientRestrictionsBuilder, MqttConnectMessage connectMessage) @@ -78,6 +87,9 @@ public static void parsePropertiesToStuffRestriction( } else { receiveMaximum.ifPresent(clientRestrictionsBuilder::receiveMaximum); } + // parse maximum packet size + Optional maximumPacketSize = getMaximumPacketSize(properties); + maximumPacketSize.ifPresent(clientRestrictionsBuilder::maximumPacketSize); } /** diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java index 7aa49b3e9..8c836d16d 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java @@ -33,6 +33,7 @@ public class ClientRestrictions { private Integer keepAliveTime; @Getter private boolean cleanSession; + private Integer maximumPacketSize; public int getSessionExpireInterval() { return Optional.ofNullable(sessionExpireInterval) @@ -47,6 +48,14 @@ public int getKeepAliveTime() { return Optional.ofNullable(keepAliveTime).orElse(0); } + public int getMaximumPacketSize() { + return Optional.ofNullable(maximumPacketSize).orElse(0); + } + + public boolean exceedMaximumPacketSize(int readableBytes) { + return getMaximumPacketSize() != 0 ? readableBytes > maximumPacketSize : false; + } + public void updateExpireInterval(int newExpireInterval) throws InvalidSessionExpireIntervalException { if (sessionExpireInterval <= SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime() || newExpireInterval < SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index 655655305..adc5a2385 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -463,8 +463,8 @@ private void registerRegexTopicFilterListener(MqttTopicSubscription subTopic) { private CompletableFuture createAndSubConsumer(Subscription sub, MqttTopicSubscription subTopic, - String changedTopicName) { - MQTTConsumer consumer = new MQTTConsumer(sub, subTopic.topicName(), changedTopicName, connection, serverCnx, + String pulsarTopicName) { + MQTTConsumer consumer = new MQTTConsumer(sub, subTopic.topicName(), pulsarTopicName, connection, serverCnx, subTopic.qualityOfService(), packetIdGenerator, outstandingPacketContainer, metricsCollector); return sub.addConsumer(consumer).thenAccept(__ -> { consumer.addAllPermits(); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java index 9c3663b51..8e9c6d7b6 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java @@ -104,7 +104,15 @@ public ChannelPromise sendMessages(List entries, EntryBatchSizes batchSiz log.debug("[{}] [{}] [{}] Send MQTT message {} to subscriber", pulsarTopicName, mqttTopicName, super.getSubscription().getName(), msg); } - metricsCollector.addReceived(msg.payload().readableBytes()); + int readableBytes = msg.payload().readableBytes(); + metricsCollector.addReceived(readableBytes); + if (clientRestrictions.exceedMaximumPacketSize(readableBytes)) { + log.warn("discard msg {}, because it exceeds maximum packet size : {}, msg size {}", msg, + clientRestrictions.getMaximumPacketSize(), readableBytes); + getSubscription().acknowledgeMessage(Collections.singletonList(entry.getPosition()), + CommandAck.AckType.Individual, Collections.emptyMap()); + continue; + } cnx.ctx().channel().write(new MqttAdapterMessage(connection.getClientId(), msg, connection.isFromProxy())); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java index 5306faffc..51867028e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java @@ -18,11 +18,14 @@ import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.datatypes.MqttTopic; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -139,4 +142,36 @@ public void testDynamicUpdateSubscribe() throws InterruptedException, PulsarAdmi client2.disconnect(); } + @Test(timeOut = TIMEOUT) + public void testMaximumPacketSize() throws Exception { + final String topic = "maximumPacketSize"; + final String identifier = "maximum-packet-size"; + Mqtt5BlockingClient client = Mqtt5Client.builder() + .identifier(identifier) + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client.connectWith() + .restrictions( + Mqtt5ConnectRestrictions.builder().maximumPacketSize(20).build()) + .send(); + client.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + byte[] msg = "payload_123456789_123456789".getBytes(); + client.publishWith() + .topic(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .payload(msg) + .send(); + + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) { + Optional received = publishes.receive(3, TimeUnit.SECONDS); + Assert.assertFalse(received.isPresent()); + } + Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get(identifier).getUnackedMessages(), 0); + client.unsubscribeWith().topicFilter(topic).send(); + client.disconnect(); + } }