-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuniverse.go
142 lines (131 loc) · 4.04 KB
/
universe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package kafkauniverse
import (
"context"
"errors"
"fmt"
)
// KafkaUniverse struct
type KafkaUniverse struct {
clusters []*cluster
producers map[string]*producer
consumers map[string]*consumer
}
// Logger interface for logging with level
type Logger interface {
Debug(ctx context.Context, keyvals ...any)
Info(ctx context.Context, keyvals ...any)
Warn(ctx context.Context, keyvals ...any)
Error(ctx context.Context, keyvals ...any)
}
// ConfigurationProvider interface
type ConfigurationProvider func(target any) error
// NewKafkaUniverse creates a KafkaUniverse from a provided configuration
func NewKafkaUniverse(ctx context.Context, logger Logger, envKeyPrefix string, confUnmarshal ConfigurationProvider) (*KafkaUniverse, error) {
var clusterRepresentations = []KafkaClusterRepresentation{}
var err error
if err = confUnmarshal(&clusterRepresentations); err != nil {
logger.Error(ctx, "msg", "Failed to unmarshal Kafka configuration", "err", err)
return nil, err
}
if len(clusterRepresentations) == 0 {
logger.Error(ctx, "msg", "Kafka universe is empty")
return nil, errors.New("kafka universe is empty")
}
for idx, clusterRep := range clusterRepresentations {
if err = clusterRep.Validate(); err != nil {
logger.Error(ctx, "msg", "Loaded Kafka configuration is invalid", "err", err, "idx", idx)
return nil, err
}
}
var res = KafkaUniverse{
producers: map[string]*producer{},
consumers: map[string]*consumer{},
}
for _, clusterRepresentation := range clusterRepresentations {
var cluster, err = newCluster(ctx, clusterRepresentation, envKeyPrefix, logger)
if err != nil {
return nil, err
}
res.clusters = append(res.clusters, cluster)
for _, producerRep := range clusterRepresentation.Producers {
res.producers[*producerRep.ID], err = newProducer(cluster, producerRep, logger)
if err != nil {
return nil, err
}
}
for _, consumerRep := range clusterRepresentation.Consumers {
var consumer, err = newConsumer(cluster, consumerRep, logger)
if err != nil {
return nil, err
}
res.consumers[*consumerRep.ID] = consumer
if consumerRep.FailureProducer != nil {
var ok bool
if consumer.failureProducer, ok = res.producers[*consumerRep.FailureProducer]; !ok {
return nil, fmt.Errorf("invalid failure producer %s for consumer %s", *consumerRep.FailureProducer, *consumerRep.ID)
}
}
}
}
return &res, nil
}
// Close releases all instantiated resources
func (ku *KafkaUniverse) Close() error {
var anError error
for _, consumer := range ku.consumers {
if err := consumer.Close(); err != nil {
anError = err
}
}
for _, producer := range ku.producers {
if err := producer.Close(); err != nil {
anError = err
}
}
for _, cluster := range ku.clusters {
if err := cluster.Close(); err != nil {
anError = err
}
}
return anError
}
// InitializeProducers initializes all specified producers
func (ku *KafkaUniverse) InitializeProducers(producerIDs ...string) error {
for _, producerName := range producerIDs {
if producer, ok := ku.producers[producerName]; ok {
if err := producer.initialize(); err != nil {
return err
}
} else {
return fmt.Errorf("unknown producer %s", producerIDs)
}
}
return nil
}
// InitializeConsumers initializes all specified consumers
func (ku *KafkaUniverse) InitializeConsumers(consumerIDs ...string) error {
for _, consumerID := range consumerIDs {
if consumer, ok := ku.consumers[consumerID]; ok {
if err := consumer.initialize(); err != nil {
return err
}
} else {
return fmt.Errorf("unknown consumer %s", consumerID)
}
}
return nil
}
// GetProducer gets the specified producer
func (ku *KafkaUniverse) GetProducer(producerID string) *producer {
return ku.producers[producerID]
}
// GetConsumer gets the specified consumer
func (ku *KafkaUniverse) GetConsumer(consumerID string) *consumer {
return ku.consumers[consumerID]
}
// StartConsumers starts all specified consumers
func (ku *KafkaUniverse) StartConsumers(consumerIDs ...string) {
for _, consumerName := range consumerIDs {
ku.GetConsumer(consumerName).Go()
}
}