Skip to content

Commit

Permalink
[FAB-8570] Reduce log severity if existing block added
Browse files Browse the repository at this point in the history
Gossip enqueues blocks into the payload buffer via different mechanisms:

    State transfer anti-entropy
    Pull
    Push

Whenever an attempt to add a block to the payload buffer which was
already added before by some other pathway - a warning is logged.

I think we should make that warning a debug, since it's not something
that users should be worried about.

Change-Id: Ia735473e33ddff3f8787c121f39a6d15e54baa04
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Feb 28, 2018
1 parent 318bff3 commit 2f401c6
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 21 deletions.
11 changes: 4 additions & 7 deletions gossip/state/payloads_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ SPDX-License-Identifier: Apache-2.0
package state

import (
"strconv"
"sync"
"sync/atomic"

"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/pkg/errors"
)

// PayloadsBuffer is used to store payloads into which used to
Expand All @@ -23,7 +21,7 @@ import (
// to signal whenever expected block has arrived.
type PayloadsBuffer interface {
// Adds new block into the buffer
Push(payload *proto.Payload) error
Push(payload *proto.Payload)

// Returns next expected sequence number
Next() uint64
Expand Down Expand Up @@ -75,15 +73,15 @@ func (b *PayloadsBufferImpl) Ready() chan struct{} {
// Push new payload into the buffer structure in case new arrived payload
// sequence number is below the expected next block number payload will be
// thrown away and error will be returned.
func (b *PayloadsBufferImpl) Push(payload *proto.Payload) error {
func (b *PayloadsBufferImpl) Push(payload *proto.Payload) {
b.mutex.Lock()
defer b.mutex.Unlock()

seqNum := payload.SeqNum

if seqNum < b.next || b.buf[seqNum] != nil {
return errors.Errorf("Payload with sequence number = %s has been already processed",
strconv.FormatUint(payload.SeqNum, 10))
logger.Debugf("Payload with sequence number = %d has been already processed", payload.SeqNum)
return
}

b.buf[seqNum] = payload
Expand All @@ -95,7 +93,6 @@ func (b *PayloadsBufferImpl) Push(payload *proto.Payload) error {
b.readyChan <- struct{}{}
}()
}
return nil
}

// Next function provides the number of the next expected block
Expand Down
14 changes: 1 addition & 13 deletions gossip/state/payloads_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {
payload, err := randomPayloadWithSeqNum(nextSeqNum)
assert.NoError(t, err)

var errors []error

ready := int32(0)
readyWG := sync.WaitGroup{}
readyWG.Add(1)
Expand All @@ -136,26 +134,16 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {

for i := 0; i < concurrency; i++ {
go func() {
buffer.Push(payload)
startWG.Wait()
errors = append(errors, buffer.Push(payload))
finishWG.Done()
}()
}
startWG.Done()
finishWG.Wait()

success := 0

// Only one push attempt expected to succeed
for _, err := range errors {
if err == nil {
success++
}
}

readyWG.Wait()
assert.Equal(t, int32(1), atomic.LoadInt32(&ready))
assert.Equal(t, 1, success)
// Buffer size has to be only one
assert.Equal(t, 1, buffer.Size())
}
3 changes: 2 additions & 1 deletion gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,8 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod
time.Sleep(enqueueRetryInterval)
}

return s.payloads.Push(payload)
s.payloads.Push(payload)
return nil
}

func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {
Expand Down

0 comments on commit 2f401c6

Please sign in to comment.