Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MQTT-5 Auth command. #693

Merged
merged 1 commit into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public AuthenticationResult authenticate(MqttConnectPayload payload) {
return new AuthenticationResult(authenticated, userRole);
}

private AuthenticationResult authenticate(String clientIdentifier,
public AuthenticationResult authenticate(String clientIdentifier,
String authMethod,
AuthenticationDataCommand command) {
AuthenticationProvider authenticationProvider = authenticationProviders.get(authMethod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public void channelRead(ChannelHandlerContext ctx, Object message) {
case PINGREQ:
processor.processPingReq(adapterMsg);
break;
case AUTH:
processor.processAuthReq(adapterMsg);
break;
default:
throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ public interface ProtocolMethodProcessor {
void processUnSubscribe(MqttAdapterMessage msg);

void processPingReq(MqttAdapterMessage msg);

void processAuthReq(MqttAdapterMessage msg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
public enum Mqtt5DisConnReasonCode implements MqttReasonCode {
NORMAL(0x0),
WITH_WILL_MESSAGE(0x04),
CONTINUE_AUTHENTICATION(0x18),
UNSPECIFIED_ERROR(0x80),
MALFORMED_PACKET(0x81),
PROTOCOL_ERROR(0x82),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.streamnative.pulsar.handlers.mqtt.MQTTAuthenticationService;
import io.streamnative.pulsar.handlers.mqtt.ProtocolMethodProcessor;
import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.exception.restrictions.InvalidReceiveMaximumException;
import io.streamnative.pulsar.handlers.mqtt.messages.MqttPropertyUtils;
import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttConnectAck;
import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5DisConnReasonCode;
import io.streamnative.pulsar.handlers.mqtt.restrictions.ClientRestrictions;
import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils;
import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils;
import io.streamnative.pulsar.handlers.mqtt.utils.NettyUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;

/**
* Common protocol method processor.
Expand Down Expand Up @@ -173,4 +178,43 @@ public void processPubComp(MqttAdapterMessage msg) {
log.debug("[PubComp] [{}]", NettyUtils.getConnection(channel).getClientId());
}
}

@Override
public void processAuthReq(MqttAdapterMessage adapter) {
if (log.isDebugEnabled()) {
log.debug("[AUTH] [{}]", NettyUtils.getConnection(channel).getClientId());
}
MqttMessage mqttMessage = adapter.getMqttMessage();
MqttProperties properties = ((MqttReasonCodeAndPropertiesVariableHeader) mqttMessage.variableHeader())
.properties();
MqttProperties.StringProperty authMethodProperty = (MqttProperties.StringProperty) properties
.getProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value());
MqttProperties.BinaryProperty authDataProperty = (MqttProperties.BinaryProperty) properties
.getProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value());
MQTTAuthenticationService.AuthenticationResult authResult = authenticationService.authenticate(
adapter.getClientId(), authMethodProperty.value(),
new AuthenticationDataCommand(new String(authDataProperty.value())));
if (authResult.isFailed()) {
log.error("[AUTH] auth failed. CId={}", adapter.getClientId());
MqttMessage mqttAuthSFailure = MqttMessageBuilders.auth()
.properties(properties)
.reasonCode(Mqtt5DisConnReasonCode.CONTINUE_AUTHENTICATION.byteValue()).build();
adapter.setMqttMessage(mqttAuthSFailure);
channel.writeAndFlush(adapter).addListener(future -> {
if (!future.isSuccess()) {
log.warn("send auth result failed", future.cause());
}
});
} else {
MqttMessage mqttAuthSuccess = MqttMessageBuilders.auth()
.properties(properties)
.reasonCode(Mqtt5DisConnReasonCode.NORMAL.byteValue()).build();
adapter.setMqttMessage(mqttAuthSuccess);
channel.writeAndFlush(adapter).addListener(future -> {
if (!future.isSuccess()) {
log.warn("send auth result failed", future.cause());
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public void testAuthenticate() throws Exception {
}

@Test(timeOut = TIMEOUT)
public void testAuthenticateWithAuthMethod() throws Exception {
String topic = "persistent://public/default/testAuthenticateWithAuthMethod";
public void testAuthenticateWithConnAuthMethod() throws Exception {
String topic = "persistent://public/default/testAuthenticateWithConnAuthMethod";
Mqtt5BlockingClient client1 = Mqtt5Client.builder()
.identifier("abc")
.serverHost("127.0.0.1")
Expand Down Expand Up @@ -113,7 +113,7 @@ public int getTimeout() {
@Override
public @NotNull CompletableFuture<Boolean> onReAuthSuccess(@NotNull Mqtt5ClientConfig clientConfig,
@NotNull Mqtt5Auth auth) {
return null;
return CompletableFuture.completedFuture(true);
}

@Override
Expand Down Expand Up @@ -150,6 +150,97 @@ public void onReAuthError(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Thro
client1.disconnect();
}

@Test(timeOut = TIMEOUT)
public void testAuthenticateWithReAuthMethod() throws Exception {
String topic = "persistent://public/default/testAuthenticateWithReAuthMethod";
Mqtt5BlockingClient client1 = Mqtt5Client.builder()
.identifier("abc")
.serverHost("127.0.0.1")
.serverPort(getMqttBrokerPortList().get(0))
.buildBlocking();
client1.connectWith().enhancedAuth(new Mqtt5EnhancedAuthMechanism() {
@Override
public @NotNull MqttUtf8String getMethod() {
return MqttUtf8String.of("basic");
}

@Override
public int getTimeout() {
return 20;
}

@Override
public @NotNull CompletableFuture<Void> onAuth(@NotNull Mqtt5ClientConfig clientConfig,
@NotNull Mqtt5Connect connect,
@NotNull Mqtt5EnhancedAuthBuilder authBuilder) {
authBuilder.data("superUser:supepass".getBytes(StandardCharsets.UTF_8));
return CompletableFuture.completedFuture(null);
}

@Override
public @NotNull CompletableFuture<Void> onReAuth(@NotNull Mqtt5ClientConfig clientConfig,
@NotNull Mqtt5AuthBuilder authBuilder) {
authBuilder.data("superUser:Xsupepass".getBytes(StandardCharsets.UTF_8));
authBuilder.userProperties().add("key1", "value1").applyUserProperties();
return CompletableFuture.completedFuture(null);
}

@Override
public @NotNull CompletableFuture<Boolean> onContinue(@NotNull Mqtt5ClientConfig clientConfig,
@NotNull Mqtt5Auth auth,
@NotNull Mqtt5AuthBuilder authBuilder) {
authBuilder.data("superUser:supepass".getBytes(StandardCharsets.UTF_8));
authBuilder.userProperties().add("key1", "value1").applyUserProperties();
return CompletableFuture.completedFuture(true);
}

@Override
public @NotNull CompletableFuture<Boolean> onAuthSuccess(@NotNull Mqtt5ClientConfig clientConfig,
@NotNull Mqtt5ConnAck connAck) {
return CompletableFuture.completedFuture(true);
}

@Override
public @NotNull CompletableFuture<Boolean> onReAuthSuccess(@NotNull Mqtt5ClientConfig clientConfig,
@NotNull Mqtt5Auth auth) {
return CompletableFuture.completedFuture(true);
}

@Override
public void onAuthRejected(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Mqtt5ConnAck connAck) {
// NOP
}

@Override
public void onReAuthRejected(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Mqtt5Disconnect disconnect) {
// NOP
}

@Override
public void onAuthError(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Throwable cause) {
// NOP
}

@Override
public void onReAuthError(@NotNull Mqtt5ClientConfig clientConfig, @NotNull Throwable cause) {
// NOP
}
}).send();
client1.reauth();
Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic)
.qos(MqttQos.AT_LEAST_ONCE).build();
client1.subscribeWith()
.topicFilter(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL);
client1.publish(publishMessage);
Mqtt5Publish message = publishes.receive();
Assert.assertNotNull(message);
publishes.close();
client1.disconnect();
}

@Test(expectedExceptions = {MQTTException.class}, timeOut = TIMEOUT)
public void testNoAuthenticated() throws Exception {
MQTT mqtt = createMQTTClient();
Expand Down