-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprojection.go
155 lines (125 loc) · 2.98 KB
/
projection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package eventstore
import (
"context"
"errors"
"io"
"log"
"sync"
"time"
)
// EventStreamer represents an event stream that can be subscribed to
// This package offers EventStore as EventStreamer implementation
type EventStreamer interface {
SubscribeAll(context.Context, ...SubAllOpt) (Subscription, error)
}
// NewProjector constructs a Projector
// TODO Configure logger, pollInterval, and retry
func NewProjector(s EventStreamer) *Projector {
return &Projector{
streamer: s,
logger: log.Default(),
}
}
// Projector is an event projector which will subscribe to an
// event stream (evet store) and project events to each
// individual projection in an asynchronous manner
type Projector struct {
streamer EventStreamer
projections []Projection
logger *log.Logger
}
// Projection is basically a function which needs to handle a stored event.
// It will be called for each event that comes in
type Projection func(StoredEvent) error
// Add effectively registers a projection with the projector
// Make sure to add all of your projections before calling Run
func (p *Projector) Add(projections ...Projection) {
p.projections = append(p.projections, projections...)
}
// Run will start the projector
func (p *Projector) Run(ctx context.Context) error {
var wg sync.WaitGroup
for _, projection := range p.projections {
wg.Add(1)
go func(projection Projection) {
defer wg.Done()
for {
// TODO retry with backoff
sub, err := p.streamer.SubscribeAll(ctx)
if err != nil {
p.logErr(err)
return
}
if err := p.run(ctx, sub, projection); err != nil {
sub.Close()
continue
}
sub.Close()
return
}
}(projection)
}
wg.Wait()
return nil
}
func (p *Projector) run(ctx context.Context, sub Subscription, projection Projection) error {
for {
select {
case data := <-sub.EventData:
err := projection(data)
if err != nil {
p.logErr(err)
// TODO retry with backoff
return err
}
case err := <-sub.Err:
if err != nil {
if errors.Is(err, io.EOF) {
break
}
if errors.Is(err, ErrSubscriptionClosedByClient) {
return nil
}
p.logErr(err)
}
case <-ctx.Done():
return nil
}
}
}
func (p *Projector) logErr(err error) {
p.logger.Printf("projector error: %v", err)
}
// FlushAfter wraps the projection passed in, and it calls
// the projection itself as new events come (as usual) in addition to calling
// the provided flush function periodically each time flush interval expires
func FlushAfter(
p Projection,
flush func() error,
flushInt time.Duration) Projection {
work := make(chan StoredEvent, 1)
errs := make(chan error, 2)
go func() {
for {
select {
case <-time.After(flushInt):
if err := flush(); err != nil {
errs <- err
}
case w := <-work:
if err := p(w); err != nil {
errs <- err
}
}
}
}()
return func(data StoredEvent) error {
select {
case err := <-errs:
return err
default:
work <- data
}
return nil
}
}