-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtypes.go
171 lines (141 loc) · 6.25 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package sink
import (
"context"
"fmt"
"github.com/streamingfast/bstream"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"go.uber.org/zap/zapcore"
)
type sinkerHandlers struct {
handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error
handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error
}
func (h sinkerHandlers) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error {
return h.handleBlockScopedData(ctx, data, isLive, cursor)
}
func (h sinkerHandlers) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error {
return h.handleBlockUndoSignal(ctx, undoSignal, cursor)
}
func NewSinkerHandlers(
handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error,
handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error,
) SinkerHandler {
return sinkerHandlers{handleBlockScopedData, handleBlockUndoSignal}
}
type SinkerHandler interface {
// HandleBlockScopedData defines the callback that will handle Substreams `BlockScopedData` messages.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `data` contains the block scoped data that was received from the Substreams API, refer to it's definition for proper usage.
// - `isLive` will be non-nil if a [LivenessChecker] has been configured on the [Sinker] instance that call the handler.
// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
//
// The [HandleBlockScopedData] must be non-nil, the [Sinker] enforces this.
//
// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
// the [Sinker] that it should retry from last valid cursor. It's your responsibility to ensure no data was persisted prior the
// the error.
/**/ HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error
// HandleBlockUndoSignal defines the callback that will handle Substreams `BlockUndoSignal` messages.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `undoSignal` contains the last valid block that is still valid, any data saved after this last saved block should be discarded.
// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
//
// The [HandleBlockUndoSignal] can be nil if the sinker is configured to stream final blocks only, otherwise it must be set,
// the [Sinker] enforces this.
//
// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
// the [Sinker] that it should retry from last valid cursor. It's your responsibility to ensure no data was persisted prior the
// the error.
HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error
}
// SinkerCompletionHandler defines an extra interface that can be implemented on top of `SinkerHandler` where the
// callback will be invoked when the sinker is done processing the requested range. This is useful to implement
// a checkpointing mechanism where when the range has correctly fully processed, you can do something meaningful.
type SinkerCompletionHandler interface {
// HandleBlockRangeCompletion is called when the sinker is done processing the requested range, only when
// the stream has correctly reached its end block. If the sinker is configured to stream live, this callback
// will never be called.
//
// If the sinker terminates with an error, this callback will not be called.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
HandleBlockRangeCompletion(ctx context.Context, cursor *Cursor) error
}
type Cursor struct {
*bstream.Cursor
}
func NewCursor(cursor string) (*Cursor, error) {
if cursor == "" {
return blankCursor, nil
}
decoded, err := bstream.CursorFromOpaque(cursor)
if err != nil {
return nil, fmt.Errorf("decode %q: %w", cursor, err)
}
return &Cursor{decoded}, nil
}
func MustNewCursor(cursor string) *Cursor {
decoded, err := NewCursor(cursor)
if err != nil {
panic(err)
}
return decoded
}
var blankCursor = (*Cursor)(nil)
func NewBlankCursor() *Cursor {
return blankCursor
}
func (c *Cursor) Block() bstream.BlockRef {
if c.IsBlank() {
return unsetBlockRef{}
}
return c.Cursor.Block
}
func (c *Cursor) IsBlank() bool {
return c == nil || c == blankCursor
}
func (c *Cursor) IsEqualTo(other *Cursor) bool {
if c.IsBlank() && other.IsBlank() {
return true
}
// We know both are not equal, so if either side is `nil`, we are sure the other is not, so not equal
if !c.IsBlank() || !other.IsBlank() {
return false
}
// Both side are non-nil here
actual := c.Cursor
candidate := other.Cursor
return actual.Equals(candidate)
}
func (c *Cursor) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
cursor := "<Blank>"
if !c.IsBlank() {
cursor = c.String()
}
encoder.AddString("cursor", cursor)
return nil
}
// String returns a string representation suitable for handling a Firehose request
// meaning a blank cursor returns "".
func (c *Cursor) String() string {
if c.IsBlank() {
return ""
}
return c.Cursor.ToOpaque()
}
//go:generate go-enum -f=$GOFILE --marshal --names
// ENUM(
//
// Development
// Production
//
// )
type SubstreamsMode uint