diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java index fe633430e023..cf17e4b25262 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java @@ -62,7 +62,7 @@ public CompletableFuture setStateAsync(byte[] sessionState) { } @Override - public CompletableFuture renewLockAsync() { + public CompletableFuture renewSessionLockAsync() { throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); } @@ -113,7 +113,7 @@ public CompletableFuture receiveAsync(Duration serverWaitTime) } @Override - public CompletableFuture receiveAsync(long sequenceNumber) + public CompletableFuture receiveBySequenceNumberAsync(long sequenceNumber) { throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java index dc7daf681c76..ba34992b12e0 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java @@ -21,7 +21,7 @@ public static IMessageSender createMessageSenderFromConnectionStringBuilder(Conn return Utils.completeFuture(createMessageSenderFromConnectionStringBuilderAsync(amqpConnectionStringBuilder)); } - public static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException + static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromEntityPathAsync(messagingFactory, entityPath)); } @@ -39,7 +39,7 @@ public static CompletableFuture createMessageSenderFromConnectio return sender.initializeAsync().thenApply((v) -> sender); } - public static CompletableFuture createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath) + static CompletableFuture createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath) { Utils.assertNonNull("messagingFactory", messagingFactory); MessageSender sender = new MessageSender(messagingFactory, entityPath); @@ -68,12 +68,12 @@ public static IMessageReceiver createMessageReceiverFromConnectionStringBuilder( return Utils.completeFuture(createMessageReceiverFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, receiveMode)); } - public static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException + static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException { return createMessageReceiverFromEntityPath(messagingFactory, entityPath, DEFAULTRECEIVEMODE); } - public static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException + static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, receiveMode)); } @@ -101,12 +101,12 @@ public static CompletableFuture createMessageReceiverFromConne return receiver.initializeAsync().thenApply((v) -> receiver); } - public static CompletableFuture createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath) + static CompletableFuture createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath) { return createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, DEFAULTRECEIVEMODE); } - public static CompletableFuture createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) + static CompletableFuture createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) { Utils.assertNonNull("messagingFactory", messagingFactory); MessageReceiver receiver = new MessageReceiver(messagingFactory, entityPath, receiveMode); @@ -134,12 +134,12 @@ public static IMessageSession acceptSessionFromConnectionStringBuilder(Connectio return Utils.completeFuture(acceptSessionFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, sessionId, receiveMode)); } - public static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId) throws InterruptedException, ServiceBusException + static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId) throws InterruptedException, ServiceBusException { return acceptSessionFromEntityPath(messagingFactory, entityPath, sessionId, DEFAULTRECEIVEMODE); } - public static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException + static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { return Utils.completeFuture(acceptSessionFromEntityPathAsync(messagingFactory, entityPath, sessionId, receiveMode)); } @@ -167,12 +167,12 @@ public static CompletableFuture acceptSessionFromConnectionStri return session.initializeAsync().thenApply((v) -> session); } - public static CompletableFuture acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId) + static CompletableFuture acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId) { return acceptSessionFromEntityPathAsync(messagingFactory, entityPath, sessionId, DEFAULTRECEIVEMODE); } - public static CompletableFuture acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) + static CompletableFuture acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) { Utils.assertNonNull("messagingFactory", messagingFactory); MessageSession session = new MessageSession(messagingFactory, entityPath, sessionId, receiveMode); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java index 9b9fb13ebe3b..65552eb3b1bc 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java @@ -36,7 +36,7 @@ public interface IMessage { public void setSessionId(String sessionId); - public byte[] getContent(); + public byte[] getBody(); public void setContent(byte[] content); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java index 5ef5befa3cd2..67f701d392cc 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java @@ -28,9 +28,9 @@ interface IMessageAndSessionPump CompletableFuture completeAsync(UUID lockToken); - void defer(UUID lockToken) throws InterruptedException, ServiceBusException; - - void defer(UUID lockToken, Map propertiesToModify) throws InterruptedException, ServiceBusException; +// void defer(UUID lockToken) throws InterruptedException, ServiceBusException; +// +// void defer(UUID lockToken, Map propertiesToModify) throws InterruptedException, ServiceBusException; CompletableFuture deferAsync(UUID lockToken); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageReceiver.java index 0e8af3f92b0e..864208664164 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageReceiver.java @@ -56,7 +56,7 @@ public interface IMessageReceiver extends IMessageEntity, IMessageBrowser{ IMessage receive(Duration serverWaitTime) throws InterruptedException, ServiceBusException; - IMessage receive(long sequenceNumber) throws InterruptedException, ServiceBusException; + IMessage receiveBySequenceNumber(long sequenceNumber) throws InterruptedException, ServiceBusException; Collection receiveBatch(int maxMessageCount) throws InterruptedException, ServiceBusException; @@ -68,7 +68,7 @@ public interface IMessageReceiver extends IMessageEntity, IMessageBrowser{ CompletableFuture receiveAsync(Duration serverWaitTime); - CompletableFuture receiveAsync(long sequenceNumber); + CompletableFuture receiveBySequenceNumberAsync(long sequenceNumber); CompletableFuture> receiveBatchAsync(int maxMessageCount); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSession.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSession.java index 95291cc5441d..b1331d7fb70c 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSession.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSession.java @@ -11,9 +11,9 @@ public interface IMessageSession extends IMessageReceiver { Instant getLockedUntilUtc(); - void renewLock() throws InterruptedException, ServiceBusException; + void renewSessionLock() throws InterruptedException, ServiceBusException; - CompletableFuture renewLockAsync(); + CompletableFuture renewSessionLockAsync(); void setState(byte[] state) throws InterruptedException, ServiceBusException; diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IQueueClient.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IQueueClient.java index 0fb841418193..508dc734e73f 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IQueueClient.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IQueueClient.java @@ -1,6 +1,8 @@ package com.microsoft.azure.servicebus; -public interface IQueueClient extends IMessageSender, IMessageSessionEntity, IMessageAndSessionPump, IMessageEntity +public interface IQueueClient extends IMessageSender, IMessageAndSessionPump, IMessageEntity { public ReceiveMode getReceiveMode(); + + public String getQueueName(); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ISubscriptionClient.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ISubscriptionClient.java index 3f2057644b71..d1f1111b715d 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ISubscriptionClient.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ISubscriptionClient.java @@ -6,7 +6,7 @@ import com.microsoft.azure.servicebus.rules.Filter; import com.microsoft.azure.servicebus.rules.RuleDescription; -public interface ISubscriptionClient extends IMessageEntity, IMessageSessionEntity, IMessageAndSessionPump +public interface ISubscriptionClient extends IMessageEntity, IMessageAndSessionPump { public ReceiveMode getReceiveMode(); public void addRule(RuleDescription ruleDescription) throws InterruptedException, ServiceBusException; @@ -15,4 +15,6 @@ public interface ISubscriptionClient extends IMessageEntity, IMessageSessionEnti public CompletableFuture addRuleAsync(String ruleName, Filter filter); public CompletableFuture removeRuleAsync(String ruleName); public void removeRule(String ruleName) throws InterruptedException, ServiceBusException; + public String getTopicName(); + public String getSubscriptionName(); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ITopicClient.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ITopicClient.java index 6de619c3afe8..9b7ba183d81c 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ITopicClient.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ITopicClient.java @@ -2,5 +2,5 @@ // Should we allow browse/peek on topic? public interface ITopicClient extends IMessageSender, IMessageBrowser, IMessageEntity { - + public String getTopicName(); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java index 7424afd5f3d1..305e6215a767 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java @@ -183,7 +183,7 @@ public void setSessionId(String sessionId) { } @Override - public byte[] getContent() { + public byte[] getBody() { return this.content; } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java index 081f5074a1b4..549e4ac5103c 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java @@ -602,7 +602,7 @@ protected void loop() if(renewInterval != null && !renewInterval.isNegative()) { this.timerFuture = Timer.schedule(() -> { - this.session.renewLockAsync().handleAsync((v, renewLockEx) -> + this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) -> { if(renewLockEx != null) { @@ -664,14 +664,14 @@ public CompletableFuture completeAsync(UUID lockToken) { return this.innerReceiver.completeAsync(lockToken); } - @Override - public void defer(UUID lockToken) throws InterruptedException, ServiceBusException { +// @Override + void defer(UUID lockToken) throws InterruptedException, ServiceBusException { this.checkInnerReceiveCreated(); this.innerReceiver.defer(lockToken); } - @Override - public void defer(UUID lockToken, Map propertiesToModify) throws InterruptedException, ServiceBusException { +// @Override + void defer(UUID lockToken, Map propertiesToModify) throws InterruptedException, ServiceBusException { this.checkInnerReceiveCreated(); this.innerReceiver.defer(lockToken, propertiesToModify); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java index 8f793ee394b6..78ff0a9fc520 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java @@ -25,9 +25,9 @@ public class MessageConverter public static org.apache.qpid.proton.message.Message convertBrokeredMessageToAmqpMessage(Message brokeredMessage) { org.apache.qpid.proton.message.Message amqpMessage = Proton.message(); - if(brokeredMessage.getContent() != null) + if(brokeredMessage.getBody() != null) { - amqpMessage.setBody(new Data(new Binary(brokeredMessage.getContent()))); + amqpMessage.setBody(new Data(new Binary(brokeredMessage.getBody()))); } if(brokeredMessage.getProperties() != null) diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index 0c533d6199ad..9f5e04c96085 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -319,8 +319,8 @@ public IMessage receive(Duration serverWaitTime) throws InterruptedException, Se } @Override - public IMessage receive(long sequenceNumber) throws InterruptedException, ServiceBusException{ - return Utils.completeFuture(this.receiveAsync(sequenceNumber)); + public IMessage receiveBySequenceNumber(long sequenceNumber) throws InterruptedException, ServiceBusException{ + return Utils.completeFuture(this.receiveBySequenceNumberAsync(sequenceNumber)); } @Override @@ -392,7 +392,7 @@ else if (c.isEmpty()) } @Override - public CompletableFuture receiveAsync(long sequenceNumber) { + public CompletableFuture receiveBySequenceNumberAsync(long sequenceNumber) { ArrayList list = new ArrayList<>(); list.add(sequenceNumber); return this.receiveBatchAsync(list).thenApplyAsync(c -> diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSession.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSession.java index d81a9a9c9363..a9f2eacc6e13 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSession.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSession.java @@ -51,12 +51,12 @@ public Instant getLockedUntilUtc() { } @Override - public void renewLock() throws InterruptedException, ServiceBusException { - Utils.completeFuture(this.renewLockAsync()); + public void renewSessionLock() throws InterruptedException, ServiceBusException { + Utils.completeFuture(this.renewSessionLockAsync()); } @Override - public CompletableFuture renewLockAsync() { + public CompletableFuture renewSessionLockAsync() { return this.getInternalReceiver().renewSessionLocksAsync(); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java index 25b2768dfb69..0d33fc2f67a2 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java @@ -16,6 +16,7 @@ public final class QueueClient extends InitializableEntity implements IQueueClient { private final ReceiveMode receiveMode; + private MessagingFactory factory; private IMessageSender sender; private MessageAndSessionPump messageAndSessionPump; private SessionBrowser sessionBrowser; @@ -27,15 +28,14 @@ private QueueClient(ReceiveMode receiveMode) this.receiveMode = receiveMode; } - public QueueClient(String amqpConnectionString, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException + public QueueClient(ConnectionStringBuilder amqpConnectionStringBuilder, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(receiveMode); - ConnectionStringBuilder builder = new ConnectionStringBuilder(amqpConnectionString); - CompletableFuture factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(builder); - Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createInternals(f, builder.getEntityPath(), receiveMode))); + CompletableFuture factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder); + Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createInternals(f, amqpConnectionStringBuilder.getEntityPath(), receiveMode))); } - public QueueClient(MessagingFactory factory, String queuePath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException + QueueClient(MessagingFactory factory, String queuePath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(receiveMode); Utils.completeFuture(this.createInternals(factory, queuePath, receiveMode)); @@ -43,6 +43,7 @@ public QueueClient(MessagingFactory factory, String queuePath, ReceiveMode recei private CompletableFuture createInternals(MessagingFactory factory, String queuePath, ReceiveMode receiveMode) { + this.factory = factory; CompletableFuture senderFuture = ClientFactory.createMessageSenderFromEntityPathAsync(factory, queuePath); CompletableFuture postSenderFuture = senderFuture.thenAcceptAsync((s) -> { this.sender = s; @@ -137,26 +138,26 @@ CompletableFuture initializeAsync() throws Exception { @Override protected CompletableFuture onClose() { - return this.messageAndSessionPump.closeAsync().thenCompose((v) -> this.sender.closeAsync().thenCompose((u) -> this.miscRequestResponseHandler.closeAsync())); + return this.messageAndSessionPump.closeAsync().thenCompose((v) -> this.sender.closeAsync().thenCompose((u) -> this.miscRequestResponseHandler.closeAsync().thenCompose((w) -> this.factory.closeAsync()))); } - @Override - public Collection getMessageSessions() throws InterruptedException, ServiceBusException { +// @Override + Collection getMessageSessions() throws InterruptedException, ServiceBusException { return Utils.completeFuture(this.getMessageSessionsAsync()); } - @Override - public Collection getMessageSessions(Instant lastUpdatedTime) throws InterruptedException, ServiceBusException { +// @Override + Collection getMessageSessions(Instant lastUpdatedTime) throws InterruptedException, ServiceBusException { return Utils.completeFuture(this.getMessageSessionsAsync(lastUpdatedTime)); } - @Override - public CompletableFuture> getMessageSessionsAsync() { +// @Override + CompletableFuture> getMessageSessionsAsync() { return this.sessionBrowser.getMessageSessionsAsync(); } - @Override - public CompletableFuture> getMessageSessionsAsync(Instant lastUpdatedTime) { +// @Override + CompletableFuture> getMessageSessionsAsync(Instant lastUpdatedTime) { return this.sessionBrowser.getMessageSessionsAsync(Date.from(lastUpdatedTime)); } @@ -190,13 +191,13 @@ public CompletableFuture completeAsync(UUID lockToken) { return this.messageAndSessionPump.completeAsync(lockToken); } - @Override - public void defer(UUID lockToken) throws InterruptedException, ServiceBusException { +// @Override + void defer(UUID lockToken) throws InterruptedException, ServiceBusException { this.messageAndSessionPump.defer(lockToken); } - @Override - public void defer(UUID lockToken, Map propertiesToModify) throws InterruptedException, ServiceBusException { +// @Override + void defer(UUID lockToken, Map propertiesToModify) throws InterruptedException, ServiceBusException { this.messageAndSessionPump.defer(lockToken, propertiesToModify); } @@ -258,5 +259,11 @@ public int getPrefetchCount() { @Override public void setPrefetchCount(int prefetchCount) throws ServiceBusException { this.messageAndSessionPump.setPrefetchCount(prefetchCount); - } + } + + @Override + public String getQueueName() + { + return this.getEntityPath(); + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java index 4a7ff2d70341..c7704ae8ba8d 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; import com.microsoft.azure.servicebus.primitives.MessagingFactory; @@ -17,8 +18,10 @@ public final class SubscriptionClient extends InitializableEntity implements ISubscriptionClient { + private static final String SUBSCRIPTIONS_DELIMITER = "/subscriptions/"; private final ReceiveMode receiveMode; private String subscriptionPath; + private MessagingFactory factory; private MessageAndSessionPump messageAndSessionPump; private SessionBrowser sessionBrowser; private MiscRequestResponseOperationHandler miscRequestResponseHandler; @@ -29,16 +32,15 @@ private SubscriptionClient(ReceiveMode receiveMode) this.receiveMode = receiveMode; } - public SubscriptionClient(String amqpConnectionString, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException + public SubscriptionClient(ConnectionStringBuilder amqpConnectionStringBuilder, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { - this(receiveMode); - ConnectionStringBuilder builder = new ConnectionStringBuilder(amqpConnectionString); - this.subscriptionPath = builder.getEntityPath(); - CompletableFuture factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(builder); + this(receiveMode); + this.subscriptionPath = amqpConnectionStringBuilder.getEntityPath(); + CompletableFuture factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder); Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createPumpAndBrowserAsync(f))); } - public SubscriptionClient(MessagingFactory factory, String subscriptionPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException + SubscriptionClient(MessagingFactory factory, String subscriptionPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(receiveMode); this.subscriptionPath = subscriptionPath; @@ -46,7 +48,8 @@ public SubscriptionClient(MessagingFactory factory, String subscriptionPath, Rec } private CompletableFuture createPumpAndBrowserAsync(MessagingFactory factory) - { + { + this.factory = factory; CompletableFuture postSessionBrowserFuture = MiscRequestResponseOperationHandler.create(factory, this.subscriptionPath).thenAcceptAsync((msoh) -> { this.miscRequestResponseHandler = msoh; this.sessionBrowser = new SessionBrowser(factory, this.subscriptionPath, msoh); @@ -126,26 +129,26 @@ CompletableFuture initializeAsync() throws Exception { @Override protected CompletableFuture onClose() { - return this.messageAndSessionPump.closeAsync().thenCompose((v) -> this.miscRequestResponseHandler.closeAsync()); + return this.messageAndSessionPump.closeAsync().thenCompose((v) -> this.miscRequestResponseHandler.closeAsync().thenCompose((w) -> this.factory.closeAsync())); } - @Override - public Collection getMessageSessions() throws InterruptedException, ServiceBusException { +// @Override + Collection getMessageSessions() throws InterruptedException, ServiceBusException { return Utils.completeFuture(this.getMessageSessionsAsync()); } - @Override - public Collection getMessageSessions(Instant lastUpdatedTime) throws InterruptedException, ServiceBusException { +// @Override + Collection getMessageSessions(Instant lastUpdatedTime) throws InterruptedException, ServiceBusException { return Utils.completeFuture(this.getMessageSessionsAsync(lastUpdatedTime)); } - @Override - public CompletableFuture> getMessageSessionsAsync() { +// @Override + CompletableFuture> getMessageSessionsAsync() { return this.sessionBrowser.getMessageSessionsAsync(); } - @Override - public CompletableFuture> getMessageSessionsAsync(Instant lastUpdatedTime) { +// @Override + CompletableFuture> getMessageSessionsAsync(Instant lastUpdatedTime) { return this.sessionBrowser.getMessageSessionsAsync(Date.from(lastUpdatedTime)); } @@ -179,13 +182,13 @@ public CompletableFuture completeAsync(UUID lockToken) { return this.messageAndSessionPump.completeAsync(lockToken); } - @Override - public void defer(UUID lockToken) throws InterruptedException, ServiceBusException { +// @Override + void defer(UUID lockToken) throws InterruptedException, ServiceBusException { this.messageAndSessionPump.defer(lockToken); } - @Override - public void defer(UUID lockToken, Map propertiesToModify) throws InterruptedException, ServiceBusException { +// @Override + void defer(UUID lockToken, Map propertiesToModify) throws InterruptedException, ServiceBusException { this.messageAndSessionPump.defer(lockToken, propertiesToModify); } @@ -248,4 +251,25 @@ public int getPrefetchCount() { public void setPrefetchCount(int prefetchCount) throws ServiceBusException { this.messageAndSessionPump.setPrefetchCount(prefetchCount); } + + @Override + public String getTopicName() { + String entityPath = this.getEntityPath(); + String[] parts = Pattern.compile(SUBSCRIPTIONS_DELIMITER, Pattern.CASE_INSENSITIVE).split(entityPath, 2); + return parts[0]; + } + + @Override + public String getSubscriptionName() { + String entityPath = this.getEntityPath(); + String[] parts = Pattern.compile(SUBSCRIPTIONS_DELIMITER, Pattern.CASE_INSENSITIVE).split(entityPath, 2); + if(parts.length == 2) + { + return parts[1]; + } + else + { + throw new RuntimeException("Invalid entity path in the subscription client."); + } + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java index 9452a94762c7..1696443ffe7a 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java @@ -4,6 +4,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; import com.microsoft.azure.servicebus.primitives.MessagingFactory; import com.microsoft.azure.servicebus.primitives.ServiceBusException; import com.microsoft.azure.servicebus.primitives.StringUtil; @@ -18,14 +19,14 @@ private TopicClient() super(StringUtil.getShortRandomString(), null); } - public TopicClient(String amqpConnectionString) throws InterruptedException, ServiceBusException + public TopicClient(ConnectionStringBuilder amqpConnectionStringBuilder) throws InterruptedException, ServiceBusException { this(); - this.sender = ClientFactory.createMessageSenderFromConnectionString(amqpConnectionString); + this.sender = ClientFactory.createMessageSenderFromConnectionStringBuilder(amqpConnectionStringBuilder); this.browser = new MessageBrowser((MessageSender)sender); } - public TopicClient(MessagingFactory factory, String topicPath) throws InterruptedException, ServiceBusException + TopicClient(MessagingFactory factory, String topicPath) throws InterruptedException, ServiceBusException { this(); this.sender = ClientFactory.createMessageSenderFromEntityPath(factory, topicPath); @@ -127,4 +128,9 @@ CompletableFuture initializeAsync() throws Exception { protected CompletableFuture onClose() { return this.sender.closeAsync(); } + + @Override + public String getTopicName() { + return this.getEntityPath(); + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java index 7858d6ad0ede..c062eee10f25 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java @@ -106,7 +106,7 @@ public void onConnectionRemoteOpen(Event event) TRACE_LOGGER.log(Level.FINE, "Connection.onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + ", " + event.getConnection().getRemoteContainer() +"]"); } - this.messagingFactory.onOpenComplete(null); + this.messagingFactory.onOpenComplete(); } @Override diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java index a931739f4458..7dc50b42a8d8 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java @@ -9,7 +9,7 @@ public interface IAmqpConnection { - void onOpenComplete(Exception exception); + void onOpenComplete(); void onConnectionError(ErrorCondition error); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index f68048bc7d9b..9f57412b9201 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -94,6 +94,7 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, private Instant lastKnownErrorReportedAt; private int nextCreditToFlow; private ScheduledFuture sasTokenRenewTimerFuture; + private CompletableFuture requestResponseLinkCreationFuture; private final Runnable timedOutUpdateStateRequestsDaemon; @@ -230,20 +231,33 @@ public void onEvent() private CompletableFuture createRequestResponseLink() { - synchronized (this.requestResonseLinkCreationLock) - { - if(this.requestResponseLink == null) - { - String requestResponseLinkPath = RequestResponseLink.getManagementNodeLinkPath(this.receivePath); - CompletableFuture crateAndAssignRequestResponseLink = - RequestResponseLink.createAsync(this.underlyingFactory, this.getClientId() + "-RequestResponse", requestResponseLinkPath).thenAccept((rrlink) -> {this.requestResponseLink = rrlink;}); - return crateAndAssignRequestResponseLink; - } - else - { - return CompletableFuture.completedFuture(null); - } - } + synchronized (this.requestResonseLinkCreationLock) { + if(this.requestResponseLinkCreationFuture == null) + { + this.requestResponseLinkCreationFuture = new CompletableFuture(); + String requestResponseLinkPath = RequestResponseLink.getManagementNodeLinkPath(this.receivePath); + RequestResponseLink.createAsync(this.underlyingFactory, this.getClientId() + "-RequestResponse", requestResponseLinkPath).handleAsync((rrlink, ex) -> + { + if(ex == null) + { + this.requestResponseLink = rrlink; + this.requestResponseLinkCreationFuture.complete(null); + } + else + { + this.requestResponseLinkCreationFuture.completeExceptionally(ExceptionUtil.extractAsyncCompletionCause(ex)); + // Set it to null so next call will retry rr link creation + synchronized (this.requestResonseLinkCreationLock) + { + this.requestResponseLinkCreationFuture = null; + } + } + return null; + }); + } + + return this.requestResponseLinkCreationFuture; + } } private void createReceiveLink() diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java index 1f6711b159c4..1269e229cf6a 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java @@ -78,6 +78,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr private Exception lastKnownLinkError; private Instant lastKnownErrorReportedAt; private ScheduledFuture sasTokenRenewTimerFuture; + private CompletableFuture requestResponseLinkCreationFuture; public static CompletableFuture create( final MessagingFactory factory, @@ -121,20 +122,34 @@ public void onEvent() } private CompletableFuture createRequestResponseLink() - { + { synchronized (this.requestResonseLinkCreationLock) { - if(this.requestResponseLink == null) - { - String requestResponseLinkPath = RequestResponseLink.getManagementNodeLinkPath(this.sendPath); - CompletableFuture crateAndAssignRequestResponseLink = - RequestResponseLink.createAsync(this.underlyingFactory, this.getClientId() + "-RequestResponse", requestResponseLinkPath).thenAccept((rrlink) -> {this.requestResponseLink = rrlink;}); - return crateAndAssignRequestResponseLink; - } - else - { - return CompletableFuture.completedFuture(null); - } - } + if(this.requestResponseLinkCreationFuture == null) + { + this.requestResponseLinkCreationFuture = new CompletableFuture(); + String requestResponseLinkPath = RequestResponseLink.getManagementNodeLinkPath(this.sendPath); + RequestResponseLink.createAsync(this.underlyingFactory, this.getClientId() + "-RequestResponse", requestResponseLinkPath).handleAsync((rrlink, ex) -> + { + if(ex == null) + { + this.requestResponseLink = rrlink; + this.requestResponseLinkCreationFuture.complete(null); + } + else + { + this.requestResponseLinkCreationFuture.completeExceptionally(ExceptionUtil.extractAsyncCompletionCause(ex)); + // Set it to null so next call will retry rr link creation + synchronized (this.requestResonseLinkCreationLock) + { + this.requestResponseLinkCreationFuture = null; + } + } + return null; + }); + } + + return this.requestResponseLinkCreationFuture; + } } private CoreMessageSender(final MessagingFactory factory, final String sendLinkName, final String senderPath) diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java index d46be6fc7d16..38e9a1a83041 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java @@ -43,10 +43,10 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection, I public static final Duration DefaultOperationTimeout = Duration.ofSeconds(30); private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE); - private final Object requestResponseLinkCreationLock = new Object(); + private static final int MAX_CBS_LINK_CREATION_ATTEMPTS = 3; private final ConnectionStringBuilder builder; private final String hostName; - private final CompletableFuture closeTask; + private final CompletableFuture connetionCloseFuture; private final ConnectionHandler connectionHandler; private final ReactorHandler reactorHandler; private final LinkedList registeredLinks; @@ -58,9 +58,11 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection, I private Duration operationTimeout; private RetryPolicy retryPolicy; - private CompletableFuture open; - private CompletableFuture openConnection; + private CompletableFuture factoryOpenFuture; + private CompletableFuture cbsLinkCreationFuture; private RequestResponseLink cbsLink; + private int cbsLinkCreationAttempts = 0; + private Throwable lastCBSLinkCreationException = null; /** * @param reactor parameter reactor is purely for testing purposes and the SDK code should always set it to null @@ -76,11 +78,11 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection, I this.operationTimeout = builder.getOperationTimeout(); this.retryPolicy = builder.getRetryPolicy(); this.registeredLinks = new LinkedList(); - this.closeTask = new CompletableFuture(); + this.connetionCloseFuture = new CompletableFuture(); this.reactorLock = new Object(); this.connectionHandler = new ConnectionHandler(this); - this.open = new CompletableFuture(); - this.openConnection = new CompletableFuture(); + this.factoryOpenFuture = new CompletableFuture(); + this.cbsLinkCreationFuture = new CompletableFuture(); this.reactorHandler = new ReactorHandler() { @@ -125,7 +127,7 @@ private void startReactor(ReactorHandler reactorHandler) throws IOException this.reactorScheduler = new ReactorDispatcher(newReactor); } - final Thread reactorThread = new Thread(new RunReactor(newReactor)); + final Thread reactorThread = new Thread(new RunReactor()); reactorThread.start(); } @@ -157,9 +159,9 @@ public static CompletableFuture createFromConnectionStringBuil messagingFactory.startReactor(messagingFactory.reactorHandler); } catch (IOException e) { e.printStackTrace(); - messagingFactory.open.completeExceptionally(e); + messagingFactory.factoryOpenFuture.completeExceptionally(e); } - return messagingFactory.open; + return messagingFactory.factoryOpenFuture; } public static CompletableFuture createFromConnectionStringAsync(final String connectionString) @@ -179,74 +181,48 @@ public static MessagingFactory createFromConnectionString(final String connectio } @Override - public void onOpenComplete(Exception exception) + public void onOpenComplete() { - if (exception == null) - { - AsyncUtil.completeFuture(this.open, this); - AsyncUtil.completeFuture(this.openConnection, this.connection); - } - else - { - AsyncUtil.completeFutureExceptionally(this.open, exception); - AsyncUtil.completeFutureExceptionally(this.openConnection, exception); - } + if(!factoryOpenFuture.isDone()) + { + AsyncUtil.completeFuture(this.factoryOpenFuture, this); + } + + // Connection opened. Initiate new cbs link creation + this.createCBSLinkAsync(); } @Override public void onConnectionError(ErrorCondition error) { - if (!this.open.isDone()) + if (!this.factoryOpenFuture.isDone()) { - this.onOpenComplete(ExceptionUtil.toException(error)); + AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, ExceptionUtil.toException(error)); } else { - final Connection currentConnection = this.connection; - Link[] links = this.registeredLinks.toArray(new Link[0]); - - this.openConnection = new CompletableFuture(); - - for(Link link : links) - { - if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) - { - link.close(); - } - } - - if (currentConnection.getLocalState() != EndpointState.CLOSED && currentConnection.getRemoteState() != EndpointState.CLOSED) - { - currentConnection.close(); - } - - for(Link link : links) - { - Handler handler = BaseHandler.getHandler(link); - if (handler != null && handler instanceof BaseLinkHandler) - { - BaseLinkHandler linkHandler = (BaseLinkHandler) handler; - linkHandler.processOnClose(link, error); - } - } + this.closeConnection(error, null); } - if (this.getIsClosingOrClosed() && !this.closeTask.isDone()) + if (this.getIsClosingOrClosed() && !this.connetionCloseFuture.isDone()) { - this.closeTask.complete(null); + this.connetionCloseFuture.complete(null); Timer.unregister(this.getClientId()); } } private void onReactorError(Exception cause) { - if (!this.open.isDone()) + if (!this.factoryOpenFuture.isDone()) { - this.onOpenComplete(cause); + AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, cause); } else { - final Connection currentConnection = this.connection; + if(this.getIsClosingOrClosed()) + { + return; + } try { @@ -254,90 +230,144 @@ private void onReactorError(Exception cause) } catch (IOException e) { - TRACE_LOGGER.log(Level.SEVERE, ExceptionUtil.toStackTraceString(e, "Re-starting reactor failed with error")); - + TRACE_LOGGER.log(Level.SEVERE, ExceptionUtil.toStackTraceString(e, "Re-starting reactor failed with error")); this.onReactorError(cause); - } - - Link[] links = this.registeredLinks.toArray(new Link[0]); - - for(Link link : links) - { - if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) - { - link.close(); - } - } - - if (currentConnection.getLocalState() != EndpointState.CLOSED && currentConnection.getRemoteState() != EndpointState.CLOSED) - { - currentConnection.close(); } - - for(Link link : links) - { - Handler handler = BaseHandler.getHandler(link); - if (handler != null && handler instanceof BaseLinkHandler) - { - BaseLinkHandler linkHandler = (BaseLinkHandler) handler; - linkHandler.processOnClose(link, cause); - } - } + + this.closeConnection(null, cause); } } + + // One of the parameters must be null + private void closeConnection(ErrorCondition error, Exception cause) + { + // Important to copy the reference of the connection as a call to getConnection might create a new connection while we are still in this method + Connection currentConnection = this.connection; + if(connection != null) + { + Link[] links = this.registeredLinks.toArray(new Link[0]); + + for(Link link : links) + { + if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) + { + link.close(); + } + } + + if(this.cbsLink != null) + { + try { + this.cbsLink.close(); + } catch (ServiceBusException e) { + // Ignore this exception + } + } + + if(this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone()) + { + AsyncUtil.completeFutureExceptionally(this.cbsLinkCreationFuture, new Exception("Connection closed.")); + } + + this.cbsLinkCreationFuture = new CompletableFuture(); + + if (currentConnection.getLocalState() != EndpointState.CLOSED && currentConnection.getRemoteState() != EndpointState.CLOSED) + { + currentConnection.close(); + } + + for(Link link : links) + { + Handler handler = BaseHandler.getHandler(link); + if (handler != null && handler instanceof BaseLinkHandler) + { + BaseLinkHandler linkHandler = (BaseLinkHandler) handler; + if(error != null) + { + linkHandler.processOnClose(link, error); + } + else + { + linkHandler.processOnClose(link, cause); + } + } + } + } + } @Override protected CompletableFuture onClose() { if (!this.getIsClosed()) { - if (this.connection != null && this.connection.getRemoteState() != EndpointState.CLOSED) - { - try { - this.scheduleOnReactorThread(new DispatchHandler() - { - @Override - public void onEvent() - { - if (MessagingFactory.this.connection != null && MessagingFactory.this.connection.getLocalState() != EndpointState.CLOSED) - { - MessagingFactory.this.connection.close(); - } - } - }); - } catch (IOException e) { - AsyncUtil.completeFutureExceptionally(this.closeTask, e); - } - - Timer.schedule(new Runnable() - { - @Override - public void run() - { - if (!MessagingFactory.this.closeTask.isDone()) - { - MessagingFactory.this.closeTask.completeExceptionally(new TimeoutException("Closing MessagingFactory timed out.")); - } - } - }, - this.operationTimeout, TimerType.OneTimeRun); - } - else if(this.connection == null || this.connection.getRemoteState() == EndpointState.CLOSED) - { - AsyncUtil.completeFuture(this.closeTask, null); - } - } - - return this.closeTask; + CompletableFuture cbsLinkCloseFuture; + if(this.cbsLink == null) + { + cbsLinkCloseFuture = CompletableFuture.completedFuture(null); + } + else + { + cbsLinkCloseFuture = this.cbsLink.closeAsync(); + } + + cbsLinkCloseFuture.thenRun(() -> { + if(this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone()) + { + AsyncUtil.completeFutureExceptionally(this.cbsLinkCreationFuture, new Exception("Connection closed.")); + } + + if (this.connection != null && this.connection.getRemoteState() != EndpointState.CLOSED) + { + try { + this.scheduleOnReactorThread(new DispatchHandler() + { + @Override + public void onEvent() + { + if (MessagingFactory.this.connection != null && MessagingFactory.this.connection.getLocalState() != EndpointState.CLOSED) + { + MessagingFactory.this.connection.close(); + } + } + }); + } catch (IOException e) { + AsyncUtil.completeFutureExceptionally(this.connetionCloseFuture, e); + } + + Timer.schedule(new Runnable() + { + @Override + public void run() + { + if (!MessagingFactory.this.connetionCloseFuture.isDone()) + { + MessagingFactory.this.connetionCloseFuture.completeExceptionally(new TimeoutException("Closing MessagingFactory timed out.")); + } + } + }, + this.operationTimeout, TimerType.OneTimeRun); + } + else if(this.connection == null || this.connection.getRemoteState() == EndpointState.CLOSED) + { + this.connetionCloseFuture.complete(null); + } + }); + + return this.connetionCloseFuture; + } + else + { + return CompletableFuture.completedFuture(null); + } } private class RunReactor implements Runnable { final private Reactor rctr; - public RunReactor(final Reactor reactor) + public RunReactor() { - this.rctr = reactor; + this.rctr = MessagingFactory.this.getReactor(); } public void run() @@ -351,7 +381,16 @@ public void run() { this.rctr.setTimeout(3141); this.rctr.start(); - while(!Thread.interrupted() && this.rctr.process()) {} + boolean continuteProcessing = true; + while(!Thread.interrupted() && continuteProcessing) + { + // If factory is closed, stop reactor too + if(MessagingFactory.this.getIsClosed()) + { + break; + } + continuteProcessing = this.rctr.process(); + } this.rctr.stop(); } catch (HandlerException handlerException) @@ -436,7 +475,7 @@ CompletableFuture> sendSASTokenAndSetRenewTimer(String sasTok final String finalSasToken = sasToken; final boolean finalIsSasTokenGenerated = isSasTokenGenerated; - CompletableFuture sendTokenFuture = this.createCBSLink().thenComposeAsync((v) -> { + CompletableFuture sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync((v) -> { return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.operationTimeout), finalSasToken, ClientConstants.SAS_TOKEN_TYPE, sasTokenAudienceURI); }); return sendTokenFuture.thenApplyAsync((v) -> { @@ -454,20 +493,33 @@ CompletableFuture> sendSASTokenAndSetRenewTimer(String sasTok }); } - private CompletableFuture createCBSLink() + private CompletableFuture createCBSLinkAsync() { - synchronized (this.requestResponseLinkCreationLock) { - if(this.cbsLink == null) - { - String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath(); - CompletableFuture crateAndAssignRequestResponseLink = - RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath).thenAccept((rrlink) -> {this.cbsLink = rrlink;}); - return crateAndAssignRequestResponseLink; - } - else - { - return CompletableFuture.completedFuture(null); - } - } + if(++this.cbsLinkCreationAttempts > MAX_CBS_LINK_CREATION_ATTEMPTS ) + { + Throwable completionEx = this.lastCBSLinkCreationException == null ? new Exception("CBS link creation failed multiple times.") : this.lastCBSLinkCreationException; + this.cbsLinkCreationFuture.completeExceptionally(completionEx); + return CompletableFuture.completedFuture(null); + } + else + { + String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath(); + CompletableFuture crateAndAssignRequestResponseLink = + RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath).handleAsync((cbsLink, ex) -> + { + if(ex == null) + { + this.cbsLink = cbsLink; + this.cbsLinkCreationFuture.complete(null); + } + else + { + this.lastCBSLinkCreationException = ExceptionUtil.extractAsyncCompletionCause(ex); + this.createCBSLinkAsync(); + } + return null; + }); + return crateAndAssignRequestResponseLink; + } } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java index e4aae24035ee..e4a1d1cf2215 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java @@ -1,9 +1,14 @@ package com.microsoft.azure.servicebus.primitives; +import java.time.ZonedDateTime; import java.util.Date; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.qpid.proton.message.Message; @@ -11,10 +16,15 @@ public final class MiscRequestResponseOperationHandler extends ClientEntity { + private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE); + private final Object requestResonseLinkCreationLock = new Object(); private final String entityPath; + private final String sasTokenAudienceURI; private final MessagingFactory underlyingFactory; private RequestResponseLink requestResponseLink; + private CompletableFuture requestResponseLinkCreationFuture; + private ScheduledFuture sasTokenRenewTimerFuture; private MiscRequestResponseOperationHandler(MessagingFactory factory, String linkName, String entityPath) { @@ -22,36 +32,105 @@ private MiscRequestResponseOperationHandler(MessagingFactory factory, String lin this.underlyingFactory = factory; this.entityPath = entityPath; + this.sasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), entityPath); } - public static CompletableFuture create( - MessagingFactory factory, - String entityPath) + public static CompletableFuture create(MessagingFactory factory, String entityPath) { - MiscRequestResponseOperationHandler sessionBrowser = new MiscRequestResponseOperationHandler(factory, StringUtil.getShortRandomString(), entityPath); - return CompletableFuture.completedFuture(sessionBrowser); + CompletableFuture creationFuture = new CompletableFuture(); + MiscRequestResponseOperationHandler requestResponseOperationHandler = new MiscRequestResponseOperationHandler(factory, StringUtil.getShortRandomString(), entityPath); + requestResponseOperationHandler.sendSASTokenAndSetRenewTimer().handleAsync((v, ex) -> { + if(ex == null) + { + creationFuture.complete(requestResponseOperationHandler); + } + else + { + creationFuture.completeExceptionally(ExceptionUtil.extractAsyncCompletionCause(ex)); + } + return null; + }); + + Timer.schedule( + new Runnable() + { + public void run() + { + if (!creationFuture.isDone()) + { + requestResponseOperationHandler.cancelSASTokenRenewTimer(); + Exception operationTimedout = new TimeoutException( + String.format(Locale.US, "Open operation on CBSLink(%s) on Entity(%s) timed out at %s.", requestResponseOperationHandler.getClientId(), requestResponseOperationHandler.entityPath, ZonedDateTime.now().toString())); + if (TRACE_LOGGER.isLoggable(Level.WARNING)) + { + TRACE_LOGGER.log(Level.WARNING, operationTimedout.getMessage()); + } + + creationFuture.completeExceptionally(operationTimedout); + } + } + } + , factory.getOperationTimeout() + , TimerType.OneTimeRun); + return creationFuture; } @Override protected CompletableFuture onClose() { + this.cancelSASTokenRenewTimer(); return this.requestResponseLink == null ? CompletableFuture.completedFuture(null) : this.requestResponseLink.closeAsync(); } + CompletableFuture sendSASTokenAndSetRenewTimer() + { + if(this.getIsClosingOrClosed()) + { + return CompletableFuture.completedFuture(null); + } + else + { + CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSASTokenAndSetRenewTimer(this.sasTokenAudienceURI, () -> this.sendSASTokenAndSetRenewTimer()); + return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;}); + } + } + + private void cancelSASTokenRenewTimer() + { + if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) + { + this.sasTokenRenewTimerFuture.cancel(true); + } + } + private CompletableFuture createRequestResponseLink() { - synchronized (this.requestResonseLinkCreationLock) { - if(this.requestResponseLink == null) - { - String requestResponseLinkPath = RequestResponseLink.getManagementNodeLinkPath(this.entityPath); - CompletableFuture crateAndAssignRequestResponseLink = - RequestResponseLink.createAsync(this.underlyingFactory, this.getClientId() + "-RequestResponse", requestResponseLinkPath).thenAccept((rrlink) -> {this.requestResponseLink = rrlink;}); - return crateAndAssignRequestResponseLink; - } - else - { - return CompletableFuture.completedFuture(null); - } - } + synchronized (this.requestResonseLinkCreationLock) { + if(this.requestResponseLinkCreationFuture == null) + { + this.requestResponseLinkCreationFuture = new CompletableFuture(); + String requestResponseLinkPath = RequestResponseLink.getManagementNodeLinkPath(this.entityPath); + RequestResponseLink.createAsync(this.underlyingFactory, this.getClientId() + "-RequestResponse", requestResponseLinkPath).handleAsync((rrlink, ex) -> + { + if(ex == null) + { + this.requestResponseLink = rrlink; + this.requestResponseLinkCreationFuture.complete(null); + } + else + { + this.requestResponseLinkCreationFuture.completeExceptionally(ExceptionUtil.extractAsyncCompletionCause(ex)); + // Set it to null so next call will retry rr link creation + synchronized (this.requestResonseLinkCreationLock) + { + this.requestResponseLinkCreationFuture = null; + } + } + return null; + }); + } + + return this.requestResponseLinkCreationFuture; + } } public CompletableFuture> getMessageSessionsAsync(Date lastUpdatedTime, int skip, int top, String lastSessionId) diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueClientTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueClientTests.java index 26b940fc2796..cc621ef6b616 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueClientTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueClientTests.java @@ -44,12 +44,12 @@ private void createSessionfulQueueClient() throws InterruptedException, ServiceB private void createQueueClient(ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { - this.queueClient = new QueueClient(TestUtils.getNonPartitionedQueueConnectionStringBuilder().toString(), receiveMode); + this.queueClient = new QueueClient(TestUtils.getNonPartitionedQueueConnectionStringBuilder(), receiveMode); } private void createSessionfulQueueClient(ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { - this.sessionfulQueueClient = new QueueClient(TestUtils.getNonPartitionedSessionfulQueueConnectionStringBuilder().toString(), receiveMode); + this.sessionfulQueueClient = new QueueClient(TestUtils.getNonPartitionedSessionfulQueueConnectionStringBuilder(), receiveMode); } @Test diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java index 2786663d6fbc..97ccb7d3b37d 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java @@ -209,10 +209,10 @@ public void testRenewSessionLock() throws InterruptedException, ServiceBusExcept String sessionId = getRandomString(); this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); Instant initialValidity = this.session.getLockedUntilUtc(); - this.session.renewLock(); + this.session.renewSessionLock(); Instant renewedValidity = this.session.getLockedUntilUtc(); Assert.assertTrue("RenewSessionLock did not renew session lockeduntil time.", renewedValidity.isAfter(initialValidity)); - this.session.renewLock(); + this.session.renewSessionLock(); Instant renewedValidity2 = this.session.getLockedUntilUtc(); Assert.assertTrue("RenewSessionLock did not renew session lockeduntil time.", renewedValidity2.isAfter(renewedValidity)); } diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientTests.java index d7c4e96dccd4..254455ba2f17 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientTests.java @@ -63,14 +63,14 @@ private void createSessionfulSubscriptionClient() throws InterruptedException, S private void createSubscriptionClient(ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { - this.topicClient = new TopicClient(TestUtils.getNonPartitionedTopicConnectionStringBuilder().toString()); - this.subscriptionClient = new SubscriptionClient(TestUtils.getNonPartitionedSubscriptionConnectionStringBuilder().toString(), receiveMode); + this.topicClient = new TopicClient(TestUtils.getNonPartitionedTopicConnectionStringBuilder()); + this.subscriptionClient = new SubscriptionClient(TestUtils.getNonPartitionedSubscriptionConnectionStringBuilder(), receiveMode); } private void createSessionfulSubscriptionClient(ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { - this.sessionfulTopicClient = new TopicClient(TestUtils.getNonPartitionedSessionfulTopicConnectionStringBuilder().toString()); - this.sessionfulSubscriptionClient = new SubscriptionClient(TestUtils.getNonPartitionedSessionfulSubscriptionConnectionStringBuilder().toString(), receiveMode); + this.sessionfulTopicClient = new TopicClient(TestUtils.getNonPartitionedSessionfulTopicConnectionStringBuilder()); + this.sessionfulSubscriptionClient = new SubscriptionClient(TestUtils.getNonPartitionedSessionfulSubscriptionConnectionStringBuilder(), receiveMode); } @Test @@ -215,4 +215,11 @@ public void testSessionPumpRenewLock() throws InterruptedException, ServiceBusEx this.createSessionfulSubscriptionClient(); MessageAndSessionPumpTests.testSessionPumpRenewLock(this.sessionfulTopicClient, this.sessionfulSubscriptionClient); } + + @Test + public void testSubscriptionNameSplitting() throws InterruptedException, ServiceBusException + { + this.subscriptionClient = new SubscriptionClient(TestUtils.getNonPartitionedSubscriptionConnectionStringBuilder(), ReceiveMode.PeekLock); + Assert.assertEquals("Wrong subscription name returned.", TestUtils.getProperty(TestUtils.SUBSCRIPTION_NAME_PROPERTY), this.subscriptionClient.getSubscriptionName()); + } } diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java index 2631067fbb51..356e1402f072 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java @@ -394,7 +394,7 @@ public static void testReceiveBySequenceNumberAndComplete(IMessageSender sender, receiver.defer(receivedMessage.getLockToken()); // Now receive by sequence number - receivedMessage = receiver.receive(sequenceNumber); + receivedMessage = receiver.receiveBySequenceNumber(sequenceNumber); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); receiver.complete(receivedMessage.getLockToken()); @@ -402,7 +402,7 @@ public static void testReceiveBySequenceNumberAndComplete(IMessageSender sender, // Try to receive by sequence number again try { - receivedMessage = receiver.receive(sequenceNumber); + receivedMessage = receiver.receiveBySequenceNumber(sequenceNumber); Assert.fail("Message recieved by sequnce number was not properly completed."); } catch(MessageNotFoundException e) @@ -426,14 +426,14 @@ public static void testReceiveBySequenceNumberAndAbandon(IMessageSender sender, receiver.defer(receivedMessage.getLockToken()); // Now receive by sequence number - receivedMessage = receiver.receive(sequenceNumber); + receivedMessage = receiver.receiveBySequenceNumber(sequenceNumber); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); long deliveryCount = receivedMessage.getDeliveryCount(); receiver.abandon(receivedMessage.getLockToken()); // Try to receive by sequence number again - receivedMessage = receiver.receive(sequenceNumber); + receivedMessage = receiver.receiveBySequenceNumber(sequenceNumber); Assert.assertEquals("Abandon didn't increase the delivery count for the message received by sequence number.", deliveryCount + 1, receivedMessage.getDeliveryCount()); receiver.complete(receivedMessage.getLockToken()); } @@ -463,7 +463,7 @@ public static void testReceiveBySequenceNumberAndDefer(IMessageSender sender, St receiver.defer(receivedMessage.getLockToken(), customProperties); // Now receive by sequence number - receivedMessage = receiver.receive(sequenceNumber); + receivedMessage = receiver.receiveBySequenceNumber(sequenceNumber); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); Assert.assertEquals("Defer didn't update properties of the message received by sequence number", firstDeferredPhase, receivedMessage.getProperties().get(phaseKey)); @@ -471,7 +471,7 @@ public static void testReceiveBySequenceNumberAndDefer(IMessageSender sender, St receiver.defer(receivedMessage.getLockToken(), customProperties); // Try to receive by sequence number again - receivedMessage = receiver.receive(sequenceNumber); + receivedMessage = receiver.receiveBySequenceNumber(sequenceNumber); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message after deferrring", sequenceNumber, receivedMessage.getSequenceNumber()); Assert.assertEquals("Defer didn't update properties of the message received by sequence number", secondDeferredPhase, receivedMessage.getProperties().get(phaseKey)); receiver.complete(receivedMessage.getLockToken()); @@ -492,7 +492,7 @@ public static void testReceiveBySequenceNumberAndDeadletter(IMessageSender sende receiver.defer(receivedMessage.getLockToken()); // Now receive by sequence number - receivedMessage = receiver.receive(sequenceNumber); + receivedMessage = receiver.receiveBySequenceNumber(sequenceNumber); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); String deadLetterReason = "java client deadletter test"; @@ -501,7 +501,7 @@ public static void testReceiveBySequenceNumberAndDeadletter(IMessageSender sende // Try to receive by sequence number again try { - receivedMessage = receiver.receive(sequenceNumber); + receivedMessage = receiver.receiveBySequenceNumber(sequenceNumber); Assert.fail("Message received by sequence number was not properly deadlettered"); } catch(MessageNotFoundException e) @@ -510,7 +510,7 @@ public static void testReceiveBySequenceNumberAndDeadletter(IMessageSender sende } } - public static void testGetMessageSessions(IMessageSender sender, IMessageSessionEntity sessionsClient) throws InterruptedException, ServiceBusException + public static void testGetMessageSessions(IMessageSender sender, Object sessionsClient) throws InterruptedException, ServiceBusException { int numSessions = 110; // More than default page size String[] sessionIds = new String[numSessions]; @@ -522,7 +522,12 @@ public static void testGetMessageSessions(IMessageSender sender, IMessageSession sender.send(message); } - Collection sessions = Utils.completeFuture(sessionsClient.getMessageSessionsAsync()); + Collection sessions; + if(sessionsClient instanceof QueueClient) + sessions = Utils.completeFuture(((QueueClient)sessionsClient).getMessageSessionsAsync()); + else + sessions = Utils.completeFuture(((SubscriptionClient)sessionsClient).getMessageSessionsAsync()); + Assert.assertTrue("GetMessageSessions didnot return all sessions", numSessions <= sessions.size()); // There could be sessions left over from other tests IMessageSession anySession = (IMessageSession)sessions.toArray()[0]; @@ -585,7 +590,7 @@ public static void drainAllMessagesFromReceiver(IMessageReceiver receiver, boole { try { - IMessage message = receiver.receive(peekedMessage.getSequenceNumber()); + IMessage message = receiver.receiveBySequenceNumber(peekedMessage.getSequenceNumber()); if(receiver.getReceiveMode() == ReceiveMode.PeekLock) { receiver.complete(message.getLockToken()); @@ -610,17 +615,18 @@ public static void drainAllMessages(ConnectionStringBuilder connectionStringBuil public static void drainAllSessions(ConnectionStringBuilder connectionStringBuilder, boolean isQueue) throws InterruptedException, ServiceBusException { int numParallelSessionDrains = 5; - IMessageSessionEntity sessionsClient; + Collection browsableSessions; if(isQueue) { - sessionsClient = new QueueClient(connectionStringBuilder.toString(), ReceiveMode.ReceiveAndDelete); + QueueClient qc = new QueueClient(connectionStringBuilder, ReceiveMode.ReceiveAndDelete); + browsableSessions = qc.getMessageSessions(); } else { - sessionsClient = new SubscriptionClient(connectionStringBuilder.toString(), ReceiveMode.ReceiveAndDelete); - } + SubscriptionClient sc = new SubscriptionClient(connectionStringBuilder, ReceiveMode.ReceiveAndDelete); + browsableSessions = sc.getMessageSessions(); + } - Collection browsableSessions = sessionsClient.getMessageSessions(); if(browsableSessions != null && browsableSessions.size() > 0) { CompletableFuture[] drainFutures = new CompletableFuture[numParallelSessionDrains]; diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java index 0c04ae813837..7de27d5957ea 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java @@ -21,7 +21,7 @@ public class TestUtils { //Topic and Subscription private static final String NON_PARTITIONED_TOPIC_NAME_PROPERTY = "non.partitioned.topic.name"; - private static final String SUBSCRIPTION_NAME_PROPERTY = "subscription.name"; + static final String SUBSCRIPTION_NAME_PROPERTY = "subscription.name"; //Sessionful Topic and Subscription private static final String NON_PARTITIONED_SESSIONFUL_TOPIC_NAME_PROPERTY = "session.non.partitioned.topic.name"; @@ -52,7 +52,7 @@ public class TestUtils { } } - private static String getProperty(String propertyName) + static String getProperty(String propertyName) { String defaultValue = ""; return accessProperties.getProperty(propertyName, defaultValue);