Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: state: Fast migration for v15 #7933

Merged
merged 10 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 242 additions & 0 deletions blockstore/autobatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package blockstore

import (
"context"
"sync"
"time"

"golang.org/x/xerrors"

block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

// autolog is a logger for the autobatching blockstore. It is subscoped from the
// blockstore logger.
var autolog = log.Named("auto")

// contains the same set of blocks twice, once as an ordered list for flushing, and as a map for fast access
type blockBatch struct {
blockList []block.Block
blockMap map[cid.Cid]block.Block
}

type AutobatchBlockstore struct {
// TODO: drop if memory consumption is too high
addedCids map[cid.Cid]struct{}

stateLock sync.Mutex
doFlushLock sync.Mutex
bufferedBatch blockBatch

flushingBatch blockBatch
flushErr error
flushWorkerDone bool

flushCh chan struct{}

flushRetryDelay time.Duration
flushCtx context.Context
shutdownCh chan struct{}

backingBs Blockstore

bufferCapacity int
bufferSize int
}

func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore {
bs := &AutobatchBlockstore{
addedCids: make(map[cid.Cid]struct{}),
backingBs: backingBs,
bufferCapacity: bufferCapacity,
flushCtx: ctx,
flushCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
// could be made configable
flushRetryDelay: time.Millisecond * 100,
flushWorkerDone: false,
}

bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block)

go bs.flushWorker()

return bs
}

func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error {
bs.stateLock.Lock()
defer bs.stateLock.Unlock()

_, ok := bs.addedCids[blk.Cid()]
if !ok {
bs.addedCids[blk.Cid()] = struct{}{}
Comment on lines +72 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the hit rate on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not measured, but I expect it to be quite high -- think of the number of times we'll try to Put the empty deadline object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm just wondering if it's worth the added memory use (tho if it's insignificant, this should be ok to keep even if it doesn't help that much).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good -- i do have a TODO to drop if the memory use is a problem, we'll see in experiments.

bs.bufferedBatch.blockList = append(bs.bufferedBatch.blockList, blk)
bs.bufferedBatch.blockMap[blk.Cid()] = blk
bs.bufferSize += len(blk.RawData())
if bs.bufferSize >= bs.bufferCapacity {
// signal that a flush is appropriate, may be ignored
select {
case bs.flushCh <- struct{}{}:
default:
// do nothing
}
}
}

return nil
}

func (bs *AutobatchBlockstore) flushWorker() {
defer func() {
bs.stateLock.Lock()
bs.flushWorkerDone = true
bs.stateLock.Unlock()
}()
for {
select {
case <-bs.flushCh:
putErr := bs.doFlush(bs.flushCtx)
for putErr != nil {
select {
case <-bs.shutdownCh:
return
case <-time.After(bs.flushRetryDelay):
autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay)
putErr = bs.doFlush(bs.flushCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems weird to me to retry putting it, what are the possible causes of an error that goes away?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien thinks that could happen if the system is under stress -- with some backoff it may succeed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not an issue with badger? But my thinking was:

  1. Transient error: retry will help.
  2. Non-transient error: we can't write anything else anyways so we might as well retry.

}
}
case <-bs.shutdownCh:
return
}
}
}

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error {
bs.doFlushLock.Lock()
defer bs.doFlushLock.Unlock()
if bs.flushErr == nil {
bs.stateLock.Lock()
// We do NOT clear addedCids here, because its purpose is to expedite Puts
bs.flushingBatch = bs.bufferedBatch
bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList))
bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap))
bs.stateLock.Unlock()
}

bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)
bs.stateLock.Lock()
bs.flushingBatch = blockBatch{}
bs.stateLock.Unlock()

return bs.flushErr
arajasek marked this conversation as resolved.
Show resolved Hide resolved
}

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) Flush(ctx context.Context) error {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
return bs.doFlush(ctx)
}

func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
bs.stateLock.Lock()
flushDone := bs.flushWorkerDone
bs.stateLock.Unlock()
if !flushDone {
// may racily block forever if Shutdown is called in parallel
bs.shutdownCh <- struct{}{}
}

return bs.flushErr
}

func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
// may seem backward to check the backingBs first, but that is the likeliest case
blk, err := bs.backingBs.Get(ctx, c)
if err == nil {
return blk, nil
}
Comment on lines +155 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we check backingBs isn't this potentially racy with the way we do locks here? If we really want to avoid locking, we probably want to check backingBs once more at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point -- i'm not super concerned about it because for the migration, we should actually never try to Get anything we've Put...but a second check of the bs at the end makes sense

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoiding taking the lock likely isn't worth it. If it's a problem, we could always use a read/write lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try and report back with perf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this slowed us down -- trying again without to confirm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed locking in there slows us down from 870 migrations per sec to 350

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You kind of need those locks, but they can be rwlocks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you mean the locking in this function? Shouldn't we never hit those locks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to lock to avoid raciness, it has to be at the very top of the method (so always gets hit)


