Skip to content

Commit

Permalink
Close the consumer and then remove partition (#16886)
Browse files Browse the repository at this point in the history
  • Loading branch information
srnagar authored Oct 27, 2020
1 parent 28d3937 commit edac273
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,19 @@ void stopAllPartitionPumps() {
* @param ownership The partition ownership information for which the connection state will be verified.
*/
void verifyPartitionConnection(PartitionOwnership ownership) {
if (partitionPumps.containsKey(ownership.getPartitionId())) {
EventHubConsumerAsyncClient consumerClient = partitionPumps.get(ownership.getPartitionId());
String partitionId = ownership.getPartitionId();
if (partitionPumps.containsKey(partitionId)) {
EventHubConsumerAsyncClient consumerClient = partitionPumps.get(partitionId);
if (consumerClient.isConnectionClosed()) {
logger.info("Connection closed for {}, partition {}. Removing the consumer.",
ownership.getEventHubName(), ownership.getPartitionId());
partitionPumps.remove(ownership.getPartitionId());
ownership.getEventHubName(), partitionId);
try {
partitionPumps.get(partitionId).close();
} catch (Exception ex) {
logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, ex);
} finally {
partitionPumps.remove(partitionId);
}
}
}
}
Expand Down

0 comments on commit edac273

Please sign in to comment.