Skip to content

Commit

Permalink
Support mqtt5 connect maximum packet size property. (#631)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored May 14, 2022
1 parent 9a9ea54 commit 4a8be74
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ private static Optional<Integer> getReceiveMaximum(MqttProperties properties) {
return Optional.ofNullable(property.value());
}

public static Optional<Integer> getMaximumPacketSize(MqttProperties properties) {
MqttProperties.MqttProperty<Integer> property = properties
.getProperty(MqttProperties.MqttPropertyType.MAXIMUM_PACKET_SIZE.value());
if (property == null) {
return Optional.empty();
}
return Optional.ofNullable(property.value());
}

public static void parsePropertiesToStuffRestriction(
ClientRestrictions.ClientRestrictionsBuilder clientRestrictionsBuilder,
MqttConnectMessage connectMessage)
Expand All @@ -78,6 +87,9 @@ public static void parsePropertiesToStuffRestriction(
} else {
receiveMaximum.ifPresent(clientRestrictionsBuilder::receiveMaximum);
}
// parse maximum packet size
Optional<Integer> maximumPacketSize = getMaximumPacketSize(properties);
maximumPacketSize.ifPresent(clientRestrictionsBuilder::maximumPacketSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ClientRestrictions {
private Integer keepAliveTime;
@Getter
private boolean cleanSession;
private Integer maximumPacketSize;

public int getSessionExpireInterval() {
return Optional.ofNullable(sessionExpireInterval)
Expand All @@ -47,6 +48,14 @@ public int getKeepAliveTime() {
return Optional.ofNullable(keepAliveTime).orElse(0);
}

public int getMaximumPacketSize() {
return Optional.ofNullable(maximumPacketSize).orElse(0);
}

public boolean exceedMaximumPacketSize(int readableBytes) {
return getMaximumPacketSize() != 0 ? readableBytes > maximumPacketSize : false;
}

public void updateExpireInterval(int newExpireInterval) throws InvalidSessionExpireIntervalException {
if (sessionExpireInterval <= SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime()
|| newExpireInterval < SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,8 @@ private void registerRegexTopicFilterListener(MqttTopicSubscription subTopic) {

private CompletableFuture<Void> createAndSubConsumer(Subscription sub,
MqttTopicSubscription subTopic,
String changedTopicName) {
MQTTConsumer consumer = new MQTTConsumer(sub, subTopic.topicName(), changedTopicName, connection, serverCnx,
String pulsarTopicName) {
MQTTConsumer consumer = new MQTTConsumer(sub, subTopic.topicName(), pulsarTopicName, connection, serverCnx,
subTopic.qualityOfService(), packetIdGenerator, outstandingPacketContainer, metricsCollector);
return sub.addConsumer(consumer).thenAccept(__ -> {
consumer.addAllPermits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,15 @@ public ChannelPromise sendMessages(List<Entry> entries, EntryBatchSizes batchSiz
log.debug("[{}] [{}] [{}] Send MQTT message {} to subscriber", pulsarTopicName,
mqttTopicName, super.getSubscription().getName(), msg);
}
metricsCollector.addReceived(msg.payload().readableBytes());
int readableBytes = msg.payload().readableBytes();
metricsCollector.addReceived(readableBytes);
if (clientRestrictions.exceedMaximumPacketSize(readableBytes)) {
log.warn("discard msg {}, because it exceeds maximum packet size : {}, msg size {}", msg,
clientRestrictions.getMaximumPacketSize(), readableBytes);
getSubscription().acknowledgeMessage(Collections.singletonList(entry.getPosition()),
CommandAck.AckType.Individual, Collections.emptyMap());
continue;
}
cnx.ctx().channel().write(new MqttAdapterMessage(connection.getClientId(), msg,
connection.isFromProxy()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -139,4 +142,36 @@ public void testDynamicUpdateSubscribe() throws InterruptedException, PulsarAdmi
client2.disconnect();
}

@Test(timeOut = TIMEOUT)
public void testMaximumPacketSize() throws Exception {
final String topic = "maximumPacketSize";
final String identifier = "maximum-packet-size";
Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(identifier)
.serverHost("127.0.0.1")
.serverPort(getMqttBrokerPortList().get(0))
.buildBlocking();
client.connectWith()
.restrictions(
Mqtt5ConnectRestrictions.builder().maximumPacketSize(20).build())
.send();
client.subscribeWith()
.topicFilter(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
byte[] msg = "payload_123456789_123456789".getBytes();
client.publishWith()
.topic(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.payload(msg)
.send();

try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> received = publishes.receive(3, TimeUnit.SECONDS);
Assert.assertFalse(received.isPresent());
}
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get(identifier).getUnackedMessages(), 0);
client.unsubscribeWith().topicFilter(topic).send();
client.disconnect();
}
}

0 comments on commit 4a8be74

Please sign in to comment.