From 95e2c61f9572c25d8d9348d554cff3c619fc10f8 Mon Sep 17 00:00:00 2001 From: Connie Yau Date: Sat, 6 Jun 2020 20:40:57 -0700 Subject: [PATCH] Adding ServiceBusManagementClient and builder. (#11836) * Adding ServiceBusManagementClient and ServiceBusManagementAsyncClient. * Adding builder and runtime info * Add tests for sync and async client * Making policy constructor public. * Making classes final. * Adding null check for empty body feeds. * Regenerating AuthorizationRule. * Fixing encoding bug in SerializerEncoding. * Fixing test for azure-core SerializerEncodingTests. --- .../util/serializer/SerializerEncoding.java | 4 +- .../serializer/SerializerEncodingTests.java | 6 +- .../ServiceBusManagementAsyncClient.java | 668 ++++++++++++++++++ .../ServiceBusManagementClient.java | 243 +++++++ .../ServiceBusManagementClientBuilder.java | 324 +++++++++ .../ServiceBusManagementSerializer.java | 12 +- .../ServiceBusTokenCredentialHttpPolicy.java | 15 +- .../servicebus/models/AuthorizationRule.java | 49 +- .../servicebus/models/QueueRuntimeInfo.java | 104 +++ .../azure-messaging-servicebus.properties | 2 + ...sManagementAsyncClientIntegrationTest.java | 71 ++ .../ServiceBusManagementAsyncClientTest.java | 383 ++++++++++ .../ServiceBusManagementClientTest.java | 287 ++++++++ ...sManagementClientImplIntegrationTests.java | 10 +- .../session-records/createQueue.json | 2 +- ...ueue.json => createQueueExistingName.json} | 0 ...ue.json => createQueueImplementation.json} | 2 +- .../deleteQueueImplementation.json | 4 + ...eues.json => editQueueImplementation.json} | 0 .../getQueueImplementation.json | 4 + .../listQueuesImplementation.json | 4 + 21 files changed, 2156 insertions(+), 38 deletions(-) create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClient.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementClient.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementClientBuilder.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/QueueRuntimeInfo.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClientIntegrationTest.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClientTest.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementClientTest.java rename sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/{editQueue.json => createQueueExistingName.json} (100%) rename sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/{deleteQueue.json => createQueueImplementation.json} (52%) create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/deleteQueueImplementation.json rename sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/{listQueues.json => editQueueImplementation.json} (100%) create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/getQueueImplementation.json create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/listQueuesImplementation.json diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/serializer/SerializerEncoding.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/serializer/SerializerEncoding.java index 287a3a5c8bfe..7ce2f9c92015 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/serializer/SerializerEncoding.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/serializer/SerializerEncoding.java @@ -56,12 +56,12 @@ public static SerializerEncoding fromHeaders(HttpHeaders headers) { return DEFAULT_ENCODING; } - final SerializerEncoding encoding = SUPPORTED_MIME_TYPES.get(mimeContentType); + final String[] parts = mimeContentType.split(";"); + final SerializerEncoding encoding = SUPPORTED_MIME_TYPES.get(parts[0]); if (encoding != null) { return encoding; } - final String[] parts = mimeContentType.split(";"); final String[] mimeTypeParts = parts[0].split("/"); if (mimeTypeParts.length != 2) { LOGGER.warning("Content-Type '{}' does not match mime-type formatting 'type'/'subtype'. " diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/util/serializer/SerializerEncodingTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/util/serializer/SerializerEncodingTests.java index 06f425da099d..bcf76b4d1cca 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/util/serializer/SerializerEncodingTests.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/util/serializer/SerializerEncodingTests.java @@ -17,7 +17,8 @@ class SerializerEncodingTests { private static final String CONTENT_TYPE = "Content-Type"; @ParameterizedTest - @ValueSource(strings = {"application/xml", "application/atom+xml", "text/xml", "application/foo+XML", "TEXT/XML"}) + @ValueSource(strings = {"application/xml", "application/atom+xml", "text/xml", "application/foo+XML", "TEXT/XML", + "application/xml;charset=utf-8", "application/atom+xml; charset=utf-32"}) void recognizeXml(String mimeType) { // Arrange HttpHeaders headers = new HttpHeaders(Collections.singletonMap(CONTENT_TYPE, mimeType)); @@ -27,7 +28,8 @@ void recognizeXml(String mimeType) { } @ParameterizedTest - @ValueSource(strings = {"application/json", "application/kv+json", "APPLICATION/JSON", "application/FOO+JSON"}) + @ValueSource(strings = {"application/json", "application/kv+json", "APPLICATION/JSON", "application/FOO+JSON", + "application/json;charset=utf-8", "application/config+json; charset=utf-32"}) void recognizeJson(String mimeType) { // Arrange HttpHeaders headers = new HttpHeaders(Collections.singletonMap(CONTENT_TYPE, mimeType)); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClient.java new file mode 100644 index 000000000000..6f2b4734c031 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClient.java @@ -0,0 +1,668 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.annotation.ReturnType; +import com.azure.core.annotation.ServiceClient; +import com.azure.core.annotation.ServiceMethod; +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpRequest; +import com.azure.core.http.rest.PagedFlux; +import com.azure.core.http.rest.PagedResponse; +import com.azure.core.http.rest.Response; +import com.azure.core.http.rest.SimpleResponse; +import com.azure.core.util.Context; +import com.azure.core.util.IterableStream; +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.implementation.QueuesImpl; +import com.azure.messaging.servicebus.implementation.ServiceBusManagementClientImpl; +import com.azure.messaging.servicebus.implementation.ServiceBusManagementSerializer; +import com.azure.messaging.servicebus.implementation.models.CreateQueueBody; +import com.azure.messaging.servicebus.implementation.models.CreateQueueBodyContent; +import com.azure.messaging.servicebus.implementation.models.QueueDescriptionFeed; +import com.azure.messaging.servicebus.implementation.models.ResponseLink; +import com.azure.messaging.servicebus.models.QueueDescription; +import com.azure.messaging.servicebus.models.QueueRuntimeInfo; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static com.azure.core.util.FluxUtil.monoError; +import static com.azure.core.util.FluxUtil.withContext; +import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; + +/** + * An asynchronous client for managing a Service Bus namespace. + * + * @see ServiceBusManagementClient ServiceBusManagementClient for a synchronous client. + */ +@ServiceClient(builder = ServiceBusManagementClientBuilder.class, isAsync = true) +public final class ServiceBusManagementAsyncClient { + // See https://docs.microsoft.com/azure/azure-resource-manager/management/azure-services-resource-providers + // for more information on Azure resource provider namespaces. + private static final String SERVICE_BUS_TRACING_NAMESPACE_VALUE = "Microsoft.ServiceBus"; + private static final String CONTENT_TYPE = "application/xml"; + + // Name of the entity type when listing queues. + private static final String QUEUES_ENTITY_TYPE = "queues"; + private static final int NUMBER_OF_ELEMENTS = 10; + + private final ServiceBusManagementClientImpl managementClient; + private final QueuesImpl queuesClient; + private final ClientLogger logger = new ClientLogger(ServiceBusManagementAsyncClient.class); + private final ServiceBusManagementSerializer serializer; + + /** + * Creates a new instance with the given management client and serializer. + * + * @param managementClient Client to make management calls. + * @param serializer Serializer to deserialize ATOM XML responses. + */ + ServiceBusManagementAsyncClient(ServiceBusManagementClientImpl managementClient, + ServiceBusManagementSerializer serializer) { + this.managementClient = Objects.requireNonNull(managementClient, "'managementClient' cannot be null."); + this.queuesClient = managementClient.getQueues(); + this.serializer = serializer; + } + + /** + * Creates a queue the {@link QueueDescription}. + * + * @param queue Information about the queue to create. + * + * @return A Mono that completes with information about the created queue. + * @throws NullPointerException if {@code queue} is null. + * @throws IllegalArgumentException if {@link QueueDescription#getName() queue.getName()} is null or an empty + * string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono createQueue(QueueDescription queue) { + return createQueueWithResponse(queue).map(Response::getValue); + } + + /** + * Creates a queue and returns the created queue in addition to the HTTP response. + * + * @param queue The queue to create. + * + * @return A Mono that returns the created queue in addition to the HTTP response. + * @throws NullPointerException if {@code queue} is null. + * @throws IllegalArgumentException if {@link QueueDescription#getName() queue.getName()} is null or an empty + * string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono> createQueueWithResponse(QueueDescription queue) { + return withContext(context -> createQueueWithResponse(queue, context)); + } + + /** + * Deletes a queue the matching {@code queueName}. + * + * @param queueName Name of queue to delete. + * + * @return A Mono that completes when the queue is deleted. + * @throws NullPointerException if {@code queueName} is null. + * @throws IllegalArgumentException if {@code queueName} is an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono deleteQueue(String queueName) { + return deleteQueueWithResponse(queueName).then(); + } + + /** + * Deletes a queue the matching {@code queueName} and returns the HTTP response. + * + * @param queueName Name of queue to delete. + * + * @return A Mono that completes when the queue is deleted and returns the HTTP response. + * @throws NullPointerException if {@code queueName} is null. + * @throws IllegalArgumentException if {@code queueName} is an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono> deleteQueueWithResponse(String queueName) { + return withContext(context -> deleteQueueWithResponse(queueName, context)); + } + + /** + * Gets information about the queue. + * + * @param queueName Name of queue to get information about. + * + * @return A Mono that completes with information about the queue. + * @throws NullPointerException if {@code queueName} is null or an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono getQueue(String queueName) { + return getQueueWithResponse(queueName).map(Response::getValue); + } + + /** + * Gets information about the queue along with its HTTP response. + * + * @param queueName Name of queue to get information about. + * + * @return A Mono that completes with information about the queue and the associated HTTP response. + * @throws NullPointerException if {@code queueName} is null or an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono> getQueueWithResponse(String queueName) { + return withContext(context -> getQueueWithResponse(queueName, context)); + } + + /** + * Gets runtime information about the queue. + * + * @param queueName Name of queue to get information about. + * + * @return A Mono that completes with runtime information about the queue. + * @throws NullPointerException if {@code queueName} is null or an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono getQueueRuntimeInfo(String queueName) { + return getQueueWithResponse(queueName).map(response -> new QueueRuntimeInfo(response.getValue())); + } + + /** + * Gets runtime information about the queue along with its HTTP response. + * + * @param queueName Name of queue to get information about. + * + * @return A Mono that completes with runtime information about the queue and the associated HTTP response. + * @throws NullPointerException if {@code queueName} is null or an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono> getQueueRuntimeInfoWithResponse(String queueName) { + return withContext(context -> getQueueRuntimeInfoWithResponse(queueName, context)); + } + + /** + * Fetches all the queues in the Service Bus namespace. + * + * @return A Flux of {@link QueueDescription queues} in the Service Bus namespace. + */ + @ServiceMethod(returns = ReturnType.COLLECTION) + public PagedFlux listQueues() { + return new PagedFlux<>( + () -> withContext(context -> listQueuesFirstPage(context)), + token -> withContext(context -> listQueuesNextPage(token, context))); + } + + /** + * Updates a queue with the given {@link QueueDescription}. The {@link QueueDescription} must be fully populated as + * all of the properties are replaced. + * + * The suggested flow is: + *
    + *
  1. {@link #getQueue(String) Get queue description.}
  2. + *
  3. Update the required elements.
  4. + *
  5. Pass the updated description into this method.
  6. + *
+ * + *

+ * There are a subset of properties that can be updated. They are: + *

    + *
  • {@link QueueDescription#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}
  • + *
  • {@link QueueDescription#setLockDuration(Duration) LockDuration}
  • + *
  • {@link QueueDescription#setDuplicateDetectionHistoryTimeWindow(Duration) DuplicateDetectionHistoryTimeWindow} + *
  • + *
  • {@link QueueDescription#setMaxDeliveryCount(Integer) MaxDeliveryCount}
  • + *
+ * + * @see Update Queue + * @param queue Information about the queue to update. + * + * @return A Mono that completes with information about the created queue. + * @throws NullPointerException if {@code queue} is null. + * @throws IllegalArgumentException if {@link QueueDescription#getName() queue.getName()} is null or an empty + * string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono updateQueue(QueueDescription queue) { + return updateQueueWithResponse(queue).map(Response::getValue); + } + + /** + * Updates a queue with the given {@link QueueDescription}. The {@link QueueDescription} must be fully populated as + * all of the properties are replaced. + * + * The suggested flow is: + *
    + *
  1. {@link #getQueue(String) Get queue description.}
  2. + *
  3. Update the required elements.
  4. + *
  5. Pass the updated description into this method.
  6. + *
+ * + *

+ * There are a subset of properties that can be updated. They are: + *

    + *
  • {@link QueueDescription#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}
  • + *
  • {@link QueueDescription#setLockDuration(Duration) LockDuration}
  • + *
  • {@link QueueDescription#setDuplicateDetectionHistoryTimeWindow(Duration) DuplicateDetectionHistoryTimeWindow} + *
  • + *
  • {@link QueueDescription#setMaxDeliveryCount(Integer) MaxDeliveryCount}
  • + *
+ * + * @see Update Queue + * @param queue The queue to create. + * + * @return A Mono that returns the updated queue in addition to the HTTP response. + * @throws NullPointerException if {@code queue} is null. + * @throws IllegalArgumentException if {@link QueueDescription#getName() queue.getName()} is null or an empty + * string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono> updateQueueWithResponse(QueueDescription queue) { + return withContext(context -> updateQueueWithResponse(queue, context)); + } + + /** + * Creates a queue with its context. + * + * @param queue Queue to create. + * @param context Context to pass into request. + * + * @return A Mono that completes with the created {@link QueueDescription}. + */ + Mono> createQueueWithResponse(QueueDescription queue, Context context) { + if (queue == null) { + return monoError(logger, new NullPointerException("'queue' cannot be null")); + } else if (queue.getName() == null || queue.getName().isEmpty()) { + return monoError(logger, new IllegalArgumentException("'queue.getName' cannot be null or empty.")); + } else if (context == null) { + return monoError(logger, new NullPointerException("'context' cannot be null.")); + } + + final CreateQueueBodyContent content = new CreateQueueBodyContent() + .setType(CONTENT_TYPE) + .setQueueDescription(queue); + final CreateQueueBody createEntity = new CreateQueueBody() + .setContent(content); + + final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE); + + try { + return queuesClient.putWithResponseAsync(queue.getName(), createEntity, null, withTracing) + .map(response -> deserializeQueue(response, queue.getName())); + } catch (RuntimeException ex) { + return monoError(logger, ex); + } + } + + /** + * Deletes a queue with its context. + * + * @param queueName Name of queue to delete. + * @param context Context to pass into request. + * + * @return A Mono that completes with the created {@link QueueDescription}. + */ + Mono> deleteQueueWithResponse(String queueName, Context context) { + if (queueName == null) { + return monoError(logger, new NullPointerException("'queueName' cannot be null")); + } else if (queueName.isEmpty()) { + return monoError(logger, new IllegalArgumentException("'queueName' cannot be empty.")); + } else if (context == null) { + return monoError(logger, new NullPointerException("'context' cannot be null.")); + } + + final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE); + + try { + return queuesClient.deleteWithResponseAsync(queueName, withTracing) + .map(response -> { + return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), + response.getHeaders(), null); + }); + } catch (RuntimeException ex) { + return monoError(logger, ex); + } + } + + /** + * Gets a queue with its context. + * + * @param queueName Name of queue to fetch information for. + * @param context Context to pass into request. + * + * @return A Mono that completes with the {@link QueueDescription}. + */ + Mono> getQueueRuntimeInfoWithResponse(String queueName, Context context) { + if (queueName == null) { + return monoError(logger, new NullPointerException("'queueName' cannot be null")); + } else if (queueName.isEmpty()) { + return monoError(logger, new IllegalArgumentException("'queueName' cannot be empty.")); + } else if (context == null) { + return monoError(logger, new NullPointerException("'context' cannot be null.")); + } + + final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE); + + try { + return queuesClient.getWithResponseAsync(queueName, true, withTracing) + .map(response -> { + final Response deserializeQueue = deserializeQueue(response, queueName); + final QueueRuntimeInfo runtimeInfo = deserializeQueue.getValue() != null + ? new QueueRuntimeInfo(deserializeQueue.getValue()) + : null; + + return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), + runtimeInfo); + }); + } catch (RuntimeException ex) { + return monoError(logger, ex); + } + } + + /** + * Gets a queue with its context. + * + * @param queueName Name of queue to fetch information for. + * @param context Context to pass into request. + * + * @return A Mono that completes with the {@link QueueDescription}. + */ + Mono> getQueueWithResponse(String queueName, Context context) { + if (queueName == null) { + return monoError(logger, new NullPointerException("'queueName' cannot be null")); + } else if (queueName.isEmpty()) { + return monoError(logger, new IllegalArgumentException("'queueName' cannot be empty.")); + } else if (context == null) { + return monoError(logger, new NullPointerException("'context' cannot be null.")); + } + + final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE); + + try { + return queuesClient.getWithResponseAsync(queueName, true, withTracing) + .map(response -> deserializeQueue(response, queueName)); + } catch (RuntimeException ex) { + return monoError(logger, ex); + } + } + + /** + * Gets the first page of queues with context. + * + * @param context Context to pass into request. + * + * @return A Mono that completes with a page of queues. + */ + Mono> listQueuesFirstPage(Context context) { + final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE); + + try { + return listQueues(0, NUMBER_OF_ELEMENTS, withTracing); + } catch (RuntimeException e) { + return monoError(logger, e); + } + } + + /** + * Gets the next page of queues with context. + * + * @param continuationToken Number of items to skip in feed. + * @param context Context to pass into request. + * + * @return A Mono that completes with a page of queues or empty if there are no items left. + */ + Mono> listQueuesNextPage(String continuationToken, Context context) { + if (continuationToken == null || continuationToken.isEmpty()) { + return Mono.empty(); + } + + try { + final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE); + final int skip = Integer.parseInt(continuationToken); + + return listQueues(skip, NUMBER_OF_ELEMENTS, withTracing); + } catch (RuntimeException e) { + return monoError(logger, e); + } + } + + /** + * Updates a queue with its context. + * + * @param queue Queue to update + * @param context Context to pass into request. + * + * @return A Mono that completes with the updated {@link QueueDescription}. + */ + Mono> updateQueueWithResponse(QueueDescription queue, Context context) { + if (queue == null) { + return monoError(logger, new NullPointerException("'queue' cannot be null")); + } else if (queue.getName() == null || queue.getName().isEmpty()) { + return monoError(logger, new IllegalArgumentException("'queue.getName' cannot be null or empty.")); + } else if (context == null) { + return monoError(logger, new NullPointerException("'context' cannot be null.")); + } + + final CreateQueueBodyContent content = new CreateQueueBodyContent() + .setType(CONTENT_TYPE) + .setQueueDescription(queue); + final CreateQueueBody createEntity = new CreateQueueBody() + .setContent(content); + final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE); + + try { + // If-Match == "*" to unconditionally update. This is in line with the existing client library behaviour. + return queuesClient.putWithResponseAsync(queue.getName(), createEntity, "*", withTracing) + .map(response -> deserializeQueue(response, queue.getName())); + } catch (RuntimeException ex) { + return monoError(logger, ex); + } + } + + /** + * Given an HTTP response, will deserialize it into a strongly typed Response object. + * + * @param response HTTP response to deserialize response body from. + * @param clazz Class to deserialize response type into. + * @param Class type to deserialize response into. + * + * @return A Response with a strongly typed response value. + */ + private Response deserialize(Response response, Class clazz) { + final Object body = response.getValue(); + if (body == null) { + return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null); + } + + final String contents = String.valueOf(body); + if (contents.isEmpty()) { + return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null); + } + + final T responseBody; + try { + responseBody = serializer.deserialize(contents, clazz); + } catch (IOException e) { + throw logger.logExceptionAsError(new RuntimeException(String.format( + "Exception while deserializing. Body: [%s]. Class: %s", contents, clazz), e)); + } + + if (responseBody == null) { + throw logger.logExceptionAsError(new IllegalArgumentException(String.format( + "'deserialize' should not be null. Body: [%s]. Class: [%s]", contents, clazz))); + } + + return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), + responseBody); + } + + /** + * Helper method that sets the convenience model properties on {@link QueueDescription}. + * + * @param queueName Name of the queue. + * @param response HTTP Response to deserialize. + * + * @return The corresponding HTTP response with convenience properties set. + */ + private Response deserializeQueue(Response response, String queueName) { + final Response queueDescription = deserialize(response, QueueDescription.class); + final QueueDescription value = queueDescription.getValue(); + + // This was an empty response (ie. 204). + if (value == null) { + return queueDescription; + } + + // The queue name is a property we artificially added to the REST model. + if (value.getName() == null || value.getName().isEmpty()) { + value.setName(queueName); + } + + return queueDescription; + } + + /** + * Creates a {@link FeedPage} given the elements and a set of response links to get the next link from. + * + * @param entities Entities in the feed. + * @param responseLinks Links returned from the feed. + * @param Type of Service Bus entities in page. + * + * @return A {@link FeedPage} indicating whether this can be continued or not. + * @throws MalformedURLException if the "next" page link does not contain a well-formed URL. + */ + private FeedPage extractPage(Response response, List entities, + List responseLinks) + throws MalformedURLException, UnsupportedEncodingException { + final Optional nextLink = responseLinks.stream() + .filter(link -> link.getRel().equalsIgnoreCase("next")) + .findFirst(); + + if (!nextLink.isPresent()) { + return new FeedPage<>(response.getStatusCode(), response.getHeaders(), response.getRequest(), entities); + } + + final URL url = new URL(nextLink.get().getHref()); + final String decode = URLDecoder.decode(url.getQuery(), StandardCharsets.UTF_8.name()); + final Optional skipParameter = Arrays.stream(decode.split("&|&")) + .map(part -> part.split("=", 2)) + .filter(parts -> parts[0].equalsIgnoreCase("$skip") && parts.length == 2) + .map(parts -> Integer.valueOf(parts[1])) + .findFirst(); + + if (skipParameter.isPresent()) { + return new FeedPage<>(response.getStatusCode(), response.getHeaders(), response.getRequest(), entities, + skipParameter.get()); + } else { + logger.warning("There should have been a skip parameter for the next page."); + return new FeedPage<>(response.getStatusCode(), response.getHeaders(), response.getRequest(), entities); + } + } + + /** + * Helper method that invokes the service method, extracts the data and translates it to a PagedResponse. + * + * @param skip Number of elements to skip. + * @param top Number of elements to fetch. + * @param context Context for the query. + * + * @return A Mono that completes with a paged response of queues. + */ + private Mono> listQueues(int skip, int top, Context context) { + return managementClient.listEntitiesWithResponseAsync(QUEUES_ENTITY_TYPE, skip, top, context) + .flatMap(response -> { + final Response feedResponse = deserialize(response, QueueDescriptionFeed.class); + final QueueDescriptionFeed feed = feedResponse.getValue(); + if (feed == null) { + logger.warning("Could not deserialize QueueDescriptionFeed. skip {}, top: {}", skip, top); + return Mono.empty(); + } + + final List entities = feed.getEntry().stream() + .filter(e -> e.getContent() != null && e.getContent().getQueueDescription() != null) + .map(e -> e.getContent().getQueueDescription()) + .collect(Collectors.toList()); + try { + return Mono.just(extractPage(feedResponse, entities, feed.getLink())); + } catch (MalformedURLException | UnsupportedEncodingException error) { + return Mono.error(new RuntimeException("Could not parse response into FeedPage", + error)); + } + }); + } + + /** + * A page of Service Bus entities. + * + * @param The entity description from Service Bus. + */ + private static final class FeedPage implements PagedResponse { + private final int statusCode; + private final HttpHeaders header; + private final HttpRequest request; + private final IterableStream entries; + private final String continuationToken; + + /** + * Creates a page that does not have any more pages. + * + * @param entries Items in the page. + */ + private FeedPage(int statusCode, HttpHeaders header, HttpRequest request, List entries) { + this.statusCode = statusCode; + this.header = header; + this.request = request; + this.entries = new IterableStream<>(entries); + this.continuationToken = null; + } + + /** + * Creates an instance that has additional pages to fetch. + * + * @param entries Items in the page. + * @param skip Number of elements to "skip". + */ + private FeedPage(int statusCode, HttpHeaders header, HttpRequest request, List entries, int skip) { + this.statusCode = statusCode; + this.header = header; + this.request = request; + this.entries = new IterableStream<>(entries); + this.continuationToken = String.valueOf(skip); + } + + @Override + public IterableStream getElements() { + return entries; + } + + @Override + public String getContinuationToken() { + return continuationToken; + } + + @Override + public int getStatusCode() { + return statusCode; + } + + @Override + public HttpHeaders getHeaders() { + return header; + } + + @Override + public HttpRequest getRequest() { + return request; + } + + @Override + public void close() { + } + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementClient.java new file mode 100644 index 000000000000..a6bab9077fc4 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementClient.java @@ -0,0 +1,243 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.annotation.ReturnType; +import com.azure.core.annotation.ServiceClient; +import com.azure.core.annotation.ServiceMethod; +import com.azure.core.http.rest.PagedFlux; +import com.azure.core.http.rest.PagedIterable; +import com.azure.core.http.rest.Response; +import com.azure.core.util.Context; +import com.azure.messaging.servicebus.models.QueueDescription; +import com.azure.messaging.servicebus.models.QueueRuntimeInfo; + +import java.time.Duration; +import java.util.Objects; + +/** + * A synchronous client for managing a Service Bus namespace. + * + * @see ServiceBusManagementAsyncClient ServiceBusManagementAsyncClient for an asynchronous client. + */ +@ServiceClient(builder = ServiceBusManagementClientBuilder.class) +public final class ServiceBusManagementClient { + private final ServiceBusManagementAsyncClient asyncClient; + + /** + * Creates a new instance with the given client. + * + * @param asyncClient Asynchronous client to perform management calls through. + */ + ServiceBusManagementClient(ServiceBusManagementAsyncClient asyncClient) { + this.asyncClient = Objects.requireNonNull(asyncClient, "'asyncClient' cannot be null."); + } + + /** + * Creates a queue the {@link QueueDescription}. + * + * @param queue Information about the queue to create. + * + * @return The created queue. + * @throws NullPointerException if {@code queue} is null. + * @throws IllegalArgumentException if {@link QueueDescription#getName() queue.getName()} is null or an empty + * string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public QueueDescription createQueue(QueueDescription queue) { + return asyncClient.createQueue(queue).block(); + } + + /** + * Creates a queue and returns the created queue in addition to the HTTP response. + * + * @param queue The queue to create. + * @param context Additional context that is passed through the HTTP pipeline during the service call. + * + * @return The created queue in addition to the HTTP response. + * @throws NullPointerException if {@code queue} is null. + * @throws IllegalArgumentException if {@link QueueDescription#getName() queue.getName()} is null or an empty + * string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Response createQueueWithResponse(QueueDescription queue, Context context) { + return asyncClient.createQueueWithResponse(queue, context).block(); + } + + /** + * Deletes a queue the matching {@code queueName}. + * + * @param queueName Name of queue to delete. + * + * @throws NullPointerException if {@code queueName} is null. + * @throws IllegalArgumentException if {@code queueName} is an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public void deleteQueue(String queueName) { + asyncClient.deleteQueue(queueName).block(); + } + + /** + * Deletes a queue the matching {@code queueName} and returns the HTTP response. + * + * @param queueName Name of queue to delete. + * @param context Additional context that is passed through the HTTP pipeline during the service call. + * + * @return The HTTP response when the queue is successfully deleted. + * @throws NullPointerException if {@code queueName} is null. + * @throws IllegalArgumentException if {@code queueName} is an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Response deleteQueueWithResponse(String queueName, Context context) { + return asyncClient.deleteQueueWithResponse(queueName, context).block(); + } + + /** + * Gets information about the queue. + * + * @param queueName Name of queue to get information about. + * + * @return Information about the queue. + * @throws NullPointerException if {@code queueName} is null or an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public QueueDescription getQueue(String queueName) { + return asyncClient.getQueue(queueName).block(); + } + + /** + * Gets information about the queue along with its HTTP response. + * + * @param queueName Name of queue to get information about. + * @param context Additional context that is passed through the HTTP pipeline during the service call. + * + * @return Information about the queue and the associated HTTP response. + * @throws NullPointerException if {@code queueName} is null or an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Response getQueueWithResponse(String queueName, Context context) { + return asyncClient.getQueueWithResponse(queueName, context).block(); + } + + /** + * Gets runtime information about the queue. + * + * @param queueName Name of queue to get information about. + * + * @return Runtime information about the queue. + * @throws NullPointerException if {@code queueName} is null or an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public QueueRuntimeInfo getQueueRuntimeInfo(String queueName) { + return asyncClient.getQueueRuntimeInfo(queueName).block(); + } + + /** + * Gets runtime information about the queue along with its HTTP response. + * + * @param queueName Name of queue to get information about. + * @param context Additional context that is passed through the HTTP pipeline during the service call. + * + * @return Runtime information about the queue and the associated HTTP response. + * @throws NullPointerException if {@code queueName} is null or an empty string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Response getQueueRuntimeInfoWithResponse(String queueName, Context context) { + return asyncClient.getQueueRuntimeInfoWithResponse(queueName, context).block(); + } + + /** + * Fetches all the queues in the Service Bus namespace. + * + * @return A PagedIterable of {@link QueueDescription queues} in the Service Bus namespace. + */ + @ServiceMethod(returns = ReturnType.COLLECTION) + public PagedIterable listQueues() { + return new PagedIterable<>(asyncClient.listQueues()); + } + + /** + * Fetches all the queues in the Service Bus namespace. + * + * @param context Additional context that is passed through the HTTP pipeline during the service call. + * @return A PagedIterable of {@link QueueDescription queues} in the Service Bus namespace. + */ + @ServiceMethod(returns = ReturnType.COLLECTION) + public PagedIterable listQueues(Context context) { + final PagedFlux pagedFlux = new PagedFlux<>( + () -> asyncClient.listQueuesFirstPage(context), + continuationToken -> asyncClient.listQueuesNextPage(continuationToken, context)); + + return new PagedIterable<>(pagedFlux); + } + + /** + * Updates a queue with the given {@link QueueDescription}. The {@link QueueDescription} must be fully populated as + * all of the properties are replaced. + * + * The suggested flow is: + *
    + *
  1. {@link #getQueue(String) Get queue description.}
  2. + *
  3. Update the required elements.
  4. + *
  5. Pass the updated description into this method.
  6. + *
+ * + *

+ * There are a subset of properties that can be updated. They are: + *

    + *
  • {@link QueueDescription#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}
  • + *
  • {@link QueueDescription#setLockDuration(Duration) LockDuration}
  • + *
  • {@link QueueDescription#setDuplicateDetectionHistoryTimeWindow(Duration) DuplicateDetectionHistoryTimeWindow} + *
  • + *
  • {@link QueueDescription#setMaxDeliveryCount(Integer) MaxDeliveryCount}
  • + *
+ * + * @see Update Queue + * @param queue Information about the queue to update. + * + * @return The updated queue. + * @throws NullPointerException if {@code queue} is null. + * @throws IllegalArgumentException if {@link QueueDescription#getName() queue.getName()} is null or an empty + * string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public QueueDescription updateQueue(QueueDescription queue) { + return asyncClient.updateQueue(queue).block(); + } + + /** + * Updates a queue with the given {@link QueueDescription}. The {@link QueueDescription} must be fully populated as + * all of the properties are replaced. + * + * The suggested flow is: + *
    + *
  1. {@link #getQueue(String) Get queue description.}
  2. + *
  3. Update the required elements.
  4. + *
  5. Pass the updated description into this method.
  6. + *
+ * + *

+ * There are a subset of properties that can be updated. They are: + *

    + *
  • {@link QueueDescription#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}
  • + *
  • {@link QueueDescription#setLockDuration(Duration) LockDuration}
  • + *
  • {@link QueueDescription#setDuplicateDetectionHistoryTimeWindow(Duration) DuplicateDetectionHistoryTimeWindow} + *
  • + *
  • {@link QueueDescription#setMaxDeliveryCount(Integer) MaxDeliveryCount}
  • + *
+ * + * @see Update Queue + * @param queue The queue to update. + * @param context Additional context that is passed through the HTTP pipeline during the service call. + * + * @return The updated queue with its HTTP response. + * @throws NullPointerException if {@code queue} is null. + * @throws IllegalArgumentException if {@link QueueDescription#getName() queue.getName()} is null or an empty + * string. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Response updateQueueWithResponse(QueueDescription queue, Context context) { + return asyncClient.updateQueueWithResponse(queue, context).block(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementClientBuilder.java new file mode 100644 index 000000000000..bd78ca662631 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusManagementClientBuilder.java @@ -0,0 +1,324 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.amqp.implementation.ConnectionStringProperties; +import com.azure.core.annotation.ServiceClientBuilder; +import com.azure.core.credential.TokenCredential; +import com.azure.core.exception.AzureException; +import com.azure.core.http.HttpClient; +import com.azure.core.http.HttpPipeline; +import com.azure.core.http.HttpPipelineBuilder; +import com.azure.core.http.policy.AddHeadersFromContextPolicy; +import com.azure.core.http.policy.HttpLogDetailLevel; +import com.azure.core.http.policy.HttpLogOptions; +import com.azure.core.http.policy.HttpLoggingPolicy; +import com.azure.core.http.policy.HttpPipelinePolicy; +import com.azure.core.http.policy.HttpPolicyProviders; +import com.azure.core.http.policy.RetryPolicy; +import com.azure.core.http.policy.UserAgentPolicy; +import com.azure.core.util.Configuration; +import com.azure.core.util.CoreUtils; +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.implementation.ServiceBusConstants; +import com.azure.messaging.servicebus.implementation.ServiceBusManagementClientImpl; +import com.azure.messaging.servicebus.implementation.ServiceBusManagementClientImplBuilder; +import com.azure.messaging.servicebus.implementation.ServiceBusManagementSerializer; +import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential; +import com.azure.messaging.servicebus.implementation.ServiceBusTokenCredentialHttpPolicy; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * This class provides a fluent builder API to help aid the configuration and instantiation of {@link + * ServiceBusManagementClient} and {@link ServiceBusManagementAsyncClient}. Call {@link #buildClient() buildClient} and + * {@link #buildAsyncClient() buildAsyncClient} respectively to construct an instance of the desired client. + * + * @see ServiceBusManagementClient + * @see ServiceBusManagementAsyncClient + */ +@ServiceClientBuilder(serviceClients = {ServiceBusManagementClient.class, ServiceBusManagementAsyncClient.class}) +public class ServiceBusManagementClientBuilder { + // We only allow one service version because the ATOM API was brought forward for legacy reasons. + private static final String API_VERSION = "2017-04"; + private final ClientLogger logger = new ClientLogger(ServiceBusManagementClientBuilder.class); + private final ServiceBusManagementSerializer serializer = new ServiceBusManagementSerializer(); + private final List userPolicies = new ArrayList<>(); + private final Map properties = + CoreUtils.getProperties("azure-messaging-servicebus.properties"); + + private Configuration configuration; + + // Endpoint of the Service Bus resource. It will be the fully-qualified domain name of the Service Bus namespace. + private String endpoint; + private HttpClient httpClient; + private HttpLogOptions httpLogOptions = new HttpLogOptions(); + private HttpPipeline pipeline; + private HttpPipelinePolicy retryPolicy; + private TokenCredential tokenCredential; + + /** + * Constructs a builder with the default parameters. + */ + public ServiceBusManagementClientBuilder() { + } + + /** + * Creates a {@link ServiceBusManagementAsyncClient} based on options set in the builder. Every time {@code + * buildAsyncClient} is invoked, a new instance of the client is created. + * + *

If {@link #pipeline(HttpPipeline) pipeline} is set, then the {@code pipeline} and + * {@link #endpoint(String) endpoint} are used to create the {@link ServiceBusManagementAsyncClient client}. All + * other builder settings are ignored.

+ * + * @return A {@link ServiceBusManagementAsyncClient} with the options set in the builder. + * @throws NullPointerException if {@code endpoint} has not been set. This is automatically set when {@link + * #connectionString(String) connectionString} is set. Or, explicitly through {@link #endpoint(String)}. + * @throws IllegalStateException If {@link #connectionString(String) connectionString} has not been set. + */ + public ServiceBusManagementAsyncClient buildAsyncClient() { + if (endpoint == null) { + throw logger.logExceptionAsError(new NullPointerException("'endpoint' cannot be null.")); + } + + final HttpPipeline httpPipeline = createPipeline(); + final ServiceBusManagementClientImpl client = new ServiceBusManagementClientImplBuilder() + .pipeline(httpPipeline) + .serializer(serializer) + .endpoint(endpoint) + .apiVersion(API_VERSION) + .buildClient(); + + return new ServiceBusManagementAsyncClient(client, serializer); + } + + /** + * Creates a {@link ServiceBusManagementClient} based on options set in the builder. Every time {@code + * buildAsyncClient} is invoked, a new instance of the client is created. + * + *

If {@link #pipeline(HttpPipeline) pipeline} is set, then the {@code pipeline} and + * {@link #endpoint(String) endpoint} are used to create the {@link ServiceBusManagementClient client}. All other + * builder settings are ignored.

+ * + * @return A {@link ServiceBusManagementClient} with the options set in the builder. + * @throws NullPointerException if {@code endpoint} has not been set. This is automatically set when {@link + * #connectionString(String) connectionString} is set. Or it can be set explicitly through {@link + * #endpoint(String)}. + * @throws IllegalStateException If {@link #connectionString(String) connectionString} has not been set. + */ + public ServiceBusManagementClient buildClient() { + return new ServiceBusManagementClient(buildAsyncClient()); + } + + /** + * Adds a policy to the set of existing policies that are executed after required policies. + * + * @param policy The retry policy for service requests. + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + * @throws NullPointerException If {@code policy} is {@code null}. + */ + public ServiceBusManagementClientBuilder addPolicy(HttpPipelinePolicy policy) { + Objects.requireNonNull(policy); + userPolicies.add(policy); + return this; + } + + /** + * Sets the service endpoint for the Service Bus namespace. + * + * @param endpoint The URL of the Service Bus namespace. + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + * @throws NullPointerException if {@code endpoint} is null. + * @throws IllegalArgumentException if {@code endpoint} cannot be parsed into a valid URL. + */ + public ServiceBusManagementClientBuilder endpoint(String endpoint) { + final URL url; + try { + url = new URL(Objects.requireNonNull(endpoint, "'endpoint' cannot be null.")); + } catch (MalformedURLException ex) { + throw logger.logExceptionAsWarning(new IllegalArgumentException("'endpoint' must be a valid URL")); + } + + this.endpoint = url.getHost(); + return this; + } + + /** + * Sets the configuration store that is used during construction of the service client. + * + * The default configuration store is a clone of the {@link Configuration#getGlobalConfiguration() global + * configuration store}, use {@link Configuration#NONE} to bypass using configuration settings during construction. + * + * @param configuration The configuration store used to + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + */ + public ServiceBusManagementClientBuilder configuration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + /** + * Sets the connection string for a Service Bus namespace or a specific Service Bus resource. + * + * @param connectionString Connection string for a Service Bus namespace or a specific Service Bus resource. + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + * @throws NullPointerException If {@code connectionString} is {@code null}. + * @throws IllegalArgumentException If {@code connectionString} is an entity specific connection string, and not + * a {@code connectionString} for the Service Bus namespace. + */ + public ServiceBusManagementClientBuilder connectionString(String connectionString) { + Objects.requireNonNull(connectionString, "'connectionString' cannot be null."); + + final ConnectionStringProperties properties = new ConnectionStringProperties(connectionString); + final TokenCredential tokenCredential; + try { + tokenCredential = new ServiceBusSharedKeyCredential(properties.getSharedAccessKeyName(), + properties.getSharedAccessKey(), ServiceBusConstants.TOKEN_VALIDITY); + } catch (Exception e) { + throw logger.logExceptionAsError( + new AzureException("Could not create the ServiceBusSharedKeyCredential.", e)); + } + + this.endpoint = properties.getEndpoint().getHost(); + if (properties.getEntityPath() != null && !properties.getEntityPath().isEmpty()) { + throw logger.logExceptionAsError(new IllegalArgumentException( + "'connectionString' cannot contain an EntityPath. It should be a namespace connection string.")); + } + + return credential(properties.getEndpoint().getHost(), tokenCredential); + } + + /** + * Sets the credential used to authenticate HTTP requests to the Service Bus namespace. + * + * @param fullyQualifiedNamespace for the Service Bus. + * @param credential {@link TokenCredential} to be used for authentication. + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + */ + public ServiceBusManagementClientBuilder credential(String fullyQualifiedNamespace, TokenCredential credential) { + this.endpoint = Objects.requireNonNull(fullyQualifiedNamespace, + "'fullyQualifiedNamespace' cannot be null."); + this.tokenCredential = Objects.requireNonNull(credential, "'credential' cannot be null."); + + if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) { + throw logger.logExceptionAsError( + new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string.")); + } + + return this; + } + + /** + * Sets the HTTP client to use for sending and receiving requests to and from the service. + * + * @param client The HTTP client to use for requests. + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + */ + public ServiceBusManagementClientBuilder httpClient(HttpClient client) { + if (this.httpClient != null && client == null) { + logger.info("HttpClient is being set to 'null' when it was previously configured."); + } + + this.httpClient = client; + return this; + } + + /** + * Sets the logging configuration for HTTP requests and responses. + * + *

If logLevel is not provided, default value of {@link HttpLogDetailLevel#NONE} is set.

+ * + * @param logOptions The logging configuration to use when sending and receiving HTTP requests/responses. + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + */ + public ServiceBusManagementClientBuilder httpLogOptions(HttpLogOptions logOptions) { + httpLogOptions = logOptions; + return this; + } + + /** + * Sets the HTTP pipeline to use for the service client. + * + * If {@code pipeline} is set, all other settings are ignored, aside from {@link + * ServiceBusManagementClientBuilder#endpoint(String) endpoint} to build {@link ServiceBusManagementClient} or + * {@link ServiceBusManagementAsyncClient}. + * + * @param pipeline The HTTP pipeline to use for sending service requests and receiving responses. + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + */ + public ServiceBusManagementClientBuilder pipeline(HttpPipeline pipeline) { + if (this.pipeline != null && pipeline == null) { + logger.info("HttpPipeline is being set to 'null' when it was previously configured."); + } + + this.pipeline = pipeline; + return this; + } + + /** + * Sets the {@link HttpPipelinePolicy} that is used when each request is sent. + * + * The default retry policy will be used if not provided {@link #buildAsyncClient()} + * to build {@link ServiceBusManagementClient} or {@link ServiceBusManagementAsyncClient}. + * + * @param retryPolicy user's retry policy applied to each request. + * + * @return The updated {@link ServiceBusManagementClientBuilder} object. + */ + public ServiceBusManagementClientBuilder retryPolicy(HttpPipelinePolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + /** + * Builds a new HTTP pipeline if none is set, or returns a user-provided one. + * + * @return A new HTTP pipeline or the user-defined one from {@link #pipeline(HttpPipeline)}. + */ + private HttpPipeline createPipeline() { + if (pipeline != null) { + return pipeline; + } + + final Configuration buildConfiguration = configuration == null + ? Configuration.getGlobalConfiguration().clone() + : configuration; + + // Closest to API goes first, closest to wire goes last. + final List httpPolicies = new ArrayList<>(); + final String clientName = properties.getOrDefault("name", "UnknownName"); + final String clientVersion = properties.getOrDefault("version", "UnknownVersion"); + + httpPolicies.add(new UserAgentPolicy(httpLogOptions.getApplicationId(), clientName, clientVersion, + buildConfiguration)); + httpPolicies.add(new ServiceBusTokenCredentialHttpPolicy(tokenCredential)); + httpPolicies.add(new AddHeadersFromContextPolicy()); + + HttpPolicyProviders.addBeforeRetryPolicies(httpPolicies); + + httpPolicies.add(retryPolicy == null ? new RetryPolicy() : retryPolicy); + httpPolicies.addAll(userPolicies); + httpPolicies.add(new HttpLoggingPolicy(httpLogOptions)); + + HttpPolicyProviders.addAfterRetryPolicies(httpPolicies); + + return new HttpPipelineBuilder() + .policies(httpPolicies.toArray(new HttpPipelinePolicy[0])) + .httpClient(httpClient) + .build(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementSerializer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementSerializer.java index 057daacd22f6..4b566f842723 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementSerializer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementSerializer.java @@ -8,6 +8,7 @@ import com.azure.core.util.serializer.JacksonAdapter; import com.azure.core.util.serializer.SerializerAdapter; import com.azure.core.util.serializer.SerializerEncoding; +import com.azure.messaging.servicebus.implementation.models.ServiceBusManagementError; import java.io.IOException; import java.lang.reflect.Type; @@ -41,11 +42,16 @@ public T deserialize(String value, Type type) throws IOException { @Override @SuppressWarnings("unchecked") public T deserialize(String value, Type type, SerializerEncoding encoding) throws IOException { - if (encoding == SerializerEncoding.XML) { - return (T) value; + if (encoding != SerializerEncoding.XML) { + return jacksonAdapter.deserialize(value, type, encoding); } - return jacksonAdapter.deserialize(value, type, encoding); + if (ServiceBusManagementError.class == type) { + final ServiceBusManagementError error = deserialize(value, type); + return (T) error; + } + + return (T) value; } @Override diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusTokenCredentialHttpPolicy.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusTokenCredentialHttpPolicy.java index f8a7385b1753..d644bd136e19 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusTokenCredentialHttpPolicy.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusTokenCredentialHttpPolicy.java @@ -17,10 +17,23 @@ public class ServiceBusTokenCredentialHttpPolicy implements HttpPipelinePolicy { private final TokenCredential tokenCredential; - ServiceBusTokenCredentialHttpPolicy(TokenCredential tokenCredential) { + /** + * Creates an instance that authorizes with the tokenCredential. + * + * @param tokenCredential Credential to get access token. + */ + public ServiceBusTokenCredentialHttpPolicy(TokenCredential tokenCredential) { this.tokenCredential = tokenCredential; } + /** + * Adds the authorization header to a Service Bus management request. + * + * @param context HTTP request context. + * @param next The next HTTP policy in the pipeline. + * + * @return A mono that completes with the HTTP response. + */ @Override public Mono process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) { final String url = context.getHttpRequest().getUrl().toString(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/AuthorizationRule.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/AuthorizationRule.java index f90ebe7f044b..19778421c7ed 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/AuthorizationRule.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/AuthorizationRule.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; + import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; @@ -19,15 +20,17 @@ @Fluent public final class AuthorizationRule { /* - * The Type property. + * The type property. */ - @JacksonXmlProperty(localName = "Type", namespace = "http://www.w3.org/2001/XMLSchema-instance") + @JacksonXmlProperty(localName = "type", isAttribute = true) private String type; /* * The ClaimType property. */ - @JacksonXmlProperty(localName = "ClaimType", isAttribute = true) + @JacksonXmlProperty( + localName = "ClaimType", + namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect") private String claimType; /* @@ -49,7 +52,7 @@ private RightsWrapper(@JacksonXmlProperty(localName = "AccessRights") List getRights() { } /** - * Set the rights property: Access rights of the entity. + * Set the rights property: Access rights of the entity. Values are 'Send', 'Listen', or 'Manage'. * * @param rights the rights value to set. * @return the AuthorizationRule object itself. @@ -180,7 +183,7 @@ public AuthorizationRule setRights(List rights) { } /** - * Get the createdTime property: The CreatedTime property. + * Get the createdTime property: The date and time when the authorization rule was created. * * @return the createdTime value. */ @@ -189,7 +192,7 @@ public OffsetDateTime getCreatedTime() { } /** - * Set the createdTime property: The CreatedTime property. + * Set the createdTime property: The date and time when the authorization rule was created. * * @param createdTime the createdTime value to set. * @return the AuthorizationRule object itself. @@ -200,7 +203,7 @@ public AuthorizationRule setCreatedTime(OffsetDateTime createdTime) { } /** - * Get the modifiedTime property: The ModifiedTime property. + * Get the modifiedTime property: The date and time when the authorization rule was modified. * * @return the modifiedTime value. */ @@ -209,7 +212,7 @@ public OffsetDateTime getModifiedTime() { } /** - * Set the modifiedTime property: The ModifiedTime property. + * Set the modifiedTime property: The date and time when the authorization rule was modified. * * @param modifiedTime the modifiedTime value to set. * @return the AuthorizationRule object itself. @@ -220,7 +223,7 @@ public AuthorizationRule setModifiedTime(OffsetDateTime modifiedTime) { } /** - * Get the keyName property: The KeyName property. + * Get the keyName property: The authorization rule key name. * * @return the keyName value. */ @@ -229,7 +232,7 @@ public String getKeyName() { } /** - * Set the keyName property: The KeyName property. + * Set the keyName property: The authorization rule key name. * * @param keyName the keyName value to set. * @return the AuthorizationRule object itself. @@ -240,7 +243,7 @@ public AuthorizationRule setKeyName(String keyName) { } /** - * Get the primaryKey property: The PrimaryKey property. + * Get the primaryKey property: The primary key of the authorization rule. * * @return the primaryKey value. */ @@ -249,7 +252,7 @@ public String getPrimaryKey() { } /** - * Set the primaryKey property: The PrimaryKey property. + * Set the primaryKey property: The primary key of the authorization rule. * * @param primaryKey the primaryKey value to set. * @return the AuthorizationRule object itself. @@ -260,7 +263,7 @@ public AuthorizationRule setPrimaryKey(String primaryKey) { } /** - * Get the secondaryKey property: The SecondaryKey property. + * Get the secondaryKey property: The primary key of the authorization rule. * * @return the secondaryKey value. */ @@ -269,7 +272,7 @@ public String getSecondaryKey() { } /** - * Set the secondaryKey property: The SecondaryKey property. + * Set the secondaryKey property: The primary key of the authorization rule. * * @param secondaryKey the secondaryKey value to set. * @return the AuthorizationRule object itself. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/QueueRuntimeInfo.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/QueueRuntimeInfo.java new file mode 100644 index 000000000000..eb25bae0b314 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/QueueRuntimeInfo.java @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus.models; + +import com.azure.core.annotation.Immutable; + +import java.time.Instant; +import java.util.Objects; + +/** + * Runtime information about the queue. + */ +@Immutable +public class QueueRuntimeInfo { + private final String name; + private final MessageCountDetails details; + private final long messageCount; + private final long sizeInBytes; + private final Instant accessAt; + private final Instant createdAt; + private final Instant updatedAt; + + /** + * Creates a new instance with runtime properties extracted from the given QueueDescription. + * + * @param queueDescription Queue description to extract runtime information from. + * + * @throws NullPointerException if {@code queueDescription} is null. + */ + public QueueRuntimeInfo(QueueDescription queueDescription) { + Objects.requireNonNull(queueDescription, "'queueDescription' cannot be null."); + this.name = queueDescription.getName(); + this.details = queueDescription.getMessageCountDetails(); + this.messageCount = queueDescription.getMessageCount(); + this.sizeInBytes = queueDescription.getSizeInBytes(); + this.accessAt = queueDescription.getAccessedAt().toInstant(); + this.createdAt = queueDescription.getCreatedAt().toInstant(); + this.updatedAt = queueDescription.getUpdatedAt().toInstant(); + } + + /** + * Gets the last time a message was sent, or the last time there was a receive request to this queue. + * + * @return The last time a message was sent, or the last time there was a receive request to this queue. + */ + public Instant getAccessAt() { + return accessAt; + } + + /** + * Gets the exact time the queue was created. + * + * @return The exact time the queue was created. + */ + public Instant getCreatedAt() { + return createdAt; + } + + /** + * Gets details about the message counts in queue. + * + * @return Details about the message counts in queue. + */ + public MessageCountDetails getDetails() { + return details; + } + + /** + * Gets the number of messages in the queue. + * + * @return The number of messages in the queue. + */ + public long getMessageCount() { + return messageCount; + } + + /** + * Gets the name of the queue. + * + * @return The name of the queue. + */ + public String getName() { + return name; + } + + /** + * Gets the size of the queue, in bytes. + * + * @return The size of the queue, in bytes. + */ + public long getSizeInBytes() { + return sizeInBytes; + } + + /** + * Gets the exact time a message was updated in the queue. + * + * @return The exact time a message was updated in the queue. + */ + public Instant getUpdatedAt() { + return updatedAt; + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties b/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties index 0d59bb6019eb..0600b337ca0e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties @@ -1,3 +1,5 @@ +name=${project.artifactId} +version=${project.version} CLASS_NOT_A_SUPPORTED_TYPE=Class '%s' is not a supported deserializable type. MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not setting body contents. REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClientIntegrationTest.java new file mode 100644 index 000000000000..bd467d8d5f14 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClientIntegrationTest.java @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.http.policy.HttpLogDetailLevel; +import com.azure.core.http.policy.HttpLogOptions; +import com.azure.core.test.TestBase; +import com.azure.core.test.TestMode; +import com.azure.messaging.servicebus.implementation.models.ServiceBusManagementErrorException; +import com.azure.messaging.servicebus.models.QueueDescription; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ServiceBusManagementAsyncClientIntegrationTest extends TestBase { + private ServiceBusManagementAsyncClient client; + + static void beforeAll() { + StepVerifier.setDefaultTimeout(Duration.ofSeconds(30)); + } + + @AfterAll + static void afterAll() { + StepVerifier.resetDefaultTimeout(); + } + + @Override + protected void beforeTest() { + Assumptions.assumeTrue(getTestMode() != TestMode.PLAYBACK, + "Current record/playback does not support persisting XML calls."); + + client = new ServiceBusManagementClientBuilder() + .connectionString(TestUtils.getConnectionString()) + .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS)) + .buildAsyncClient(); + } + + @Test + void getQueue() { + // Arrange + String queueName = TestUtils.getQueueName(); + + // Act & Assert + StepVerifier.create(client.getQueue(queueName)) + .assertNext(queueDescription -> { + assertEquals(queueName, queueDescription.getName()); + }) + .verifyComplete(); + } + + @Test + void createQueueExistingName() { + // Arrange + String queueName = TestUtils.getQueueName(); + QueueDescription queueDescription = new QueueDescription().setName(queueName); + + // Act & Assert + StepVerifier.create(client.createQueue(queueDescription)) + .consumeErrorWith(error -> { + assertTrue(error instanceof ServiceBusManagementErrorException); + }) + .verify(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClientTest.java new file mode 100644 index 000000000000..94c687860300 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementAsyncClientTest.java @@ -0,0 +1,383 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpMethod; +import com.azure.core.http.HttpRequest; +import com.azure.core.http.rest.Response; +import com.azure.core.util.Context; +import com.azure.messaging.servicebus.implementation.QueuesImpl; +import com.azure.messaging.servicebus.implementation.ServiceBusManagementClientImpl; +import com.azure.messaging.servicebus.implementation.ServiceBusManagementSerializer; +import com.azure.messaging.servicebus.implementation.models.CreateQueueBody; +import com.azure.messaging.servicebus.implementation.models.CreateQueueBodyContent; +import com.azure.messaging.servicebus.implementation.models.QueueDescriptionEntry; +import com.azure.messaging.servicebus.implementation.models.QueueDescriptionEntryContent; +import com.azure.messaging.servicebus.implementation.models.QueueDescriptionFeed; +import com.azure.messaging.servicebus.implementation.models.ResponseLink; +import com.azure.messaging.servicebus.models.QueueDescription; +import com.azure.messaging.servicebus.models.QueueRuntimeInfo; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ServiceBusManagementAsyncClient}. + */ +class ServiceBusManagementAsyncClientTest { + @Mock + private ServiceBusManagementClientImpl serviceClient; + @Mock + private QueuesImpl queuesClient; + @Mock + private ServiceBusManagementSerializer serializer; + @Mock + private Response objectResponse; + @Mock + private Response secondObjectResponse; + + private final String queueName = "some-queue"; + private final String responseString = "some-xml-response-string"; + private final String secondResponseString = "second-xml-response"; + private final HttpHeaders httpHeaders = new HttpHeaders().put("foo", "baz"); + private final HttpRequest httpRequest; + + private ServiceBusManagementAsyncClient client; + + ServiceBusManagementAsyncClientTest() { + try { + httpRequest = new HttpRequest(HttpMethod.TRACE, new URL("https://something.com")); + } catch (MalformedURLException e) { + throw new RuntimeException("Could not form URL.", e); + } + } + + @BeforeAll + static void beforeAll() { + StepVerifier.setDefaultTimeout(Duration.ofSeconds(5)); + } + + @AfterAll + static void afterAll() { + StepVerifier.resetDefaultTimeout(); + } + + @BeforeEach + void beforeEach() { + MockitoAnnotations.initMocks(this); + + when(objectResponse.getValue()).thenReturn(responseString); + int statusCode = 202; + when(objectResponse.getStatusCode()).thenReturn(statusCode); + when(objectResponse.getHeaders()).thenReturn(httpHeaders); + when(objectResponse.getRequest()).thenReturn(httpRequest); + + when(secondObjectResponse.getValue()).thenReturn(secondResponseString); + when(secondObjectResponse.getStatusCode()).thenReturn(430); + when(secondObjectResponse.getHeaders()).thenReturn(httpHeaders); + when(secondObjectResponse.getRequest()).thenReturn(httpRequest); + + when(serviceClient.getQueues()).thenReturn(queuesClient); + + client = new ServiceBusManagementAsyncClient(serviceClient, serializer); + } + + @AfterEach + void afterEach() { + Mockito.framework().clearInlineMocks(); + } + + @Test + void createQueue() throws IOException { + // Arrange + final QueueDescription description = new QueueDescription().setName(queueName); + final QueueDescription expected = new QueueDescription().setName("some-new-name"); + + when(queuesClient.putWithResponseAsync(eq(queueName), + argThat(arg -> createBodyContentEquals(arg, description)), isNull(), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + when(serializer.deserialize(responseString, QueueDescription.class)).thenReturn(expected); + + // Act & Assert + StepVerifier.create(client.createQueue(description)) + .expectNext(expected) + .verifyComplete(); + } + + @Test + void createQueueWithResponse() throws IOException { + // Arrange + final QueueDescription description = new QueueDescription().setName(queueName); + final QueueDescription expected = new QueueDescription().setName("some-new-name"); + + when(queuesClient.putWithResponseAsync(eq(queueName), + argThat(arg -> createBodyContentEquals(arg, description)), isNull(), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + when(serializer.deserialize(responseString, QueueDescription.class)).thenReturn(expected); + + // Act & Assert + StepVerifier.create(client.createQueueWithResponse(description)) + .assertNext(response -> { + assertResponse(objectResponse, response); + assertEquals(expected, response.getValue()); + }) + .verifyComplete(); + } + + @Test + void deleteQueue() { + // Arrange + when(queuesClient.deleteWithResponseAsync(eq(queueName), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + // Act & Assert + StepVerifier.create(client.deleteQueue(queueName)) + .verifyComplete(); + } + + @Test + void deleteQueueWithResponse() { + // Arrange + when(queuesClient.deleteWithResponseAsync(eq(queueName), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + // Act & Assert + StepVerifier.create(client.deleteQueueWithResponse(queueName)) + .assertNext(response -> assertResponse(objectResponse, response)) + .verifyComplete(); + } + + @Test + void getQueue() throws IOException { + // Arrange + final QueueDescription expected = new QueueDescription().setName(queueName); + + when(queuesClient.getWithResponseAsync(eq(queueName), eq(true), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + when(serializer.deserialize(responseString, QueueDescription.class)).thenReturn(expected); + + // Act & Assert + StepVerifier.create(client.getQueue(queueName)) + .expectNext(expected) + .verifyComplete(); + } + + @Test + void getQueueWithResponse() throws IOException { + // Arrange + final QueueDescription expected = new QueueDescription().setName(queueName); + + when(queuesClient.getWithResponseAsync(eq(queueName), eq(true), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + when(serializer.deserialize(responseString, QueueDescription.class)).thenReturn(expected); + + // Act & Assert + StepVerifier.create(client.getQueueWithResponse(queueName)) + .assertNext(response -> { + assertResponse(objectResponse, response); + assertEquals(expected, response.getValue()); + }) + .verifyComplete(); + } + + @Test + void getQueueRuntimeInfo() throws IOException { + // Arrange + final QueueDescription expected = new QueueDescription() + .setName(queueName) + .setMessageCount(100) + .setSizeInBytes(1053) + .setAccessedAt(OffsetDateTime.of(2020, 10, 6, 12, 1, 20, 300, ZoneOffset.UTC)) + .setCreatedAt(OffsetDateTime.of(2010, 12, 10, 12, 1, 20, 300, ZoneOffset.UTC)) + .setUpdatedAt(OffsetDateTime.of(2019, 4, 25, 7, 1, 20, 300, ZoneOffset.UTC)); + + when(queuesClient.getWithResponseAsync(eq(queueName), eq(true), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + when(serializer.deserialize(responseString, QueueDescription.class)).thenReturn(expected); + + // Act & Assert + StepVerifier.create(client.getQueueRuntimeInfo(queueName)) + .assertNext(info -> { + assertEquals(expected.getName(), info.getName()); + assertEquals(Long.valueOf(expected.getMessageCount()), info.getMessageCount()); + assertEquals(Long.valueOf(expected.getSizeInBytes()), info.getSizeInBytes()); + assertEquals(expected.getCreatedAt().toInstant(), info.getCreatedAt()); + assertEquals(expected.getUpdatedAt().toInstant(), info.getUpdatedAt()); + assertEquals(expected.getAccessedAt().toInstant(), info.getAccessAt()); + }) + .verifyComplete(); + } + + @Test + void getQueueRuntimeInfoWithResponse() throws IOException { + // Arrange + final QueueDescription expected = new QueueDescription() + .setName(queueName) + .setMessageCount(100) + .setSizeInBytes(1053) + .setAccessedAt(OffsetDateTime.of(2020, 10, 6, 12, 1, 20, 300, ZoneOffset.UTC)) + .setCreatedAt(OffsetDateTime.of(2010, 12, 10, 12, 1, 20, 300, ZoneOffset.UTC)) + .setUpdatedAt(OffsetDateTime.of(2019, 4, 25, 7, 1, 20, 300, ZoneOffset.UTC)); + + when(queuesClient.getWithResponseAsync(eq(queueName), eq(true), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + when(serializer.deserialize(responseString, QueueDescription.class)).thenReturn(expected); + + // Act & Assert + StepVerifier.create(client.getQueueRuntimeInfoWithResponse(queueName)) + .assertNext(response -> { + assertResponse(objectResponse, response); + + final QueueRuntimeInfo info = response.getValue(); + assertEquals(expected.getName(), info.getName()); + assertEquals(Long.valueOf(expected.getMessageCount()), info.getMessageCount()); + assertEquals(Long.valueOf(expected.getSizeInBytes()), info.getSizeInBytes()); + assertEquals(expected.getCreatedAt().toInstant(), info.getCreatedAt()); + assertEquals(expected.getUpdatedAt().toInstant(), info.getUpdatedAt()); + assertEquals(expected.getAccessedAt().toInstant(), info.getAccessAt()); + }) + .verifyComplete(); + } + + @Test + void listQueues() throws IOException { + // Arrange + final int firstEntities = 7; + final String entityType = "queues"; + final List firstEntries = IntStream.range(0, 4).mapToObj(number -> { + QueueDescription description = new QueueDescription().setName(String.valueOf(number)); + QueueDescriptionEntryContent content = new QueueDescriptionEntryContent().setQueueDescription(description); + return new QueueDescriptionEntry().setContent(content); + }).collect(Collectors.toList()); + final List links = Arrays.asList( + new ResponseLink().setRel("self").setHref("foo"), + new ResponseLink().setRel("bar").setHref("baz"), + new ResponseLink().setRel("next").setHref("https://foo.bar.net?api-version=2017-04&$skip=" + firstEntities) + ); + final QueueDescriptionFeed firstFeed = new QueueDescriptionFeed() + .setLink(links) + .setEntry(firstEntries) + .setId("first-id"); + + final List secondEntries = IntStream.range(5, 7).mapToObj(number -> { + QueueDescription description = new QueueDescription().setName(String.valueOf(number)); + QueueDescriptionEntryContent content = new QueueDescriptionEntryContent().setQueueDescription(description); + return new QueueDescriptionEntry().setContent(content); + }).collect(Collectors.toList()); + final List secondLinks = Arrays.asList( + new ResponseLink().setRel("self").setHref("foo"), + new ResponseLink().setRel("bar").setHref("baz")); + final QueueDescriptionFeed secondFeed = new QueueDescriptionFeed() + .setEntry(secondEntries) + .setLink(secondLinks) + .setId("second-id"); + + when(serviceClient.listEntitiesWithResponseAsync(eq(entityType), eq(0), anyInt(), any(Context.class))) + .thenReturn(Mono.fromCallable(() -> objectResponse)); + when(serviceClient.listEntitiesWithResponseAsync(eq(entityType), eq(firstEntities), anyInt(), any(Context.class))) + .thenReturn(Mono.fromCallable(() -> secondObjectResponse)); + + when(serializer.deserialize(responseString, QueueDescriptionFeed.class)) + .thenReturn(firstFeed); + when(serializer.deserialize(secondResponseString, QueueDescriptionFeed.class)) + .thenReturn(secondFeed); + + // Act & Assert + StepVerifier.create(client.listQueues()) + .expectNextCount(firstEntries.size()) + .expectNextCount(secondEntries.size()) + .verifyComplete(); + } + + @Test + void updateQueue() throws IOException { + // Arrange + final QueueDescription description = new QueueDescription().setName(queueName); + final QueueDescription expected = new QueueDescription().setName("some-new-name"); + + when(queuesClient.putWithResponseAsync(eq(queueName), + argThat(arg -> createBodyContentEquals(arg, description)), eq("*"), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + when(serializer.deserialize(responseString, QueueDescription.class)).thenReturn(expected); + + // Act & Assert + StepVerifier.create(client.updateQueue(description)) + .expectNext(expected) + .verifyComplete(); + } + + @Test + void updateQueueWithResponse() throws IOException { + // Arrange + final QueueDescription description = new QueueDescription().setName(queueName); + final QueueDescription expected = new QueueDescription().setName("some-new-name"); + + when(queuesClient.putWithResponseAsync(eq(queueName), + argThat(arg -> createBodyContentEquals(arg, description)), eq("*"), any(Context.class))) + .thenReturn(Mono.just(objectResponse)); + + when(serializer.deserialize(responseString, QueueDescription.class)).thenReturn(expected); + + // Act & Assert + StepVerifier.create(client.updateQueueWithResponse(description)) + .assertNext(response -> { + assertResponse(objectResponse, response); + assertEquals(expected, response.getValue()); + }) + .verifyComplete(); + } + + private static void assertResponse(Response expected, Response actual) { + assertEquals(expected.getStatusCode(), actual.getStatusCode()); + assertEquals(expected.getHeaders(), actual.getHeaders()); + assertEquals(expected.getRequest(), actual.getRequest()); + } + + private static boolean createBodyContentEquals(Object requestBody, QueueDescription expected) { + if (!(requestBody instanceof CreateQueueBody)) { + return false; + } + + final CreateQueueBody body = (CreateQueueBody) requestBody; + final CreateQueueBodyContent content = body.getContent(); + return content != null + && Objects.equals(expected, content.getQueueDescription()) + && "application/xml".equals(content.getType()); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementClientTest.java new file mode 100644 index 000000000000..ec6dfec7989c --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusManagementClientTest.java @@ -0,0 +1,287 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.rest.PagedFlux; +import com.azure.core.http.rest.PagedIterable; +import com.azure.core.http.rest.PagedResponse; +import com.azure.core.http.rest.Response; +import com.azure.core.util.Context; +import com.azure.core.util.IterableStream; +import com.azure.messaging.servicebus.models.QueueDescription; +import com.azure.messaging.servicebus.models.QueueRuntimeInfo; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ServiceBusManagementClient}. + */ +class ServiceBusManagementClientTest { + @Mock + private ServiceBusManagementAsyncClient asyncClient; + @Mock + private Response queueDescriptionResponse; + @Mock + private Response queueRuntimeInfoResponse; + @Mock + private Response voidResponse; + @Mock + private PagedFlux queuePagedFlux; + @Mock + private PagedResponse pagedResponse; + @Mock + private PagedResponse continuationPagedResponse; + + private final Context context = new Context("foo", "bar").addData("baz", "boo"); + private final String queueName = "some-queue"; + private ServiceBusManagementClient client; + + @BeforeEach + void beforeEach() { + MockitoAnnotations.initMocks(this); + client = new ServiceBusManagementClient(asyncClient); + } + + @AfterEach + void afterEach() { + Mockito.framework().clearInlineMocks(); + } + + @Test + void createQueue() { + // Arrange + final QueueDescription description = new QueueDescription() + .setName(queueName) + .setMaxDeliveryCount(10) + .setAutoDeleteOnIdle(Duration.ofSeconds(10)); + final QueueDescription result = new QueueDescription() + .setName("queue-name-2") + .setMaxDeliveryCount(4) + .setAutoDeleteOnIdle(Duration.ofSeconds(30)) + .setAccessedAt(OffsetDateTime.now()); + + when(asyncClient.createQueue(description)).thenReturn(Mono.just(result)); + + // Act + final QueueDescription actual = client.createQueue(description); + + // Assert + assertEquals(result, actual); + } + + @Test + void createQueueWithResponse() { + // Arrange + final QueueDescription description = mock(QueueDescription.class); + final QueueDescription result = mock(QueueDescription.class); + + when(queueDescriptionResponse.getValue()).thenReturn(result); + when(asyncClient.createQueueWithResponse(description, context)).thenReturn(Mono.just(queueDescriptionResponse)); + + // Act + final Response actual = client.createQueueWithResponse(description, context); + + // Assert + assertEquals(queueDescriptionResponse, actual); + assertEquals(result, actual.getValue()); + } + + @Test + void deleteQueue() { + // Arrange + when(asyncClient.deleteQueue(queueName)).thenReturn(Mono.empty()); + + // Act + client.deleteQueue(queueName); + + // Assert + verify(asyncClient).deleteQueue(queueName); + } + + @Test + void deleteQueueWithResponse() { + // Arrange + when(asyncClient.deleteQueueWithResponse(queueName, context)).thenReturn(Mono.just(voidResponse)); + + // Act + final Response actual = client.deleteQueueWithResponse(queueName, context); + + // Assert + assertEquals(voidResponse, actual); + } + + @Test + void getQueue() { + // Arrange + final QueueDescription result = mock(QueueDescription.class); + + when(asyncClient.getQueue(queueName)).thenReturn(Mono.just(result)); + + // Act + final QueueDescription actual = client.getQueue(queueName); + + // Assert + assertEquals(result, actual); + } + + @Test + void getQueueWithResponse() { + // Arrange + final QueueDescription result = mock(QueueDescription.class); + + when(queueDescriptionResponse.getValue()).thenReturn(result); + when(asyncClient.getQueueWithResponse(queueName, context)).thenReturn(Mono.just(queueDescriptionResponse)); + + // Act + final Response actual = client.getQueueWithResponse(queueName, context); + + // Assert + assertEquals(result, actual.getValue()); + } + + @Test + void getQueueRuntimeInfo() { + // Arrange + final QueueRuntimeInfo result = mock(QueueRuntimeInfo.class); + + when(asyncClient.getQueueRuntimeInfo(queueName)).thenReturn(Mono.just(result)); + + // Act + final QueueRuntimeInfo actual = client.getQueueRuntimeInfo(queueName); + + // Assert + assertEquals(result, actual); + } + + @Test + void getQueueRuntimeInfoWithResponse() { + // Arrange + final QueueRuntimeInfo result = mock(QueueRuntimeInfo.class); + + when(queueRuntimeInfoResponse.getValue()).thenReturn(result); + when(asyncClient.getQueueRuntimeInfoWithResponse(queueName, context)) + .thenReturn(Mono.just(queueRuntimeInfoResponse)); + + // Act + final Response actual = client.getQueueRuntimeInfoWithResponse(queueName, context); + + // Assert + assertEquals(result, actual.getValue()); + } + + @Test + void listQueues() { + // Arrange + final List queues = Arrays.asList(mock(QueueDescription.class), mock(QueueDescription.class)); + when(pagedResponse.getElements()).thenReturn(new IterableStream<>(queues)); + when(pagedResponse.getValue()).thenReturn(queues); + when(pagedResponse.getStatusCode()).thenReturn(200); + when(pagedResponse.getHeaders()).thenReturn(new HttpHeaders()); + when(pagedResponse.getContinuationToken()).thenReturn(""); + + final PagedFlux pagedFlux = new PagedFlux<>(() -> Mono.just(pagedResponse)); + when(asyncClient.listQueues()).thenReturn(pagedFlux); + + // Act + final PagedIterable queueDescriptions = client.listQueues(); + + // Assert + final long size = queueDescriptions.stream().count(); + assertEquals(queues.size(), size); + } + + @Test + void listQueuesWithContext() { + // Arrange + final String continuationToken = "foo"; + final String lastToken = "last"; + final List firstPage = Arrays.asList(mock(QueueDescription.class), + mock(QueueDescription.class)); + final List secondPage = Arrays.asList(mock(QueueDescription.class), + mock(QueueDescription.class), mock(QueueDescription.class)); + + when(pagedResponse.getElements()).thenReturn(new IterableStream<>(firstPage)); + when(pagedResponse.getValue()).thenReturn(firstPage); + when(pagedResponse.getStatusCode()).thenReturn(200); + when(pagedResponse.getHeaders()).thenReturn(new HttpHeaders()); + when(pagedResponse.getContinuationToken()).thenReturn(continuationToken); + + when(continuationPagedResponse.getElements()).thenReturn(new IterableStream<>(secondPage)); + when(continuationPagedResponse.getValue()).thenReturn(firstPage); + when(continuationPagedResponse.getStatusCode()).thenReturn(200); + when(continuationPagedResponse.getHeaders()).thenReturn(new HttpHeaders()); + when(continuationPagedResponse.getContinuationToken()).thenReturn(lastToken); + + when(asyncClient.listQueuesFirstPage(context)).thenReturn(Mono.just(pagedResponse)); + when(asyncClient.listQueuesNextPage(continuationToken, context)) + .thenReturn(Mono.just(continuationPagedResponse)); + when(asyncClient.listQueuesNextPage(lastToken, context)) + .thenReturn(Mono.empty()); + + // Act + final PagedIterable queueDescriptions = client.listQueues(context); + + // Assert + final long size = queueDescriptions.stream().count(); + final long expectedSize = firstPage.size() + secondPage.size(); + + assertEquals(expectedSize, size); + } + + + @Test + void updateQueue() { + // Arrange + final QueueDescription description = new QueueDescription() + .setName(queueName) + .setMaxDeliveryCount(10) + .setAutoDeleteOnIdle(Duration.ofSeconds(10)); + final QueueDescription result = new QueueDescription() + .setName("queue-name-2") + .setMaxDeliveryCount(4) + .setAutoDeleteOnIdle(Duration.ofSeconds(30)) + .setAccessedAt(OffsetDateTime.now()); + + when(asyncClient.updateQueue(description)).thenReturn(Mono.just(result)); + + // Act + final QueueDescription actual = client.updateQueue(description); + + // Assert + assertEquals(result, actual); + } + + @Test + void updateQueueWithResponse() { + // Arrange + final QueueDescription description = mock(QueueDescription.class); + final QueueDescription result = mock(QueueDescription.class); + + when(queueDescriptionResponse.getValue()).thenReturn(result); + when(asyncClient.updateQueueWithResponse(description, context)).thenReturn(Mono.just(queueDescriptionResponse)); + + // Act + final Response actual = client.updateQueueWithResponse(description, context); + + // Assert + assertEquals(queueDescriptionResponse, actual); + assertEquals(result, actual.getValue()); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementClientImplIntegrationTests.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementClientImplIntegrationTests.java index 3861905f64e9..88a455700e58 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementClientImplIntegrationTests.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementClientImplIntegrationTests.java @@ -89,7 +89,7 @@ protected void beforeTest() { * Verifies we can get queue information. */ @Test - void getQueue() { + void getQueueImplementation() { // Arrange String queueName = TestUtils.getQueueName(); @@ -111,7 +111,7 @@ void getQueue() { * Verifies we can create a queue. */ @Test - void createQueue() { + void createQueueImplementation() { // Arrange String queueName = testResourceNamer.randomName("test", 7); QueueDescription description = new QueueDescription().setMaxDeliveryCount(15); @@ -144,7 +144,7 @@ void createQueue() { * Verifies we can delete a queue. */ @Test - void deleteQueue() { + void deleteQueueImplementation() { // Arrange String queueName = testResourceNamer.randomName("test", 7); QueueDescription description = new QueueDescription().setMaxDeliveryCount(15); @@ -173,7 +173,7 @@ void deleteQueue() { * Verifies that we can edit properties on an existing queue. */ @Test - void editQueue() { + void editQueueImplementation() { // Arrange final String queueName = "q-5"; final Response response = queuesClient.getWithResponseAsync(queueName, true, Context.NONE) @@ -208,7 +208,7 @@ void editQueue() { * Verifies we can list queues. */ @Test - void listQueues() { + void listQueuesImplementation() { // Arrange String entityType = "queues"; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueue.json b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueue.json index 6c3a853334e5..ba5f37f8f855 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueue.json +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueue.json @@ -1,4 +1,4 @@ { "networkCallRecords" : [ ], - "variables" : [ "b739ef1" ] + "variables" : [ ] } \ No newline at end of file diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/editQueue.json b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueueExistingName.json similarity index 100% rename from sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/editQueue.json rename to sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueueExistingName.json diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/deleteQueue.json b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueueImplementation.json similarity index 52% rename from sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/deleteQueue.json rename to sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueueImplementation.json index 829ece784d8f..4a02647a9052 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/deleteQueue.json +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/createQueueImplementation.json @@ -1,4 +1,4 @@ { "networkCallRecords" : [ ], - "variables" : [ "1b72a33" ] + "variables" : [ "4875d06" ] } \ No newline at end of file diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/deleteQueueImplementation.json b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/deleteQueueImplementation.json new file mode 100644 index 000000000000..80d0ff9552e7 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/deleteQueueImplementation.json @@ -0,0 +1,4 @@ +{ + "networkCallRecords" : [ ], + "variables" : [ "0bdb318" ] +} \ No newline at end of file diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/listQueues.json b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/editQueueImplementation.json similarity index 100% rename from sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/listQueues.json rename to sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/editQueueImplementation.json diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/getQueueImplementation.json b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/getQueueImplementation.json new file mode 100644 index 000000000000..ba5f37f8f855 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/getQueueImplementation.json @@ -0,0 +1,4 @@ +{ + "networkCallRecords" : [ ], + "variables" : [ ] +} \ No newline at end of file diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/listQueuesImplementation.json b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/listQueuesImplementation.json new file mode 100644 index 000000000000..ba5f37f8f855 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/resources/session-records/listQueuesImplementation.json @@ -0,0 +1,4 @@ +{ + "networkCallRecords" : [ ], + "variables" : [ ] +} \ No newline at end of file