From 79dd56c32ac9654f99b0e8c5068f9a4ee3e68e0f Mon Sep 17 00:00:00 2001 From: Ali Jafari Date: Sat, 23 Mar 2024 17:58:45 +0100 Subject: [PATCH 1/3] Add declare flag to exchange (similar to queue) --- lib/src/client/channel.dart | 1 + lib/src/client/impl/channel_impl.dart | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/lib/src/client/channel.dart b/lib/src/client/channel.dart index 927f134..83f8a2a 100644 --- a/lib/src/client/channel.dart +++ b/lib/src/client/channel.dart @@ -47,6 +47,7 @@ abstract class Channel { {bool passive = false, bool durable = false, bool noWait = false, + bool declare = true, Map arguments}); /// Setup the [prefetchSize] and [prefetchCount] QoS parameters. The value diff --git a/lib/src/client/impl/channel_impl.dart b/lib/src/client/impl/channel_impl.dart index d132112..052e701 100644 --- a/lib/src/client/impl/channel_impl.dart +++ b/lib/src/client/impl/channel_impl.dart @@ -563,6 +563,7 @@ class _ChannelImpl implements Channel { {bool passive = false, bool durable = false, bool noWait = false, + bool declare = true, Map? arguments}) { if (name.isEmpty) { throw ArgumentError("The name of the exchange cannot be empty"); @@ -579,6 +580,12 @@ class _ChannelImpl implements Channel { ..arguments = arguments; Completer opCompleter = Completer(); + + if (!declare) { + opCompleter.complete(_ExchangeImpl(this, name, type)); + return opCompleter.future; + } + writeMessage(exchangeRequest, completer: opCompleter, futurePayload: _ExchangeImpl(this, name, type), From d031346204188c33cc145673e9ba4ec5eb96fee8 Mon Sep 17 00:00:00 2001 From: Ali Jafari Date: Sat, 23 Mar 2024 23:26:08 +0100 Subject: [PATCH 2/3] Update doc block --- lib/src/client/channel.dart | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/src/client/channel.dart b/lib/src/client/channel.dart index 83f8a2a..e6c335b 100644 --- a/lib/src/client/channel.dart +++ b/lib/src/client/channel.dart @@ -43,6 +43,9 @@ abstract class Channel { /// returned future will fail with a [ExchangeNotFoundException] if the exchange does not exist. /// /// The [durable] flag will enable the exchange to persist across server restarts. + /// + /// The [declare] flag can be set to false to skip the exchange declaration step + /// for clients with read-only access to the broker. Future exchange(String name, ExchangeType type, {bool passive = false, bool durable = false, From 007aaa135273b8eff93cf2b7390666becce13580 Mon Sep 17 00:00:00 2001 From: Ali Jafari Date: Sat, 23 Mar 2024 23:28:08 +0100 Subject: [PATCH 3/3] Add test for exchange without declaration scenario --- test/lib/exchange_test.dart | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/test/lib/exchange_test.dart b/test/lib/exchange_test.dart index 6246f77..bca8840 100644 --- a/test/lib/exchange_test.dart +++ b/test/lib/exchange_test.dart @@ -203,6 +203,38 @@ main({bool enableLogger = true}) { await boundQueue.unbind(exchange, ""); }); + test("publish to exchange without exchange declaration", () async { + Completer testCompleter = Completer(); + + // Use the second client to define the queue and the exchange in advance + Channel channel2 = await client2.channel(); + Queue queue = await channel2.queue("q_test_ro"); + Exchange exchange = await channel2.exchange("exc_test_ro", ExchangeType.FANOUT); + queue.bind(exchange, ''); + + // Pretend we are a RO consumer that cannot declare the exchange but + // should still be able to publish to it. + Channel channel = await client.channel(); + Exchange exchangeRo = await channel + .exchange("exc_test_ro", ExchangeType.FANOUT, declare: false); + + expect(exchangeRo.channel, const TypeMatcher()); + expect(exchangeRo.type, equals(ExchangeType.FANOUT)); + expect(exchangeRo.name, "exc_test_ro"); + + exchangeRo.publish("Test payload", ""); + + Consumer consumer = await queue.consume(); + consumer.listen((AmqpMessage reply) { + expect(reply.payloadAsString, equals("Test payload")); + + // Pass! + testCompleter.complete(); + }); + + return testCompleter.future; + }); + group("exceptions", () { test("missing exchange name", () async { Channel channel = await client.channel();