-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdb_producer.go
45 lines (38 loc) · 1.04 KB
/
db_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package consumer
import (
"context"
"sync"
"github.com/inviqa/kafka-consumer-go/data/failure/model"
"github.com/inviqa/kafka-consumer-go/log"
)
// databaseProducer is a producer that listens for failed push attempts to kafka
// sent on fch and then sends them to the database for retry later
type databaseProducer struct {
retryManager retryManager
fch <-chan model.Failure
logger log.Logger
}
func newDatabaseProducer(rm retryManager, fch <-chan model.Failure, logger log.Logger) *databaseProducer {
return &databaseProducer{
retryManager: rm,
fch: fch,
logger: logger,
}
}
func (d databaseProducer) listenForFailures(ctx context.Context, wg *sync.WaitGroup) {
d.logger.Info("starting database retry producer")
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case f := <-d.fch:
if err := d.retryManager.PublishFailure(context.Background(), f); err != nil {
d.logger.Errorf("error publishing a failure to database for retry: %s", err)
}
case <-ctx.Done():
return
}
}
}()
}