Skip to content

Commit

Permalink
Improve logging and update consumer close logic (#7175)
Browse files Browse the repository at this point in the history
* Improve logging and update consumer close logic

* Update localization text message

* Fix unit test

* Fix checkpointstore tests
  • Loading branch information
srnagar authored Jan 7, 2020
1 parent e378be3 commit f821fa7
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
*/
public class BlobCheckpointStore implements CheckpointStore {

private static final String SEQUENCE_NUMBER = "SequenceNumber";
private static final String OFFSET = "Offset";
private static final String OWNER_ID = "OwnerId";
private static final String SEQUENCE_NUMBER = "sequencenumber";
private static final String OFFSET = "offset";
private static final String OWNER_ID = "ownerid";
private static final String ETAG = "eTag";

private static final String BLOB_PATH_SEPARATOR = "/";
Expand Down Expand Up @@ -109,14 +109,27 @@ private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
}

Map<String, String> metadata = blobItem.getMetadata();
logger.info(Messages.CHECKPOINT_INFO, blobItem.getName(), metadata.get(SEQUENCE_NUMBER),
metadata.get(OFFSET));

Long sequenceNumber = null;
Long offset = null;
if (!CoreUtils.isNullOrEmpty(metadata.get(SEQUENCE_NUMBER))) {
sequenceNumber = Long.parseLong(metadata.get(SEQUENCE_NUMBER));
}

if (!CoreUtils.isNullOrEmpty(metadata.get(OFFSET))) {
offset = Long.parseLong(metadata.get(OFFSET));
}

Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace(names[0])
.setEventHubName(names[1])
.setConsumerGroup(names[2])
// names[3] is "checkpoint"
.setPartitionId(names[4])
.setSequenceNumber(Long.parseLong(metadata.get(SEQUENCE_NUMBER)))
.setOffset(Long.parseLong(metadata.get(OFFSET)));
.setSequenceNumber(sequenceNumber)
.setOffset(offset);

return Mono.just(checkpoint);
}
Expand All @@ -134,38 +147,44 @@ private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {

return Flux.fromIterable(requestedPartitionOwnerships).flatMap(partitionOwnership -> {
String partitionId = partitionOwnership.getPartitionId();
String blobName = getBlobName(partitionOwnership.getFullyQualifiedNamespace(),
partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId,
OWNERSHIP_PATH);

if (!blobClients.containsKey(blobName)) {
blobClients.put(blobName, blobContainerAsyncClient.getBlobAsyncClient(blobName));
}

BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

Map<String, String> metadata = new HashMap<>();
metadata.put(OWNER_ID, partitionOwnership.getOwnerId());

BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
if (CoreUtils.isNullOrEmpty(partitionOwnership.getETag())) {
// New blob should be created
blobRequestConditions.setIfNoneMatch("*");
return blobAsyncClient.getBlockBlobAsyncClient()
.uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null, blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage());
return Mono.empty();
}, Mono::empty);
} else {
// update existing blob
blobRequestConditions.setIfMatch(partitionOwnership.getETag());
return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage());
return Mono.empty();
}, Mono::empty);
try {
String partitionId = partitionOwnership.getPartitionId();
String blobName = getBlobName(partitionOwnership.getFullyQualifiedNamespace(),
partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId,
OWNERSHIP_PATH);

if (!blobClients.containsKey(blobName)) {
blobClients.put(blobName, blobContainerAsyncClient.getBlobAsyncClient(blobName));
}

BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

Map<String, String> metadata = new HashMap<>();
metadata.put(OWNER_ID, partitionOwnership.getOwnerId());

BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
if (CoreUtils.isNullOrEmpty(partitionOwnership.getETag())) {
// New blob should be created
blobRequestConditions.setIfNoneMatch("*");
return blobAsyncClient.getBlockBlobAsyncClient()
.uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null,
blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage());
return Mono.empty();
}, Mono::empty);
} else {
// update existing blob
blobRequestConditions.setIfMatch(partitionOwnership.getETag());
return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(Messages.CLAIM_ERROR, partitionId, error);
return Mono.empty();
}, Mono::empty);
}
} catch (Exception ex) {
logger.warning(Messages.CLAIM_ERROR, partitionOwnership.getPartitionId(), ex);
return Mono.empty();
}
});
}
Expand Down Expand Up @@ -237,6 +256,8 @@ private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem)
logger.warning(Messages.NO_METADATA_AVAILABLE_FOR_BLOB, blobItem.getName());
return Mono.empty();
}
logger
.info(Messages.BLOB_OWNER_INFO, blobItem.getName(), blobItem.getMetadata().getOrDefault(OWNER_ID, ""));

