This repository has been archived by the owner on Sep 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathworkers.go
451 lines (373 loc) · 14.7 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
Refactored: 1
*/
package mirbft
import (
"fmt"
"github.com/hyperledger-labs/mirbft/pkg/events"
"github.com/hyperledger-labs/mirbft/pkg/pb/eventpb"
"github.com/hyperledger-labs/mirbft/pkg/pb/statuspb"
t "github.com/hyperledger-labs/mirbft/pkg/types"
"github.com/pkg/errors"
"runtime/debug"
)
// Input and output channels for the modules within the Node.
// the Node.process() method reads and writes events
// to and from these channels to rout them between the Node's modules.
type workChans struct {
// There is one channel per module to feed events into the module.
clients chan *events.EventList
protocol chan *events.EventList
wal chan *events.EventList
hash chan *events.EventList
crypto chan *events.EventList
net chan *events.EventList
app chan *events.EventList
reqStore chan *events.EventList
// All modules write their output events in a common channel, from where the node processor reads and redistributes
// the events to their respective workItems buffers.
// External events are also funneled through this channel towards the workItems buffers.
workItemInput chan *events.EventList
}
// Allocate and return a new workChans structure.
func newWorkChans() workChans {
return workChans{
clients: make(chan *events.EventList),
protocol: make(chan *events.EventList),
wal: make(chan *events.EventList),
hash: make(chan *events.EventList),
crypto: make(chan *events.EventList),
net: make(chan *events.EventList),
app: make(chan *events.EventList),
reqStore: make(chan *events.EventList),
workItemInput: make(chan *events.EventList),
}
}
// A function type used for performing the work of a module.
// It usually reads events from a work channel and writes the output to another work channel.
// Any error that occurs while performing the work is returned.
// When the exitC channel is closed the function should return ErrStopped
type workFunc func(exitC <-chan struct{}) error
// Calls the passed work function repeatedly in an infinite loop until the work function returns an non-nil error.
// doUntilErr then sets the error in the Node's workErrNotifier and returns.
func (n *Node) doUntilErr(work workFunc) {
for {
err := work(n.workErrNotifier.ExitC())
if err != nil {
n.workErrNotifier.Fail(err)
return
}
}
}
// eventProcessor defines the type of the function that processes a single input events.EventList,
// producing a single output events.EventList.
// There is one such function defined for each Module that is executed in a loop by a worker goroutine.
type eventProcessor func(*events.EventList) (*events.EventList, error)
// processEvents reads a single list of input Events from a work channel, strips off all associated follow-up Events,
// and processes the bare content of the list using the passed processing function.
// processEvents writes all the stripped off follow-up events along with any Events generated by the processing
// to the workItemInput channel, so they will be added to the workItems buffer for further processing.
//
// If the Node is configured to use an Interceptor, after having removed all follow-up Events,
// processEvents passes the list of input Events to the Interceptor.
//
// If exitC is closed, returns ErrStopped.
func (n *Node) processEvents(
processFunc eventProcessor,
eventSource <-chan *events.EventList,
exitC <-chan struct{},
) error {
var eventsIn *events.EventList
// Read input.
select {
case eventsIn = <-eventSource:
case <-exitC:
return ErrStopped
}
// Remove follow-up Events from the input EventList,
// in order to re-insert them in the processing loop after the input events have been processed.
followUps := eventsIn.StripFollowUps()
// Intercept the (stripped of all follow-ups) events that are about to be processed.
// This is only for debugging / diagnostic purposes.
n.interceptEvents(eventsIn)
// Process events.
newEvents, err := processFunc(eventsIn)
if err != nil {
return errors.WithMessage(err, "could not process events")
}
// Merge the pending follow-up Events with the newly generated Events.
out := followUps.PushBackList(newEvents)
// Return if no output was generated.
// This is only an optimization to prevent the processor loop from handling empty EventLists.
if out.Len() == 0 {
return nil
}
// Write output.
select {
case n.workChans.workItemInput <- out:
case <-exitC:
return ErrStopped
}
return nil
}
// Module-specific wrappers for Node.ProcessEvents,
// associating each Module's processing function with its corresponding work channel.
// On top of that, the Protocol processing wrapper additionally sets the Node's exit status when done.
func (n *Node) doWALWork(exitC <-chan struct{}) error {
return n.processEvents(n.processWALEvents, n.workChans.wal, exitC)
}
func (n *Node) doClientWork(exitC <-chan struct{}) error {
return n.processEvents(n.processClientEvents, n.workChans.clients, exitC)
}
func (n *Node) doHashWork(exitC <-chan struct{}) error {
return n.processEvents(n.processHashEvents, n.workChans.hash, exitC)
}
func (n *Node) doCryptoWork(exitC <-chan struct{}) error {
return n.processEvents(n.processCryptoEvents, n.workChans.crypto, exitC)
}
func (n *Node) doSendingWork(exitC <-chan struct{}) error {
return n.processEvents(n.processSendEvents, n.workChans.net, exitC)
}
func (n *Node) doAppWork(exitC <-chan struct{}) error {
return n.processEvents(n.processAppEvents, n.workChans.app, exitC)
}
func (n *Node) doReqStoreWork(exitC <-chan struct{}) error {
return n.processEvents(n.processReqStoreEvents, n.workChans.reqStore, exitC)
}
func (n *Node) doProtocolWork(exitC <-chan struct{}) (err error) {
// On returning, sets the exit status of the protocol state machine in the work error notifier.
defer func() {
if err != nil {
s, err := n.modules.Protocol.Status()
n.workErrNotifier.SetExitStatus(&statuspb.NodeStatus{Protocol: s}, err)
// TODO: Clean up status-related code.
}
}()
return n.processEvents(n.processProtocolEvents, n.workChans.protocol, exitC)
}
// TODO: Document the functions below.
func (n *Node) processWALEvents(eventsIn *events.EventList) (*events.EventList, error) {
// If no WAL implementation is present, do nothing and return immediately.
if n.modules.WAL == nil {
return &events.EventList{}, nil
}
eventsOut := &events.EventList{}
iter := eventsIn.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
// Perform the necessary action based on event type.
switch e := event.Type.(type) {
case *eventpb.Event_WalAppend:
if err := n.modules.WAL.Append(e.WalAppend.Event, t.WALRetIndex(e.WalAppend.RetentionIndex)); err != nil {
return nil, fmt.Errorf("could not persist event (retention index %d) to WAL: %w",
e.WalAppend.RetentionIndex, err)
}
case *eventpb.Event_PersistDummyBatch:
if err := n.modules.WAL.Append(event, 0); err != nil {
return nil, fmt.Errorf("could not persist dummy batch: %w", err)
}
default:
return nil, errors.Errorf("unexpected type of WAL event: %T", event.Type)
}
}
// Then we sync the WAL
if err := n.modules.WAL.Sync(); err != nil {
return nil, errors.WithMessage(err, "failed to sync WAL")
}
return eventsOut, nil
}
func (n *Node) processClientEvents(eventsIn *events.EventList) (*events.EventList, error) {
eventsOut := &events.EventList{}
iter := eventsIn.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
newEvents, err := n.safeApplyClientEvent(event)
if err != nil {
return nil, errors.WithMessage(err, "err applying client event")
}
eventsOut.PushBackList(newEvents)
}
return eventsOut, nil
}
func (n *Node) safeApplyClientEvent(event *eventpb.Event) (result *events.EventList, err error) {
defer func() {
if r := recover(); r != nil {
if rErr, ok := r.(error); ok {
err = fmt.Errorf("panic in client tracker: %w\nStack trace:\n%s", rErr, string(debug.Stack()))
} else {
err = fmt.Errorf("panic in client tracker: %v\nStack trace:\n%s", r, string(debug.Stack()))
}
}
}()
return n.modules.ClientTracker.ApplyEvent(event), nil
}
func (n *Node) processHashEvents(eventsIn *events.EventList) (*events.EventList, error) {
eventsOut := &events.EventList{}
iter := eventsIn.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
switch e := event.Type.(type) {
case *eventpb.Event_HashRequest:
// HashRequest is the only event understood by the hasher module.
// Hash all the data and create a hashResult event.
h := n.modules.Hasher.New()
for _, data := range e.HashRequest.Data {
h.Write(data)
}
eventsOut.PushBack(events.HashResult(h.Sum(nil), e.HashRequest.Origin))
default:
// Complain about all other incoming event types.
return nil, errors.Errorf("unexpected type of Hash event: %T", event.Type)
}
}
return eventsOut, nil
}
func (n *Node) processCryptoEvents(eventsIn *events.EventList) (*events.EventList, error) {
eventsOut := &events.EventList{}
iter := eventsIn.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
switch e := event.Type.(type) {
case *eventpb.Event_VerifyRequestSig:
// Verify client request signature.
// The signature is only computed (and verified) over the digest of a request.
// The other fields can safely be ignored.
// Convenience variable
reqRef := e.VerifyRequestSig.RequestRef
// Verify signature.
err := n.modules.Crypto.VerifyClientSig(
[][]byte{reqRef.Digest},
e.VerifyRequestSig.Signature,
t.ClientID(reqRef.ClientId))
// Create result event, depending on verification outcome.
if err == nil {
eventsOut.PushBack(events.RequestSigVerified(reqRef, true, ""))
} else {
eventsOut.PushBack(events.RequestSigVerified(reqRef, false, err.Error()))
}
default:
// Complain about all other incoming event types.
return nil, errors.Errorf("unexpected type of Crypto event: %T", event.Type)
}
}
return eventsOut, nil
}
func (n *Node) processSendEvents(eventsIn *events.EventList) (*events.EventList, error) {
eventsOut := &events.EventList{}
iter := eventsIn.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
switch e := event.Type.(type) {
case *eventpb.Event_SendMessage:
for _, destId := range e.SendMessage.Destinations {
if t.NodeID(destId) == n.ID {
eventsOut.PushBack(events.MessageReceived(n.ID, e.SendMessage.Msg))
} else {
n.modules.Net.Send(t.NodeID(destId), e.SendMessage.Msg)
// TODO: Handle sending errors.
}
}
default:
return nil, errors.Errorf("unexpected type of Net event: %T", event.Type)
}
}
return eventsOut, nil
}
func (n *Node) processAppEvents(eventsIn *events.EventList) (*events.EventList, error) {
eventsOut := &events.EventList{}
iter := eventsIn.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
switch e := event.Type.(type) {
case *eventpb.Event_AnnounceDummyBatch:
if err := n.modules.App.Apply(e.AnnounceDummyBatch.Batch); err != nil {
return nil, fmt.Errorf("app error: %w", err)
}
case *eventpb.Event_Deliver:
if err := n.modules.App.Apply(e.Deliver.Batch); err != nil {
return nil, fmt.Errorf("app batch delivery error: %w", err)
}
case *eventpb.Event_AppSnapshotRequest:
if data, err := n.modules.App.Snapshot(); err != nil {
return nil, fmt.Errorf("app snapshot error: %w", err)
} else {
return (&events.EventList{}).PushBack(events.AppSnapshot(t.SeqNr(e.AppSnapshotRequest.Sn), data)), nil
}
default:
return nil, errors.Errorf("unexpected type of App event: %T", event.Type)
}
}
return eventsOut, nil
}
func (n *Node) processReqStoreEvents(eventsIn *events.EventList) (*events.EventList, error) {
eventsOut := &events.EventList{}
iter := eventsIn.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
// Process event based on its type.
switch e := event.Type.(type) {
case *eventpb.Event_StoreVerifiedRequest:
storeEvent := e.StoreVerifiedRequest
// Store request data.
if err := n.modules.RequestStore.PutRequest(storeEvent.RequestRef, storeEvent.Data); err != nil {
return nil, fmt.Errorf("cannot store request (c%dr%d) data: %w",
storeEvent.RequestRef.ClientId,
storeEvent.RequestRef.ReqNo,
err)
}
// Mark request as authenticated.
if err := n.modules.RequestStore.SetAuthenticated(storeEvent.RequestRef); err != nil {
return nil, fmt.Errorf("cannot mark request (c%dr%d) as authenticated: %w",
storeEvent.RequestRef.ClientId,
storeEvent.RequestRef.ReqNo,
err)
}
// Store request authenticator.
if err := n.modules.RequestStore.PutAuthenticator(storeEvent.RequestRef, storeEvent.Authenticator); err != nil {
return nil, fmt.Errorf("cannot store authenticator (c%dr%d) of request: %w",
storeEvent.RequestRef.ClientId,
storeEvent.RequestRef.ReqNo,
err)
}
case *eventpb.Event_StoreDummyRequest:
storeEvent := e.StoreDummyRequest // Helper variable for convenience
// Store request data.
if err := n.modules.RequestStore.PutRequest(storeEvent.RequestRef, storeEvent.Data); err != nil {
return nil, fmt.Errorf("cannot store dummy request data: %w", err)
}
// Mark request as authenticated.
if err := n.modules.RequestStore.SetAuthenticated(storeEvent.RequestRef); err != nil {
return nil, fmt.Errorf("cannot mark dummy request as authenticated: %w", err)
}
// Associate a dummy authenticator with the request
if err := n.modules.RequestStore.PutAuthenticator(storeEvent.RequestRef, []byte{0}); err != nil {
return nil, fmt.Errorf("cannot store authenticator of dummy request: %w", err)
}
eventsOut.PushBack(events.RequestReady(storeEvent.RequestRef))
}
}
// Then sync the request store, ensuring that all updates to its state are persisted.
if err := n.modules.RequestStore.Sync(); err != nil {
return nil, errors.WithMessage(err, "could not sync request store, unsafe to continue")
}
return eventsOut, nil
}
func (n *Node) processProtocolEvents(eventsIn *events.EventList) (*events.EventList, error) {
eventsOut := &events.EventList{}
iter := eventsIn.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
newEvents, err := n.safeApplyProtocolEvent(event)
if err != nil {
return nil, errors.WithMessage(err, "error applying protocol event")
}
eventsOut.PushBackList(newEvents)
}
return eventsOut, nil
}
func (n *Node) safeApplyProtocolEvent(event *eventpb.Event) (result *events.EventList, err error) {
defer func() {
if r := recover(); r != nil {
if rErr, ok := r.(error); ok {
err = fmt.Errorf("panic in protocol state machine: %w\nStack trace:\n%s", rErr, string(debug.Stack()))
} else {
err = fmt.Errorf("panic in protocol state machine: %v\nStack trace:\n%s", r, string(debug.Stack()))
}
}
}()
return n.modules.Protocol.ApplyEvent(event), nil
}