Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise HeartbeatFailedException after missing maxMissedHeartbeats (new tuning param) consecutive heartbeats #111

Merged
merged 2 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ you don't have to deal with tuning at the application level. The client will use

The class exposes the following parameters:

| Param name | Default value | Description
|-----------------|------------------|----------------------
| maxChannels | 0 (no limit) | The maximum number of channels that can be opened by the client. When set to zero, the maximum number of channels is 65535.
| maxFrameSize | 4096 bytes | The maximum frame size that can be parsed by the client. According to the spec, this is set to a high-enough initial value so that the client can parse the messages exchanged during the handshake. The client will negotiate with the server during the handshake phase and adjust this value upwards.
| heartbeatPeriod | 0 sec | The preferred heartbeat period (or `Duration.zero` to disable heartbeats) expressed as a [Duration](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:core.Duration) object.
| Param name | Default value | Description
|---------------------|---------------|
| maxChannels | 0 (no limit) | The maximum number of channels that can be opened by the client. When set to zero, the maximum number of channels is 65535.
| maxFrameSize | 4096 bytes | The maximum frame size that can be parsed by the client. According to the spec, this is set to a high-enough initial value so that the client can parse the messages exchanged during the handshake. The client will negotiate with the server during the handshake phase and adjust this value upwards.
| heartbeatPeriod | 0 sec | The preferred heartbeat period (or `Duration.zero` to disable heartbeats) expressed as a [Duration](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:core.Duration) object.
| maxMissedHeartbeats | 3 | If heartbeats are enabled, raise an exception if `maxMissedHeartbeats` consecutive heartbeats have been sent by the client without receiving any response from the broker.


### Creating a new client
Expand Down
16 changes: 12 additions & 4 deletions lib/src/client/impl/client_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ class _ClientImpl implements Client {
final _error = StreamController<Exception>.broadcast();

// The heartbeattRecvTimer is reset every time we receive _any_ message from
// the server. If the timer expires, a HeartbeatFailed exception will be raised.
// the server. If the timer expires, and a HeartbeatFailed exception will be
// raised.
//
// The timer is set to a multiple of the negotiated interval to reset the
// connection if we have not received any message from the server for a
// consecutive number of maxMissedHeartbeats (see tuningSettings).
RestartableTimer? _heartbeatRecvTimer;

_ClientImpl({ConnectionSettings? settings}) {
Expand Down Expand Up @@ -138,14 +143,17 @@ class _ClientImpl implements Client {
// period has been configured, start monitoring incoming heartbeats.
if (serverMessage.message is ConnectionOpenOk &&
tuningSettings.heartbeatPeriod.inSeconds > 0) {
// Raise an exception if we miss maxMissedHeartbeats consecutive
// heartbeats.
Duration missInterval =
tuningSettings.heartbeatPeriod * tuningSettings.maxMissedHeartbeats;
_heartbeatRecvTimer?.cancel();
_heartbeatRecvTimer =
RestartableTimer(tuningSettings.heartbeatPeriod, () {
_heartbeatRecvTimer = RestartableTimer(missInterval, () {
// Set the timer to null to avoid accidentally resetting it while
// shutting down.
_heartbeatRecvTimer = null;
_handleException(HeartbeatFailedException(
"Server did not respond to heartbeats for ${tuningSettings.heartbeatPeriod.inSeconds}s"));
"Server did not respond to heartbeats for ${tuningSettings.heartbeatPeriod.inSeconds}s (missed consecutive heartbeats: ${tuningSettings.maxMissedHeartbeats})"));
});
}

Expand Down
8 changes: 5 additions & 3 deletions lib/src/protocol/io/tuning_settings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ class TuningSettings {
// min(client hb period, server hb period). In other words, clients may force
// a lower heartbeat period but they are never allowed to increase it beyond
// the value suggested by the remote server.
//
Duration heartbeatPeriod = Duration.zero;

// When a non-zero heartbeat period is negotiated with the remote server, a
// [HeartbeatFailedException] will be raised if the server does not respond
// to heartbeat requests within the configured heartbeat period.
Duration heartbeatPeriod = Duration.zero;
// to [maxMissedHeartbeats] consecutive heartbeat requests.
int maxMissedHeartbeats = 3;

TuningSettings({
this.maxChannels = 0,
this.maxFrameSize = 4096,
this.heartbeatPeriod = Duration.zero,
this.maxMissedHeartbeats = 3,
});
}
4 changes: 2 additions & 2 deletions test/lib/auth_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ main({bool enableLogger = true}) {
tuningSettings = TuningSettings();
frameWriter = FrameWriter(tuningSettings);
server = mock.MockServer();
client = Client(settings: ConnectionSettings(port: 9000));
return server.listen('127.0.0.1', 9000);
client = Client(settings: ConnectionSettings(port: 9001));
return server.listen('127.0.0.1', 9001);
});

tearDown(() async {
Expand Down
9 changes: 6 additions & 3 deletions test/lib/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,15 @@ main({bool enableLogger = true}) {
expect(client.tuningSettings.heartbeatPeriod,
equals(const Duration(seconds: 1)));

// Perform a blocking call until the heartbeat timer expires.
// Perform a blocking call until we miss
// tuningSettings.maxMissedHeartbeats consecutive heartbeats.
await client.channel();
} catch (e) {
expect(e, const TypeMatcher<HeartbeatFailedException>());
expect((e as HeartbeatFailedException).message,
equals("Server did not respond to heartbeats for 1s"));
expect(
(e as HeartbeatFailedException).message,
equals(
"Server did not respond to heartbeats for 1s (missed consecutive heartbeats: 3)"));

// Encode final connection close
frameWriter.writeMessage(0, mock.ConnectionCloseOkMock());
Expand Down
2 changes: 1 addition & 1 deletion test/lib/mocks/server_mocks.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MockServer {
Future listen(String host, int port) async {
mockLogger.info("Binding MockServer to $host:$port");

_server = await ServerSocket.bind(host, port);
_server = await ServerSocket.bind(host, port, shared: true);
mockLogger.info("[$host:$port] Listening for incoming connections");
_server!.listen(_handleConnection);
}
Expand Down