From 19836bbb8120df9fe570d16a3dbd37c5d3806443 Mon Sep 17 00:00:00 2001 From: Achilleas Anagnostopoulos Date: Tue, 2 Jan 2024 09:27:00 +0000 Subject: [PATCH 1/2] Introduce maxMissedHeartbeats tuning parameter This commit implements a suggestion from https://github.com/achilleasa/dart_amqp/issues/106#issuecomment-1846280852. The client-side heartbeat implementation is modified via the introduction of a new tuning parameter called `maxMissedHeartbeats` (default: 3). If heartbeats are enabled and the client sends `maxMissedHeartbeats` consecutive heartbeat messages at the interval negotiated with the broker without receiving any message (heartbeat or regular traffic) back, the client assumes that the server is not available and will raise a HeartbeatFailedException. --- API.md | 11 ++++++----- lib/src/client/impl/client_impl.dart | 16 ++++++++++++---- lib/src/protocol/io/tuning_settings.dart | 8 +++++--- test/lib/client_test.dart | 9 ++++++--- 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/API.md b/API.md index 862c752..b924809 100644 --- a/API.md +++ b/API.md @@ -34,11 +34,12 @@ you don't have to deal with tuning at the application level. The client will use The class exposes the following parameters: -| Param name | Default value | Description -|-----------------|------------------|---------------------- -| maxChannels | 0 (no limit) | The maximum number of channels that can be opened by the client. When set to zero, the maximum number of channels is 65535. -| maxFrameSize | 4096 bytes | The maximum frame size that can be parsed by the client. According to the spec, this is set to a high-enough initial value so that the client can parse the messages exchanged during the handshake. The client will negotiate with the server during the handshake phase and adjust this value upwards. -| heartbeatPeriod | 0 sec | The preferred heartbeat period (or `Duration.zero` to disable heartbeats) expressed as a [Duration](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:core.Duration) object. +| Param name | Default value | Description +|---------------------|---------------| +| maxChannels | 0 (no limit) | The maximum number of channels that can be opened by the client. When set to zero, the maximum number of channels is 65535. +| maxFrameSize | 4096 bytes | The maximum frame size that can be parsed by the client. According to the spec, this is set to a high-enough initial value so that the client can parse the messages exchanged during the handshake. The client will negotiate with the server during the handshake phase and adjust this value upwards. +| heartbeatPeriod | 0 sec | The preferred heartbeat period (or `Duration.zero` to disable heartbeats) expressed as a [Duration](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:core.Duration) object. +| maxMissedHeartbeats | 3 | If heartbeats are enabled, raise an exception if `maxMissedHeartbeats` consecutive heartbeats have been sent by the client without receiving any response from the broker. ### Creating a new client diff --git a/lib/src/client/impl/client_impl.dart b/lib/src/client/impl/client_impl.dart index 6d3421c..ccb34e7 100644 --- a/lib/src/client/impl/client_impl.dart +++ b/lib/src/client/impl/client_impl.dart @@ -24,7 +24,12 @@ class _ClientImpl implements Client { final _error = StreamController.broadcast(); // The heartbeattRecvTimer is reset every time we receive _any_ message from - // the server. If the timer expires, a HeartbeatFailed exception will be raised. + // the server. If the timer expires, and a HeartbeatFailed exception will be + // raised. + // + // The timer is set to a multiple of the negotiated interval to reset the + // connection if we have not received any message from the server for a + // consecutive number of maxMissedHeartbeats (see tuningSettings). RestartableTimer? _heartbeatRecvTimer; _ClientImpl({ConnectionSettings? settings}) { @@ -138,14 +143,17 @@ class _ClientImpl implements Client { // period has been configured, start monitoring incoming heartbeats. if (serverMessage.message is ConnectionOpenOk && tuningSettings.heartbeatPeriod.inSeconds > 0) { + // Raise an exception if we miss maxMissedHeartbeats consecutive + // heartbeats. + Duration missInterval = + tuningSettings.heartbeatPeriod * tuningSettings.maxMissedHeartbeats; _heartbeatRecvTimer?.cancel(); - _heartbeatRecvTimer = - RestartableTimer(tuningSettings.heartbeatPeriod, () { + _heartbeatRecvTimer = RestartableTimer(missInterval, () { // Set the timer to null to avoid accidentally resetting it while // shutting down. _heartbeatRecvTimer = null; _handleException(HeartbeatFailedException( - "Server did not respond to heartbeats for ${tuningSettings.heartbeatPeriod.inSeconds}s")); + "Server did not respond to heartbeats for ${tuningSettings.heartbeatPeriod.inSeconds}s (missed consecutive heartbeats: ${tuningSettings.maxMissedHeartbeats})")); }); } diff --git a/lib/src/protocol/io/tuning_settings.dart b/lib/src/protocol/io/tuning_settings.dart index e2e1733..66405bd 100644 --- a/lib/src/protocol/io/tuning_settings.dart +++ b/lib/src/protocol/io/tuning_settings.dart @@ -14,15 +14,17 @@ class TuningSettings { // min(client hb period, server hb period). In other words, clients may force // a lower heartbeat period but they are never allowed to increase it beyond // the value suggested by the remote server. - // + Duration heartbeatPeriod = Duration.zero; + // When a non-zero heartbeat period is negotiated with the remote server, a // [HeartbeatFailedException] will be raised if the server does not respond - // to heartbeat requests within the configured heartbeat period. - Duration heartbeatPeriod = Duration.zero; + // to [maxMissedHeartbeats] consecutive heartbeat requests. + int maxMissedHeartbeats = 3; TuningSettings({ this.maxChannels = 0, this.maxFrameSize = 4096, this.heartbeatPeriod = Duration.zero, + this.maxMissedHeartbeats = 3, }); } diff --git a/test/lib/client_test.dart b/test/lib/client_test.dart index de128fd..a412565 100644 --- a/test/lib/client_test.dart +++ b/test/lib/client_test.dart @@ -211,12 +211,15 @@ main({bool enableLogger = true}) { expect(client.tuningSettings.heartbeatPeriod, equals(const Duration(seconds: 1))); - // Perform a blocking call until the heartbeat timer expires. + // Perform a blocking call until we miss + // tuningSettings.maxMissedHeartbeats consecutive heartbeats. await client.channel(); } catch (e) { expect(e, const TypeMatcher()); - expect((e as HeartbeatFailedException).message, - equals("Server did not respond to heartbeats for 1s")); + expect( + (e as HeartbeatFailedException).message, + equals( + "Server did not respond to heartbeats for 1s (missed consecutive heartbeats: 3)")); // Encode final connection close frameWriter.writeMessage(0, mock.ConnectionCloseOkMock()); From 9e52ca9217fe93ab1da3108b03bc0aefafc9ab95 Mon Sep 17 00:00:00 2001 From: Achilleas Anagnostopoulos Date: Tue, 2 Jan 2024 09:31:00 +0000 Subject: [PATCH 2/2] Apply drive-by fixes to existing tests --- test/lib/auth_test.dart | 4 ++-- test/lib/mocks/server_mocks.dart | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/lib/auth_test.dart b/test/lib/auth_test.dart index 7289fda..8c1ac5d 100644 --- a/test/lib/auth_test.dart +++ b/test/lib/auth_test.dart @@ -58,8 +58,8 @@ main({bool enableLogger = true}) { tuningSettings = TuningSettings(); frameWriter = FrameWriter(tuningSettings); server = mock.MockServer(); - client = Client(settings: ConnectionSettings(port: 9000)); - return server.listen('127.0.0.1', 9000); + client = Client(settings: ConnectionSettings(port: 9001)); + return server.listen('127.0.0.1', 9001); }); tearDown(() async { diff --git a/test/lib/mocks/server_mocks.dart b/test/lib/mocks/server_mocks.dart index 8e57ddd..4fe845b 100644 --- a/test/lib/mocks/server_mocks.dart +++ b/test/lib/mocks/server_mocks.dart @@ -45,7 +45,7 @@ class MockServer { Future listen(String host, int port) async { mockLogger.info("Binding MockServer to $host:$port"); - _server = await ServerSocket.bind(host, port); + _server = await ServerSocket.bind(host, port, shared: true); mockLogger.info("[$host:$port] Listening for incoming connections"); _server!.listen(_handleConnection); }