-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.go
221 lines (195 loc) · 6.29 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package kafkauniverse
import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/cloudtrust/kafka-client/misc"
"github.com/google/uuid"
)
// KafkaMessageHandler interface shall be implemented by clients
type KafkaMessageHandler func(context.Context, KafkaMessage) error
// KafkaMessageMapper function type
type KafkaMessageMapper func(ctx context.Context, messageOffset int64, in any) (any, error)
// KafkaContextInitializer function type
type KafkaContextInitializer func(context.Context) context.Context
type consumer struct {
initialized bool
cluster *cluster
id string
enabled bool
topic string
consumerGroupName string
failureProducerName *string
failureProducer *producer
consumptionDelay *time.Duration
consumerGroup sarama.ConsumerGroup
mappers []KafkaMessageMapper
autoCommit bool
handler KafkaMessageHandler
contextInit KafkaContextInitializer
logger Logger
logEventRate int64
}
func newConsumer(cluster *cluster, consumerRep KafkaConsumerRepresentation, logger Logger) (*consumer, error) {
var enabled = true
if !cluster.enabled || (consumerRep.Enabled != nil && !*consumerRep.Enabled) {
enabled = false
}
groupName := *consumerRep.ConsumerGroupName
// Replace <UUID> in groupName with a random UUID
groupName = strings.Replace(groupName, "<UUID>", uuid.New().String(), 1)
return &consumer{
initialized: false,
cluster: cluster,
id: *consumerRep.ID,
enabled: enabled,
topic: *consumerRep.Topic,
consumerGroupName: groupName,
failureProducerName: consumerRep.FailureProducer,
failureProducer: nil,
consumptionDelay: consumerRep.ConsumptionDelay,
consumerGroup: nil,
mappers: nil,
autoCommit: true,
handler: func(ctx context.Context, msg KafkaMessage) error { return errors.New("handler not implemented") },
contextInit: func(ctx context.Context) context.Context { return ctx },
logger: logger,
logEventRate: 1000,
}, nil
}
func (c *consumer) Close() error {
if !c.initialized || !c.enabled {
return nil
}
var anError error
if err := c.consumerGroup.Close(); err != nil {
c.logger.Warn(context.Background(), "msg", "Failed to close consumer group", "group", c.consumerGroupName, "err", err)
anError = err
}
return anError
}
func (c *consumer) initialize() error {
if c.initialized {
return fmt.Errorf("consumer %s already initialized", c.id)
}
// Is consumer enabled?
if !c.enabled {
c.consumerGroup = &misc.NoopKafkaConsumerGroup{}
c.initialized = true
return nil
}
// Consumer group
var err error
if c.consumerGroup, err = c.cluster.getConsumerGroup(c.consumerGroupName); err != nil {
return err
}
// Done
c.initialized = true
return nil
}
func (c *consumer) SetHandler(handler KafkaMessageHandler) *consumer {
c.handler = handler
return c
}
func (c *consumer) SetLogEventRate(rate int64) *consumer {
if rate > 0 {
c.logEventRate = rate
}
return c
}
func (c *consumer) SetContextInitializer(ctxInitializer KafkaContextInitializer) *consumer {
c.contextInit = ctxInitializer
return c
}
func (c *consumer) AddContentMapper(mapper KafkaMessageMapper) *consumer {
c.mappers = append(c.mappers, mapper)
return c
}
func (c *consumer) SetAutoCommit(enabled bool) {
c.autoCommit = enabled
}
func (c *consumer) Go() {
if c.initialized && c.enabled {
go func() {
var failureTopic = "none"
if c.failureProducerName != nil {
failureTopic = *c.failureProducerName
}
c.logger.Info(context.Background(), "msg", "Just started thread to consume queue", "topic", c.topic, "failure-topic", failureTopic)
for {
c.consumerGroup.Consume(context.Background(), []string{c.topic}, c)
select {
case err := <-c.consumerGroup.Errors():
c.logger.Error(context.Background(), "msg", "Failure during message processing. Exit", "err", err, "topic", c.topic)
os.Exit(1)
default:
}
}
}()
}
}
func (c *consumer) applyMappers(ctx context.Context, kafkaMsg *sarama.ConsumerMessage) (any, error) {
var content any = kafkaMsg.Value
for idx, mapper := range c.mappers {
var err error
if content, err = mapper(ctx, kafkaMsg.Offset, content); err != nil {
logMsg := fmt.Sprintf("Mapper #%d failed to map content", idx+1)
c.logger.Error(ctx, "msg", logMsg, "err", err, "topic", c.topic, "offset", kafkaMsg.Offset,
"partition", kafkaMsg.Partition, "contentLength", len(kafkaMsg.Value))
return nil, err
}
}
return content, nil
}
func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (c *consumer) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
// This function is called in several goroutines ==> needs to be thread safe
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for kafkaMsg := range claim.Messages() {
ctx := c.contextInit(context.Background())
if c.consumptionDelay != nil {
sinceMessageProduction := time.Since(kafkaMsg.Timestamp)
if sinceMessageProduction < *c.consumptionDelay {
pauseDuration := *c.consumptionDelay - sinceMessageProduction
c.logger.Info(ctx, "msg", "pause consumption because of consumption delay", "pauseDuration", pauseDuration, "consumptionDelay", *c.consumptionDelay, "consumerGroupName", c.consumerGroupName)
time.Sleep(pauseDuration)
}
}
var content, err = c.applyMappers(ctx, kafkaMsg)
var msg = &consumedMessage{
msg: kafkaMsg,
content: content,
consumer: c,
session: session,
abort: false,
}
if err != nil {
msg.SendToFailureTopic()
} else {
err = c.handler(ctx, msg)
if err != nil {
c.logger.Error(ctx, "msg", "Failed to handle event", "err", err.Error(), "topic", claim.Topic())
if msg.abort {
return err
}
}
if kafkaMsg.Offset%c.logEventRate == 0 {
logMsg := fmt.Sprintf("Messages from %d to %d offset are processed", kafkaMsg.Offset-c.logEventRate, kafkaMsg.Offset)
c.logger.Info(ctx, "msg", logMsg, "topic", c.topic, "partition", kafkaMsg.Partition, "topic", claim.Topic())
}
}
// Commit event
if c.autoCommit {
session.MarkMessage(kafkaMsg, "")
}
}
return nil
}