diff --git a/orderer/common/blockcutter/blockcutter.go b/orderer/common/blockcutter/blockcutter.go index 31b0bcc981f..f838001e006 100644 --- a/orderer/common/blockcutter/blockcutter.go +++ b/orderer/common/blockcutter/blockcutter.go @@ -30,11 +30,11 @@ var logger = logging.MustGetLogger("orderer/common/blockcutter") type Receiver interface { // Ordered should be invoked sequentially as messages are ordered // If the current message valid, and no batches need to be cut: - // - Ordered will return nil, nil, and true (indicating ok). + // - Ordered will return nil, nil, and true (indicating valid Tx). // If the current message valid, and batches need to be cut: - // - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok). + // - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating valid Tx). // If the current message is invalid: - // - Ordered will return nil, nil, and false (to indicate not ok). + // - Ordered will return nil, nil, and false (to indicate invalid Tx). // // Given a valid message, if the current message needs to be isolated (as determined during filtering). // - Ordered will return: @@ -45,7 +45,9 @@ type Receiver interface { // - The current message needs to be isolated (as determined during filtering). // - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes. // - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount. - Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) + // + // In any case, `pending` is set to true if there are still messages pending in the receiver after cutting the block. + Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, committers [][]filter.Committer, validTx bool, pending bool) // Cut returns the current batch and starts a new one Cut() ([]*cb.Envelope, []filter.Committer) @@ -69,29 +71,34 @@ func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet // Ordered should be invoked sequentially as messages are ordered // If the current message valid, and no batches need to be cut: -// - Ordered will return nil, nil, and true (indicating ok). +// - Ordered will return nil, nil, true (indicating valid tx) and true (indicating there are pending messages). // If the current message valid, and batches need to be cut: -// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok). +// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating valid tx). // If the current message is invalid: -// - Ordered will return nil, nil, and false (to indicate not ok). +// - Ordered will return nil, nil, and false (to indicate invalid tx). // // Given a valid message, if the current message needs to be isolated (as determined during filtering). // - Ordered will return: // * The pending batch of (if not empty), and a second batch containing only the isolated message. // * The corresponding batches of committers. -// * true (indicating ok). +// * true (indicating valid tx). // Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if: // - The current message needs to be isolated (as determined during filtering). // - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes. // - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount. -func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) { +// +// In any case, `pending` is set to true if there are still messages pending in the receiver after cutting the block. +func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, committerBatches [][]filter.Committer, validTx bool, pending bool) { // The messages must be filtered a second time in case configuration has changed since the message was received committer, err := r.filters.Apply(msg) if err != nil { logger.Debugf("Rejecting message: %s", err) - return nil, nil, false + return // We don't bother to determine `pending` here as it's not processed in error case } + // message is valid + validTx = true + messageSizeBytes := messageSizeBytes(msg) if committer.Isolated() || messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes { @@ -102,9 +109,6 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, r.sharedConfigManager.BatchSize().PreferredMaxBytes) } - messageBatches := [][]*cb.Envelope{} - committerBatches := [][]filter.Committer{} - // cut pending batch, if it has any messages if len(r.pendingBatch) > 0 { messageBatch, committerBatch := r.Cut() @@ -116,12 +120,9 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi messageBatches = append(messageBatches, []*cb.Envelope{msg}) committerBatches = append(committerBatches, []filter.Committer{committer}) - return messageBatches, committerBatches, true + return } - messageBatches := [][]*cb.Envelope{} - committerBatches := [][]filter.Committer{} - messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes if messageWillOverflowBatchSizeBytes { @@ -136,21 +137,17 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi r.pendingBatch = append(r.pendingBatch, msg) r.pendingBatchSizeBytes += messageSizeBytes r.pendingCommitters = append(r.pendingCommitters, committer) + pending = true if uint32(len(r.pendingBatch)) >= r.sharedConfigManager.BatchSize().MaxMessageCount { logger.Debugf("Batch size met, cutting batch") messageBatch, committerBatch := r.Cut() messageBatches = append(messageBatches, messageBatch) committerBatches = append(committerBatches, committerBatch) + pending = false } - // return nils instead of empty slices - if len(messageBatches) == 0 { - return nil, nil, true - } - - return messageBatches, committerBatches, true - + return } // Cut returns the current batch and starts a new one diff --git a/orderer/common/blockcutter/blockcutter_test.go b/orderer/common/blockcutter/blockcutter_test.go index 53834edc987..1604a587984 100644 --- a/orderer/common/blockcutter/blockcutter_test.go +++ b/orderer/common/blockcutter/blockcutter_test.go @@ -25,6 +25,7 @@ import ( cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" logging "github.com/op/go-logging" + "github.com/stretchr/testify/assert" ) func init() { @@ -85,26 +86,19 @@ func TestNormalBatch(t *testing.T) { preferredMaxBytes := uint32(100) r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - batches, committers, ok := r.Ordered(goodTx) + batches, committers, ok, pending := r.Ordered(goodTx) - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch") - } - - if !ok { - t.Fatalf("Should have enqueued message into batch") - } + assert.Nil(t, batches, "Should not have created batch") + assert.Nil(t, committers, "Should not have created batch") + assert.True(t, ok, "Should have enqueued message into batch") + assert.True(t, pending, "Should have pending messages") - batches, committers, ok = r.Ordered(goodTx) - - if batches == nil || committers == nil { - t.Fatalf("Should have created batch") - } - - if !ok { - t.Fatalf("Should have enqueued second message into batch") - } + batches, committers, ok, pending = r.Ordered(goodTx) + assert.Len(t, batches, 1, "Should have created 1 message batch, got %d", len(batches)) + assert.Len(t, committers, 1, "Should have created 1 committer batch, got %d", len(committers)) + assert.True(t, ok, "Should have enqueued message into batch") + assert.False(t, pending, "Should not have pending messages") } func TestBadMessageInBatch(t *testing.T) { @@ -114,35 +108,24 @@ func TestBadMessageInBatch(t *testing.T) { preferredMaxBytes := uint32(100) r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - batches, committers, ok := r.Ordered(badTx) + batches, committers, ok, _ := r.Ordered(badTx) - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch") - } + assert.Nil(t, batches, "Should not have created batch") + assert.Nil(t, committers, "Should not have created batch") + assert.False(t, ok, "Should not have enqueued bad message into batch") - if ok { - t.Fatalf("Should not have enqueued bad message into batch") - } + batches, committers, ok, pending := r.Ordered(goodTx) - batches, committers, ok = r.Ordered(goodTx) + assert.Nil(t, batches, "Should not have created batch") + assert.Nil(t, committers, "Should not have created batch") + assert.True(t, ok, "Should have enqueued good message into batch") + assert.True(t, pending, "Should have pending messages") - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch") - } - - if !ok { - t.Fatalf("Should have enqueued good message into batch") - } - - batches, committers, ok = r.Ordered(badTx) + batches, committers, ok, _ = r.Ordered(badTx) - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch") - } - - if ok { - t.Fatalf("Should not have enqueued second bad message into batch") - } + assert.Nil(t, batches, "Should not have created batch") + assert.Nil(t, committers, "Should not have created batch") + assert.False(t, ok, "Should not have enqueued second bad message into batch") } func TestUnmatchedMessageInBatch(t *testing.T) { @@ -152,35 +135,24 @@ func TestUnmatchedMessageInBatch(t *testing.T) { preferredMaxBytes := uint32(100) r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - batches, committers, ok := r.Ordered(unmatchedTx) - - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch") - } + batches, committers, ok, _ := r.Ordered(unmatchedTx) - if ok { - t.Fatalf("Should not have enqueued unmatched message into batch") - } + assert.Nil(t, batches, "Should not have created batch") + assert.Nil(t, committers, "Should not have created batch") + assert.False(t, ok, "Should not have enqueued unmatched message into batch") - batches, committers, ok = r.Ordered(goodTx) + batches, committers, ok, pending := r.Ordered(goodTx) - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch") - } - - if !ok { - t.Fatalf("Should have enqueued good message into batch") - } + assert.Nil(t, batches, "Should not have created batch") + assert.Nil(t, committers, "Should not have created batch") + assert.True(t, ok, "Should have enqueued good message into batch") + assert.True(t, pending, "Should have pending messages") - batches, committers, ok = r.Ordered(unmatchedTx) + batches, committers, ok, _ = r.Ordered(unmatchedTx) - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch from unmatched message") - } - - if ok { - t.Fatalf("Should not have enqueued second bad message into batch") - } + assert.Nil(t, batches, "Should not have created batch from unmatched message") + assert.Nil(t, committers, "Should not have created batch") + assert.False(t, ok, "Should not have enqueued second unmatched message into batch") } func TestIsolatedEmptyBatch(t *testing.T) { @@ -190,23 +162,15 @@ func TestIsolatedEmptyBatch(t *testing.T) { preferredMaxBytes := uint32(100) r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - batches, committers, ok := r.Ordered(isolatedTx) + batches, committers, ok, pending := r.Ordered(isolatedTx) - if !ok { - t.Fatalf("Should have enqueued isolated message") - } - - if len(batches) != 1 || len(committers) != 1 { - t.Fatalf("Should created new batch, got %d and %d", len(batches), len(committers)) - } - - if len(batches[0]) != 1 || len(committers[0]) != 1 { - t.Fatalf("Should have had one isolatedTx in the second batch got %d and %d", len(batches[1]), len(committers[0])) - } - - if !bytes.Equal(batches[0][0].Payload, isolatedTx.Payload) { - t.Fatalf("Should have had the isolated tx in the first batch") - } + assert.Len(t, batches, 1, "Should created 1 new message batch, got %d", len(batches)) + assert.Len(t, batches[0], 1, "Should have had one isolatedTx in the message batch, got %d", len(batches[0])) + assert.Len(t, committers, 1, "Should created 1 new committer batch, got %d", len(committers)) + assert.Len(t, committers[0], 1, "Should have had one isolatedTx in the committer batch, got %d", len(committers[0])) + assert.True(t, ok, "Should have enqueued isolated message into batch") + assert.False(t, pending, "Should not have pending messages") + assert.Equal(t, isolatedTx.Payload, batches[0][0].Payload, "Should have had the isolated tx in the first batch") } func TestIsolatedPartialBatch(t *testing.T) { @@ -216,41 +180,25 @@ func TestIsolatedPartialBatch(t *testing.T) { preferredMaxBytes := uint32(100) r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - batches, committers, ok := r.Ordered(goodTx) - - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch") - } - - if !ok { - t.Fatalf("Should have enqueued good message into batch") - } - - batches, committers, ok = r.Ordered(isolatedTx) - - if !ok { - t.Fatalf("Should have enqueued isolated message") - } - - if len(batches) != 2 || len(committers) != 2 { - t.Fatalf("Should have created two batches, got %d and %d", len(batches), len(committers)) - } - - if len(batches[0]) != 1 || len(committers[0]) != 1 { - t.Fatalf("Should have had one normal tx in the first batch got %d and %d committers", len(batches[0]), len(committers[0])) - } - - if !bytes.Equal(batches[0][0].Payload, goodTx.Payload) { - t.Fatalf("Should have had the normal tx in the first batch") - } - - if len(batches[1]) != 1 || len(committers[1]) != 1 { - t.Fatalf("Should have had one isolated tx in the second batch got %d and %d committers", len(batches[1]), len(committers[1])) - } - - if !bytes.Equal(batches[1][0].Payload, isolatedTx.Payload) { - t.Fatalf("Should have had the isolated tx in the second batch") - } + batches, committers, ok, pending := r.Ordered(goodTx) + + assert.Nil(t, batches, "Should not have created batch") + assert.Nil(t, committers, "Should not have created batch") + assert.True(t, ok, "Should have enqueued message into batch") + assert.True(t, pending, "Should have pending messages") + + batches, committers, ok, pending = r.Ordered(isolatedTx) + + assert.Len(t, batches, 2, "Should created 2 new message batch, got %d", len(batches)) + assert.Len(t, batches[0], 1, "Should have had one goodTx in the first message batch, got %d", len(batches[0])) + assert.Len(t, batches[1], 1, "Should have had one isolatedTx in the second message batch, got %d", len(batches[1])) + assert.Len(t, committers, 2, "Should created 2 new committer batch, got %d", len(committers)) + assert.Len(t, committers[0], 1, "Should have had 1 committer in the first committer batch, got %d", len(committers[0])) + assert.Len(t, committers[1], 1, "Should have had 1 committer in the second committer batch, got %d", len(committers[1])) + assert.True(t, ok, "Should have enqueued isolated message into batch") + assert.False(t, pending, "Should not have pending messages") + assert.Equal(t, goodTx.Payload, batches[0][0].Payload, "Should have had the good tx in the first batch") + assert.Equal(t, isolatedTx.Payload, batches[1][0].Payload, "Should have had the isolated tx in the second batch") } func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) { @@ -268,44 +216,31 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) { // enqueue 9 messages for i := 0; i < 9; i++ { - batches, committers, ok := r.Ordered(goodTx) - if batches != nil || committers != nil { - t.Fatalf("Should not have created batch") - } - if !ok { - t.Fatalf("Should have enqueued message into batch") - } - } - - // next message should create batch - batches, committers, ok := r.Ordered(goodTx) + batches, committers, ok, pending := r.Ordered(goodTx) - if batches == nil || committers == nil { - t.Fatalf("Should have created batch") + assert.Nil(t, batches, "Should not have created batch") + assert.Nil(t, committers, "Should not have created batch") + assert.True(t, ok, "Should have enqueued message into batch") + assert.True(t, pending, "Should have pending messages") } - if len(batches) != 1 || len(committers) != 1 { - t.Fatalf("Should have created one batch, got %d and %d", len(batches), len(committers)) - } + // next message should create batch + batches, committers, ok, pending := r.Ordered(goodTx) - if len(batches[0]) != 9 || len(committers[0]) != 9 { - t.Fatalf("Should have had nine normal tx in the batch got %d and %d committers", len(batches[0]), len(committers[0])) - } - if !ok { - t.Fatalf("Should have enqueued the tenth message into batch") - } + assert.Len(t, batches, 1, "Should have created 1 message batch, got %d", len(batches)) + assert.Len(t, batches[0], 9, "Should have had nine normal tx in the message batch, got %d", len(batches[0])) + assert.Len(t, committers, 1, "Should have created 1 committer batch, got %d", len(committers)) + assert.Len(t, committers[0], 9, "Should have had nine committers in the committer batch, got %d", len(committers[0])) + assert.True(t, ok, "Should have enqueued message into batch") + assert.True(t, pending, "Should still have pending messages") // force a batch cut messageBatch, committerBatch := r.Cut() - if messageBatch == nil || committerBatch == nil { - t.Fatalf("Should have created batch") - } - - if len(messageBatch) != 1 || len(committerBatch) != 1 { - t.Fatalf("Should have had one tx in the batch, got %d and %d", len(batches), len(committers)) - } - + assert.NotNil(t, messageBatch, "Should have created message batch") + assert.Len(t, messageBatch, 1, "Should have had 1 tx in the batch, got %d", len(messageBatch)) + assert.NotNil(t, committerBatch, "Should have created committer batch") + assert.Len(t, committerBatch, 1, "Should have had 1 committer in the committer batch, got %d", len(committerBatch)) } func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) { @@ -322,21 +257,12 @@ func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) { r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}}, filters) // submit large message - batches, committers, ok := r.Ordered(goodTxLarge) - - if batches == nil || committers == nil { - t.Fatalf("Should have created batch") - } - - if len(batches) != 1 || len(committers) != 1 { - t.Fatalf("Should have created one batch, got %d and %d", len(batches), len(committers)) - } - - if len(batches[0]) != 1 || len(committers[0]) != 1 { - t.Fatalf("Should have had one normal tx in the batch got %d and %d committers", len(batches[0]), len(committers[0])) - } - if !ok { - t.Fatalf("Should have enqueued the message into batch") - } - + batches, committers, ok, pending := r.Ordered(goodTxLarge) + + assert.Len(t, batches, 1, "Should have created 1 message batch, got %d", len(batches)) + assert.Len(t, batches[0], 1, "Should have had 1 normal tx in the message batch, got %d", len(batches[0])) + assert.Len(t, committers, 1, "Should have created 1 committer batch, got %d", len(committers)) + assert.Len(t, committers[0], 1, "Should have had 1 committer in the committer batch, got %d", len(committers[0])) + assert.True(t, ok, "Should have enqueued message into batch") + assert.False(t, pending, "Should not have pending messages") } diff --git a/orderer/kafka/chain.go b/orderer/kafka/chain.go index 15526d3c024..755b86f7369 100644 --- a/orderer/kafka/chain.go +++ b/orderer/kafka/chain.go @@ -371,25 +371,31 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.C // This shouldn't happen, it should be filtered at ingress return fmt.Errorf("unmarshal/%s", err) } - batches, committers, ok := support.BlockCutter().Ordered(env) - logger.Debugf("[channel: %s] Ordering results: items in batch = %d, ok = %v", support.ChainID(), len(batches), ok) + batches, committers, ok, pending := support.BlockCutter().Ordered(env) + logger.Debugf("[channel: %s] Ordering results: items in batch = %d, ok = %v, pending = %v", support.ChainID(), len(batches), ok, pending) if ok && len(batches) == 0 && *timer == nil { *timer = time.After(support.SharedConfig().BatchTimeout()) logger.Debugf("[channel: %s] Just began %s batch timer", support.ChainID(), support.SharedConfig().BatchTimeout().String()) return nil } + + offset := receivedOffset + if pending || len(batches) == 2 { + // If the newest envelope is not encapsulated into the first batch, + // the LastOffsetPersisted of first block should be receivedOffset-1. + offset-- + } + // If !ok, batches == nil, so this will be skipped for i, batch := range batches { - // If more than one batch is produced, exactly 2 batches are produced. - // The receivedOffset for the first batch is one less than the supplied - // offset to this function. - offset := receivedOffset - int64(len(batches)-i-1) block := support.CreateNextBlock(batch) encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: offset}) support.WriteBlock(block, committers[i], encodedLastOffsetPersisted) *lastCutBlockNumber++ logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", support.ChainID(), *lastCutBlockNumber, offset) + offset++ } + if len(batches) > 0 { *timer = nil } diff --git a/orderer/kafka/chain_test.go b/orderer/kafka/chain_test.go index cb816d5604e..5ac13eb001e 100644 --- a/orderer/kafka/chain_test.go +++ b/orderer/kafka/chain_test.go @@ -659,8 +659,6 @@ func TestSendTimeToCut(t *testing.T) { } func TestProcessMessagesToBlocks(t *testing.T) { - subtestIndex := -1 // Used to calculate the right offset at each subtest - mockBroker := sarama.NewMockBroker(t, 0) defer func() { mockBroker.Close() }() @@ -684,8 +682,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") t.Run("ReceiveConnect", func(t *testing.T) { - subtestIndex++ - errorChan := make(chan struct{}) close(errorChan) haltChan := make(chan struct{}) @@ -727,8 +723,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveRegularWithError", func(t *testing.T) { - subtestIndex++ - errorChan := make(chan struct{}) close(errorChan) haltChan := make(chan struct{}) @@ -770,8 +764,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveRegularAndQueue", func(t *testing.T) { - subtestIndex++ - errorChan := make(chan struct{}) close(errorChan) haltChan := make(chan struct{}) @@ -827,8 +819,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveRegularAndCutBlock", func(t *testing.T) { - subtestIndex++ - errorChan := make(chan struct{}) close(errorChan) haltChan := make(chan struct{}) @@ -887,8 +877,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveTwoRegularAndCutTwoBlocks", func(t *testing.T) { - subtestIndex++ - if testing.Short() { t.Skip("Skipping test in short mode") } @@ -933,6 +921,7 @@ func TestProcessMessagesToBlocks(t *testing.T) { var block1, block2 *cb.Block // This is the first wrappedMessage that the for-loop will process + block1Offset := mpc.HighWaterMarkOffset() mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return logger.Debugf("Mock blockcutter's Ordered call has returned") @@ -940,6 +929,7 @@ func TestProcessMessagesToBlocks(t *testing.T) { mockSupport.BlockCutterVal.IsolatedTx = true // This is the first wrappedMessage that the for-loop will process + block2Offset := mpc.HighWaterMarkOffset() mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) mockSupport.BlockCutterVal.Block <- struct{}{} logger.Debugf("Mock blockcutter's Ordered call has returned for the second time") @@ -961,19 +951,100 @@ func TestProcessMessagesToBlocks(t *testing.T) { logger.Debug("haltChan closed") <-done - expectedOffset := newestOffset + int64(subtestIndex) // TODO Hacky, revise eventually - assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") assert.Equal(t, uint64(2), counts[indexRecvPass], "Expected 2 messages received and unmarshaled") assert.Equal(t, uint64(2), counts[indexProcessRegularPass], "Expected 2 REGULAR messages processed") assert.Equal(t, lastCutBlockNumber+2, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by two") - assert.Equal(t, expectedOffset+1, extractEncodedOffset(block1.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", newestOffset+1) - assert.Equal(t, expectedOffset+2, extractEncodedOffset(block2.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", newestOffset+2) + assert.Equal(t, block1Offset, extractEncodedOffset(block1.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", block1Offset) + assert.Equal(t, block2Offset, extractEncodedOffset(block2.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", block2Offset) }) - t.Run("ReceiveRegularAndSendTimeToCut", func(t *testing.T) { - subtestIndex++ + t.Run("SecondTxOverflows", func(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode") + } + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) + + lastCutBlockNumber := uint64(3) + + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + SharedConfigVal: &mockconfig.Orderer{ + BatchTimeoutVal: longTimeout, + }, + } + defer close(mockSupport.BlockCutterVal.Block) + + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, + + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, + + errorChan: errorChan, + haltChan: haltChan, + } + + var counts []uint64 + done := make(chan struct{}) + + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() + + var block1, block2 *cb.Block + + block1LastOffset := mpc.HighWaterMarkOffset() + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) + mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return + + // Set CutAncestors to true so that second message overflows receiver batch + mockSupport.BlockCutterVal.CutAncestors = true + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) + mockSupport.BlockCutterVal.Block <- struct{}{} + + select { + case block1 = <-mockSupport.Blocks: // Let the `mockConsenterSupport.WriteBlock` proceed + case <-time.After(shortTimeout): + logger.Fatalf("Did not receive a block from the blockcutter as expected") + } + + // Set CutNext to true to flush all pending messages + mockSupport.BlockCutterVal.CutAncestors = false + mockSupport.BlockCutterVal.CutNext = true + block2LastOffset := mpc.HighWaterMarkOffset() + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) + mockSupport.BlockCutterVal.Block <- struct{}{} + + select { + case block2 = <-mockSupport.Blocks: + case <-time.After(shortTimeout): + logger.Fatalf("Did not receive a block from the blockcutter as expected") + } + + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done + + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(3), counts[indexRecvPass], "Expected 2 messages received and unmarshaled") + assert.Equal(t, uint64(3), counts[indexProcessRegularPass], "Expected 2 REGULAR messages processed") + assert.Equal(t, lastCutBlockNumber+2, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by two") + assert.Equal(t, block1LastOffset, extractEncodedOffset(block1.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", block1LastOffset) + assert.Equal(t, block2LastOffset, extractEncodedOffset(block2.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in second block to be %d", block2LastOffset) + }) + + t.Run("ReceiveRegularAndSendTimeToCut", func(t *testing.T) { t.Skip("Skipping test as it introduces a race condition") // NB We haven't set a handlermap for the mock broker so we need to set @@ -1048,8 +1119,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { // - Consumer.Retry.Backoff // - Metadata.Retry.Max - subtestIndex++ - t.Skip("Skipping test as it introduces a race condition") // Exact same test as ReceiveRegularAndSendTimeToCut. @@ -1120,8 +1189,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveTimeToCutProper", func(t *testing.T) { - subtestIndex++ - errorChan := make(chan struct{}) close(errorChan) haltChan := make(chan struct{}) @@ -1181,8 +1248,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveTimeToCutZeroBatch", func(t *testing.T) { - subtestIndex++ - errorChan := make(chan struct{}) close(errorChan) haltChan := make(chan struct{}) @@ -1232,8 +1297,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveTimeToCutLargerThanExpected", func(t *testing.T) { - subtestIndex++ - errorChan := make(chan struct{}) close(errorChan) haltChan := make(chan struct{}) @@ -1283,8 +1346,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveTimeToCutStale", func(t *testing.T) { - subtestIndex++ - errorChan := make(chan struct{}) close(errorChan) haltChan := make(chan struct{}) @@ -1334,8 +1395,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveKafkaErrorAndCloseErrorChan", func(t *testing.T) { - subtestIndex++ - // If we set up the mock broker so that it returns a response, if the // test finishes before the sendConnectMessage goroutine has received // this response, we will get a failure ("not all expectations were @@ -1400,8 +1459,6 @@ func TestProcessMessagesToBlocks(t *testing.T) { }) t.Run("ReceiveKafkaErrorAndThenReceiveRegularMessage", func(t *testing.T) { - subtestIndex++ - t.Skip("Skipping test as it introduces a race condition") // If we set up the mock broker so that it returns a response, if the diff --git a/orderer/mocks/blockcutter/blockcutter.go b/orderer/mocks/blockcutter/blockcutter.go index 4435053d479..45fe498b2e7 100644 --- a/orderer/mocks/blockcutter/blockcutter.go +++ b/orderer/mocks/blockcutter/blockcutter.go @@ -29,13 +29,13 @@ var logger = logging.MustGetLogger("orderer/mocks/blockcutter") // Receiver mocks the blockcutter.Receiver interface type Receiver struct { - // QueueNext causes Ordered returns nil false when not set to true - QueueNext bool - - // IsolatedTx causes Ordered returns [][]{curBatch, []{newTx}}, true when set to true + // IsolatedTx causes Ordered returns [][]{curBatch, []{newTx}}, true, false when set to true IsolatedTx bool - // CutNext causes Ordered returns [][]{append(curBatch, newTx)}, true when set to true + // CutAncestors causes Ordered returns [][]{curBatch, []{newTx}}, true, true when set to true + CutAncestors bool + + // CutNext causes Ordered returns [][]{append(curBatch, newTx)}, true, false when set to true CutNext bool // CurBatch is the currently outstanding messages in the batch @@ -49,10 +49,10 @@ type Receiver struct { // NewReceiver returns the mock blockcutter.Receiver implemenation func NewReceiver() *Receiver { return &Receiver{ - QueueNext: true, - IsolatedTx: false, - CutNext: false, - Block: make(chan struct{}), + IsolatedTx: false, + CutAncestors: false, + CutNext: false, + Block: make(chan struct{}), } } @@ -65,21 +65,23 @@ func noopCommitters(size int) []filter.Committer { } // Ordered will add or cut the batch according to the state of Receiver, it blocks reading from Block on return -func (mbc *Receiver) Ordered(env *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) { +func (mbc *Receiver) Ordered(env *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool, bool) { defer func() { <-mbc.Block }() - if !mbc.QueueNext { - logger.Debugf("Not queueing message") - return nil, nil, false - } - if mbc.IsolatedTx { logger.Debugf("Receiver: Returning dual batch") res := [][]*cb.Envelope{mbc.CurBatch, []*cb.Envelope{env}} mbc.CurBatch = nil - return res, [][]filter.Committer{noopCommitters(len(res[0])), noopCommitters(len(res[1]))}, true + return res, [][]filter.Committer{noopCommitters(len(res[0])), noopCommitters(len(res[1]))}, true, false + } + + if mbc.CutAncestors { + logger.Debugf("Receiver: Returning current batch and appending newest env") + res := [][]*cb.Envelope{mbc.CurBatch} + mbc.CurBatch = []*cb.Envelope{env} + return res, [][]filter.Committer{noopCommitters(len(res))}, true, true } mbc.CurBatch = append(mbc.CurBatch, env) @@ -88,11 +90,11 @@ func (mbc *Receiver) Ordered(env *cb.Envelope) ([][]*cb.Envelope, [][]filter.Com logger.Debugf("Returning regular batch") res := [][]*cb.Envelope{mbc.CurBatch} mbc.CurBatch = nil - return res, [][]filter.Committer{noopCommitters(len(res))}, true + return res, [][]filter.Committer{noopCommitters(len(res))}, true, false } logger.Debugf("Appending to batch") - return nil, nil, true + return nil, nil, true, true } // Cut terminates the current batch, returning it diff --git a/orderer/multichain/util_test.go b/orderer/multichain/util_test.go index d585ae3931c..1d54cd0839f 100644 --- a/orderer/multichain/util_test.go +++ b/orderer/multichain/util_test.go @@ -65,7 +65,7 @@ func (mch *mockChain) Start() { if !ok { return } - batches, committers, _ := mch.cutter.Ordered(msg) + batches, committers, _, _ := mch.cutter.Ordered(msg) for i, batch := range batches { block := mch.support.CreateNextBlock(batch) mch.support.WriteBlock(block, committers[i], nil) diff --git a/orderer/solo/consensus.go b/orderer/solo/consensus.go index 1cb76722707..2c3f7c421ab 100644 --- a/orderer/solo/consensus.go +++ b/orderer/solo/consensus.go @@ -88,7 +88,7 @@ func (ch *chain) main() { for { select { case msg := <-ch.sendChan: - batches, committers, ok := ch.support.BlockCutter().Ordered(msg) + batches, committers, ok, _ := ch.support.BlockCutter().Ordered(msg) if ok && len(batches) == 0 && timer == nil { timer = time.After(ch.support.SharedConfig().BatchTimeout()) continue