Skip to content

Commit

Permalink
Merge pull request Azure#57 from yvgopal/vijay-work
Browse files Browse the repository at this point in the history
Bug fixes
  • Loading branch information
binzywu authored May 30, 2017
2 parents 21a0f7a + 72394e5 commit 3533662
Show file tree
Hide file tree
Showing 28 changed files with 504 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public CompletableFuture<Void> setStateAsync(byte[] sessionState) {
}

@Override
public CompletableFuture<Void> renewLockAsync() {
public CompletableFuture<Void> renewSessionLockAsync() {
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

Expand Down Expand Up @@ -113,7 +113,7 @@ public CompletableFuture<IMessage> receiveAsync(Duration serverWaitTime)
}

@Override
public CompletableFuture<IMessage> receiveAsync(long sequenceNumber)
public CompletableFuture<IMessage> receiveBySequenceNumberAsync(long sequenceNumber)
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static IMessageSender createMessageSenderFromConnectionStringBuilder(Conn
return Utils.completeFuture(createMessageSenderFromConnectionStringBuilderAsync(amqpConnectionStringBuilder));
}

public static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException
static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException
{
return Utils.completeFuture(createMessageSenderFromEntityPathAsync(messagingFactory, entityPath));
}
Expand All @@ -39,7 +39,7 @@ public static CompletableFuture<IMessageSender> createMessageSenderFromConnectio
return sender.initializeAsync().thenApply((v) -> sender);
}

public static CompletableFuture<IMessageSender> createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath)
static CompletableFuture<IMessageSender> createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath)
{
Utils.assertNonNull("messagingFactory", messagingFactory);
MessageSender sender = new MessageSender(messagingFactory, entityPath);
Expand Down Expand Up @@ -68,12 +68,12 @@ public static IMessageReceiver createMessageReceiverFromConnectionStringBuilder(
return Utils.completeFuture(createMessageReceiverFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, receiveMode));
}

public static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException
static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException
{
return createMessageReceiverFromEntityPath(messagingFactory, entityPath, DEFAULTRECEIVEMODE);
}

public static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException
static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException
{
return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, receiveMode));
}
Expand Down Expand Up @@ -101,12 +101,12 @@ public static CompletableFuture<IMessageReceiver> createMessageReceiverFromConne
return receiver.initializeAsync().thenApply((v) -> receiver);
}

public static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath)
static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath)
{
return createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, DEFAULTRECEIVEMODE);
}

public static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode)
static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode)
{
Utils.assertNonNull("messagingFactory", messagingFactory);
MessageReceiver receiver = new MessageReceiver(messagingFactory, entityPath, receiveMode);
Expand Down Expand Up @@ -134,12 +134,12 @@ public static IMessageSession acceptSessionFromConnectionStringBuilder(Connectio
return Utils.completeFuture(acceptSessionFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, sessionId, receiveMode));
}

public static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId) throws InterruptedException, ServiceBusException
static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId) throws InterruptedException, ServiceBusException
{
return acceptSessionFromEntityPath(messagingFactory, entityPath, sessionId, DEFAULTRECEIVEMODE);
}

public static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException
static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException
{
return Utils.completeFuture(acceptSessionFromEntityPathAsync(messagingFactory, entityPath, sessionId, receiveMode));
}
Expand Down Expand Up @@ -167,12 +167,12 @@ public static CompletableFuture<IMessageSession> acceptSessionFromConnectionStri
return session.initializeAsync().thenApply((v) -> session);
}

public static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId)
static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId)
{
return acceptSessionFromEntityPathAsync(messagingFactory, entityPath, sessionId, DEFAULTRECEIVEMODE);
}

