Skip to content

Commit

Permalink
make consumer isolationLevel configurable (fixes #182)
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Jan 18, 2024
1 parent c23f0e8 commit c0254ad
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- [#182](https://github.com/deviceinsight/kafkactl/issues/182) Make isolationLevel configurable and change default to 'readCommitted'
- [#184](https://github.com/deviceinsight/kafkactl/pull/184) Added option to show default configs when describing topics
- [#183](https://github.com/deviceinsight/kafkactl/issues/183) Add command `delete records` to delete records from topic

Expand Down
4 changes: 4 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ contexts:
# optional: maximum permitted size of a message (defaults to 1000000)
maxMessageBytes: 1000000
consumer:
# optional: isolationLevel (defaults to ReadCommitted)
isolationLevel: ReadUncommitted
current-context: default
----

Expand Down
1 change: 1 addition & 0 deletions cmd/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func NewConsumeCmd() *cobra.Command {
cmdConsume.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description")
cmdConsume.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type")
cmdConsume.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type")
cmdConsume.Flags().StringVarP(&flags.IsolationLevel, "isolation-level", "i", "", "isolationLevel to use. One of: ReadUncommitted|ReadCommitted")

if err := cmdConsume.RegisterFlagCompletionFunc("group", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return consumergroups.CompleteConsumerGroups(cmd, args, toComplete)
Expand Down
6 changes: 6 additions & 0 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type K8sConfig struct {
ImagePullSecret string
}

type ConsumerConfig struct {
IsolationLevel string
}

type ProducerConfig struct {
Partitioner string
RequiredAcks string
Expand All @@ -69,6 +73,7 @@ type ClientContext struct {
AvroJSONCodec avro.JSONCodec
Protobuf protobuf.SearchContext
Producer ProducerConfig
Consumer ConsumerConfig
}

type Config struct {
Expand Down Expand Up @@ -115,6 +120,7 @@ func CreateClientContext() (ClientContext, error) {
context.Producer.Partitioner = viper.GetString("contexts." + context.Name + ".producer.partitioner")
context.Producer.RequiredAcks = viper.GetString("contexts." + context.Name + ".producer.requiredAcks")
context.Producer.MaxMessageBytes = viper.GetInt("contexts." + context.Name + ".producer.maxMessageBytes")
context.Consumer.IsolationLevel = viper.GetString("contexts." + context.Name + ".consumer.isolationLevel")
context.Sasl.Enabled = viper.GetBool("contexts." + context.Name + ".sasl.enabled")
context.Sasl.Username = viper.GetString("contexts." + context.Name + ".sasl.username")
context.Sasl.Password = viper.GetString("contexts." + context.Name + ".sasl.password")
Expand Down
44 changes: 43 additions & 1 deletion internal/consume/consume-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consume
import (
"context"
"sort"
"strings"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers"
Expand Down Expand Up @@ -39,6 +40,7 @@ type Flags struct {
ProtosetFiles []string
KeyProtoType string
ValueProtoType string
IsolationLevel string
}

type ConsumedMessage struct {
Expand All @@ -65,7 +67,16 @@ func (operation *Operation) Consume(topic string, flags Flags) error {
return err
}

if client, err = internal.CreateClient(&clientContext); err != nil {
config, err := internal.CreateClientConfig(&clientContext)
if err != nil {
return err
}

if err = applyConsumerConfigs(config, clientContext, flags); err != nil {
return err
}

if client, err = sarama.NewClient(clientContext.Brokers, config); err != nil {
return errors.Wrap(err, "failed to create client")
}

Expand Down Expand Up @@ -166,6 +177,37 @@ func (operation *Operation) Consume(topic string, flags Flags) error {
return nil
}

func applyConsumerConfigs(config *sarama.Config, clientContext internal.ClientContext, flags Flags) error {

var err error

isolationLevel := clientContext.Consumer.IsolationLevel
if flags.IsolationLevel != "" {
isolationLevel = flags.IsolationLevel
}

if config.Consumer.IsolationLevel, err = parseIsolationLevel(isolationLevel); err != nil {
return err
}

output.Debugf("using isolationLevel=%v", config.Consumer.IsolationLevel)

return nil
}

func parseIsolationLevel(isolationLevel string) (sarama.IsolationLevel, error) {
switch strings.ToLower(isolationLevel) {
case "":
return sarama.ReadCommitted, nil
case "readcommitted":
return sarama.ReadCommitted, nil
case "readuncommitted":
return sarama.ReadUncommitted, nil
default:
return sarama.ReadCommitted, errors.Errorf("isolationLevel=%s not supported", isolationLevel)
}
}

func deserializeMessages(ctx context.Context, flags Flags, messages <-chan *sarama.ConsumerMessage, stopConsumers chan<- bool, deserializers MessageDeserializerChain) *errgroup.Group {

errorGroup, _ := errgroup.WithContext(ctx)
Expand Down

0 comments on commit c0254ad

Please sign in to comment.