Skip to content

Commit

Permalink
Merge "[FAB-6380] fix race condition in kafka chain Halt"
Browse files Browse the repository at this point in the history
  • Loading branch information
kchristidis authored and Gerrit Code Review committed Oct 12, 2017
2 parents 5a93a68 + e76b396 commit 8a52d63
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 56 deletions.
20 changes: 15 additions & 5 deletions orderer/consensus/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type chainImpl struct {
// channel never re-opens when closed. Its closing triggers the exit of the
// processMessagesToBlock loop.
haltChan chan struct{}
// notification that the chain has stopped processing messages into blocks
doneProcessingMessagesToBlocks chan struct{}
// Close when the retriable steps in Start have completed.
startChan chan struct{}
// timer controls the batch timeout of cutting pending messages into block
Expand Down Expand Up @@ -111,8 +113,12 @@ func (chain *chainImpl) Halt() {
logger.Warningf("[channel: %s] Halting of chain requested again", chain.ChainID())
default:
logger.Criticalf("[channel: %s] Halting of chain requested", chain.ChainID())
// stat shutdown of chain
close(chain.haltChan)
chain.closeKafkaObjects() // Also close the producer and the consumer
// wait for processing of messages to blocks to finish shutting down
<-chain.doneProcessingMessagesToBlocks
// close the kafka producer and the consumer
chain.closeKafkaObjects()
logger.Debugf("[channel: %s] Closed the haltChan", chain.ChainID())
}
default:
Expand Down Expand Up @@ -206,6 +212,8 @@ func startThread(chain *chainImpl) {
}
logger.Infof("[channel: %s] Channel consumer set up successfully", chain.channel.topic())

chain.doneProcessingMessagesToBlocks = make(chan struct{})

close(chain.startChan) // Broadcast requests will now go through
chain.errorChan = make(chan struct{}) // Deliver requests will also go through

Expand All @@ -221,6 +229,11 @@ func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
counts := make([]uint64, 11) // For metrics and tests
msg := new(ab.KafkaMessage)

defer func() {
// notify that we are not processing messages to blocks
close(chain.doneProcessingMessagesToBlocks)
}()

defer func() { // When Halt() is called
select {
case <-chain.errorChan: // If already closed, don't do anything
Expand All @@ -240,10 +253,7 @@ func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
logger.Warningf("[channel: %s] Consenter for channel exiting", chain.ChainID())
counts[indexExitChanPass]++
return counts, nil
case kafkaErr, ok := <-chain.channelConsumer.Errors():
if !ok {
continue // chain is halting
}
case kafkaErr := <-chain.channelConsumer.Errors():
logger.Errorf("[channel: %s] Error during consumption: %s", chain.ChainID(), kafkaErr)
counts[indexRecvError]++
select {
Expand Down
126 changes: 75 additions & 51 deletions orderer/consensus/kafka/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

// We need the mock blockcutter to deliver a non-empty batch
Expand Down Expand Up @@ -856,8 +857,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -905,8 +907,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -954,8 +957,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -998,8 +1002,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
channel: mockChannel,
ConsenterSupport: mockSupport,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1041,8 +1046,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
channel: mockChannel,
ConsenterSupport: mockSupport,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1094,8 +1100,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1149,9 +1156,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
}
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{})}

var counts []uint64
done := make(chan struct{})
Expand Down Expand Up @@ -1212,8 +1219,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1295,8 +1303,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1350,8 +1359,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1404,8 +1414,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1454,8 +1465,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1515,8 +1527,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1572,8 +1585,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1647,8 +1661,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1715,8 +1730,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1798,8 +1814,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1858,8 +1875,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1919,8 +1937,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -1979,8 +1998,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -2058,8 +2078,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -2119,8 +2140,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
ConsenterSupport: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -2184,8 +2206,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
channel: mockChannel,
ConsenterSupport: mockSupport,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down Expand Up @@ -2251,8 +2274,9 @@ func TestProcessMessagesToBlocks(t *testing.T) {
channel: mockChannel,
ConsenterSupport: mockSupport,

errorChan: errorChan,
haltChan: haltChan,
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
}

var counts []uint64
Expand Down

0 comments on commit 8a52d63

Please sign in to comment.