public static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode)
static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode)
{
Utils.assertNonNull("messagingFactory", messagingFactory);
MessageSession session = new MessageSession(messagingFactory, entityPath, sessionId, receiveMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface IMessage {

public void setSessionId(String sessionId);

public byte[] getContent();
public byte[] getBody();

public void setContent(byte[] content);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ interface IMessageAndSessionPump

CompletableFuture<Void> completeAsync(UUID lockToken);

void defer(UUID lockToken) throws InterruptedException, ServiceBusException;

void defer(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;
// void defer(UUID lockToken) throws InterruptedException, ServiceBusException;
//
// void defer(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;

CompletableFuture<Void> deferAsync(UUID lockToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface IMessageReceiver extends IMessageEntity, IMessageBrowser{

IMessage receive(Duration serverWaitTime) throws InterruptedException, ServiceBusException;

IMessage receive(long sequenceNumber) throws InterruptedException, ServiceBusException;
IMessage receiveBySequenceNumber(long sequenceNumber) throws InterruptedException, ServiceBusException;

Collection<IMessage> receiveBatch(int maxMessageCount) throws InterruptedException, ServiceBusException;

Expand All @@ -68,7 +68,7 @@ public interface IMessageReceiver extends IMessageEntity, IMessageBrowser{

CompletableFuture<IMessage> receiveAsync(Duration serverWaitTime);

CompletableFuture<IMessage> receiveAsync(long sequenceNumber);
CompletableFuture<IMessage> receiveBySequenceNumberAsync(long sequenceNumber);

CompletableFuture<Collection<IMessage>> receiveBatchAsync(int maxMessageCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public interface IMessageSession extends IMessageReceiver {

Instant getLockedUntilUtc();

void renewLock() throws InterruptedException, ServiceBusException;
void renewSessionLock() throws InterruptedException, ServiceBusException;

CompletableFuture<Void> renewLockAsync();
CompletableFuture<Void> renewSessionLockAsync();

void setState(byte[] state) throws InterruptedException, ServiceBusException;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.microsoft.azure.servicebus;

public interface IQueueClient extends IMessageSender, IMessageSessionEntity, IMessageAndSessionPump, IMessageEntity
public interface IQueueClient extends IMessageSender, IMessageAndSessionPump, IMessageEntity
{
public ReceiveMode getReceiveMode();

public String getQueueName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.microsoft.azure.servicebus.rules.Filter;
import com.microsoft.azure.servicebus.rules.RuleDescription;

public interface ISubscriptionClient extends IMessageEntity, IMessageSessionEntity, IMessageAndSessionPump
public interface ISubscriptionClient extends IMessageEntity, IMessageAndSessionPump
{
public ReceiveMode getReceiveMode();
public void addRule(RuleDescription ruleDescription) throws InterruptedException, ServiceBusException;
Expand All @@ -15,4 +15,6 @@ public interface ISubscriptionClient extends IMessageEntity, IMessageSessionEnti
public CompletableFuture<Void> addRuleAsync(String ruleName, Filter filter);
public CompletableFuture<Void> removeRuleAsync(String ruleName);
public void removeRule(String ruleName) throws InterruptedException, ServiceBusException;
public String getTopicName();
public String getSubscriptionName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

// Should we allow browse/peek on topic?
public interface ITopicClient extends IMessageSender, IMessageBrowser, IMessageEntity {

public String getTopicName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void setSessionId(String sessionId) {
}

@Override
public byte[] getContent() {
public byte[] getBody() {
return this.content;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ protected void loop()
if(renewInterval != null && !renewInterval.isNegative())
{
this.timerFuture = Timer.schedule(() -> {
this.session.renewLockAsync().handleAsync((v, renewLockEx) ->
this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) ->
{
if(renewLockEx != null)
{
Expand Down Expand Up @@ -664,14 +664,14 @@ public CompletableFuture<Void> completeAsync(UUID lockToken) {
return this.innerReceiver.completeAsync(lockToken);
}

@Override
public void defer(UUID lockToken) throws InterruptedException, ServiceBusException {
// @Override
void defer(UUID lockToken) throws InterruptedException, ServiceBusException {
this.checkInnerReceiveCreated();
this.innerReceiver.defer(lockToken);
}

@Override
public void defer(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException {
// @Override
void defer(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException {
this.checkInnerReceiveCreated();
this.innerReceiver.defer(lockToken, propertiesToModify);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class MessageConverter
public static org.apache.qpid.proton.message.Message convertBrokeredMessageToAmqpMessage(Message brokeredMessage)
{
org.apache.qpid.proton.message.Message amqpMessage = Proton.message();
if(brokeredMessage.getContent() != null)
if(brokeredMessage.getBody() != null)
{
amqpMessage.setBody(new Data(new Binary(brokeredMessage.getContent())));
amqpMessage.setBody(new Data(new Binary(brokeredMessage.getBody())));
}

if(brokeredMessage.getProperties() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ public IMessage receive(Duration serverWaitTime) throws InterruptedException, Se
}

@Override
public IMessage receive(long sequenceNumber) throws InterruptedException, ServiceBusException{
return Utils.completeFuture(this.receiveAsync(sequenceNumber));
public IMessage receiveBySequenceNumber(long sequenceNumber) throws InterruptedException, ServiceBusException{
return Utils.completeFuture(this.receiveBySequenceNumberAsync(sequenceNumber));
}

@Override
Expand Down Expand Up @@ -392,7 +392,7 @@ else if (c.isEmpty())
}

@Override
public CompletableFuture<IMessage> receiveAsync(long sequenceNumber) {
public CompletableFuture<IMessage> receiveBySequenceNumberAsync(long sequenceNumber) {
ArrayList<Long> list = new ArrayList<>();
list.add(sequenceNumber);
return this.receiveBatchAsync(list).thenApplyAsync(c ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public Instant getLockedUntilUtc() {
}

@Override
public void renewLock() throws InterruptedException, ServiceBusException {
Utils.completeFuture(this.renewLockAsync());
public void renewSessionLock() throws InterruptedException, ServiceBusException {
Utils.completeFuture(this.renewSessionLockAsync());
}

@Override
public CompletableFuture<Void> renewLockAsync() {
public CompletableFuture<Void> renewSessionLockAsync() {
return this.getInternalReceiver().renewSessionLocksAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
public final class QueueClient extends InitializableEntity implements IQueueClient
{
private final ReceiveMode receiveMode;
private MessagingFactory factory;
private IMessageSender sender;
private MessageAndSessionPump messageAndSessionPump;
private SessionBrowser sessionBrowser;
Expand All @@ -27,22 +28,22 @@ private QueueClient(ReceiveMode receiveMode)
this.receiveMode = receiveMode;
}

public QueueClient(String amqpConnectionString, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException
public QueueClient(ConnectionStringBuilder amqpConnectionStringBuilder, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException
{
this(receiveMode);
ConnectionStringBuilder builder = new ConnectionStringBuilder(amqpConnectionString);
CompletableFuture<MessagingFactory> factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(builder);
Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createInternals(f, builder.getEntityPath(), receiveMode)));
CompletableFuture<MessagingFactory> factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder);
Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createInternals(f, amqpConnectionStringBuilder.getEntityPath(), receiveMode)));
}

public QueueClient(MessagingFactory factory, String queuePath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException
QueueClient(MessagingFactory factory, String queuePath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException
{
this(receiveMode);
Utils.completeFuture(this.createInternals(factory, queuePath, receiveMode));
}

private CompletableFuture<Void> createInternals(MessagingFactory factory, String queuePath, ReceiveMode receiveMode)
{
this.factory = factory;
CompletableFuture<IMessageSender> senderFuture = ClientFactory.createMessageSenderFromEntityPathAsync(factory, queuePath);
CompletableFuture<Void> postSenderFuture = senderFuture.thenAcceptAsync((s) -> {
this.sender = s;
Expand Down Expand Up @@ -137,26 +138,26 @@ CompletableFuture<Void> initializeAsync() throws Exception {

@Override
protected CompletableFuture<Void> onClose() {
return this.messageAndSessionPump.closeAsync().thenCompose((v) -> this.sender.closeAsync().thenCompose((u) -> this.miscRequestResponseHandler.closeAsync()));
return this.messageAndSessionPump.closeAsync().thenCompose((v) -> this.sender.closeAsync().thenCompose((u) -> this.miscRequestResponseHandler.closeAsync().thenCompose((w) -> this.factory.closeAsync())));
}

@Override
public Collection<IMessageSession> getMessageSessions() throws InterruptedException, ServiceBusException {
// @Override
Collection<IMessageSession> getMessageSessions() throws InterruptedException, ServiceBusException {
return Utils.completeFuture(this.getMessageSessionsAsync());
}

@Override
public Collection<IMessageSession> getMessageSessions(Instant lastUpdatedTime) throws InterruptedException, ServiceBusException {
// @Override
Collection<IMessageSession> getMessageSessions(Instant lastUpdatedTime) throws InterruptedException, ServiceBusException {
return Utils.completeFuture(this.getMessageSessionsAsync(lastUpdatedTime));
}

@Override
public CompletableFuture<Collection<IMessageSession>> getMessageSessionsAsync() {
// @Override
CompletableFuture<Collection<IMessageSession>> getMessageSessionsAsync() {
return this.sessionBrowser.getMessageSessionsAsync();
}

@Override
public CompletableFuture<Collection<IMessageSession>> getMessageSessionsAsync(Instant lastUpdatedTime) {
// @Override
CompletableFuture<Collection<IMessageSession>> getMessageSessionsAsync(Instant lastUpdatedTime) {
return this.sessionBrowser.getMessageSessionsAsync(Date.from(lastUpdatedTime));
}

Expand Down Expand Up @@ -190,13 +191,13 @@ public CompletableFuture<Void> completeAsync(UUID lockToken) {
return this.messageAndSessionPump.completeAsync(lockToken);
}

@Override
public void defer(UUID lockToken) throws InterruptedException, ServiceBusException {
// @Override
void defer(UUID lockToken) throws InterruptedException, ServiceBusException {
this.messageAndSessionPump.defer(lockToken);
}

@Override
public void defer(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException {
// @Override
void defer(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException {
this.messageAndSessionPump.defer(lockToken, propertiesToModify);
}

Expand Down Expand Up @@ -258,5 +259,11 @@ public int getPrefetchCount() {
@Override
public void setPrefetchCount(int prefetchCount) throws ServiceBusException {
this.messageAndSessionPump.setPrefetchCount(prefetchCount);
}
}

@Override
public String getQueueName()
{
return this.getEntityPath();
}
}
Loading

0 comments on commit 3533662

Please sign in to comment.