if err != ErrNotFound {
return blk, err
}

v, ok := bs.flushingBatch.blockMap[c]
arajasek marked this conversation as resolved.
Show resolved Hide resolved
if ok {
return v, nil
}

v, ok = bs.bufferedBatch.blockMap[c]
if ok {
return v, nil
}

return bs.Get(ctx, c)
}

func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error {
// if we wanted to support this, we would have to:
// - flush
// - delete from the backingBs (if present)
// - remove from addedCids (if present)
// - if present in addedCids, also walk the ordered lists and remove if present
return xerrors.New("deletion is unsupported")
}

func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
// see note in DeleteBlock()
return xerrors.New("deletion is unsupported")
}

func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, err := bs.Get(ctx, c)
if err == nil {
return true, nil
}
if err == ErrNotFound {
return false, nil
}

return false, err
}

func (bs *AutobatchBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blk, err := bs.Get(ctx, c)
if err != nil {
return 0, err
}

return len(blk.RawData()), nil
}

func (bs *AutobatchBlockstore) PutMany(ctx context.Context, blks []block.Block) error {
for _, blk := range blks {
if err := bs.Put(ctx, blk); err != nil {
return err
}
}

return nil
}

func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if err := bs.Flush(ctx); err != nil {
return nil, err
}

return bs.backingBs.AllKeysChan(ctx)
}

func (bs *AutobatchBlockstore) HashOnRead(enabled bool) {
bs.backingBs.HashOnRead(enabled)
}

func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
blk, err := bs.Get(ctx, cid)
if err != nil {
return err
}

return callback(blk.RawData())
}
34 changes: 34 additions & 0 deletions blockstore/autobatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package blockstore

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestAutobatchBlockstore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ab := NewAutobatch(ctx, NewMemory(), len(b0.RawData())+len(b1.RawData())-1)

require.NoError(t, ab.Put(ctx, b0))
require.NoError(t, ab.Put(ctx, b1))
require.NoError(t, ab.Put(ctx, b2))

v0, err := ab.Get(ctx, b0.Cid())
require.NoError(t, err)
require.Equal(t, b0.RawData(), v0.RawData())

v1, err := ab.Get(ctx, b1.Cid())
require.NoError(t, err)
require.Equal(t, b1.RawData(), v1.RawData())

v2, err := ab.Get(ctx, b2.Cid())
require.NoError(t, err)
require.Equal(t, b2.RawData(), v2.RawData())

require.NoError(t, ab.Flush(ctx))
require.NoError(t, ab.Shutdown(ctx))
}
4 changes: 4 additions & 0 deletions chain/actors/builtin/paych/message.go.template
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func (m message{{.v}}) Create(to address.Address, initialAmount abi.TokenAmount)

func (m message{{.v}}) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych{{.v}}.UpdateChannelStateParams{
{{if (ge .v 7)}}
Sv: toV{{.v}}SignedVoucher(*sv),
{{else}}
Sv: *sv,
{{end}}
Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message0.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message0) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message0) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych0.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message2.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message2) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message2) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych2.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message3.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message3) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message3) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych3.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message4.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message4) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message4) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych4.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message5.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message5) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message5) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych5.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message6.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message6) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message6) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych6.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message7.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message7) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message7) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych7.UpdateChannelStateParams{
Sv: *sv,

Sv: toV7SignedVoucher(*sv),

Secret: secret,
})
if aerr != nil {
Expand Down
18 changes: 18 additions & 0 deletions chain/actors/builtin/paych/state.go.template
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,21 @@ func (ls *laneState{{.v}}) Redeemed() (big.Int, error) {
func (ls *laneState{{.v}}) Nonce() (uint64, error) {
return ls.LaneState.Nonce, nil
}

{{if (ge .v 7)}}
func toV{{.v}}SignedVoucher(sv SignedVoucher) paych{{.v}}.SignedVoucher {
return paych{{.v}}.SignedVoucher{
ChannelAddr: sv.ChannelAddr,
TimeLockMin: sv.TimeLockMin,
TimeLockMax: sv.TimeLockMax,
SecretHash: sv.SecretPreimage,
Extra: sv.Extra,
Lane: sv.Lane,
Nonce: sv.Nonce,
Amount: sv.Amount,
MinSettleHeight: sv.MinSettleHeight,
Merges: sv.Merges,
Signature: sv.Signature,
}
}
{{end}}
Loading