Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor WebSockets to fix window of duplicate delivery after reconect #123

Merged
merged 17 commits into from
Feb 13, 2024

Conversation

peterbroadhurst
Copy link
Contributor

@peterbroadhurst peterbroadhurst commented Jan 27, 2024

The WebSocketChannels interface has been around a long time, all the way back to the early days of EthConnect.

It provides a protocol for:

  • Multiplexing multiple streams onto a single WebSocket
  • Providing acknowledged at-least-once delivery of batches across multiple WebSocket connections on a single stream
    • With workload balancing of deliveries across multiple connections to the server
  • Broadcasting a copy of a message to every websocket (without ack)

But there is a problem with the complex way that the channels are used as a code interface, per this function (which is derived from similar code from EthConnect, FFTM/EVMConnect and FireFly core):

func (w *webSocketAction[DT]) AttemptDispatch(ctx context.Context, attempt int, batch *EventBatch[DT]) error {
var err error
// Get a blocking channel to send and receive on our chosen namespace
sender, broadcaster, receiver := w.wsChannels.GetChannels(w.topic)
var channel chan<- interface{}
isBroadcast := *w.spec.DistributionMode == DistributionModeBroadcast
if isBroadcast {
channel = broadcaster
} else {
channel = sender
}
// Send the batch of events
select {
case channel <- batch:
break
case <-ctx.Done():
err = i18n.NewError(ctx, i18n.MsgWebSocketInterruptedSend)
}
if err == nil && !isBroadcast {
log.L(ctx).Infof("Batch %d dispatched (len=%d,attempt=%d)", batch.BatchNumber, len(batch.Events), attempt)
err = w.waitForAck(ctx, receiver, batch.BatchNumber)
}
// Pass back any exception due
if err != nil {
log.L(ctx).Infof("WebSocket event batch %d delivery failed (len=%d,attempt=%d): %s", batch.BatchNumber, len(batch.Events), attempt, err)
return err
}
log.L(ctx).Infof("WebSocket event batch %d complete (len=%d,attempt=%d)", batch.BatchNumber, len(batch.Events), attempt)
return nil
}

Because after a disconnect when messages arrive that are candidates for delivery, a send wiil be queued into the channel when there are no WebSockets connected to receive it. Then when a WebSocket connects, that websocket will receive the payload/batch queued into the channel for it. However, the AttemptDispatch will immediately fail in waitForAck (without waiting for an ack at all) because the WebSocket ID has changed, even thought the disconnection happened some time ago.

This does not affect the resilience of the idempotent delivery interface between FFTM and FireFly, but with the new eventstreams interface in microservices it is highly likely to occur and be frustrating to applications consuming the stream.

So this PR proposes to fix the problem by simplifying the API interface of the wsserver package (and resulting in a re-write of the stream selection and dispatch logic of wsserver).

The code used to be a complex pull model, where each webscoket connection tracks the streams it's started on, and pulls from a separate channel for each stream. This required uncommon code constructs, like dynamic select against an array of potential channels.
This PR proposes a much more straight forward model where a RoundTrip object is defined and the parent server-level code keeps track of the connections available for each stream - so it can pick a different one for each round-trip. It then pushes that send into a unique send channel for that webscoket. The parent is responsible for making sure each stream can have just one round-trip in-flight at any given time, which is cancelled if the in-bound WebSocket processing it disconnects.

After the change, the consumer code that uses the wsserver API looks like:

func (w *webSocketAction[DT]) AttemptDispatch(ctx context.Context, attempt int, batch *EventBatch[DT]) error {
var err error
isBroadcast := *w.spec.DistributionMode == DistributionModeBroadcast
if isBroadcast {
w.wsProtocol.Broadcast(ctx, w.topic, batch)
return nil
}
_, err = w.wsProtocol.RoundTrip(ctx, w.topic, batch)
// Pass back any exception due
if err != nil {
log.L(ctx).Infof("WebSocket event batch %d delivery failed (len=%d,attempt=%d): %s", batch.BatchNumber, len(batch.Events), attempt, err)
return err
}
log.L(ctx).Infof("WebSocket event batch %d complete (len=%d,attempt=%d)", batch.BatchNumber, len(batch.Events), attempt)
return nil
}

The problem being solve is illustrated in the below log snippet.

