Skip to content

Commit

Permalink
Clean-up integration tests (#5383)
Browse files Browse the repository at this point in the history
* Use builder to expose ways to create client.

* Replace manual creation of EventHubClient with createBuilder().

* Remove TestUtils.getEventsAsList

* Move test data set up for integration tests into IntegrationTestBase.

* Use shared parallel scheduler instead of creating new ones.
  • Loading branch information
conniey authored Sep 16, 2019
1 parent 3ad6c1f commit b8b555d
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.IntegrationTestBase;
import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider;
import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
import com.azure.messaging.eventhubs.models.EventPosition;
import org.apache.qpid.proton.Proton;
Expand All @@ -24,7 +22,6 @@

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -63,10 +60,7 @@ protected String getTestName() {

@Override
protected void beforeTest() {
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider());
final TracerProvider tracerProvider = new TracerProvider(Collections.emptyList());

client = new EventHubAsyncClient(getConnectionOptions(), getReactorProvider(), handlerProvider, tracerProvider);
client = createBuilder().buildAsyncClient();
consumer = client.createConsumer(EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME, PARTITION_ID, EventPosition.latest());

final EventHubProducerOptions producerOptions = new EventHubProducerOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ErrorContextProvider;
import com.azure.messaging.eventhubs.implementation.IntegrationTestBase;
import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.SendOptions;
import org.junit.Assert;
Expand All @@ -22,7 +20,6 @@
import reactor.test.StepVerifier;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -57,10 +54,7 @@ protected String getTestName() {
protected void beforeTest() {
MockitoAnnotations.initMocks(this);

final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider());
final TracerProvider tracerProvider = new TracerProvider(Collections.emptyList());

client = new EventHubAsyncClient(getConnectionOptions(), getReactorProvider(), handlerProvider, tracerProvider);
client = createBuilder().buildAsyncClient();
producer = client.createProducer();
}

Expand Down Expand Up @@ -92,7 +86,8 @@ public void sendSmallEventsFullBatch() {
}

/**
* Test for sending a message batch that is {@link EventHubAsyncProducer#MAX_MESSAGE_LENGTH_BYTES} with partition key.
* Test for sending a message batch that is {@link EventHubAsyncProducer#MAX_MESSAGE_LENGTH_BYTES} with partition
* key.
*/
@Test
public void sendSmallEventsFullBatchPartitionKey() {
Expand Down Expand Up @@ -157,9 +152,8 @@ public void sendBatchPartitionKeyValidate() throws InterruptedException {
logger.warning(String.format("Event[%s] matched partition key, but not GUID. Expected: %s. Actual: %s",
event.getSequenceNumber(), messageValue, event.getProperties().get(MESSAGE_TRACKING_ID)));
}
}, error -> {
Assert.fail("An error should not have occurred:" + error.toString());
}, () -> {
}, error -> Assert.fail("An error should not have occurred:" + error.toString()),
() -> {
logger.info("Disposing of consumer now that the receive is complete.");
dispose(consumer);
});
Expand All @@ -171,7 +165,7 @@ public void sendBatchPartitionKeyValidate() throws InterruptedException {
subscriptions.addAll(consumerSubscriptions);

// Act
producer.send(batch.getEvents(), sendOptions).block(TIMEOUT);
producer.send(batch.getEvents(), sendOptions).block();

// Assert
// Wait for all the events we sent to be received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.TransportType;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ConnectionOptions;
import com.azure.messaging.eventhubs.implementation.IntegrationTestBase;
import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider;
import com.azure.messaging.eventhubs.implementation.IntegrationTestEventData;
import com.azure.messaging.eventhubs.models.EventHubConsumerOptions;
import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
import com.azure.messaging.eventhubs.models.EventPosition;
Expand All @@ -24,16 +22,13 @@
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.messaging.eventhubs.EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME;
import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent;
Expand All @@ -45,6 +40,8 @@
@RunWith(Parameterized.class)
public class EventHubAsyncClientIntegrationTest extends IntegrationTestBase {
private static final int NUMBER_OF_EVENTS = 5;
private final TransportType transportType;
private EventHubClientBuilder builder;

@Parameterized.Parameters(name = "{index}: transportType={0}")
public static Iterable<Object> getTransportTypes() {
Expand All @@ -53,8 +50,7 @@ public static Iterable<Object> getTransportTypes() {

private static final String PARTITION_ID = "1";
private static final AtomicBoolean HAS_PUSHED_EVENTS = new AtomicBoolean();
private static final AtomicReference<Instant> MESSAGES_PUSHED_INSTANT = new AtomicReference<>();
private static final String MESSAGE_TRACKING_VALUE = UUID.randomUUID().toString();
private static volatile IntegrationTestEventData testData = null;

private EventHubAsyncClient client;

Expand All @@ -63,8 +59,7 @@ public static Iterable<Object> getTransportTypes() {

public EventHubAsyncClientIntegrationTest(TransportType transportType) {
super(new ClientLogger(EventHubAsyncClientIntegrationTest.class));

setTransportType(transportType);
this.transportType = transportType;
}

@Override
Expand All @@ -74,13 +69,16 @@ protected String getTestName() {

@Override
protected void beforeTest() {
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider());
final ConnectionOptions connectionOptions = getConnectionOptions();
final TracerProvider tracerProvider = new TracerProvider(Collections.emptyList());

client = new EventHubAsyncClient(connectionOptions, getReactorProvider(), handlerProvider, tracerProvider);
builder = createBuilder()
.transportType(transportType);
client = builder.buildAsyncClient();

setupEventTestData(client);
if (HAS_PUSHED_EVENTS.getAndSet(true)) {
logger.info("Already pushed events to partition. Skipping.");
} else {
final EventHubProducerOptions options = new EventHubProducerOptions().setPartitionId(PARTITION_ID);
testData = setupEventTestData(client, NUMBER_OF_EVENTS, options);
}
}

@Override
Expand All @@ -103,10 +101,11 @@ public void receiveMessage() {
final EventHubConsumerOptions options = new EventHubConsumerOptions()
.setPrefetchCount(2);
final EventHubAsyncConsumer consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, PARTITION_ID,
EventPosition.fromEnqueuedTime(MESSAGES_PUSHED_INSTANT.get()), options);
EventPosition.fromEnqueuedTime(testData.getEnqueuedTime()), options);

// Act & Assert
StepVerifier.create(consumer.receive().filter(x -> isMatchingEvent(x, MESSAGE_TRACKING_VALUE)).take(NUMBER_OF_EVENTS))
StepVerifier.create(consumer.receive().filter(x -> isMatchingEvent(x, testData.getMessageTrackingId()))
.take(NUMBER_OF_EVENTS))
.expectNextCount(NUMBER_OF_EVENTS)
.verifyComplete();
}
Expand All @@ -131,7 +130,7 @@ public void parallelEventHubClients() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(numberOfClients);
final EventHubAsyncClient[] clients = new EventHubAsyncClient[numberOfClients];
for (int i = 0; i < numberOfClients; i++) {
clients[i] = new EventHubAsyncClient(getConnectionOptions(), getReactorProvider(), new ReactorHandlerProvider(getReactorProvider()), null);
clients[i] = builder.buildAsyncClient();
}

final EventHubAsyncProducer producer = clients[0].createProducer(new EventHubProducerOptions().setPartitionId(PARTITION_ID));
Expand All @@ -149,7 +148,8 @@ public void parallelEventHubClients() throws InterruptedException {
&& messageTrackingValue.equals(event.getProperties().get(messageTrackingId));
}).take(numberOfEvents).subscribe(event -> {
logger.info("Event[{}] matched.", event.getSequenceNumber());
}, error -> Assert.fail("An error should not have occurred:" + error.toString()), () -> {
}, error -> Assert.fail("An error should not have occurred:" + error.toString()),
() -> {
long count = countDownLatch.getCount();
logger.info("Finished consuming events. Counting down: {}", count);
countDownLatch.countDown();
Expand All @@ -176,28 +176,4 @@ public void parallelEventHubClients() throws InterruptedException {
dispose(clients);
}
}

/**
* When we run this test, we check if there have been events already pushed to the partition, if not, we push some
* events there.
*/
private void setupEventTestData(EventHubAsyncClient client) {
if (HAS_PUSHED_EVENTS.getAndSet(true)) {
logger.info("Already pushed events to partition. Skipping.");
return;
}

logger.info("Pushing events to partition. Message tracking value: {}", MESSAGE_TRACKING_VALUE);

final EventHubProducerOptions producerOptions = new EventHubProducerOptions().setPartitionId(PARTITION_ID);
final EventHubAsyncProducer producer = client.createProducer(producerOptions);
final Flux<EventData> events = TestUtils.getEvents(NUMBER_OF_EVENTS, MESSAGE_TRACKING_VALUE);

try {
MESSAGES_PUSHED_INSTANT.set(Instant.now());
producer.send(events).block(TIMEOUT);
} finally {
dispose(producer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,12 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.TransportType;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ConnectionOptions;
import com.azure.messaging.eventhubs.implementation.ConnectionStringProperties;
import com.azure.messaging.eventhubs.implementation.IntegrationTestBase;
import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider;
import com.azure.messaging.eventhubs.models.EventHubConsumerOptions;
import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ProxyConfiguration;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
Expand All @@ -23,13 +17,11 @@
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -42,8 +34,8 @@
import static com.azure.messaging.eventhubs.EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME;

/**
* Integration tests with Azure Event Hubs service. There are other tests that also test {@link EventHubAsyncConsumer} in
* other scenarios.
* Integration tests with Azure Event Hubs service. There are other tests that also test {@link EventHubAsyncConsumer}
* in other scenarios.
*
* @see SetPrefetchCountTest
* @see EventPositionIntegrationTest
Expand All @@ -70,14 +62,7 @@ protected String getTestName() {

@Override
protected void beforeTest() {
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider());
final ConnectionStringProperties properties = new ConnectionStringProperties(getConnectionString());
final ConnectionOptions connectionOptions = new ConnectionOptions(properties.getEndpoint().getHost(),
properties.getEventHubName(), getTokenCredential(), getAuthorizationType(), TransportType.AMQP,
RETRY_OPTIONS, ProxyConfiguration.SYSTEM_DEFAULTS, Schedulers.parallel());
final TracerProvider tracerProvider = new TracerProvider(Collections.emptyList());

client = new EventHubAsyncClient(connectionOptions, getReactorProvider(), handlerProvider, tracerProvider);
client = createBuilder().buildAsyncClient();
}

@Override
Expand Down Expand Up @@ -110,9 +95,8 @@ public void parallelCreationOfReceivers() {

final Disposable subscription = consumer.receive().take(numberOfEvents).subscribe(event -> {
logger.info("Event[{}] received. partition: {}", event.getSequenceNumber(), partitionId);
}, error -> {
Assert.fail("An error should not have occurred:" + error.toString());
}, () -> {
}, error -> Assert.fail("An error should not have occurred:" + error.toString()),
() -> {
logger.info("Disposing of consumer now that the receive is complete.");
countDownLatch.countDown();
});
Expand Down
Loading

0 comments on commit b8b555d

Please sign in to comment.