Skip to content

Commit

Permalink
Track 2: Event Hubs Client Library (#3655)
Browse files Browse the repository at this point in the history
* Adding azure-messaging-eventhubs and azure-core-amqp pom.xml.

* Adding EventHubClient, EventHubClientBuilder, EventHubConsumer, EventHubConsumerOptions, EventHubProducer, 
and EventHubProducerOptions.

* Adding configuration options for EventHubClientBuilder.

* Adding azure-core-amqp with common classes to AMQP for Exceptions, Sessions, Links, Connections, TransportTypes, etc.

* Adding authorization with CBS node and getting Event Hub metadata.

* Adding functionality for EventHubConsumer and EventHubProducer.

* Adding support for Azure Identity and TokenCredential.

* Adding a bare set of tests for Azure Event Hubs client library.
  • Loading branch information
conniey authored Jun 21, 2019
1 parent 8015101 commit 47bceb5
Show file tree
Hide file tree
Showing 114 changed files with 11,129 additions and 9 deletions.
42 changes: 42 additions & 0 deletions core/azure-core-amqp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Azure Core AMQP client library for Java

Azure Core AMQP client library is a collection of classes common to the AMQP protocol. It help developers create their
own AMQP client library that abstracts from the underlying transport library's implementation.

## Getting started

### Prerequisites

- Java Development Kit (JDK) with version 8 or above

### Adding the package to your product

```xml
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
```

## Key concepts

The concepts for AMQP are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version
1.0](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html).

## Examples

## Troubleshooting

## Next steps

## Contributing

