diff --git a/gossip/state/payloads_buffer.go b/gossip/state/payloads_buffer.go index 94bff5ea1b7..9b74b6b53c8 100644 --- a/gossip/state/payloads_buffer.go +++ b/gossip/state/payloads_buffer.go @@ -7,14 +7,12 @@ SPDX-License-Identifier: Apache-2.0 package state import ( - "strconv" "sync" "sync/atomic" "github.com/hyperledger/fabric/gossip/util" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/op/go-logging" - "github.com/pkg/errors" ) // PayloadsBuffer is used to store payloads into which used to @@ -23,7 +21,7 @@ import ( // to signal whenever expected block has arrived. type PayloadsBuffer interface { // Adds new block into the buffer - Push(payload *proto.Payload) error + Push(payload *proto.Payload) // Returns next expected sequence number Next() uint64 @@ -75,15 +73,15 @@ func (b *PayloadsBufferImpl) Ready() chan struct{} { // Push new payload into the buffer structure in case new arrived payload // sequence number is below the expected next block number payload will be // thrown away and error will be returned. -func (b *PayloadsBufferImpl) Push(payload *proto.Payload) error { +func (b *PayloadsBufferImpl) Push(payload *proto.Payload) { b.mutex.Lock() defer b.mutex.Unlock() seqNum := payload.SeqNum if seqNum < b.next || b.buf[seqNum] != nil { - return errors.Errorf("Payload with sequence number = %s has been already processed", - strconv.FormatUint(payload.SeqNum, 10)) + logger.Debugf("Payload with sequence number = %d has been already processed", payload.SeqNum) + return } b.buf[seqNum] = payload @@ -95,7 +93,6 @@ func (b *PayloadsBufferImpl) Push(payload *proto.Payload) error { b.readyChan <- struct{}{} }() } - return nil } // Next function provides the number of the next expected block diff --git a/gossip/state/payloads_buffer_test.go b/gossip/state/payloads_buffer_test.go index a8263caf926..97304399bec 100644 --- a/gossip/state/payloads_buffer_test.go +++ b/gossip/state/payloads_buffer_test.go @@ -122,8 +122,6 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) { payload, err := randomPayloadWithSeqNum(nextSeqNum) assert.NoError(t, err) - var errors []error - ready := int32(0) readyWG := sync.WaitGroup{} readyWG.Add(1) @@ -136,26 +134,16 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) { for i := 0; i < concurrency; i++ { go func() { + buffer.Push(payload) startWG.Wait() - errors = append(errors, buffer.Push(payload)) finishWG.Done() }() } startWG.Done() finishWG.Wait() - success := 0 - - // Only one push attempt expected to succeed - for _, err := range errors { - if err == nil { - success++ - } - } - readyWG.Wait() assert.Equal(t, int32(1), atomic.LoadInt32(&ready)) - assert.Equal(t, 1, success) // Buffer size has to be only one assert.Equal(t, 1, buffer.Size()) } diff --git a/gossip/state/state.go b/gossip/state/state.go index a2fc22d1de3..19c9489ec0b 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -765,7 +765,8 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod time.Sleep(enqueueRetryInterval) } - return s.payloads.Push(payload) + s.payloads.Push(payload) + return nil } func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {