From 4f6e4e653b85ec3fc0b528341597feb5027967df Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Thu, 22 Jun 2017 14:33:50 -0400 Subject: [PATCH] [FAB-4619] enable specifying orderer kafka version Kafka version can now be set in orderer.yaml, eg: Kakfa: Version: 0.10.0.0 Default value is '0.9.0.1'. Change-Id: Ieb69b003b400bff9c7918743e99ce76c4ad4363c Signed-off-by: Luis Sanchez --- common/viperutil/config_test.go | 58 ++++++++++++++++++++++++++++ common/viperutil/config_util.go | 30 ++++++++++++++ docs/source/kafka.rst | 25 +++++------- examples/cluster/config/orderer.yaml | 5 ++- orderer/localconfig/config.go | 7 ++-- sampleconfig/orderer.yaml | 5 ++- 6 files changed, 109 insertions(+), 21 deletions(-) diff --git a/common/viperutil/config_test.go b/common/viperutil/config_test.go index 24cdee75dd3..f867209800d 100644 --- a/common/viperutil/config_test.go +++ b/common/viperutil/config_test.go @@ -25,6 +25,7 @@ import ( "strings" "testing" + "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/mocks/util" "github.com/spf13/viper" ) @@ -70,6 +71,63 @@ func TestEnvSlice(t *testing.T) { } } +func TestKafkaVersionDecode(t *testing.T) { + + type testKafkaVersion struct { + Inner struct { + Version sarama.KafkaVersion + } + } + + config := viper.New() + config.SetConfigType("yaml") + + testCases := []struct { + data string + expected sarama.KafkaVersion + errExpected bool + }{ + {"0.8.2.0", sarama.V0_8_2_0, false}, + {"0.8.2.1", sarama.V0_8_2_1, false}, + {"0.8.2.2", sarama.V0_8_2_2, false}, + {"0.9.0.0", sarama.V0_9_0_0, false}, + {"0.9.0.1", sarama.V0_9_0_1, false}, + {"0.10.0.0", sarama.V0_10_0_0, false}, + {"0.10.0.1", sarama.V0_10_0_1, false}, + {"0.10.1.0", sarama.V0_10_1_0, false}, + {"Unsupported", sarama.KafkaVersion{}, true}, + } + + for _, tc := range testCases { + t.Run(tc.data, func(t *testing.T) { + + data := fmt.Sprintf("---\nInner:\n Version: %s", tc.data) + err := config.ReadConfig(bytes.NewReader([]byte(data))) + if err != nil { + t.Fatalf("Error reading config: %s", err) + } + + var uconf testKafkaVersion + err = EnhancedExactUnmarshal(config, &uconf) + + if tc.errExpected { + if err == nil { + t.Fatalf("Should have failed to unmarshal") + } + } else { + if err != nil { + t.Fatalf("Failed to unmarshal with: %s", err) + } + if uconf.Inner.Version != tc.expected { + t.Fatalf("Did not get back the right kafka version, expected: %v got %v", tc.expected, uconf.Inner.Version) + } + } + + }) + } + +} + type testByteSize struct { Inner struct { ByteSize uint32 diff --git a/common/viperutil/config_util.go b/common/viperutil/config_util.go index c6ddf5fd158..75c7e600343 100644 --- a/common/viperutil/config_util.go +++ b/common/viperutil/config_util.go @@ -29,6 +29,7 @@ import ( "encoding/json" "encoding/pem" + "github.com/Shopify/sarama" "github.com/hyperledger/fabric/common/flogging" "github.com/mitchellh/mapstructure" "github.com/spf13/viper" @@ -254,6 +255,34 @@ func pemBlocksFromFileDecodeHook() mapstructure.DecodeHookFunc { } } +func kafkaVersionDecodeHook() mapstructure.DecodeHookFunc { + return func(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + if f.Kind() != reflect.String || t != reflect.TypeOf(sarama.KafkaVersion{}) { + return data, nil + } + switch data { + case "0.8.2.0": + return sarama.V0_8_2_0, nil + case "0.8.2.1": + return sarama.V0_8_2_1, nil + case "0.8.2.2": + return sarama.V0_8_2_2, nil + case "0.9.0.0": + return sarama.V0_9_0_0, nil + case "0.9.0.1": + return sarama.V0_9_0_1, nil + case "0.10.0.0": + return sarama.V0_10_0_0, nil + case "0.10.0.1": + return sarama.V0_10_0_1, nil + case "0.10.1.0": + return sarama.V0_10_1_0, nil + default: + return nil, fmt.Errorf("Unsupported Kafka version: '%s'", data) + } + } +} + // EnhancedExactUnmarshal is intended to unmarshal a config file into a structure // producing error when extraneous variables are introduced and supporting // the time.Duration type @@ -274,6 +303,7 @@ func EnhancedExactUnmarshal(v *viper.Viper, output interface{}) error { byteSizeDecodeHook(), stringFromFileDecodeHook(), pemBlocksFromFileDecodeHook(), + kafkaVersionDecodeHook(), ), } diff --git a/docs/source/kafka.rst b/docs/source/kafka.rst index da029dc4d83..e0b51a6f17b 100644 --- a/docs/source/kafka.rst +++ b/docs/source/kafka.rst @@ -185,22 +185,15 @@ Supported Kafka versions for v1 are ``0.9`` and ``0.10``. (Fabric uses the version of it that supports Kafka 0.9 and 0.10.) Out of the box the Kafka version defaults to ``0.9.0.1``. If you wish to use a -different supported version, you will have to edit the source code (modify the -``Version`` field of the ``defaults`` struct in -``orderer/localconfig/config.go``) and rebuild the ``orderer`` binary. For -example, if you wish to run the ordering service in a Kafka cluster running -0.10.0.1, you would edit the file like so: - -:: - - ... - Verbose: false, - Version: sarama.V0_10_0_1, - TLS: TLS{ - ... - -And then rebuild the binary. (This process will be improved with -`FAB-4619 `_.) +different supported version, specify a supported version using the +``Kafka.Version`` key in ``orderer.yaml``. + +The current supported Kafka versions are: + +* ``Version: 0.9.0.1`` +* ``Version: 0.10.0.0`` +* ``Version: 0.10.0.1`` +* ``Version: 0.10.1.0`` Debugging --------- diff --git a/examples/cluster/config/orderer.yaml b/examples/cluster/config/orderer.yaml index afaf80c555d..c6e0ec1f91a 100644 --- a/examples/cluster/config/orderer.yaml +++ b/examples/cluster/config/orderer.yaml @@ -67,7 +67,7 @@ General: # LocalMSPID is the identity to register the local MSP material with the MSP # manager. IMPORTANT: The local MSP ID of an orderer needs to match the MSP - # ID of one of the organizations defined in the orderer system channel's + # ID of one of the organizations defined in the orderer system channel's # /Channel/Orderer configuration. The sample organization defined in the # sample configuration provided has an MSP ID of "DEFAULT". LocalMSPID: OrdererMSP @@ -221,3 +221,6 @@ Kafka: # following "File" key and specify the file name from which to load the # value of RootCAs. #File: path/to/RootCAs + + # Kafka version of the Kafka cluster brokers (defaults to 0.9.0.1) + Version: diff --git a/orderer/localconfig/config.go b/orderer/localconfig/config.go index d5115b00e8c..0ee07367beb 100644 --- a/orderer/localconfig/config.go +++ b/orderer/localconfig/config.go @@ -340,10 +340,11 @@ func (c *TopLevel) completeInitialization(configDir string) { logger.Infof("Kafka.Retry.Consumer.RetryBackoff unset, setting to %v", defaults.Kafka.Retry.Consumer.RetryBackoff) c.Kafka.Retry.Consumer.RetryBackoff = defaults.Kafka.Retry.Consumer.RetryBackoff - default: - // A bit hacky, but its type makes it impossible to test for a nil value. - // This may be overwritten by the Kafka orderer upon instantiation. + case c.Kafka.Version == sarama.KafkaVersion{}: + logger.Infof("Kafka.Version unset, setting to %v", defaults.Kafka.Version) c.Kafka.Version = defaults.Kafka.Version + + default: return } } diff --git a/sampleconfig/orderer.yaml b/sampleconfig/orderer.yaml index 323f55285ec..a149c89f4ea 100644 --- a/sampleconfig/orderer.yaml +++ b/sampleconfig/orderer.yaml @@ -67,7 +67,7 @@ General: # LocalMSPID is the identity to register the local MSP material with the MSP # manager. IMPORTANT: The local MSP ID of an orderer needs to match the MSP - # ID of one of the organizations defined in the orderer system channel's + # ID of one of the organizations defined in the orderer system channel's # /Channel/Orderer configuration. The sample organization defined in the # sample configuration provided has an MSP ID of "DEFAULT". LocalMSPID: DEFAULT @@ -221,3 +221,6 @@ Kafka: # following "File" key and specify the file name from which to load the # value of RootCAs. #File: path/to/RootCAs + + # Kafka version of the Kafka cluster brokers (defaults to 0.9.0.1) + Version: