Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connects proton-j to service and contacts CBS node for authorization #3795

Merged
merged 14 commits into from
Jun 4, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,27 @@ public enum MessageConstant {
/**
* This is a client-specific id that is used so that client can send replies to this message to a specific group.
*/
REPLY_TO_GROUP_ID("reply-to-group-id");
REPLY_TO_GROUP_ID("reply-to-group-id"),
/**
* The offset of a message within a given partition.
*/
OFFSET_ANNOTATION_NAME("x-opt-offset"),
/**
* The date and time, in UTC, that a message was enqueued.
*/
ENQUEUED_TIME_UTC_ANNOTATION_NAME("x-opt-enqueued-time"),
/**
* The identifier associated with a given partition.
*/
PARTITION_KEY_ANNOTATION_NAME("x-opt-partition-key"),
/**
* The sequence number assigned to a message.
*/
SEQUENCE_NUMBER_ANNOTATION_NAME("x-opt-sequence-number"),
/**
* The name of the entity that published a message.
*/
PUBLISHER_ANNOTATION_NAME("x-opt-publisher");

private static final Map<String, MessageConstant> RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ public AmqpException(boolean isTransient, ErrorCondition errorCondition, Throwab
this.isTransient = isTransient;
}

/**
* Initializes a new instance of the AmqpException class.
*
* @param isTransient A boolean indicating if the exception is a transient error or not. If true, then the request
* can be retried; otherwise not.
* @param message Text containing any supplementary details not indicated by the condition field. This text can
* be logged as an aid to resolving issues.
* @param cause The Throwable which caused the creation of this AmqpException.
*/
public AmqpException(boolean isTransient, String message, Throwable cause) {
super(message, cause);
this.errorCondition = null;
this.isTransient = isTransient;
}

@Override
public String getMessage() {
final String baseMessage = super.getMessage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.exception;

import com.azure.core.implementation.util.ImplUtils;

import java.io.Serializable;
import java.util.Locale;
import java.util.Objects;

/**
* Provides context for an {@link AmqpException}.
Expand All @@ -17,18 +17,25 @@ public class ErrorContext implements Serializable {
private static final long serialVersionUID = -2819764407122954922L;

private final String namespaceName;
private final Throwable exception;

/**
* Creates a new instance with the provided {@code namespaceName}.
*
* @param namespaceName Namespace of the error context.
* @param exception Exception that caused this error.
* @param namespaceName Event Hub namespace of the error context.
* @throws NullPointerException when {@code exception} is {@code null}.
* @throws IllegalArgumentException when {@code namespaceName} is {@code null} or empty.
*/
public ErrorContext(final String namespaceName) {
public ErrorContext(final Throwable exception, final String namespaceName) {
Objects.requireNonNull(exception);

if (ImplUtils.isNullOrEmpty(namespaceName)) {
throw new IllegalArgumentException("'namespaceName' cannot be null or empty");
}

this.namespaceName = namespaceName;
this.exception = exception;
}

/**
Expand All @@ -40,11 +47,20 @@ public String namespaceName() {
return namespaceName;
}

/**
* Gets the exception wrapped in this context.
*
* @return The exception that caused the error.
*/
public Throwable exception() {
return exception;
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
return String.format(Locale.US, "NS: %s", this.namespaceName);
return String.format(Locale.US, "NS: %s. Exception: %s", namespaceName, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public final class ExceptionUtil {
* @param errorCondition The error condition string.
* @param description The error message.
* @return An exception that maps to the {@code errorCondition} provided.
* @throws IllegalArgumentException when 'errorCondition' is {@code null} or empty, cannot be translated into an
* {@link ErrorCondition}, or cannot be determined whether the {@link ErrorCondition} is transient or not.
* @see ErrorCondition
*/
public static Exception toException(String errorCondition, String description) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.util.Map;

/**
* Represents a TCP connection between the client and a service that uses the AMQP protocol.
*/
public interface AmqpConnection extends StateNotifier, Closeable {
/**
* Gets the connection identifier.
*
* @return The connection identifier.
*/
String getIdentifier();

/**
* Gets the host for the AMQP connection.
*
* @return The host for the AMQP connection.
*/
String getHost();

/**
* Gets the maximum framesize for the connection.
*
* @return The maximum frame size for the connection.
*/
int getMaxFrameSize();

/**
* Gets the connection properties.
*
* @return Properties associated with this connection.
*/
Map<String, Object> getConnectionProperties();

/**
* Gets the claims-based security (CBS) node that authorizes access to resources.
*
* @return Provider that authorizes access to AMQP resources.
*/
Mono<CBSNode> getCBSNode();

/**
* Creates a new session with the given entity path.
*
* @param sessionName Name of the session.
* @return The AMQP session that was created.
*/
Mono<AmqpSession> createSession(String sessionName);

/**
* Removes a session with the {@code sessionName} from the AMQP connection.
*
* @param sessionName Name of the session to remove.
* @return {@code true} if a session with the name was removed; {@code false} otherwise.
*/
boolean removeSession(String sessionName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import java.io.Closeable;

/**
* Represents a unidirectional AMQP link.
*/
public interface AmqpLink extends Closeable {
/**
* Gets the name of the link.
*
* @return The name of the link.
*/
String linkName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.time.Duration;

/**
* An AMQP session representing bidirectional communication that supports multiple {@link AmqpLink}.
*/
public interface AmqpSession extends StateNotifier, Closeable {
/**
* Gets the entity path for this AMQP session.
*
* @return The entity path for the AMQP session.
*/
String sessionName();

/**
* Gets the operation timeout for starting the AMQP session.
*
* @return The timeout for starting the AMQP session.
*/
Duration openTimeout();

/**
* Creates a new AMQP sender link.
*
* @param linkName Name of the link.
* @param timeout Timeout required for creating and opening AMPQ link.
* @return A newly created AMQP link.
*/
Mono<AmqpLink> createSender(String linkName, Duration timeout);

/**
* Creates a new AMQP receiver link.
*
* @param linkName Name of the link.
* @param timeout Timeout required for creating and opening AMPQ link.
* @return A newly created AMQP link.
*/
Mono<AmqpLink> createReceiver(String linkName, Duration timeout);

/**
* Removes a {@link AmqpLink} with the given {@code linkName}.
*
* @param linkName Name of the link to remove.
* @return {@code true} if the link was removed; {@code false} otherwise.
*/
boolean removeLink(String linkName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.time.Duration;

/**
* Claims-based security (CBS) node that authorizes connections with AMQP services.
*
* @see <a href="https://www.oasis-open.org/committees/download.php/62097/amqp-cbs-v1.0-wd05.doc">
* AMPQ Claims-based Security v1.0</a>
*/
public interface CBSNode extends Closeable {
/**
* Authorizes the caller with the CBS node to access resources for the {@code audience} with the provided
* {@code timeToLive}.
*
* @param audience Resource that the callee needs access to.
* @param timeToLive Time to live for the callee's token.
* @return A Mono that completes when the authorization is successful and errors if the authorization was
* unsuccessful.
*/
Mono<Void> authorize(String audience, Duration timeToLive);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

/**
* Represents a state for a connection, session, or link.
*/
public enum ConnectionState {
/**
* The endpoint has not been initialized.
*/
UNINITIALIZED,
/**
* The endpoint is active.
*/
ACTIVE,
/**
* The endpoint is closed.
*/
CLOSED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import com.azure.core.amqp.exception.ErrorContext;
import com.azure.core.implementation.logging.ServiceLogger;

import java.util.Objects;

/**
* Handles exceptions generated by AMQP connections, sessions, and/or links.
*/
public abstract class ExceptionHandler {
private final ServiceLogger logger;

/**
* Creates a new instance of an exception handler.
*/
protected ExceptionHandler() {
this.logger = new ServiceLogger(ExceptionHandler.class);
}

/**
* Creates the exception handler with the provided logger.
*
* @param logger Logger to use when issuing logs.
*/
protected ExceptionHandler(ServiceLogger logger) {
Objects.requireNonNull(logger);
this.logger = logger;
}

/**
* Notifies the exception handler of an endpoint error.
*
* @param context The error context that caused the error.
*/
public void onConnectionError(ErrorContext context) {
logger.asWarning().log("Connection error: {}", context);
}

/**
* Notifies the exception handler of an exception.
*
* @param exception The exception that caused the connection error.
*/
public void onConnectionError(Throwable exception) {
logger.asWarning().log("Connection exception encountered: {}", exception);
}

/**
* Notifies the exception handler that a shutdown signal occurred.
*
* @param shutdownSignal The shutdown signal that was received.
*/
public void onConnectionShutdown(ShutdownSignal shutdownSignal) {
logger.asInformational().log("Shutdown received: {}", shutdownSignal);
}
}
Loading