If you would like to become an active contributor to this project please follow the instructions provided in [Microsoft
Azure Projects Contribution Guidelines](http://azure.github.io/guidelines.html).

1. Fork it
1. Create your feature branch (`git checkout -b my-new-feature`)
1. Commit your changes (`git commit -am 'Add some feature'`)
1. Push to the branch (`git push origin my-new-feature`)
1. Create new Pull Request
78 changes: 78 additions & 0 deletions core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<!--
~ Copyright (c) Microsoft Corporation. All rights reserved.
~ Licensed under the MIT License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.azure</groupId>
<artifactId>azure-core-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Microsoft Azure Java Core AMQP Library</name>
<description>This package contains core types for Azure Java AMQP clients.</description>
<url>https://github.com/Azure/azure-sdk-for-java</url>

<licenses>
<license>
<name>The MIT License (MIT)</name>
<url>http://opensource.org/licenses/MIT</url>
<distribution>repo</distribution>
</license>
</licenses>

<distributionManagement>
<site>
<id>azure-java-build-docs</id>
<url>${site.url}/site/${project.artifactId}</url>
</site>
</distributionManagement>

<scm>
<url>scm:git:https://github.com/Azure/azure-sdk-for-java</url>
</scm>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<legal><![CDATA[[INFO] Any downloads listed may be third party software. Microsoft grants you no rights for third party software.]]></legal>
</properties>

<developers>
<developer>
<id>microsoft</id>
<name>Microsoft</name>
</developer>
</developers>

<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.util.Map;

/**
* Represents a TCP connection between the client and a service that uses the AMQP protocol.
*/
public interface AmqpConnection extends EndpointStateNotifier, Closeable {
/**
* Gets the connection identifier.
*
* @return The connection identifier.
*/
String getIdentifier();

/**
* Gets the host for the AMQP connection.
*
* @return The host for the AMQP connection.
*/
String getHost();

/**
* Gets the maximum frame size for the connection.
*
* @return The maximum frame size for the connection.
*/
int getMaxFrameSize();

/**
* Gets the connection properties.
*
* @return Properties associated with this connection.
*/
Map<String, Object> getConnectionProperties();

/**
* Gets the claims-based security (CBS) node that authorizes access to resources.
*
* @return Provider that authorizes access to AMQP resources.
*/
Mono<CBSNode> getCBSNode();

/**
* Creates a new session with the given session name.
*
* @param sessionName Name of the session.
* @return The AMQP session that was created.
*/
Mono<AmqpSession> createSession(String sessionName);

/**
* Removes a session with the {@code sessionName} from the AMQP connection.
*
* @param sessionName Name of the session to remove.
* @return {@code true} if a session with the name was removed; {@code false} otherwise.
*/
boolean removeSession(String sessionName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

/**
* Represents a state for a connection, session, or link.
*/
public enum AmqpEndpointState {
/**
* The endpoint has not been initialized.
*/
UNINITIALIZED,
/**
* The endpoint is active.
*/
ACTIVE,
/**
* The endpoint is closed.
*/
CLOSED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import com.azure.core.util.logging.ClientLogger;

/**
* Handles exceptions generated by AMQP connections, sessions, and/or links.
*/
public abstract class AmqpExceptionHandler {
private final ClientLogger logger = new ClientLogger(AmqpExceptionHandler.class);

/**
* Creates a new instance of the exception handler.
*/
protected AmqpExceptionHandler() {
}

/**
* Notifies the exception handler of an exception.
*
* @param exception The exception that caused the connection error.
*/
public void onConnectionError(Throwable exception) {
logger.asWarning().log("Connection exception encountered: " + exception.toString(), exception);
}

/**
* Notifies the exception handler that a shutdown signal occurred.
*
* @param shutdownSignal The shutdown signal that was received.
*/
public void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) {
logger.asInfo().log("Shutdown received: {}", shutdownSignal);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import java.io.Closeable;

/**
* Represents a unidirectional AMQP link.
*/
public interface AmqpLink extends EndpointStateNotifier, Closeable {
/**
* Gets the name of the link.
*
* @return The name of the link.
*/
String getLinkName();

/**
* The remote endpoint path this link is connected to.
*
* @return The remote endpoint path this link is connected to.
*/
String getEntityPath();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.time.Duration;

/**
* An AMQP session representing bidirectional communication that supports multiple {@link AmqpLink AMQP links}.
*/
public interface AmqpSession extends EndpointStateNotifier, Closeable {
/**
* Gets the name for this AMQP session.
*
* @return The name for the AMQP session.
*/
String getSessionName();

/**
* Gets the operation timeout for starting the AMQP session.
*
* @return The timeout for starting the AMQP session.
*/
Duration getOperationTimeout();

/**
* Creates a new AMQP link that publishes events to the message broker.
*
* @param linkName Name of the link.
* @param entityPath The entity path this link connects to when producing events.
* @param timeout Timeout required for creating and opening AMQP link.
* @param retry The retry policy to use when sending messages.
* @return A newly created AMQP link.
*/
Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, Retry retry);

/**
* Creates a new AMQP link that consumes events from the message broker.
*
* @param linkName Name of the link.
* @param entityPath The entity path this link connects to, so that it may read events from the message broker.
* @param timeout Timeout required for creating and opening an AMQP link.
* @param retry The retry policy to use when consuming messages.
* @return A newly created AMQP link.
*/
Mono<AmqpLink> createConsumer(String linkName, String entityPath, Duration timeout, Retry retry);

/**
* Removes an {@link AmqpLink} with the given {@code linkName}.
*
* @param linkName Name of the link to remove.
* @return {@code true} if the link was removed; {@code false} otherwise.
*/
boolean removeLink(String linkName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import java.util.Locale;

/**
* Represents a signal that caused the AMQP connection to shutdown.
*/
public class AmqpShutdownSignal {
private final boolean isTransient;
private final boolean isInitiatedByClient;
private final String message;

/**
* Creates a new instance of the AmqpShutdownSignal.
*
* @param isTransient Whether the shutdown signal can be retried or not.
* @param isInitiatedByClient {@code true} if the shutdown was initiated by the client; {@code false} otherwise.
* @param message Message associated with the shutdown.
*/
public AmqpShutdownSignal(boolean isTransient, boolean isInitiatedByClient, String message) {
this.isTransient = isTransient;
this.isInitiatedByClient = isInitiatedByClient;
this.message = message;
}

/**
* Gets whether or not this shutdown signal is transient or if it can be restarted.
*
* @return {@code true} if the shutdown signal is transient and the connection, session, or link can be recreated.
* {@code false} otherwise.
*/
public boolean isTransient() {
return isTransient;
}

/**
* Gets whether or not this shutdown signal was initiated by the client.
*
* @return {@code true} if the shutdown signal was initiated by the client, {@code false} if the shutdown signal
* occurred in the underlying AMQP layer or from the AMQP message broker.
*/
public boolean isInitiatedByClient() {
return isInitiatedByClient;
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
return String.format(Locale.US, "%s, isTransient[%s], initiatedByClient[%s]", message, isTransient, isInitiatedByClient);
}
}
Loading

0 comments on commit 47bceb5

Please sign in to comment.