From 9894437d89cdac7468bd3f69aeb63a1e1d3aaa53 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 28 Jun 2022 18:23:23 +0800 Subject: [PATCH] Support MQTT-5 Auth command. --- .../mqtt/MQTTAuthenticationService.java | 2 +- .../mqtt/MQTTCommonInboundHandler.java | 3 + .../mqtt/ProtocolMethodProcessor.java | 2 + .../codes/mqtt5/Mqtt5DisConnReasonCode.java | 1 + ...AbstractCommonProtocolMethodProcessor.java | 44 +++++++++ .../base/BasicAuthenticationTest.java | 97 ++++++++++++++++++- 6 files changed, 145 insertions(+), 4 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java index 52f91be6d..5014c8fda 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java @@ -94,7 +94,7 @@ public AuthenticationResult authenticate(MqttConnectPayload payload) { return new AuthenticationResult(authenticated, userRole); } - private AuthenticationResult authenticate(String clientIdentifier, + public AuthenticationResult authenticate(String clientIdentifier, String authMethod, AuthenticationDataCommand command) { AuthenticationProvider authenticationProvider = authenticationProviders.get(authMethod); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonInboundHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonInboundHandler.java index 7c0fb7eed..c2bba2d33 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonInboundHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonInboundHandler.java @@ -96,6 +96,9 @@ public void channelRead(ChannelHandlerContext ctx, Object message) { case PINGREQ: processor.processPingReq(adapterMsg); break; + case AUTH: + processor.processAuthReq(adapterMsg); + break; default: throw new UnsupportedOperationException("Unknown MessageType: " + messageType); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/ProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/ProtocolMethodProcessor.java index f2ccda85a..44f3050e7 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/ProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/ProtocolMethodProcessor.java @@ -41,4 +41,6 @@ public interface ProtocolMethodProcessor { void processUnSubscribe(MqttAdapterMessage msg); void processPingReq(MqttAdapterMessage msg); + + void processAuthReq(MqttAdapterMessage msg); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/codes/mqtt5/Mqtt5DisConnReasonCode.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/codes/mqtt5/Mqtt5DisConnReasonCode.java index 3d68f318e..2e0e002b6 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/codes/mqtt5/Mqtt5DisConnReasonCode.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/codes/mqtt5/Mqtt5DisConnReasonCode.java @@ -23,6 +23,7 @@ public enum Mqtt5DisConnReasonCode implements MqttReasonCode { NORMAL(0x0), WITH_WILL_MESSAGE(0x04), + CONTINUE_AUTHENTICATION(0x18), UNSPECIFIED_ERROR(0x80), MALFORMED_PACKET(0x81), PROTOCOL_ERROR(0x82), diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java index ac37124ca..38b079e80 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java @@ -18,12 +18,16 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; import io.streamnative.pulsar.handlers.mqtt.MQTTAuthenticationService; import io.streamnative.pulsar.handlers.mqtt.ProtocolMethodProcessor; import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage; import io.streamnative.pulsar.handlers.mqtt.exception.restrictions.InvalidReceiveMaximumException; import io.streamnative.pulsar.handlers.mqtt.messages.MqttPropertyUtils; import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttConnectAck; +import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5DisConnReasonCode; import io.streamnative.pulsar.handlers.mqtt.restrictions.ClientRestrictions; import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils; import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils; @@ -31,6 +35,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; /** * Common protocol method processor. @@ -173,4 +178,43 @@ public void processPubComp(MqttAdapterMessage msg) { log.debug("[PubComp] [{}]", NettyUtils.getConnection(channel).getClientId()); } } + + @Override + public void processAuthReq(MqttAdapterMessage adapter) { + if (log.isDebugEnabled()) { + log.debug("[AUTH] [{}]", NettyUtils.getConnection(channel).getClientId()); + } + MqttMessage mqttMessage = adapter.getMqttMessage(); + MqttProperties properties = ((MqttReasonCodeAndPropertiesVariableHeader) mqttMessage.variableHeader()) + .properties(); + MqttProperties.StringProperty authMethodProperty = (MqttProperties.StringProperty) properties + .getProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()); + MqttProperties.BinaryProperty authDataProperty = (MqttProperties.BinaryProperty) properties + .getProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()); + MQTTAuthenticationService.AuthenticationResult authResult = authenticationService.authenticate( + adapter.getClientId(), authMethodProperty.value(), + new AuthenticationDataCommand(new String(authDataProperty.value()))); + if (authResult.isFailed()) { + log.error("[AUTH] auth failed. CId={}", adapter.getClientId()); + MqttMessage mqttAuthSFailure = MqttMessageBuilders.auth() + .properties(properties) + .reasonCode(Mqtt5DisConnReasonCode.CONTINUE_AUTHENTICATION.byteValue()).build(); + adapter.setMqttMessage(mqttAuthSFailure); + channel.writeAndFlush(adapter).addListener(future -> { + if (!future.isSuccess()) { + log.warn("send auth result failed", future.cause()); + } + }); + } else { + MqttMessage mqttAuthSuccess = MqttMessageBuilders.auth() + .properties(properties) + .reasonCode(Mqtt5DisConnReasonCode.NORMAL.byteValue()).build(); + adapter.setMqttMessage(mqttAuthSuccess); + channel.writeAndFlush(adapter).addListener(future -> { + if (!future.isSuccess()) { + log.warn("send auth result failed", future.cause()); + } + }); + } + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/BasicAuthenticationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/BasicAuthenticationTest.java index 307ae1031..4d57603e7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/BasicAuthenticationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/BasicAuthenticationTest.java @@ -65,8 +65,8 @@ public void testAuthenticate() throws Exception { } @Test(timeOut = TIMEOUT) - public void testAuthenticateWithAuthMethod() throws Exception { - String topic = "persistent://public/default/testAuthenticateWithAuthMethod"; + public void testAuthenticateWithConnAuthMethod() throws Exception { + String topic = "persistent://public/default/testAuthenticateWithConnAuthMethod"; Mqtt5BlockingClient client1 = Mqtt5Client.builder() .identifier("abc") .serverHost("127.0.0.1") @@ -113,7 +113,7 @@ public int getTimeout() { @Override public @NotNull CompletableFuture onReAuthSuccess(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Mqtt5Auth auth) { - return null; + return CompletableFuture.completedFuture(true); } @Override @@ -150,6 +150,97 @@ public void onReAuthError(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Thro client1.disconnect(); } + @Test(timeOut = TIMEOUT) + public void testAuthenticateWithReAuthMethod() throws Exception { + String topic = "persistent://public/default/testAuthenticateWithReAuthMethod"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().enhancedAuth(new Mqtt5EnhancedAuthMechanism() { + @Override + public @NotNull MqttUtf8String getMethod() { + return MqttUtf8String.of("basic"); + } + + @Override + public int getTimeout() { + return 20; + } + + @Override + public @NotNull CompletableFuture onAuth(@NotNull Mqtt5ClientConfig clientConfig, + @NotNull Mqtt5Connect connect, + @NotNull Mqtt5EnhancedAuthBuilder authBuilder) { + authBuilder.data("superUser:supepass".getBytes(StandardCharsets.UTF_8)); + return CompletableFuture.completedFuture(null); + } + + @Override + public @NotNull CompletableFuture onReAuth(@NotNull Mqtt5ClientConfig clientConfig, + @NotNull Mqtt5AuthBuilder authBuilder) { + authBuilder.data("superUser:Xsupepass".getBytes(StandardCharsets.UTF_8)); + authBuilder.userProperties().add("key1", "value1").applyUserProperties(); + return CompletableFuture.completedFuture(null); + } + + @Override + public @NotNull CompletableFuture onContinue(@NotNull Mqtt5ClientConfig clientConfig, + @NotNull Mqtt5Auth auth, + @NotNull Mqtt5AuthBuilder authBuilder) { + authBuilder.data("superUser:supepass".getBytes(StandardCharsets.UTF_8)); + authBuilder.userProperties().add("key1", "value1").applyUserProperties(); + return CompletableFuture.completedFuture(true); + } + + @Override + public @NotNull CompletableFuture onAuthSuccess(@NotNull Mqtt5ClientConfig clientConfig, + @NotNull Mqtt5ConnAck connAck) { + return CompletableFuture.completedFuture(true); + } + + @Override + public @NotNull CompletableFuture onReAuthSuccess(@NotNull Mqtt5ClientConfig clientConfig, + @NotNull Mqtt5Auth auth) { + return CompletableFuture.completedFuture(true); + } + + @Override + public void onAuthRejected(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Mqtt5ConnAck connAck) { + // NOP + } + + @Override + public void onReAuthRejected(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Mqtt5Disconnect disconnect) { + // NOP + } + + @Override + public void onAuthError(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Throwable cause) { + // NOP + } + + @Override + public void onReAuthError(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Throwable cause) { + // NOP + } + }).send(); + client1.reauth(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + publishes.close(); + client1.disconnect(); + } + @Test(expectedExceptions = {MQTTException.class}, timeOut = TIMEOUT) public void testNoAuthenticated() throws Exception { MQTT mqtt = createMQTTClient();