[2024-01-26T18:42:18.650Z] TRACE Received: {Type:start Stream:sub1 Message: BatchNumber:1} pid=1 wsc=4a00e7ae-5c9b-4066-8a2c-5d7e01e71201
[2024-01-26T18:42:18.651Z]  INFO Batch 2 dispatched (len=1,attempt=0) eventstream=sub1 pid=1
[2024-01-26T18:42:18.651Z]  INFO WebSocket event batch 2 delivery failed (len=1,attempt=0): FF00228: WebSocket 'a87c0ea3-905d-4dcf-afcf-e739707c206c' closed eventstream=sub1 pid=1
[2024-01-26T18:42:18.651Z] ERROR Batch 2 attempt 0 failed. err=FF00228: WebSocket 'a87c0ea3-905d-4dcf-afcf-e739707c206c' closed eventstream=sub1 pid=1
[2024-01-26T18:42:18.651Z] ERROR action attempt 1: FF00228: WebSocket 'a87c0ea3-905d-4dcf-afcf-e739707c206c' closed eventstream=sub1 pid=1
[2024-01-26T18:42:18.901Z] DEBUG Batch 2 attempt 1 dispatching. Len=1 eventstream=sub1 pid=1
[2024-01-26T18:42:18.901Z]  INFO Batch 2 dispatched (len=1,attempt=1) eventstream=sub1 pid=1
[2024-01-26T18:42:18.901Z]  INFO WebSocket event batch 2 delivery failed (len=1,attempt=1): FF00228: WebSocket 'a87c0ea3-905d-4dcf-afcf-e739707c206c' closed eventstream=sub1 pid=1
[2024-01-26T18:42:18.901Z] ERROR Batch 2 attempt 1 failed. err=FF00228: WebSocket 'a87c0ea3-905d-4dcf-afcf-e739707c206c' closed eventstream=sub1 pid=1
[2024-01-26T18:42:18.901Z] ERROR action attempt 2: FF00228: WebSocket 'a87c0ea3-905d-4dcf-afcf-e739707c206c' closed eventstream=sub1 pid=1
[2024-01-26T18:42:19.402Z] DEBUG Batch 2 attempt 2 dispatching. Len=1 eventstream=sub1 pid=1
[2024-01-26T18:42:19.402Z]  INFO Batch 2 dispatched (len=1,attempt=2) eventstream=sub1 pid=1

Additional change - extensibility for JSON based query

Sorry @awrichar, I meant to split this out to a separate PR.
I've added an extra interface to the ffapi.QueryJSON interface, to allow extensibility for a use case where some of the fields are names that need to be resolved to ID before the query runs.

The interface gives you access to the whole query tree at the level you are at, which in my use case is because you might need to look at (/require) extra equal: [...] options at the same level in the tree to scope the name lookup.

This extensibility is only added for the complex JSON format of query, because the problem is much simpler in the path + query-field case and I'm not sure there's an obvious way to join the logic (without a big restructure).

…nect

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
@codecov-commenter
Copy link

codecov-commenter commented Jan 27, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (80d20a1) 99.98% compared to head (fd2e3f4) 100.00%.

Additional details and impacted files
@@             Coverage Diff             @@
##             main      #123      +/-   ##
===========================================
+ Coverage   99.98%   100.00%   +0.01%     
===========================================
  Files          78        79       +1     
  Lines        6435      6473      +38     
===========================================
+ Hits         6434      6473      +39     
+ Misses          1         0       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@EnriqueL8 EnriqueL8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing stuff!

type streamState struct {
wlmCounter int64
inflight *roundTrip
conns []*webSocketConnection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be useful making this a map with the id being the key as we iterate over this a few times to delete and add

Copy link
Contributor Author

@peterbroadhurst peterbroadhurst Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually changed away from a map in this PR, because the performance sensitive part is a copy of the list that has to happen on every broadcast. That can be a simple mem-copy now.

The other side of the coin is as you point out, the need to iterate on the lifecycle action of close.

Let me know what you think after considering, as it was a big thinking point as I worked on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - so we copy so that we don't retain a mutex.Lock in case a new connection is added or deleted whilst we are in the middle of a broadcast. Interesting to me that even with a copy() the channels in each ws connection still work, tested in Go Playground. Happy for this to be a copy instead of a map 😃

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Comment on lines +172 to +173
payload.GetBatchHeader().BatchNumber = ss.wlmCounter
payload.GetBatchHeader().Stream = stream
Copy link
Contributor

@EnriqueL8 EnriqueL8 Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems odd to me that it's a Get and we update in place. I guess my java side is twitching a little but it might be fine for go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because Go doesn't have an object model to allow something like class inheritance (which would be available in Java), so when combining with Generics this is the best patterned I've found for mandating embedding of a sub-structure into a parent.

Really GetBatchHeader() is GetBatchHeaderPtr() - but that felt unnecessary.

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
@@ -0,0 +1,40 @@
// Copyright © 2023 Kaleido, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the linter doesn't check test files 🙂

c.server.connectionClosed(c)
log.L(c.ctx).Infof("Disconnected")
}

func (c *webSocketConnection) sender() {
defer c.close()
buildCases := func() []reflect.SelectCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am so happy to see this go! 👋

@nguyer nguyer merged commit ea9c63d into main Feb 13, 2024
2 checks passed
@nguyer nguyer deleted the eventstreams-ws branch February 13, 2024 21:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants