Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into arm-temp-ta
  • Loading branch information
samvaity committed Jan 28, 2020
2 parents 9664b78 + 34cbb46 commit 649113a
Show file tree
Hide file tree
Showing 70 changed files with 1,528 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,12 @@
<!-- Checkstyle rules should not check files in generated-test-sources -->
<suppress checks="[a-zA-Z0-9]*" files="[/\\]generated-test-sources[/\\]"/>

<!-- Allows the SasTokenCredentialPolicy in Implementation -->
<!-- Allows the HttpPipelinePolicy derived class in Implementation folder -->
<suppress checks="com.azure.tools.checkstyle.checks.HttpPipelinePolicy" files="com.azure.storage.common.implementation.policy.SasTokenCredentialPolicy.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.HttpPipelinePolicy" files="com.azure.security.keyvault.secrets.implementation.KeyVaultCredentialPolicy.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.HttpPipelinePolicy" files="com.azure.security.keyvault.certificates.implementation.KeyVaultCredentialPolicy.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.HttpPipelinePolicy" files="com.azure.security.keyvault.keys.implementation.KeyVaultCredentialPolicy.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.HttpPipelinePolicy" files="com.azure.ai.textanalytics.implementation.SubscriptionKeyCredentialPolicy.java"/>

<!-- Empty while loop waiting for Reactor stream completion -->
<suppress checks="EmptyBlock" files="com.azure.storage.blob.batch.BlobBatch.java"/>
Expand Down
5 changes: 0 additions & 5 deletions eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,6 @@
<artifactId>azure-storage-blob-cryptography</artifactId>
<version>12.4.0-beta.1</version> <!-- {x-version-update;com.azure:azure-storage-blob-cryptography;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob-nio</artifactId>
<version>12.0.0-beta.1</version> <!-- {x-version-update;com.azure:azure-storage-blob-nio;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-share</artifactId>
Expand Down
36 changes: 36 additions & 0 deletions eng/pipelines/aggregate-reports.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pr: none

jobs:
- job: Generate
timeoutInMinutes: 180
variables:
- template: templates/variables/globals.yml
pool:
Expand Down Expand Up @@ -50,6 +51,41 @@ jobs:
publishJUnitResults: false
goals: 'install site:site site:stage'

- task: Maven@3
displayName: 'Generate aggregate code coverage report'
inputs:
mavenPomFile: 'pom.client.xml'
options: '$(DefaultOptions) -Dinclude-non-shipping-modules -Dgpg.skip -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true -Dspotbugs.skip=true'
mavenOptions: '-Xmx3072m $(LoggingOptions)'
javaHomeOption: 'JDKVersion'
jdkVersionOption: '1.11'
jdkArchitectureOption: 'x64'
publishJUnitResults: false
goals: 'verify'

- task: PublishTestResults@2
condition: succeededOrFailed()
inputs:
mergeTestResults: true
testRunTitle: 'Linux on Java 1.11'

# Azure DevOps only seems to respect the last code coverage result published, so only do this for Linux + Java LTS.
# Code coverage reporting is setup only for Track 2 modules.
- task: PublishCodeCoverageResults@1
inputs:
codeCoverageTool: JaCoCo
summaryFileLocation: eng/jacoco-test-coverage/target/site/test-coverage/jacoco.xml
reportDirectory: eng/jacoco-test-coverage/target/site/test-coverage/
failIfCoverageEmpty: true

# Publish code coverage to an artifact so it can be uploaded to the Maven site.
# Do this for track 2 modules only. Code coverage from previous step is only published for Linux + Java LTS.
- task: PublishPipelineArtifact@0
displayName: 'Publish coverage artifact'
inputs:
artifactName: coverage
targetPath: eng/jacoco-test-coverage/target/site/

- script: |
git clone https://github.com/JonathanGiles/DependencyChecker.git
mkdir input && cp eng/DependencyChecker/client_dependencies.json input/dependencies.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, Parti
*/
@Override
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
if (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null) {
if (checkpoint == null || (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null)) {
throw logger.logExceptionAsWarning(Exceptions
.propagate(new IllegalStateException(
"Both sequence number and offset cannot be null when updating a checkpoint")));
Expand Down Expand Up @@ -260,13 +260,19 @@ private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem)
.info(Messages.BLOB_OWNER_INFO, blobItem.getName(), blobItem.getMetadata().getOrDefault(OWNER_ID, ""));

BlobItemProperties blobProperties = blobItem.getProperties();

String ownerId = blobItem.getMetadata().getOrDefault(OWNER_ID, "");
if (ownerId == null) {
return Mono.empty();
}

PartitionOwnership partitionOwnership = new PartitionOwnership()
.setFullyQualifiedNamespace(names[0])
.setEventHubName(names[1])
.setConsumerGroup(names[2])
// names[3] is "ownership"
.setPartitionId(names[4])
.setOwnerId(blobItem.getMetadata().getOrDefault(OWNER_ID, ""))
.setOwnerId(ownerId)
.setLastModifiedTime(blobProperties.getLastModified().toInstant().toEpochMilli())
.setETag(blobProperties.getETag());
return Mono.just(partitionOwnership);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,29 @@

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.core.util.logging.ClientLogger;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import com.azure.core.util.CoreUtils;
import java.util.Map;

/**
* I18n messages loaded from the messages.properties file located within the same package.
*/
public enum Messages {
;
private static final ClientLogger LOGGER = new ClientLogger(Messages.class);
private static Properties properties;
private static final String PATH = "com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties";
public static final String NO_METADATA_AVAILABLE_FOR_BLOB = "No metadata available for blob {}";
public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}";
public static final String FOUND_BLOB_FOR_PARTITION = "Found blob for partition {}";
public static final String BLOB_OWNER_INFO = "Blob {} is owned by {}";
public static final String CHECKPOINT_INFO = "Blob {} has checkpoint with sequence number {} and offset {}";
private static final String PATH = "eventhubs-checkpointstore-blob-messages.properties";
private static final Map<String, String> PROPERTIES = CoreUtils.getProperties(PATH);

private static synchronized Properties getProperties() {
if (properties != null) {
return properties;
}
properties = new Properties();
try (InputStream inputStream =
Thread.currentThread().getContextClassLoader().getResourceAsStream(PATH)) {
if (inputStream != null) {
properties.load(inputStream);
} else {
LOGGER.error("Message properties [{}] not found", PATH); //NON-NLS
}
} catch (IOException exception) {
LOGGER.error("Error loading message properties [{}]", PATH, exception); //NON-NLS
}
return properties;
}
public static final String NO_METADATA_AVAILABLE_FOR_BLOB = getMessage("NO_METADATA_AVAILABLE_FOR_BLOB");
public static final String CLAIM_ERROR = getMessage("CLAIM_ERROR");
public static final String FOUND_BLOB_FOR_PARTITION = getMessage("FOUND_BLOB_FOR_PARTITION");
public static final String BLOB_OWNER_INFO = getMessage("BLOB_OWNER_INFO");
public static final String CHECKPOINT_INFO = getMessage("CHECKPOINT_INFO");

/**
* @param key the key of the message to retrieve
* @return the message matching the given key
*/
public static String getMessage(String key) {
return String.valueOf(getProperties().getOrDefault(key, key));
return PROPERTIES.getOrDefault(key, key);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.ListBlobsOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
Expand All @@ -35,6 +36,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.when;
Expand All @@ -61,11 +63,13 @@ public void setup() {
@Test
public void testListOwnerShip() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
BlobItem blobItem = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/ownership/0");
BlobItem blobItem2 = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem blobItem = getOwnershipBlobItem("owner1", "etag", "ns/eh/cg/ownership/0"); // valid blob
BlobItem blobItem2 = getOwnershipBlobItem("owner1", "etag", "ns/eh/cg/0"); // invalid name
BlobItem blobItem3 = new BlobItem().setName("ns/eh/cg/ownership/5"); // no metadata

PagedFlux<BlobItem> response = new PagedFlux<>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem, blobItem2), null,
Arrays.asList(blobItem, blobItem2, blobItem3), null,
null)));
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);

Expand All @@ -79,6 +83,27 @@ public void testListOwnerShip() {
}).verifyComplete();
}

@Test
public void testListCheckpoint() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
BlobItem blobItem2 = new BlobItem().setName("ns/eh/cg/checkpoint/1");
PagedFlux<BlobItem> response = new PagedFlux<>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem, blobItem2), null,
null)));
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);

StepVerifier.create(blobCheckpointStore.listCheckpoints("ns", "eh", "cg"))
.assertNext(checkpoint -> {
assertEquals("0", checkpoint.getPartitionId());
assertEquals("eh", checkpoint.getEventHubName());
assertEquals("cg", checkpoint.getConsumerGroup());
assertEquals(1L, checkpoint.getSequenceNumber());
assertEquals(230L, checkpoint.getOffset());
}).verifyComplete();
}

@Test
public void testUpdateCheckpoint() {
Checkpoint checkpoint = new Checkpoint()
Expand All @@ -89,9 +114,7 @@ public void testUpdateCheckpoint() {
.setSequenceNumber(2L)
.setOffset(100L);

Map<String, String> headers = new HashMap<>();
headers.put("eTag", "etag2");
BlobItem blobItem = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/checkpoint/0");
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem), null,
Expand All @@ -105,7 +128,44 @@ public void testUpdateCheckpoint() {
.thenReturn(Mono.empty());

BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete();
}

@Test
public void testInvalidCheckpoint() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
Assertions.assertThrows(IllegalStateException.class, () -> blobCheckpointStore.updateCheckpoint(null));
Assertions
.assertThrows(IllegalStateException.class, () -> blobCheckpointStore.updateCheckpoint(new Checkpoint()));
}

