diff --git a/internal/trie/node/branch_encode.go b/internal/trie/node/branch_encode.go index badd3556f0..10ed8a04a0 100644 --- a/internal/trie/node/branch_encode.go +++ b/internal/trie/node/branch_encode.go @@ -8,6 +8,7 @@ import ( "fmt" "hash" "io" + "runtime" "github.com/ChainSafe/gossamer/internal/trie/codec" "github.com/ChainSafe/gossamer/internal/trie/pools" @@ -113,12 +114,7 @@ func (b *Branch) Encode(buffer Buffer) (err error) { } } - const parallel = false // TODO Done in pull request #2081 - if parallel { - err = encodeChildrenInParallel(b.Children, buffer) - } else { - err = encodeChildrenSequentially(b.Children, buffer) - } + err = encodeChildrenOpportunisticParallel(b.Children, buffer) if err != nil { return fmt.Errorf("cannot encode children of branch: %w", err) } @@ -126,30 +122,64 @@ func (b *Branch) Encode(buffer Buffer) (err error) { return nil } -func encodeChildrenInParallel(children [16]Node, buffer io.Writer) (err error) { - type result struct { - index int - buffer *bytes.Buffer - err error +type encodingAsyncResult struct { + index int + buffer *bytes.Buffer + err error +} + +func runEncodeChild(child Node, index int, + results chan<- encodingAsyncResult, rateLimit <-chan struct{}) { + buffer := pools.EncodingBuffers.Get().(*bytes.Buffer) + buffer.Reset() + // buffer is put back in the pool after processing its + // data in the select block below. + + err := encodeChild(child, buffer) + + results <- encodingAsyncResult{ + index: index, + buffer: buffer, + err: err, + } + if rateLimit != nil { + // Only run if runEncodeChild is launched + // in its own goroutine. + <-rateLimit } +} + +var parallelLimit = runtime.NumCPU() + +var parallelEncodingRateLimit = make(chan struct{}, parallelLimit) - resultsCh := make(chan result) +// encodeChildrenOpportunisticParallel encodes children in parallel eventually. +// Leaves are encoded in a blocking way, and branches are encoded in separate +// goroutines IF they are less than the parallelLimit number of goroutines already +// running. This is designed to limit the total number of goroutines in order to +// avoid using too much memory on the stack. +func encodeChildrenOpportunisticParallel(children [16]Node, buffer io.Writer) (err error) { + // Buffered channels since children might be encoded in this + // goroutine or another one. + resultsCh := make(chan encodingAsyncResult, len(children)) for i, child := range children { - go func(index int, child Node) { - buffer := pools.EncodingBuffers.Get().(*bytes.Buffer) - buffer.Reset() - // buffer is put back in the pool after processing its - // data in the select block below. - - err := encodeChild(child, buffer) - - resultsCh <- result{ - index: index, - buffer: buffer, - err: err, - } - }(i, child) + if isNodeNil(child) || child.Type() == LeafType { + runEncodeChild(child, i, resultsCh, nil) + continue + } + + // Branch child + select { + case parallelEncodingRateLimit <- struct{}{}: + // We have a goroutine available to encode + // the branch in parallel. + go runEncodeChild(child, i, resultsCh, parallelEncodingRateLimit) + default: + // we reached the maximum parallel goroutines + // so encode this branch in this goroutine + runEncodeChild(child, i, resultsCh, nil) + } } currentIndex := 0 @@ -166,7 +196,7 @@ func encodeChildrenInParallel(children [16]Node, buffer io.Writer) (err error) { for currentIndex < len(children) && resultBuffers[currentIndex] != nil { bufferSlice := resultBuffers[currentIndex].Bytes() - if len(bufferSlice) > 0 { + if err == nil && len(bufferSlice) > 0 { // note buffer.Write copies the byte slice given as argument _, writeErr := buffer.Write(bufferSlice) if writeErr != nil && err == nil { @@ -203,17 +233,20 @@ func encodeChildrenSequentially(children [16]Node, buffer io.Writer) (err error) return nil } -func encodeChild(child Node, buffer io.Writer) (err error) { - var isNil bool - switch impl := child.(type) { +func isNodeNil(n Node) (isNil bool) { + switch impl := n.(type) { case *Branch: isNil = impl == nil case *Leaf: isNil = impl == nil default: - isNil = child == nil + isNil = n == nil } - if isNil { + return isNil +} + +func encodeChild(child Node, buffer io.Writer) (err error) { + if isNodeNil(child) { return nil } diff --git a/internal/trie/node/branch_encode_test.go b/internal/trie/node/branch_encode_test.go index 9c1fc50703..8b4d51c75d 100644 --- a/internal/trie/node/branch_encode_test.go +++ b/internal/trie/node/branch_encode_test.go @@ -4,6 +4,8 @@ package node import ( + "bytes" + "io" "testing" "github.com/golang/mock/gomock" @@ -251,7 +253,7 @@ func Test_Branch_Encode(t *testing.T) { wrappedErr: errTest, errMessage: "cannot write encoded value to buffer: test error", }, - "buffer write error for children encoded sequentially": { + "buffer write error for children encoding": { branch: &Branch{ Key: []byte{1, 2, 3}, Value: []byte{100}, @@ -280,10 +282,10 @@ func Test_Branch_Encode(t *testing.T) { }, wrappedErr: errTest, errMessage: "cannot encode children of branch: " + - "cannot encode child at index 3: " + - "failed to write child to buffer: test error", + "cannot write encoding of child at index 3: " + + "test error", }, - "success with sequential children encoding": { + "success with children encoding": { branch: &Branch{ Key: []byte{1, 2, 3}, Value: []byte{100}, @@ -346,7 +348,46 @@ func Test_Branch_Encode(t *testing.T) { } } -func Test_encodeChildrenInParallel(t *testing.T) { +// Opportunistic parallel: 13781602 ns/op 14419488 B/op 323575 allocs/op +// Sequentially: 24269268 ns/op 20126525 B/op 327668 allocs/op +func Benchmark_encodeChildrenOpportunisticParallel(b *testing.B) { + const valueBytesSize = 10 + const depth = 3 // do not raise above 4 + + children := populateChildren(valueBytesSize, depth) + + b.Run("", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = encodeChildrenOpportunisticParallel(children, io.Discard) + } + }) +} + +func populateChildren(valueSize, depth int) (children [16]Node) { + someValue := make([]byte, valueSize) + + if depth == 0 { + for i := range children { + children[i] = &Leaf{ + Key: someValue, + Value: someValue, + } + } + return children + } + + for i := range children { + children[i] = &Branch{ + Key: someValue, + Value: someValue, + Children: populateChildren(valueSize, depth-1), + } + } + + return children +} + +func Test_encodeChildrenOpportunisticParallel(t *testing.T) { t.Parallel() testCases := map[string]struct { @@ -393,7 +434,7 @@ func Test_encodeChildrenInParallel(t *testing.T) { }, }, }, - "encoding error": { + "leaf encoding error": { children: [16]Node{ nil, nil, nil, nil, nil, nil, nil, nil, @@ -413,6 +454,23 @@ func Test_encodeChildrenInParallel(t *testing.T) { errMessage: "cannot write encoding of child at index 11: " + "test error", }, + "branch encoding": { + // Note this may run in parallel or not depending on other tests + // running in parallel. + children: [16]Node{ + &Branch{ + Key: []byte{1}, + Children: [16]Node{ + &Leaf{Key: []byte{1}}, + }, + }, + }, + writes: []writeCall{ + { + written: []byte{32, 129, 1, 1, 0, 12, 65, 1, 0}, + }, + }, + }, } for name, testCase := range testCases { @@ -434,7 +492,7 @@ func Test_encodeChildrenInParallel(t *testing.T) { previousCall = call } - err := encodeChildrenInParallel(testCase.children, buffer) + err := encodeChildrenOpportunisticParallel(testCase.children, buffer) if testCase.wrappedErr != nil { assert.ErrorIs(t, err, testCase.wrappedErr) @@ -444,6 +502,31 @@ func Test_encodeChildrenInParallel(t *testing.T) { } }) } + + t.Run("opportunist parallel branch encoding", func(t *testing.T) { + t.Parallel() + + var children [16]Node + for i := range children { + children[i] = &Branch{} + } + + buffer := bytes.NewBuffer(nil) + + err := encodeChildrenOpportunisticParallel(children, buffer) + + require.NoError(t, err) + expectedBytes := []byte{ + 0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0, + 0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0, + 0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0, + 0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0, + 0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0, + 0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0, + 0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0, + 0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0} + assert.Equal(t, expectedBytes, buffer.Bytes()) + }) } func Test_encodeChildrenSequentially(t *testing.T) {