Skip to content

Commit

Permalink
Shutdown consumer streams when the underlying channel closes
Browse files Browse the repository at this point in the history
This commit ensures that if a channel closes, either because the
channel's close() method was invoked or because a channel-closing
exception occurred, any pending in-flight operations as well as any
active consumers are properly terminated.
  • Loading branch information
achilleasa committed Aug 11, 2023
1 parent 398840c commit e66a146
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
17 changes: 12 additions & 5 deletions lib/src/client/impl/channel_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ class _ChannelImpl implements Channel {
writeMessage(closeRequest, completer: _channelClosed, futurePayload: this);
_channelClosed!.future
.then((_) => _basicReturnStream.close())
.then((_) => _abortOperationsAndCloseConsumers(ChannelException(
"Channel closed", channelId, ErrorType.CHANNEL_ERROR)))
.then((_) => _client._removeChannel(channelId));
return _channelClosed!.future;
}
Expand Down Expand Up @@ -422,12 +424,8 @@ class _ChannelImpl implements Channel {
ErrorType.valueOf(closeResponse.replyCode));
}

// Mark the channel as closed
_channelClosed ??= Completer<Channel>();
if (!_channelClosed!.isCompleted) {
_channelClosed!.complete(this);
}
_channelCloseException = ex;
handleException(ex);

break;
}
Expand Down Expand Up @@ -479,7 +477,10 @@ class _ChannelImpl implements Channel {
if (_client.handshaking) {
return;
}
_abortOperationsAndCloseConsumers(exception);
}

void _abortOperationsAndCloseConsumers(Exception exception) {
// Abort any pending operations unless we are currently opening the channel
for (Completer completer in _pendingOperations) {
if (!completer.isCompleted) {
Expand All @@ -488,6 +489,12 @@ class _ChannelImpl implements Channel {
}
_pendingOperations.clear();
_pendingOperationPayloads.clear();

// Close any active consumers.
for (_ConsumerImpl consumer in _consumers.values) {
consumer.close();
}
_consumers.clear();
}

/// Close the channel and return a [Future<Channel>] to be completed when the channel is closed.
Expand Down
12 changes: 10 additions & 2 deletions lib/src/client/impl/consumer_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,22 @@ class _ConsumerImpl implements Consumer {
Completer<Consumer> completer = Completer<Consumer>();
channel.writeMessage(cancelRequest,
completer: completer, futurePayload: this, noWait: noWait);
completer.future.then((_) => _controller.close());
completer.future.then((_) => close());
return completer.future;
}

void onMessage(DecodedMessageImpl serverMessage) {
// Ignore message if the stream is closed
if (_controller.isClosed) {
return;
}

// Ensure that messate contains a non-null property object
serverMessage.properties ??= MessageProperties();

_controller.add(_AmqpMessageImpl.fromDecodedMessage(this, serverMessage));
}

void close() {
_controller.close();
}
}
34 changes: 34 additions & 0 deletions test/lib/channel_test.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
library dart_amqp.test.channels;

import "dart:async";

import "package:test/test.dart";

import "package:dart_amqp/src/client.dart";
Expand Down Expand Up @@ -88,6 +90,38 @@ main({bool enableLogger = true}) {
Channel channel = await client.channel();
channel = await channel.recover(true);
});

test(
"channel-closing exceptions should close any active consumer streams",
() async {
Channel channel = await client.channel();
Queue queue = await channel.queue("test-close-consumer-on-exception",
autoDelete: true);
Consumer consumer = await queue.consume();
Completer listenerDone = Completer();
consumer.listen((event) {}, onDone: () {
listenerDone.complete(true);
});

expect(() => channel.queue("bogus", passive: true),
throwsA((e) => e is QueueNotFoundException));
expect(listenerDone.future, completion(equals(true)));
});

test("closing a channel should close any active consumer streams",
() async {
Channel channel = await client.channel();
Queue queue = await channel
.queue("test-close-consumer-on-channel-close", autoDelete: true);
Consumer consumer = await queue.consume();
Completer listenerDone = Completer();
consumer.listen((event) {}, onDone: () {
listenerDone.complete(true);
});

await channel.close();
expect(listenerDone.future, completion(equals(true)));
});
});
});
}

0 comments on commit e66a146

Please sign in to comment.