@Test
public void testUpdateCheckpointForNewPartition() {
Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace("ns")
.setEventHubName("eh")
.setConsumerGroup("cg")
.setPartitionId("0")
.setSequenceNumber(2L)
.setOffset(100L);

HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.put("eTag", "etag2");
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem), null,
null)));

when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh/cg/checkpoint/0")).thenReturn(blobAsyncClient);
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
when(blobAsyncClient.exists()).thenReturn(Mono.just(false));
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
when(blockBlobAsyncClient.uploadWithResponse(ArgumentMatchers.<Flux<ByteBuffer>>any(), eq(0L),
isNull(), anyMap(), isNull(), isNull(), isNull()))
.thenReturn(Mono.just(new ResponseBase<>(null, 200, httpHeaders, null, null)));
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete();
}

Expand Down Expand Up @@ -195,9 +255,9 @@ private PartitionOwnership createPartitionOwnership(String fullyQualifiedNamespa
.setOwnerId(ownerId);
}

private BlobItem getBlobItem(String owner, String sequenceNumber, String offset, String etag, String blobName) {
Map<String, String> metadata = getMetadata(owner, sequenceNumber, offset);

private BlobItem getOwnershipBlobItem(String owner, String etag, String blobName) {
Map<String, String> metadata = new HashMap<>();
metadata.put("ownerid", owner);
BlobItemProperties properties = new BlobItemProperties()
.setLastModified(OffsetDateTime.now())
.setETag(etag);
Expand All @@ -208,11 +268,12 @@ private BlobItem getBlobItem(String owner, String sequenceNumber, String offset,
.setProperties(properties);
}

private Map<String, String> getMetadata(String owner, String sequenceNumber, String offset) {
private BlobItem getCheckpointBlobItem(String offset, String sequenceNumber, String blobName) {
Map<String, String> metadata = new HashMap<>();
metadata.put("ownerid", owner);
metadata.put("sequencenumber", sequenceNumber);
metadata.put("offset", offset);
return metadata;
return new BlobItem()
.setName(blobName)
.setMetadata(metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.Messages;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand Down
Loading

0 comments on commit 649113a

Please sign in to comment.