Skip to content

Commit

Permalink
Small enhancements, add missing functions, ...
Browse files Browse the repository at this point in the history
  • Loading branch information
fperot74 authored May 2, 2024
1 parent 948d225 commit 49e36d2
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 64 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ my-kafka-key:
var c = getViperConfiguration()
var err error
kafkaUniverse, err = kafkauniverse.NewKafkaUniverse(ctx, kafkaLogger, "ENV_", func(value interface{}) error {
kafkaUniverse, err = kafkauniverse.NewKafkaUniverse(ctx, kafkaLogger, "ENV_", func(value any) error {
return c.UnmarshalKey("my-kafka-key", value)
})
if err != nil {
kafkaLogger.Error(ctx, "msg", "could not configure Kafka", "err", err)
return
}
logger.Info(ctx, "msg", "Kafka configuration loaded", "obj", *kafkaUniverse)
logger.Info(ctx, "msg", "Kafka configuration loaded")
}
defer kafkaUniverse.Close()
```

Note that the client secret can be replaced by an environment variable... in the previous example, ENV_ will be the prefix of the environment variable and suffix will be the cluster ID with uppercase and - replaced by _. In this example, the environment variable should be ENV_CLUSTER1.
Note that the client secret can be replaced by an environment variable... in the previous example, ENV_ will be the prefix of the environment variable, the cluster ID with uppercase and - replaced by _, and a suffix _CLIENT_SECRET. In this example, the environment variable should be ENV_CLUSTER1_CLIENT_SECRET.

## Initialize your producers

Expand Down Expand Up @@ -96,10 +96,10 @@ Note that the client secret can be replaced by an environment variable... in the
// Add content mappers. By default, the content will be a slice of bytes containing the raw message consumed from a Kafka topic.
// You can add some mappers to transform it in the something more confortable to use.
// By default, no mapper is configured. A pre-defined mapper is available to decode the raw message from Base64: mappers.DecodeBase64Bytes
var mapBytesToString = func(ctx context.Context, in interface{}) (interface{}, error) {
var mapBytesToString = func(ctx context.Context, in any) (any, error) {
return string(in.([]byte)), nil
}
var mapStringToInt = func(ctx context.Context, in interface{}) (interface{}, error) {
var mapStringToInt = func(ctx context.Context, in any) (any, error) {
return strconv.Atoi(in.(string))
}
Expand Down
1 change: 1 addition & 0 deletions build.pipeline
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
goLibBuildPipeline('back-kafka-client', true, 'ssh://git@github.com/cloudtrust/kafka-client')
12 changes: 6 additions & 6 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type cluster struct {
}

func newCluster(ctx context.Context, conf KafkaClusterRepresentation, envKeyPrefix string, logger Logger) (*cluster, error) {
var secret = getEnvVariable(envKeyPrefix, *conf.ID)
if secret != nil && *secret != "" {
var secret = getEnvVariable(envKeyPrefix, *conf.ID, "_CLIENT_SECRET")
if secret != nil {
conf.Security.ClientSecret = secret
}
var saramaConfig, err = newSaramaConfig(ctx, conf, logger)
Expand All @@ -37,17 +37,17 @@ func newCluster(ctx context.Context, conf KafkaClusterRepresentation, envKeyPref
}, nil
}

func getEnvVariable(prefix string, clusterID string) *string {
var key = getEnvVariableName(prefix, clusterID)
func getEnvVariable(prefix string, clusterID string, suffix string) *string {
var key = prefix + getEnvVariableName(clusterID) + suffix
var value = os.Getenv(key)
if value != "" {
return &value
}
return nil
}

func getEnvVariableName(prefix string, clusterID string) string {
return prefix + strings.ReplaceAll(strings.ToUpper(clusterID), "-", "_")
func getEnvVariableName(clusterID string) string {
return strings.ReplaceAll(strings.ToUpper(clusterID), "-", "_")
}

func newSaramaConfig(ctx context.Context, conf KafkaClusterRepresentation, logger Logger) (*sarama.Config, error) {
Expand Down
4 changes: 2 additions & 2 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (
)

func TestGetEnvVariable(t *testing.T) {
assert.Equal(t, "PATH_TO_VARIABLE2", getEnvVariableName("PATH_", "to-variable2"))
assert.NotNil(t, getEnvVariable("PA", "th")) // will match env variable PATH
assert.Equal(t, "TO_VARIABLE2", getEnvVariableName("to-variable2"))
assert.NotNil(t, getEnvVariable("PA", "t", "H")) // will match env variable PATH
}
6 changes: 3 additions & 3 deletions consumed_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// KafkaMessage interface
type KafkaMessage interface {
GetContent() interface{}
GetContent() any
GetOffset() int64
Commit()
CommitWithMessage(message string)
Expand All @@ -18,14 +18,14 @@ type KafkaMessage interface {

type consumedMessage struct {
msg *sarama.ConsumerMessage
content interface{}
content any
consumer *consumer
session sarama.ConsumerGroupSession
abort bool
}

// GetContent returns the content of the consumed message. Mappers have already been applied to the original received content.
func (cm *consumedMessage) GetContent() interface{} {
func (cm *consumedMessage) GetContent() any {
return cm.content
}

Expand Down
17 changes: 12 additions & 5 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
// KafkaMessageHandler interface shall be implemented by clients
type KafkaMessageHandler func(context.Context, KafkaMessage) error

type KafkaMessageMapper func(context.Context, interface{}) (interface{}, error)
// KafkaMessageMapper function type
type KafkaMessageMapper func(context.Context, any) (any, error)

// KafkaContextInitializer function type
type KafkaContextInitializer func(context.Context) context.Context

type consumer struct {
Expand Down Expand Up @@ -119,7 +122,11 @@ func (c *consumer) SetAutoCommit(enabled bool) {
func (c *consumer) Go() {
if c.initialized && c.enabled {
go func() {
c.logger.Info(context.Background(), "msg", "Just started thread to consume queue", "topic", c.topic, "failure-topic", *c.failureProducerName)
var failureTopic = "none"
if c.failureProducerName != nil {
failureTopic = *c.failureProducerName
}
c.logger.Info(context.Background(), "msg", "Just started thread to consume queue", "topic", c.topic, "failure-topic", failureTopic)
for {
c.consumerGroup.Consume(context.Background(), []string{c.topic}, c)
select {
Expand All @@ -133,8 +140,8 @@ func (c *consumer) Go() {
}
}

func (c *consumer) applyMappers(ctx context.Context, kafkaMsg *sarama.ConsumerMessage) (interface{}, error) {
var content interface{} = kafkaMsg.Value
func (c *consumer) applyMappers(ctx context.Context, kafkaMsg *sarama.ConsumerMessage) (any, error) {
var content any = kafkaMsg.Value
for idx, mapper := range c.mappers {
var err error
if content, err = mapper(ctx, content); err != nil {
Expand Down Expand Up @@ -171,7 +178,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
} else {
err = c.handler(ctx, msg)
if err != nil {
c.logger.Error(ctx, "msg", "Failed to persist event", "err", err.Error(), "topic", claim.Topic())
c.logger.Error(ctx, "msg", "Failed to handle event", "err", err.Error(), "topic", claim.Topic())
if msg.abort {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestConsumeClaim(t *testing.T) {
var content = 345
fillMessageChannel(messages, strconv.Itoa(content), "invalid")
var handlerError = errors.New("error from handler")
consumer.AddContentMapper(func(ctx context.Context, in interface{}) (interface{}, error) {
consumer.AddContentMapper(func(ctx context.Context, in any) (any, error) {
return strconv.Atoi(string(in.([]byte)))
})
consumer.SetHandler(func(ctx context.Context, msg KafkaMessage) error {
Expand Down
3 changes: 2 additions & 1 deletion mappers/mappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"encoding/base64"
)

func DecodeBase64Bytes(ctx context.Context, in interface{}) (interface{}, error) {
// DecodeBase64Bytes is a mapper to convert a content from base64
func DecodeBase64Bytes(ctx context.Context, in any) (any, error) {
var bytes = in.([]byte)
return base64.StdEncoding.DecodeString(string(bytes))
}
18 changes: 9 additions & 9 deletions misc/noopproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@ func (n *NoopKafkaProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
// Close does noop
func (n *NoopKafkaProducer) Close() error { return nil }

// noop
// TxnStatus does noop
func (n *NoopKafkaProducer) TxnStatus() sarama.ProducerTxnStatusFlag {
return 0
}

// noop
// IsTransactional does noop
func (n *NoopKafkaProducer) IsTransactional() bool { return true }

// noop
// BeginTxn does noop
func (n *NoopKafkaProducer) BeginTxn() error { return nil }

// noop
// CommitTxn does noop
func (n *NoopKafkaProducer) CommitTxn() error { return nil }

// noop
// AbortTxn does noop
func (n *NoopKafkaProducer) AbortTxn() error { return nil }

// noop
func (n *NoopKafkaProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error {
// AddOffsetsToTxn does noop
func (n *NoopKafkaProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupID string) error {
return nil
}

// noop
func (n *NoopKafkaProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error {
// AddMessageToTxn does noop
func (n *NoopKafkaProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupID string, metadata *string) error {
return nil
}
5 changes: 3 additions & 2 deletions misc/saramalogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

// InfoLogger is a basic logger function
type InfoLogger func(ctx context.Context, keyvals ...interface{})
type InfoLogger func(ctx context.Context, keyvals ...any)

// NewSaramaLogger creates a Sarama logger
func NewSaramaLogger(logger InfoLogger, enabled bool) sarama.StdLogger {
if enabled {
return log.New(&loggerWrapper{logger: logger}, "[Sarama] ", log.LstdFlags)
Expand All @@ -19,7 +20,7 @@ func NewSaramaLogger(logger InfoLogger, enabled bool) sarama.StdLogger {
}

type loggerWrapper struct {
logger func(ctx context.Context, keyvals ...interface{})
logger func(ctx context.Context, keyvals ...any)
}

func (c *loggerWrapper) Write(p []byte) (n int, err error) {
Expand Down
2 changes: 1 addition & 1 deletion misc/saramalogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestWrite(t *testing.T) {
var message = "test"
var loggerFunc = func(ctx context.Context, keyvals ...interface{}) {
var loggerFunc = func(ctx context.Context, keyvals ...any) {
assert.NotNil(t, ctx)
assert.Equal(t, "msg", keyvals[0])
assert.Contains(t, keyvals[1], "[Sarama] ")
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (p *producer) initialize() error {
}
var err error
if p.producer, err = sarama.NewSyncProducer(p.cluster.brokers, p.cluster.saramaConfig); err != nil {
p.logger.Error(context.Background(), "msg", "Failed to start Kafka producer", "err", err)
p.logger.Error(context.Background(), "msg", "Failed to initialize Kafka producer", "err", err)
return err
}
p.initialized = true
Expand Down
15 changes: 10 additions & 5 deletions universe.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ type KafkaUniverse struct {

// Logger interface for logging with level
type Logger interface {
Debug(ctx context.Context, keyvals ...interface{})
Info(ctx context.Context, keyvals ...interface{})
Warn(ctx context.Context, keyvals ...interface{})
Error(ctx context.Context, keyvals ...interface{})
Debug(ctx context.Context, keyvals ...any)
Info(ctx context.Context, keyvals ...any)
Warn(ctx context.Context, keyvals ...any)
Error(ctx context.Context, keyvals ...any)
}

// ConfigurationProvider interface
type ConfigurationProvider func(target interface{}) error
type ConfigurationProvider func(target any) error

// NewKafkaUniverse creates a KafkaUniverse from a provided configuration
func NewKafkaUniverse(ctx context.Context, logger Logger, envKeyPrefix string, confUnmarshal ConfigurationProvider) (*KafkaUniverse, error) {
Expand Down Expand Up @@ -124,6 +124,11 @@ func (ku *KafkaUniverse) InitializeConsumers(consumerIDs ...string) error {
return nil
}

// GetProducer gets the specified producer
func (ku *KafkaUniverse) GetProducer(producerID string) *producer {
return ku.producers[producerID]
}

// GetConsumer gets the specified consumer
func (ku *KafkaUniverse) GetConsumer(consumerID string) *consumer {
return ku.consumers[consumerID]
Expand Down
39 changes: 16 additions & 23 deletions universe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,27 @@ func TestKafkaUniverse(t *testing.T) {
var anError = errors.New("any error")

logger.EXPECT().Error(gomock.Any(), gomock.Any()).AnyTimes()
var createDefaultUniverse = func(target any) error {
var conf = target.(*[]KafkaClusterRepresentation)
*conf = append(*conf, createValidKafkaClusterRepresentation())
return nil
}

t.Run("Empty universe", func(t *testing.T) {
var _, err = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target interface{}) error {
var _, err = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target any) error {
return anError
})
assert.Equal(t, anError, err)
})
t.Run("Empty universe", func(t *testing.T) {
var universe, err = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target interface{}) error {
var universe, err = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target any) error {
return nil
})
assert.NotNil(t, err)
assert.Nil(t, universe)
})
t.Run("Invalid configuration", func(t *testing.T) {
var universe, err = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target interface{}) error {
var universe, err = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target any) error {
var conf = target.(*[]KafkaClusterRepresentation)
*conf = append(*conf, KafkaClusterRepresentation{})
return nil
Expand All @@ -43,38 +48,26 @@ func TestKafkaUniverse(t *testing.T) {
assert.Nil(t, universe)
})
t.Run("Valid configuration", func(t *testing.T) {
var universe, err = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target interface{}) error {
var conf = target.(*[]KafkaClusterRepresentation)
*conf = append(*conf, createValidKafkaClusterRepresentation())
return nil
})
var universe, err = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", createDefaultUniverse)
assert.Nil(t, err)
assert.NotNil(t, universe)
})
t.Run("Initialize unknown producer", func(t *testing.T) {
var universe, _ = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target interface{}) error {
var conf = target.(*[]KafkaClusterRepresentation)
*conf = append(*conf, createValidKafkaClusterRepresentation())
return nil
})
var universe, _ = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", createDefaultUniverse)
var err = universe.InitializeProducers("unknown")
assert.NotNil(t, err)
})
t.Run("Initialize unknown consumer", func(t *testing.T) {
var universe, _ = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target interface{}) error {
var conf = target.(*[]KafkaClusterRepresentation)
*conf = append(*conf, createValidKafkaClusterRepresentation())
return nil
})
var universe, _ = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", createDefaultUniverse)
var err = universe.InitializeConsumers("unknown")
assert.NotNil(t, err)
})
t.Run("Get unknown producer", func(t *testing.T) {
var universe, _ = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", createDefaultUniverse)
assert.Nil(t, universe.GetProducer("unknown"))
})
t.Run("Get unknown consumer", func(t *testing.T) {
var universe, _ = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", func(target interface{}) error {
var conf = target.(*[]KafkaClusterRepresentation)
*conf = append(*conf, createValidKafkaClusterRepresentation())
return nil
})
var universe, _ = NewKafkaUniverse(ctx, logger, "CT_KAFKA_CLIENT_SECRET_", createDefaultUniverse)
assert.Nil(t, universe.GetConsumer("unknown"))
})
}

0 comments on commit 49e36d2

Please sign in to comment.