From 95094cd4fdd58fb92c9c6b721504a484ad7cb334 Mon Sep 17 00:00:00 2001 From: Kostas Christidis Date: Thu, 8 Dec 2016 14:41:36 -0500 Subject: [PATCH] [FAB-1359] Drop custom flags for Kafka orderer https://jira.hyperledger.org/browse/FAB-1359 As we are slowly moving to a setup where the behavior of the orderer is controlled by the config options captured in the genesis block and the orderer YAML file (and their overrides via ENV vars), it's time to drop the flag support that the Kafka orderer provided. This changeset then: 1. Removes all flags from the Kafka orderer. 2. Adds a "verbose" option to the YAML file to control logging for the package that we use to interact with the Kafka cluster (sarama). Additionally it: 3. Prefixes all test-related variables with "test" so as to make tests more legible and remove ambiguity. 4. Updates the test config object to match the keys of the actual config object. 5. Adds a helper test envelope constructor function, and moves some test-related functions around in an effort to consolidate files. Change-Id: Id749d0b88f62a4212854e18b8c469d90fe2f6877 Signed-off-by: Kostas Christidis --- bddtests/docker-compose-orderer-kafka.yml | 2 +- .../orderer-1-kafka-1/docker-compose.yml | 2 +- .../orderer-1-kafka-3/docker-compose.yml | 2 +- .../orderer-n-kafka-n/docker-compose.yml | 1 - orderer/kafka/broadcast_test.go | 20 +++--- orderer/kafka/broker_mock_test.go | 10 +-- orderer/kafka/broker_test.go | 4 +- orderer/kafka/client_deliver_test.go | 18 ++--- orderer/kafka/common_test.go | 61 ----------------- orderer/kafka/config_test.go | 68 ++++++++++++++++--- orderer/kafka/consumer_mock_test.go | 4 +- orderer/kafka/consumer_test.go | 8 +-- orderer/kafka/deliver_test.go | 2 +- orderer/kafka/orderer_mock_test.go | 2 +- orderer/kafka/producer_mock_test.go | 2 +- orderer/kafka/producer_test.go | 2 +- orderer/localconfig/config.go | 5 +- orderer/main.go | 13 +--- orderer/orderer.yaml | 20 +++--- 19 files changed, 114 insertions(+), 132 deletions(-) delete mode 100644 orderer/kafka/common_test.go diff --git a/bddtests/docker-compose-orderer-kafka.yml b/bddtests/docker-compose-orderer-kafka.yml index 7847e32b794..6e540a5f6f7 100644 --- a/bddtests/docker-compose-orderer-kafka.yml +++ b/bddtests/docker-compose-orderer-kafka.yml @@ -12,4 +12,4 @@ orderer0: - ORDERER_KAFKA_BROKERS=[kafka0:9092] links: - kafka0 - command: orderer -loglevel debug -verbose true + command: orderer diff --git a/bddtests/environments/orderer-1-kafka-1/docker-compose.yml b/bddtests/environments/orderer-1-kafka-1/docker-compose.yml index dc32fe69485..586e3b72571 100644 --- a/bddtests/environments/orderer-1-kafka-1/docker-compose.yml +++ b/bddtests/environments/orderer-1-kafka-1/docker-compose.yml @@ -16,7 +16,7 @@ services: - ORDERER_GENERAL_ORDERERTYPE=kafka - ORDERER_KAFKA_BROKERS=[kafka0:9092] working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer - command: orderer -loglevel debug -verbose true + command: orderer depends_on: - kafka0 diff --git a/bddtests/environments/orderer-1-kafka-3/docker-compose.yml b/bddtests/environments/orderer-1-kafka-3/docker-compose.yml index 68e6864e129..8e281fc38ac 100644 --- a/bddtests/environments/orderer-1-kafka-3/docker-compose.yml +++ b/bddtests/environments/orderer-1-kafka-3/docker-compose.yml @@ -16,7 +16,7 @@ services: - ORDERER_GENERAL_ORDERERTYPE=kafka - ORDERER_KAFKA_BROKERS=[kafka0:9092,kafka1:9092,kafka2:9092] working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer - command: orderer -loglevel debug -verbose true + command: orderer depends_on: - kafka0 - kafka1 diff --git a/bddtests/environments/orderer-n-kafka-n/docker-compose.yml b/bddtests/environments/orderer-n-kafka-n/docker-compose.yml index c52fc1e1bc8..7d6d991e7c0 100644 --- a/bddtests/environments/orderer-n-kafka-n/docker-compose.yml +++ b/bddtests/environments/orderer-n-kafka-n/docker-compose.yml @@ -11,7 +11,6 @@ services: depends_on: - zookeeper - kafka - command: -loglevel debug -verbose true kafka: build: ../kafka diff --git a/orderer/kafka/broadcast_test.go b/orderer/kafka/broadcast_test.go index 86140ba8919..569249c15c5 100644 --- a/orderer/kafka/broadcast_test.go +++ b/orderer/kafka/broadcast_test.go @@ -29,7 +29,7 @@ import ( func TestBroadcastResponse(t *testing.T) { disk := make(chan []byte) - mb := mockNewBroadcaster(t, testConf, oldestOffset, disk) + mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) defer testClose(t, mb) mbs := newMockBroadcastStream(t) @@ -60,7 +60,7 @@ func TestBroadcastResponse(t *testing.T) { func TestBroadcastBatch(t *testing.T) { disk := make(chan []byte) - mb := mockNewBroadcaster(t, testConf, oldestOffset, disk) + mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) defer testClose(t, mb) mbs := newMockBroadcastStream(t) @@ -113,7 +113,7 @@ func TestBroadcastBatch(t *testing.T) { disk := make(chan []byte) - mb := mockNewBroadcaster(t, testConf, oldestOffset, disk) + mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) defer testClose(t, mb) mbs := newMockBroadcastStream(t) @@ -139,7 +139,7 @@ loop: select { case <-mbs.outgoing: t.Fatal("Client shouldn't have received anything from the orderer") - case <-time.After(testConf.General.BatchTimeout + timePadding): + case <-time.After(testConf.General.BatchTimeout + testTimePadding): break loop // This is the success path } } @@ -154,7 +154,7 @@ func TestBroadcastIncompleteBatch(t *testing.T) { disk := make(chan []byte) - mb := mockNewBroadcaster(t, testConf, oldestOffset, disk) + mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) defer testClose(t, mb) mbs := newMockBroadcastStream(t) @@ -189,7 +189,7 @@ func TestBroadcastIncompleteBatch(t *testing.T) { t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data)) } return - case <-time.After(testConf.General.BatchTimeout + timePadding): + case <-time.After(testConf.General.BatchTimeout + testTimePadding): t.Fatal("Should have received a block by now") } } @@ -206,7 +206,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) { disk := make(chan []byte) - mb := mockNewBroadcaster(t, testConf, oldestOffset, disk) + mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) defer testClose(t, mb) mbs := newMockBroadcastStream(t) @@ -247,7 +247,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) { t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data)) } return - case <-time.After(testConf.General.BatchTimeout + timePadding): + case <-time.After(testConf.General.BatchTimeout + testTimePadding): t.Fatal("Should have received a block by now") } } @@ -256,7 +256,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) { func TestBroadcastBatchAndQuitEarly(t *testing.T) { disk := make(chan []byte) - mb := mockNewBroadcaster(t, testConf, oldestOffset, disk) + mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) defer testClose(t, mb) mbs := newMockBroadcastStream(t) @@ -301,7 +301,7 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) { func TestBroadcastClose(t *testing.T) { errChan := make(chan error) - mb := mockNewBroadcaster(t, testConf, oldestOffset, make(chan []byte)) + mb := mockNewBroadcaster(t, testConf, testOldestOffset, make(chan []byte)) mbs := newMockBroadcastStream(t) go func() { if err := mb.Broadcast(mbs); err != nil { diff --git a/orderer/kafka/broker_mock_test.go b/orderer/kafka/broker_mock_test.go index 6f713caf180..bea5ac76640 100644 --- a/orderer/kafka/broker_mock_test.go +++ b/orderer/kafka/broker_mock_test.go @@ -31,18 +31,18 @@ type mockBrockerImpl struct { } func mockNewBroker(t *testing.T, conf *config.TopLevel) Broker { - mockBroker := sarama.NewMockBroker(t, brokerID) + mockBroker := sarama.NewMockBroker(t, testBrokerID) handlerMap := make(map[string]sarama.MockResponse) // The sarama mock package doesn't allow us to return an error // for invalid offset requests, so we return an offset of -1. // Note that the mock offset responses below imply a broker with - // newestOffset-1 blocks available. Therefore, if you are using this + // testNewestOffset-1 blocks available. Therefore, if you are using this // broker as part of a bigger test where you intend to consume blocks, // make sure that the mockConsumer has been initialized accordingly - // (Set the 'seek' parameter to newestOffset-1.) + // (Set the 'seek' parameter to testNewestOffset-1.) handlerMap["OffsetRequest"] = sarama.NewMockOffsetResponse(t). - SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetOldest, oldestOffset). - SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetNewest, newestOffset) + SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetOldest, testOldestOffset). + SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetNewest, testNewestOffset) mockBroker.SetHandlerByMap(handlerMap) broker := sarama.NewBroker(mockBroker.Addr()) diff --git a/orderer/kafka/broker_test.go b/orderer/kafka/broker_test.go index a29dfc3a320..8717ee8b65a 100644 --- a/orderer/kafka/broker_test.go +++ b/orderer/kafka/broker_test.go @@ -23,8 +23,8 @@ import ( ) func TestBrokerGetOffset(t *testing.T) { - t.Run("oldest", testBrokerGetOffsetFunc(sarama.OffsetOldest, oldestOffset)) - t.Run("newest", testBrokerGetOffsetFunc(sarama.OffsetNewest, newestOffset)) + t.Run("oldest", testBrokerGetOffsetFunc(sarama.OffsetOldest, testOldestOffset)) + t.Run("newest", testBrokerGetOffsetFunc(sarama.OffsetNewest, testNewestOffset)) } func testBrokerGetOffsetFunc(given, expected int64) func(t *testing.T) { diff --git a/orderer/kafka/client_deliver_test.go b/orderer/kafka/client_deliver_test.go index aa7da7b69bd..31d5b017985 100644 --- a/orderer/kafka/client_deliver_test.go +++ b/orderer/kafka/client_deliver_test.go @@ -24,10 +24,10 @@ import ( ) func TestClientDeliverSeekWrong(t *testing.T) { - t.Run("out-of-range-1", testClientDeliverSeekWrongFunc(uint64(oldestOffset)-1, 10)) - t.Run("out-of-range-2", testClientDeliverSeekWrongFunc(uint64(newestOffset), 10)) - t.Run("bad-window-1", testClientDeliverSeekWrongFunc(uint64(oldestOffset), 0)) - t.Run("bad-window-2", testClientDeliverSeekWrongFunc(uint64(oldestOffset), uint64(testConf.General.MaxWindowSize+1))) + t.Run("out-of-range-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset)-1, 10)) + t.Run("out-of-range-2", testClientDeliverSeekWrongFunc(uint64(testNewestOffset), 10)) + t.Run("bad-window-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), 0)) + t.Run("bad-window-2", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), uint64(testConf.General.MaxWindowSize+1))) } func testClientDeliverSeekWrongFunc(seek, window uint64) func(t *testing.T) { @@ -65,7 +65,7 @@ func testClientDeliverSeekWrongFunc(seek, window uint64) func(t *testing.T) { func TestClientDeliverSeek(t *testing.T) { t.Run("oldest", testClientDeliverSeekFunc("oldest", 0, 10, 10)) - t.Run("in-between", testClientDeliverSeekFunc("specific", uint64(middleOffset), 10, 10)) + t.Run("in-between", testClientDeliverSeekFunc("specific", uint64(testMiddleOffset), 10, 10)) t.Run("newest", testClientDeliverSeekFunc("newest", 0, 10, 1)) } @@ -104,8 +104,8 @@ func testClientDeliverSeekFunc(label string, seek, window uint64, expected int) } func TestClientDeliverAckWrong(t *testing.T) { - t.Run("out-of-range-ack-1", testClientDeliverAckWrongFunc(uint64(middleOffset)-2)) - t.Run("out-of-range-ack-2", testClientDeliverAckWrongFunc(uint64(newestOffset))) + t.Run("out-of-range-ack-1", testClientDeliverAckWrongFunc(uint64(testMiddleOffset)-2)) + t.Run("out-of-range-ack-2", testClientDeliverAckWrongFunc(uint64(testNewestOffset))) } func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) { @@ -123,7 +123,7 @@ func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) { } }() - mds.incoming <- testNewSeekMessage("specific", uint64(middleOffset), 10) + mds.incoming <- testNewSeekMessage("specific", uint64(testMiddleOffset), 10) mds.incoming <- testNewAckMessage(ack) for { select { @@ -141,7 +141,7 @@ func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) { } func TestClientDeliverAck(t *testing.T) { - t.Run("in-between", testClientDeliverAckFunc("specific", uint64(middleOffset), 10, 10, 2*10)) + t.Run("in-between", testClientDeliverAckFunc("specific", uint64(testMiddleOffset), 10, 10, 2*10)) t.Run("newest", testClientDeliverAckFunc("newest", 0, 10, 1, 1)) } diff --git a/orderer/kafka/common_test.go b/orderer/kafka/common_test.go deleted file mode 100644 index 894077a38d5..00000000000 --- a/orderer/kafka/common_test.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - - ab "github.com/hyperledger/fabric/protos/orderer" -) - -func testClose(t *testing.T, x Closeable) { - if err := x.Close(); err != nil { - t.Fatal("Cannot close mock resource:", err) - } -} - -func testNewSeekMessage(startLabel string, seekNo, windowNo uint64) *ab.DeliverUpdate { - var startVal ab.SeekInfo_StartType - switch startLabel { - case "oldest": - startVal = ab.SeekInfo_OLDEST - case "newest": - startVal = ab.SeekInfo_NEWEST - default: - startVal = ab.SeekInfo_SPECIFIED - - } - return &ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Seek{ - Seek: &ab.SeekInfo{ - Start: startVal, - SpecifiedNumber: seekNo, - WindowSize: windowNo, - }, - }, - } -} - -func testNewAckMessage(ackNo uint64) *ab.DeliverUpdate { - return &ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Acknowledgement{ - Acknowledgement: &ab.Acknowledgement{ - Number: ackNo, - }, - }, - } -} diff --git a/orderer/kafka/config_test.go b/orderer/kafka/config_test.go index 0faf8861261..6da59bd841e 100644 --- a/orderer/kafka/config_test.go +++ b/orderer/kafka/config_test.go @@ -17,38 +17,88 @@ limitations under the License. package kafka import ( + "testing" "time" "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/localconfig" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" ) var ( - brokerID = int32(0) - oldestOffset = int64(100) // The oldest block available on the broker - newestOffset = int64(1100) // The offset that will be assigned to the next block - middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle + testBrokerID = int32(0) + testOldestOffset = int64(100) // The oldest block available on the broker + testNewestOffset = int64(1100) // The offset that will be assigned to the next block + testMiddleOffset = (testOldestOffset + testNewestOffset - 1) / 2 // Just an offset in the middle // Amount of time to wait for block processing when doing time-based tests // We generally want this value to be as small as possible so as to make tests execute faster // But this may have to be bumped up in slower machines - timePadding = 200 * time.Millisecond + testTimePadding = 200 * time.Millisecond ) var testConf = &config.TopLevel{ General: config.General{ OrdererType: "kafka", + LedgerType: "ram", BatchTimeout: 500 * time.Millisecond, BatchSize: 100, QueueSize: 100, MaxWindowSize: 100, ListenAddress: "127.0.0.1", ListenPort: 7050, + GenesisMethod: "static", }, Kafka: config.Kafka{ - Brokers: []string{"127.0.0.1:9092"}, - Topic: "test", - PartitionID: 0, - Version: sarama.V0_9_0_1, + Brokers: []string{"127.0.0.1:9092"}, + Retry: config.Retry{ + Period: 3 * time.Second, + Stop: 60 * time.Second, + }, + Verbose: false, + Version: sarama.V0_9_0_1, }, } + +func testClose(t *testing.T, x Closeable) { + if err := x.Close(); err != nil { + t.Fatal("Cannot close mock resource:", err) + } +} + +func newTestEnvelope(content string) *cb.Envelope { + return &cb.Envelope{Payload: []byte(content)} +} + +func testNewSeekMessage(startLabel string, seekNo, windowNo uint64) *ab.DeliverUpdate { + var startVal ab.SeekInfo_StartType + switch startLabel { + case "oldest": + startVal = ab.SeekInfo_OLDEST + case "newest": + startVal = ab.SeekInfo_NEWEST + default: + startVal = ab.SeekInfo_SPECIFIED + + } + return &ab.DeliverUpdate{ + Type: &ab.DeliverUpdate_Seek{ + Seek: &ab.SeekInfo{ + Start: startVal, + SpecifiedNumber: seekNo, + WindowSize: windowNo, + }, + }, + } +} + +func testNewAckMessage(ackNo uint64) *ab.DeliverUpdate { + return &ab.DeliverUpdate{ + Type: &ab.DeliverUpdate_Acknowledgement{ + Acknowledgement: &ab.Acknowledgement{ + Number: ackNo, + }, + }, + } +} diff --git a/orderer/kafka/consumer_mock_test.go b/orderer/kafka/consumer_mock_test.go index 866783ab5c3..1e1207f2ecc 100644 --- a/orderer/kafka/consumer_mock_test.go +++ b/orderer/kafka/consumer_mock_test.go @@ -64,7 +64,7 @@ func mockNewConsumer(t *testing.T, conf *config.TopLevel, seek int64) (Consumer, t: t, } // Stop-gap hack until #745 is resolved: - if seek >= oldestOffset && seek <= (newestOffset-1) { + if seek >= testOldestOffset && seek <= (testNewestOffset-1) { mc.testFillWithBlocks(seek - 1) // Prepare the consumer so that the next Recv gives you block "seek" } else { err = fmt.Errorf("Out of range seek number given to consumer") @@ -73,7 +73,7 @@ func mockNewConsumer(t *testing.T, conf *config.TopLevel, seek int64) (Consumer, } func (mc *mockConsumerImpl) Recv() <-chan *sarama.ConsumerMessage { - if mc.consumedOffset >= newestOffset-1 { + if mc.consumedOffset >= testNewestOffset-1 { return nil } mc.consumedOffset++ diff --git a/orderer/kafka/consumer_test.go b/orderer/kafka/consumer_test.go index 269b40ae813..737a5c3bb4d 100644 --- a/orderer/kafka/consumer_test.go +++ b/orderer/kafka/consumer_test.go @@ -19,7 +19,7 @@ package kafka import "testing" func TestConsumerInitWrong(t *testing.T) { - cases := []int64{oldestOffset - 1, newestOffset} + cases := []int64{testOldestOffset - 1, testNewestOffset} for _, seek := range cases { mc, err := mockNewConsumer(t, testConf, seek) @@ -31,9 +31,9 @@ func TestConsumerInitWrong(t *testing.T) { } func TestConsumerRecv(t *testing.T) { - t.Run("oldest", testConsumerRecvFunc(oldestOffset, oldestOffset)) - t.Run("in-between", testConsumerRecvFunc(middleOffset, middleOffset)) - t.Run("newest", testConsumerRecvFunc(newestOffset-1, newestOffset-1)) + t.Run("oldest", testConsumerRecvFunc(testOldestOffset, testOldestOffset)) + t.Run("in-between", testConsumerRecvFunc(testMiddleOffset, testMiddleOffset)) + t.Run("newest", testConsumerRecvFunc(testNewestOffset-1, testNewestOffset-1)) } func testConsumerRecvFunc(given, expected int64) func(t *testing.T) { diff --git a/orderer/kafka/deliver_test.go b/orderer/kafka/deliver_test.go index 77fa74e7d27..34b84d437f9 100644 --- a/orderer/kafka/deliver_test.go +++ b/orderer/kafka/deliver_test.go @@ -27,7 +27,7 @@ func TestDeliverMultipleClients(t *testing.T) { start string seek, window uint64 }{ - {"oldest", 0, 10}, {"newest", 0, 10}, {"specific", uint64(middleOffset), 10}, + {"oldest", 0, 10}, {"newest", 0, 10}, {"specific", uint64(testMiddleOffset), 10}, } expected := 21 // 10 + 1 + 10 diff --git a/orderer/kafka/orderer_mock_test.go b/orderer/kafka/orderer_mock_test.go index a0da54a08c4..129cec0caaf 100644 --- a/orderer/kafka/orderer_mock_test.go +++ b/orderer/kafka/orderer_mock_test.go @@ -27,7 +27,7 @@ import ( func mockNew(t *testing.T, conf *config.TopLevel, disk chan []byte) Orderer { return &serverImpl{ - broadcaster: mockNewBroadcaster(t, conf, oldestOffset, disk), + broadcaster: mockNewBroadcaster(t, conf, testOldestOffset, disk), deliverer: mockNewDeliverer(t, conf), } } diff --git a/orderer/kafka/producer_mock_test.go b/orderer/kafka/producer_mock_test.go index dbdb0a15df0..b567661cb57 100644 --- a/orderer/kafka/producer_mock_test.go +++ b/orderer/kafka/producer_mock_test.go @@ -43,7 +43,7 @@ func mockNewProducer(t *testing.T, conf *config.TopLevel, seek int64, disk chan producedOffset: 0, t: t, } - if seek >= oldestOffset && seek <= (newestOffset-1) { + if seek >= testOldestOffset && seek <= (testNewestOffset-1) { mp.testFillWithBlocks(seek - 1) // Prepare the producer so that the next Send gives you block "seek" } else { panic(fmt.Errorf("Out of range seek number given to producer")) diff --git a/orderer/kafka/producer_test.go b/orderer/kafka/producer_test.go index c14e40498ca..2ec33cbbd8f 100644 --- a/orderer/kafka/producer_test.go +++ b/orderer/kafka/producer_test.go @@ -19,6 +19,6 @@ package kafka import "testing" func TestProducer(t *testing.T) { - mp := mockNewProducer(t, testConf, middleOffset, make(chan []byte)) + mp := mockNewProducer(t, testConf, testMiddleOffset, make(chan []byte)) defer testClose(t, mp) } diff --git a/orderer/localconfig/config.go b/orderer/localconfig/config.go index ec3d17ea18f..470b0b3ea41 100644 --- a/orderer/localconfig/config.go +++ b/orderer/localconfig/config.go @@ -70,11 +70,12 @@ type FileLedger struct { // Kafka contains config for the Kafka orderer type Kafka struct { - Brokers []string + Brokers []string // TODO This should be deprecated and this information should be stored in the config block Topic string PartitionID int32 Retry Retry - Version sarama.KafkaVersion // TODO For now set this in code + Verbose bool + Version sarama.KafkaVersion } // Retry contains config for the reconnection attempts to the Kafka brokers diff --git a/orderer/main.go b/orderer/main.go index 4f113ad2e06..4470ced7f68 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -17,7 +17,6 @@ limitations under the License. package main import ( - "flag" "fmt" "io/ioutil" "log" @@ -136,17 +135,7 @@ func launchKafka(conf *config.TopLevel) { var kafkaVersion = sarama.V0_9_0_1 // TODO Ideally we'd set this in the YAML file but its type makes this impossible conf.Kafka.Version = kafkaVersion - var loglevel string - var verbose bool - - flag.StringVar(&loglevel, "loglevel", "info", - "Set the logging level for the orderer. (Suggested values: info, debug)") - flag.BoolVar(&verbose, "verbose", false, - "Turn on logging for the Kafka library. (Default: \"false\")") - flag.Parse() - - kafka.SetLogLevel(loglevel) - if verbose { + if conf.Kafka.Verbose { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Lshortfile) } diff --git a/orderer/orderer.yaml b/orderer/orderer.yaml index 0a89c9eff05..103756c325f 100644 --- a/orderer/orderer.yaml +++ b/orderer/orderer.yaml @@ -24,12 +24,12 @@ General: # Batch Size: The maximum number of messages to permit in a batch BatchSize: 10 - # Queue Size: The maximum number of messages to allow pending from a gRPC client - # This option is currently ignored for the Kafka OrdererType. + # Queue Size: The maximum number of messages to allow pending from a gRPC + # client. This option is currently ignored for the Kafka OrdererType. QueueSize: 10 # Max Window Size: The maximum number of messages to for the orderer Deliver - # to allow before acknowledgement must be received from the client + # to allow before acknowledgement must be received from the client. MaxWindowSize: 1000 # Listen address: The IP on which to bind to listen @@ -70,7 +70,7 @@ RAMLedger: FileLedger: # Location: The directory to store the blocks in - # NOTE: if this unset, a temporary location will be chosen using + # NOTE: If this is unset, a temporary location will be chosen using # the prefix specified by Prefix Location: @@ -95,12 +95,16 @@ Kafka: # Topic: The Kafka topic the orderer writes to/reads from Topic: test - # Partition ID: The partition of the Kafka topic the orderer writes to/reads from + # Partition ID: The partition of the Kafka topic the orderer writes to/reads + # from PartitionID: 0 - # Retry: What to do if none of the Kafka brokers are available. + # Retry: What to do if none of the Kafka brokers are available Retry: - # The producer should attempt to reconnect every . + # The producer should attempt to reconnect every Period: 3s - # Panic if has elapsed and no connection has been established. + # Panic if has elapsed and no connection has been established Stop: 60s + + # Verbose: Turn on logging for the Kafka library + Verbose: false