Skip to content

Commit

Permalink
Connects proton-j to service and contacts CBS node for authorization (#…
Browse files Browse the repository at this point in the history
…3795)

Remove types that are no longer needed

Moving EventDataUtil into EventData.

Add ability to deserialize AMQP message into EventData

Adding AmqpException for EventDataBatch

Update ReactorDispatcher to use friendly ways of fetching work.

Add AmqpConnection, AmqpSession, AmqpLink, ConnectionState,

Add ConnectionHandler, CustomIOHandler, DispatchHandler

Add initial way to create a connection and reactor into EventHubClient

Update Sender test with correct exception and add initial EventHubClientTest

ReactorFactory: Add ability to createConnection.

Add ReactorExecutor which is responsible for looping and processing reactor work.

Add ReactorProvider and ReactorHandlerProvider that will handle creation of the Reactor instances and handlers.

Adding AmqpShutdownSignal, and AmqpEndpointStateNotifierBase for state changes.

Adding Track 2 SDK to pom.client.xml
  • Loading branch information
conniey authored Jun 4, 2019
1 parent 504d454 commit 210eaa2
Show file tree
Hide file tree
Showing 57 changed files with 3,309 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,27 @@ public enum MessageConstant {
/**
* This is a client-specific id that is used so that client can send replies to this message to a specific group.
*/
REPLY_TO_GROUP_ID("reply-to-group-id");
REPLY_TO_GROUP_ID("reply-to-group-id"),
/**
* The offset of a message within a given partition.
*/
OFFSET_ANNOTATION_NAME("x-opt-offset"),
/**
* The date and time, in UTC, that a message was enqueued.
*/
ENQUEUED_TIME_UTC_ANNOTATION_NAME("x-opt-enqueued-time"),
/**
* The identifier associated with a given partition.
*/
PARTITION_KEY_ANNOTATION_NAME("x-opt-partition-key"),
/**
* The sequence number assigned to a message.
*/
SEQUENCE_NUMBER_ANNOTATION_NAME("x-opt-sequence-number"),
/**
* The name of the entity that published a message.
*/
PUBLISHER_ANNOTATION_NAME("x-opt-publisher");

private static final Map<String, MessageConstant> RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ public AmqpException(boolean isTransient, ErrorCondition errorCondition, Throwab
this.isTransient = isTransient;
}

/**
* Initializes a new instance of the AmqpException class.
*
* @param isTransient A boolean indicating if the exception is a transient error or not. If true, then the request
* can be retried; otherwise not.
* @param message Text containing any supplementary details not indicated by the condition field. This text can
* be logged as an aid to resolving issues.
* @param cause The Throwable which caused the creation of this AmqpException.
*/
public AmqpException(boolean isTransient, String message, Throwable cause) {
super(message, cause);
this.errorCondition = null;
this.isTransient = isTransient;
}

@Override
public String getMessage() {
final String baseMessage = super.getMessage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.exception;

import com.azure.core.implementation.util.ImplUtils;

import java.io.Serializable;
import java.util.Locale;
import java.util.Objects;

/**
* Provides context for an {@link AmqpException}.
Expand All @@ -17,18 +17,25 @@ public class ErrorContext implements Serializable {
private static final long serialVersionUID = -2819764407122954922L;

private final String namespaceName;
private final Throwable exception;

/**
* Creates a new instance with the provided {@code namespaceName}.
*
* @param namespaceName Namespace of the error context.
* @param exception Exception that caused this error.
* @param namespaceName Event Hub namespace of the error context.
* @throws NullPointerException when {@code exception} is {@code null}.
* @throws IllegalArgumentException when {@code namespaceName} is {@code null} or empty.
*/
public ErrorContext(final String namespaceName) {
public ErrorContext(final Throwable exception, final String namespaceName) {
Objects.requireNonNull(exception);

if (ImplUtils.isNullOrEmpty(namespaceName)) {
throw new IllegalArgumentException("'namespaceName' cannot be null or empty");
}

this.namespaceName = namespaceName;
this.exception = exception;
}

/**
Expand All @@ -40,11 +47,20 @@ public String namespaceName() {
return namespaceName;
}

/**
* Gets the exception wrapped in this context.
*
* @return The exception that caused the error.
*/
public Throwable exception() {
return exception;
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
return String.format(Locale.US, "NS: %s", this.namespaceName);
return String.format(Locale.US, "NS: %s. Exception: %s", namespaceName, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public final class ExceptionUtil {
* @param errorCondition The error condition string.
* @param description The error message.
* @return An exception that maps to the {@code errorCondition} provided.
* @throws IllegalArgumentException when 'errorCondition' is {@code null} or empty, cannot be translated into an
* {@link ErrorCondition}, or cannot be determined whether the {@link ErrorCondition} is transient or not.
* @see ErrorCondition
*/
public static Exception toException(String errorCondition, String description) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.util.Map;

/**
* Represents a TCP connection between the client and a service that uses the AMQP protocol.
*/
public interface AmqpConnection extends EndpointStateNotifier, Closeable {
/**
* Gets the connection identifier.
*
* @return The connection identifier.
*/
String getIdentifier();

/**
* Gets the host for the AMQP connection.
*
* @return The host for the AMQP connection.
*/
String getHost();

/**
* Gets the maximum framesize for the connection.
*
* @return The maximum frame size for the connection.
*/
int getMaxFrameSize();

/**
* Gets the connection properties.
*
* @return Properties associated with this connection.
*/
Map<String, Object> getConnectionProperties();

/**
* Gets the claims-based security (CBS) node that authorizes access to resources.
*
* @return Provider that authorizes access to AMQP resources.
*/
Mono<CBSNode> getCBSNode();

/**
* Creates a new session with the given entity path.
*
* @param sessionName Name of the session.
* @return The AMQP session that was created.
*/
Mono<AmqpSession> createSession(String sessionName);

/**
* Removes a session with the {@code sessionName} from the AMQP connection.
*
* @param sessionName Name of the session to remove.
* @return {@code true} if a session with the name was removed; {@code false} otherwise.
*/
boolean removeSession(String sessionName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

/**
* Represents a state for a connection, session, or link.
*/
public enum AmqpEndpointState {
/**
* The endpoint has not been initialized.
*/
UNINITIALIZED,
/**
* The endpoint is active.
*/
ACTIVE,
/**
* The endpoint is closed.
*/
CLOSED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import com.azure.core.amqp.exception.ErrorContext;
import com.azure.core.implementation.logging.ServiceLogger;

import java.util.Objects;

/**
* Handles exceptions generated by AMQP connections, sessions, and/or links.
*/
public abstract class AmqpExceptionHandler {
private final ServiceLogger logger;

/**
* Creates a new instance of an exception handler.
*/
protected AmqpExceptionHandler() {
this.logger = new ServiceLogger(AmqpExceptionHandler.class);
}

/**
* Creates the exception handler with the provided logger.
*
* @param logger Logger to use when issuing logs.
*/
protected AmqpExceptionHandler(ServiceLogger logger) {
Objects.requireNonNull(logger);
this.logger = logger;
}

/**
* Notifies the exception handler of an endpoint error.
*
* @param context The error context that caused the error.
*/
public void onConnectionError(ErrorContext context) {
logger.asWarning().log("Connection error: {}", context);
}

/**
* Notifies the exception handler of an exception.
*
* @param exception The exception that caused the connection error.
*/
public void onConnectionError(Throwable exception) {
logger.asWarning().log("Connection exception encountered: {}", exception);
}

/**
* Notifies the exception handler that a shutdown signal occurred.
*
* @param shutdownSignal The shutdown signal that was received.
*/
public void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) {
logger.asInformational().log("Shutdown received: {}", shutdownSignal);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import java.io.Closeable;

/**
* Represents a unidirectional AMQP link.
*/
public interface AmqpLink extends Closeable {
/**
* Gets the name of the link.
*
* @return The name of the link.
*/
String getLinkName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.time.Duration;

/**
* An AMQP session representing bidirectional communication that supports multiple {@link AmqpLink}.
*/
public interface AmqpSession extends EndpointStateNotifier, Closeable {
/**
* Gets the entity path for this AMQP session.
*
* @return The entity path for the AMQP session.
*/
String getSessionName();

/**
* Gets the operation timeout for starting the AMQP session.
*
* @return The timeout for starting the AMQP session.
*/
Duration getOpenTimeout();

/**
* Creates a new AMQP sender link.
*
* @param linkName Name of the link.
* @param timeout Timeout required for creating and opening AMPQ link.
* @return A newly created AMQP link.
*/
Mono<AmqpLink> createSender(String linkName, Duration timeout);

/**
* Creates a new AMQP receiver link.
*
* @param linkName Name of the link.
* @param timeout Timeout required for creating and opening AMPQ link.
* @return A newly created AMQP link.
*/
Mono<AmqpLink> createReceiver(String linkName, Duration timeout);

/**
* Removes a {@link AmqpLink} with the given {@code linkName}.
*
* @param linkName Name of the link to remove.
* @return {@code true} if the link was removed; {@code false} otherwise.
*/
boolean removeLink(String linkName);
}
Loading

0 comments on commit 210eaa2

Please sign in to comment.