Skip to content

Commit

Permalink
Merge pull request #78 from hoylen/bind-queue-properties
Browse files Browse the repository at this point in the history
Added queue properties to bindQueueConsumer and bindPrivateQueueConsumer
  • Loading branch information
achilleasa authored Nov 2, 2022
2 parents f1844ca + 34a13ab commit 9e41b8e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
30 changes: 26 additions & 4 deletions lib/src/client/exchange.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ abstract class Exchange {
///
/// The [noAck] flag will notify the server whether the consumer is expected to acknowledge incoming
/// messages or not.
Future<Consumer> bindPrivateQueueConsumer(List<String>? routingKeys,
{String consumerTag, bool noAck = true});
///
/// The [noWait] and [arguments] parameters are used in the same way
/// as they are in [Channel.privateQueue].
Future<Consumer> bindPrivateQueueConsumer(
List<String>? routingKeys, {
String consumerTag,
bool noAck = true,
bool noWait = false,
Map<String, Object>? arguments,
});

/// Allocate a named [Queue], bind it to this exchange using the supplied [routingKeys],
/// allocate a [Consumer] and return a [Future<Consumer>].
Expand All @@ -53,6 +61,20 @@ abstract class Exchange {
///
/// The [noAck] flag will notify the server whether the consumer is expected to acknowledge incoming
/// messages or not.
Future<Consumer> bindQueueConsumer(String queueName, List<String> routingKeys,
{String consumerTag, bool noAck = true});
///
/// The [passive], [durable], [exclusive], [autoDelete], [noWait] and
/// [declare] parameters are used in the same way as they are in
/// [Channel.queue].
Future<Consumer> bindQueueConsumer(
String queueName,
List<String> routingKeys, {
String consumerTag,
bool noAck = true,
bool passive = false,
bool durable = false,
bool exclusive = false,
bool autoDelete = false,
bool noWait = false,
bool declare = true,
});
}
33 changes: 27 additions & 6 deletions lib/src/client/impl/exchange_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ class _ExchangeImpl implements Exchange {
}

@override
Future<Consumer> bindPrivateQueueConsumer(List<String>? routingKeys,
{String? consumerTag, bool noAck = true}) async {
Future<Consumer> bindPrivateQueueConsumer(
List<String>? routingKeys, {
String? consumerTag,
bool noAck = true,
bool noWait = false,
Map<String, Object>? arguments,
}) async {
// Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified
if ((type == ExchangeType.FANOUT || type == ExchangeType.HEADERS) &&
(routingKeys == null || routingKeys.isEmpty)) {
Expand All @@ -63,7 +68,8 @@ class _ExchangeImpl implements Exchange {
"One or more routing keys needs to be specified for this exchange type");
}

Queue queue = await channel.privateQueue();
Queue queue =
await channel.privateQueue(noWait: noWait, arguments: arguments);
for (String routingKey in routingKeys) {
await queue.bind(this, routingKey);
}
Expand All @@ -72,8 +78,17 @@ class _ExchangeImpl implements Exchange {

@override
Future<Consumer> bindQueueConsumer(
String queueName, List<String>? routingKeys,
{String? consumerTag, bool noAck = true}) async {
String queueName,
List<String>? routingKeys, {
String? consumerTag,
bool noAck = true,
bool passive = false,
bool durable = false,
bool exclusive = false,
bool autoDelete = false,
bool noWait = false,
bool declare = true,
}) async {
// Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified
if ((type == ExchangeType.FANOUT || type == ExchangeType.HEADERS) &&
(routingKeys == null || routingKeys.isEmpty)) {
Expand All @@ -85,7 +100,13 @@ class _ExchangeImpl implements Exchange {
"One or more routing keys needs to be specified for this exchange type");
}

Queue queue = await channel.queue(queueName);
Queue queue = await channel.queue(queueName,
passive: passive,
durable: durable,
exclusive: exclusive,
autoDelete: autoDelete,
noWait: noWait,
declare: declare);
for (String routingKey in routingKeys) {
await queue.bind(this, routingKey);
}
Expand Down

0 comments on commit 9e41b8e

Please sign in to comment.