Skip to content

Commit

Permalink
Merge branch '5.x.x-stable'
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Aug 2, 2018
2 parents 835ae90 + 2e042e5 commit e8300ab
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
private final ConnectionParams params;
private volatile RecoveryAwareAMQConnection delegate;

private final List<ShutdownListener> shutdownHooks = Collections.synchronizedList(new ArrayList<ShutdownListener>());
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<RecoveryListener>());
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<BlockedListener>());
private final List<ShutdownListener> shutdownHooks = Collections.synchronizedList(new ArrayList<>());
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<>());
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<>());

// Records topology changes
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<RecordedBinding>());
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<String, RecordedExchange>());
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<>());
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<>());
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<>());
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<>());
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<>());
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<>());

private final TopologyRecoveryFilter topologyRecoveryFilter;

Expand Down Expand Up @@ -143,28 +143,7 @@ public void run() {
}

private TopologyRecoveryFilter letAllPassFilter() {
return new TopologyRecoveryFilter() {

@Override
public boolean filterExchange(RecordedExchange recordedExchange) {
return true;
}

@Override
public boolean filterQueue(RecordedQueue recordedQueue) {
return true;
}

@Override
public boolean filterBinding(RecordedBinding recordedBinding) {
return true;
}

@Override
public boolean filterConsumer(RecordedConsumer recordedConsumer) {
return true;
}
};
return new TopologyRecoveryFilter() {};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,35 @@ public interface TopologyRecoveryFilter {
* @param recordedExchange
* @return true to recover the exchange, false otherwise
*/
boolean filterExchange(RecordedExchange recordedExchange);
default boolean filterExchange(RecordedExchange recordedExchange) {
return true;
}

/**
* Decides whether a queue is recovered or not.
* @param recordedQueue
* @return true to recover the queue, false otherwise
*/
boolean filterQueue(RecordedQueue recordedQueue);
default boolean filterQueue(RecordedQueue recordedQueue) {
return true;
}

/**
* Decides whether a binding is recovered or not.
* @param recordedBinding
* @return true to recover the binding, false otherwise
*/
boolean filterBinding(RecordedBinding recordedBinding);
default boolean filterBinding(RecordedBinding recordedBinding) {
return true;
}

/**
* Decides whether a consumer is recovered or not.
* @param recordedConsumer
* @return true to recover the consumer, false otherwise
*/
boolean filterConsumer(RecordedConsumer recordedConsumer);
default boolean filterConsumer(RecordedConsumer recordedConsumer) {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,26 +112,18 @@ private static boolean resourceExists(Callable<Channel> callback) throws Excepti
}

private static boolean queueExists(final String queue, final Connection connection) throws Exception {
return resourceExists(new Callable<Channel>() {

@Override
public Channel call() throws Exception {
Channel channel = connection.createChannel();
channel.queueDeclarePassive(queue);
return channel;
}
return resourceExists(() -> {
Channel channel = connection.createChannel();
channel.queueDeclarePassive(queue);
return channel;
});
}

private static boolean exchangeExists(final String exchange, final Connection connection) throws Exception {
return resourceExists(new Callable<Channel>() {

@Override
public Channel call() throws Exception {
Channel channel = connection.createChannel();
channel.exchangeDeclarePassive(exchange);
return channel;
}
return resourceExists(() -> {
Channel channel = connection.createChannel();
channel.exchangeDeclarePassive(exchange);
return channel;
});
}

Expand Down

0 comments on commit e8300ab

Please sign in to comment.