diff --git a/CHANGELOG.md b/CHANGELOG.md index a7ea912..5c5a27d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- [#182](https://github.com/deviceinsight/kafkactl/issues/182) Make isolationLevel configurable and change default to 'readCommitted' - [#184](https://github.com/deviceinsight/kafkactl/pull/184) Added option to show default configs when describing topics - [#183](https://github.com/deviceinsight/kafkactl/issues/183) Add command `delete records` to delete records from topic diff --git a/README.adoc b/README.adoc index 66409d9..c46f065 100644 --- a/README.adoc +++ b/README.adoc @@ -159,6 +159,10 @@ contexts: # optional: maximum permitted size of a message (defaults to 1000000) maxMessageBytes: 1000000 + consumer: + # optional: isolationLevel (defaults to ReadCommitted) + isolationLevel: ReadUncommitted + current-context: default ---- diff --git a/cmd/consume/consume.go b/cmd/consume/consume.go index 846a79e..c2bd9a1 100644 --- a/cmd/consume/consume.go +++ b/cmd/consume/consume.go @@ -50,6 +50,7 @@ func NewConsumeCmd() *cobra.Command { cmdConsume.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description") cmdConsume.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type") cmdConsume.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type") + cmdConsume.Flags().StringVarP(&flags.IsolationLevel, "isolation-level", "i", "", "isolationLevel to use. One of: ReadUncommitted|ReadCommitted") if err := cmdConsume.RegisterFlagCompletionFunc("group", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { return consumergroups.CompleteConsumerGroups(cmd, args, toComplete) diff --git a/internal/common-operation.go b/internal/common-operation.go index 56c0ac4..8d8c929 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -50,6 +50,10 @@ type K8sConfig struct { ImagePullSecret string } +type ConsumerConfig struct { + IsolationLevel string +} + type ProducerConfig struct { Partitioner string RequiredAcks string @@ -69,6 +73,7 @@ type ClientContext struct { AvroJSONCodec avro.JSONCodec Protobuf protobuf.SearchContext Producer ProducerConfig + Consumer ConsumerConfig } type Config struct { @@ -115,6 +120,7 @@ func CreateClientContext() (ClientContext, error) { context.Producer.Partitioner = viper.GetString("contexts." + context.Name + ".producer.partitioner") context.Producer.RequiredAcks = viper.GetString("contexts." + context.Name + ".producer.requiredAcks") context.Producer.MaxMessageBytes = viper.GetInt("contexts." + context.Name + ".producer.maxMessageBytes") + context.Consumer.IsolationLevel = viper.GetString("contexts." + context.Name + ".consumer.isolationLevel") context.Sasl.Enabled = viper.GetBool("contexts." + context.Name + ".sasl.enabled") context.Sasl.Username = viper.GetString("contexts." + context.Name + ".sasl.username") context.Sasl.Password = viper.GetString("contexts." + context.Name + ".sasl.password") diff --git a/internal/consume/consume-operation.go b/internal/consume/consume-operation.go index 5b282f9..239a1f6 100644 --- a/internal/consume/consume-operation.go +++ b/internal/consume/consume-operation.go @@ -3,6 +3,7 @@ package consume import ( "context" "sort" + "strings" "time" "github.com/deviceinsight/kafkactl/internal/helpers" @@ -39,6 +40,7 @@ type Flags struct { ProtosetFiles []string KeyProtoType string ValueProtoType string + IsolationLevel string } type ConsumedMessage struct { @@ -65,7 +67,16 @@ func (operation *Operation) Consume(topic string, flags Flags) error { return err } - if client, err = internal.CreateClient(&clientContext); err != nil { + config, err := internal.CreateClientConfig(&clientContext) + if err != nil { + return err + } + + if err = applyConsumerConfigs(config, clientContext, flags); err != nil { + return err + } + + if client, err = sarama.NewClient(clientContext.Brokers, config); err != nil { return errors.Wrap(err, "failed to create client") } @@ -166,6 +177,37 @@ func (operation *Operation) Consume(topic string, flags Flags) error { return nil } +func applyConsumerConfigs(config *sarama.Config, clientContext internal.ClientContext, flags Flags) error { + + var err error + + isolationLevel := clientContext.Consumer.IsolationLevel + if flags.IsolationLevel != "" { + isolationLevel = flags.IsolationLevel + } + + if config.Consumer.IsolationLevel, err = parseIsolationLevel(isolationLevel); err != nil { + return err + } + + output.Debugf("using isolationLevel=%v", config.Consumer.IsolationLevel) + + return nil +} + +func parseIsolationLevel(isolationLevel string) (sarama.IsolationLevel, error) { + switch strings.ToLower(isolationLevel) { + case "": + return sarama.ReadCommitted, nil + case "readcommitted": + return sarama.ReadCommitted, nil + case "readuncommitted": + return sarama.ReadUncommitted, nil + default: + return sarama.ReadCommitted, errors.Errorf("isolationLevel=%s not supported", isolationLevel) + } +} + func deserializeMessages(ctx context.Context, flags Flags, messages <-chan *sarama.ConsumerMessage, stopConsumers chan<- bool, deserializers MessageDeserializerChain) *errgroup.Group { errorGroup, _ := errgroup.WithContext(ctx)