BlobItemProperties blobProperties = blobItem.getProperties();
PartitionOwnership partitionOwnership = new PartitionOwnership()
Expand All @@ -245,7 +266,7 @@ private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem)
.setConsumerGroup(names[2])
// names[3] is "ownership"
.setPartitionId(names[4])
.setOwnerId(blobItem.getMetadata().get(OWNER_ID))
.setOwnerId(blobItem.getMetadata().getOrDefault(OWNER_ID, ""))
.setLastModifiedTime(blobProperties.getLastModified().toInstant().toEpochMilli())
.setETag(blobProperties.getETag());
return Mono.just(partitionOwnership);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ public enum Messages {
private static Properties properties;
private static final String PATH = "com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties";
public static final String NO_METADATA_AVAILABLE_FOR_BLOB = "No metadata available for blob {}";
public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}, error {}";
public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}";
public static final String FOUND_BLOB_FOR_PARTITION = "Found blob for partition {}";
public static final String BLOB_OWNER_INFO = "Blob {} is owned by {}";
public static final String CHECKPOINT_INFO = "Blob {} has checkpoint with sequence number {} and offset {}";

private static synchronized Properties getProperties() {
if (properties != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
NO_METADATA_AVAILABLE_FOR_BLOB=No metadata available for blob {}
CLAIM_ERROR=Couldn't claim ownership of partition {}, error {}
CLAIM_ERROR=Couldn't claim ownership of partition {}
FOUND_BLOB_FOR_PARTITION=Found blob for partition {}
BLOB_OWNER_INFO=Blob {} is owned by {}
CHECKPOINT_INFO=Blob {} has checkpoint with sequence number {} and offset {}
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ private BlobItem getBlobItem(String owner, String sequenceNumber, String offset,

private Map<String, String> getMetadata(String owner, String sequenceNumber, String offset) {
Map<String, String> metadata = new HashMap<>();
metadata.put("OwnerId", owner);
metadata.put("SequenceNumber", sequenceNumber);
metadata.put("Offset", offset);
metadata.put("ownerid", owner);
metadata.put("sequencenumber", sequenceNumber);
metadata.put("offset", offset);
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
if (isLoadBalanced(minPartitionsPerEventProcessor, numberOfEventProcessorsWithAdditionalPartition,
ownerPartitionMap)) {
// If the partitions are evenly distributed among all active event processors, no change required.
logger.info("Load is balanced");
logger.info("Load is balanced with this event processor owning {} partitions",
ownerPartitionMap.get(ownerId).size());
// renew ownership of already owned partitions
checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet()
.stream()
Expand All @@ -231,7 +232,8 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
// If we have reached this stage, this event processor has to claim/steal ownership of at least 1
// more partition
logger.info(
"Load is unbalanced and this event processor should own more partitions");
"Load is unbalanced and this event processor owns {} partitions and should own more partitions",
ownerPartitionMap.get(ownerId).size());
/*
* If some partitions are unclaimed, this could be because an event processor is down and
* it's partitions are now available for others to own or because event processors are just
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
partitionEvent),
/* EventHubConsumer receive() returned an error */
ex -> handleError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext),
() -> partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN)));
() -> {
partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN));
cleanup(claimedOwnership, eventHubConsumer);
});
} catch (Exception ex) {
if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) {
cleanup(claimedOwnership, partitionPumps.get(claimedOwnership.getPartitionId()));
Expand Down

0 comments on commit f821fa7

Please sign in to comment.