Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: state: Fast migration for v15 #7933

Merged
merged 10 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions blockstore/autobatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package blockstore

import (
"context"
"sync"
"time"

"golang.org/x/xerrors"

block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

// autolog is a logger for the autobatching blockstore. It is subscoped from the
// blockstore logger.
var autolog = log.Named("auto")

type AutobatchBlockstore struct {
// TODO: drop if memory consumption is too high
addedCids map[cid.Cid]struct{}

bufferedLk sync.Mutex
bufferedBlksOrdered []block.Block
bufferedBlksMap map[cid.Cid]block.Block

flushingLk sync.Mutex
arajasek marked this conversation as resolved.
Show resolved Hide resolved
flushingBlksMap map[cid.Cid]block.Block

flushCh chan struct{}
flushErr error
flushRetryDelay time.Duration
flushCtx context.Context
shutdownCh chan struct{}

backingBs Blockstore

bufferCapacity int
bufferSize int
}

func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore {
bs := &AutobatchBlockstore{
addedCids: make(map[cid.Cid]struct{}),
backingBs: backingBs,
bufferCapacity: bufferCapacity,
bufferedBlksMap: make(map[cid.Cid]block.Block),
flushingBlksMap: make(map[cid.Cid]block.Block),
flushCtx: ctx,
flushCh: make(chan struct{}, 1),
// could be made configable
flushRetryDelay: time.Second * 5,
}

go bs.flushWorker()

return bs
}

func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error {
bs.bufferedLk.Lock()
_, ok := bs.addedCids[blk.Cid()]
if !ok {
bs.addedCids[blk.Cid()] = struct{}{}
Comment on lines +72 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the hit rate on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not measured, but I expect it to be quite high -- think of the number of times we'll try to Put the empty deadline object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm just wondering if it's worth the added memory use (tho if it's insignificant, this should be ok to keep even if it doesn't help that much).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good -- i do have a TODO to drop if the memory use is a problem, we'll see in experiments.

bs.bufferedBlksOrdered = append(bs.bufferedBlksOrdered, blk)
bs.bufferedBlksMap[blk.Cid()] = blk
bs.bufferSize += len(blk.RawData())
if bs.bufferSize >= bs.bufferCapacity {
// signal that a flush is appropriate, may be ignored
select {
case bs.flushCh <- struct{}{}:
default:
// do nothing
}
}
}
bs.bufferedLk.Unlock()
return nil
}

func (bs *AutobatchBlockstore) flushWorker() {
for {
select {
case <-bs.flushCh:
putErr := bs.doFlush(bs.flushCtx)
for putErr != nil {
select {
case <-bs.shutdownCh:
bs.flushErr = putErr
return
default:
autolog.Errorf("FLUSH ERRORED: %w, retrying in %v", putErr, bs.flushRetryDelay)
time.Sleep(bs.flushRetryDelay)
arajasek marked this conversation as resolved.
Show resolved Hide resolved
putErr = bs.doFlush(bs.flushCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems weird to me to retry putting it, what are the possible causes of an error that goes away?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien thinks that could happen if the system is under stress -- with some backoff it may succeed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not an issue with badger? But my thinking was:

  1. Transient error: retry will help.
  2. Non-transient error: we can't write anything else anyways so we might as well retry.

}
}
case <-bs.shutdownCh:
return
}
}
}

func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error {
bs.bufferedLk.Lock()
bs.flushingLk.Lock()
// We do NOT clear addedCids here, because its purpose is to expedite Puts
flushingBlksOrdered := bs.bufferedBlksOrdered
bs.flushingBlksMap = bs.bufferedBlksMap
bs.bufferedBlksOrdered = []block.Block{}
bs.bufferedBlksMap = make(map[cid.Cid]block.Block)
arajasek marked this conversation as resolved.
Show resolved Hide resolved
bs.bufferedLk.Unlock()
bs.flushingLk.Unlock()
return bs.backingBs.PutMany(ctx, flushingBlksOrdered)
arajasek marked this conversation as resolved.
Show resolved Hide resolved
}

func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
// request one last flush of the worker
arajasek marked this conversation as resolved.
Show resolved Hide resolved
bs.flushCh <- struct{}{}
arajasek marked this conversation as resolved.
Show resolved Hide resolved
// shutdown the flush worker
bs.shutdownCh <- struct{}{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. It's unfortunate that this will block indefinitely if we shutdown twice.

// if it ever errored, this method fails
if bs.flushErr != nil {
return xerrors.Errorf("flushWorker errored: %w", bs.flushErr)
}

// one last flush in case it's needed
return bs.doFlush(ctx)
}

func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
// may seem backward to check the backingBs first, but that is the likeliest case
blk, err := bs.backingBs.Get(ctx, c)
if err == nil {
return blk, nil
}
Comment on lines +155 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we check backingBs isn't this potentially racy with the way we do locks here? If we really want to avoid locking, we probably want to check backingBs once more at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point -- i'm not super concerned about it because for the migration, we should actually never try to Get anything we've Put...but a second check of the bs at the end makes sense

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoiding taking the lock likely isn't worth it. If it's a problem, we could always use a read/write lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try and report back with perf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this slowed us down -- trying again without to confirm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed locking in there slows us down from 870 migrations per sec to 350

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You kind of need those locks, but they can be rwlocks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you mean the locking in this function? Shouldn't we never hit those locks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to lock to avoid raciness, it has to be at the very top of the method (so always gets hit)


if err != ErrNotFound {
return blk, err
}

bs.flushingLk.Lock()
v, ok := bs.flushingBlksMap[c]
bs.flushingLk.Unlock()
if ok {
return v, nil
}

bs.bufferedLk.Lock()
v, ok = bs.bufferedBlksMap[c]
bs.bufferedLk.Unlock()
if ok {
return v, nil
}

return nil, ErrNotFound
}

func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error {
return xerrors.New("deletion is unsupported")
}

func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
// see note in DeleteBlock()
return xerrors.New("deletion is unsupported")
}

func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, err := bs.Get(ctx, c)
if err == nil {
return true, nil
}
if err == ErrNotFound {
return false, nil
}

return false, err
}

func (bs *AutobatchBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blk, err := bs.Get(ctx, c)
if err != nil {
return 0, err
}

return len(blk.RawData()), nil
}

func (bs *AutobatchBlockstore) PutMany(ctx context.Context, blks []block.Block) error {
for _, blk := range blks {
if err := bs.Put(ctx, blk); err != nil {
return err
}
}

return nil
}

func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.New("unsupported")

}

func (bs *AutobatchBlockstore) HashOnRead(enabled bool) {
bs.backingBs.HashOnRead(enabled)
}

func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
return xerrors.New("unsupported")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd implement this, even if we just call get under the covers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might just drop the methods -- we don't actually need this type to implement Lotus's blockstore interface

}
33 changes: 33 additions & 0 deletions blockstore/autobatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package blockstore

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestAutobatchBlockstore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ab := NewAutobatch(ctx, NewMemory(), len(b0.RawData())+len(b1.RawData())-1)

require.NoError(t, ab.Put(ctx, b0))
require.NoError(t, ab.Put(ctx, b1))
require.NoError(t, ab.Put(ctx, b2))

ab.Flush(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not implement Flush in autobatch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this will get updated. Thanks!


v0, err := ab.Get(ctx, b0.Cid())
require.NoError(t, err)
require.Equal(t, b0.RawData(), v0.RawData())

v1, err := ab.Get(ctx, b1.Cid())
require.NoError(t, err)
require.Equal(t, b1.RawData(), v1.RawData())

v2, err := ab.Get(ctx, b2.Cid())
require.NoError(t, err)
require.Equal(t, b2.RawData(), v2.RawData())
}
4 changes: 4 additions & 0 deletions chain/actors/builtin/paych/message.go.template
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func (m message{{.v}}) Create(to address.Address, initialAmount abi.TokenAmount)

func (m message{{.v}}) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych{{.v}}.UpdateChannelStateParams{
{{if (ge .v 7)}}
Sv: toV{{.v}}SignedVoucher(*sv),
{{else}}
Sv: *sv,
{{end}}
Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message0.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message0) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message0) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych0.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message2.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message2) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message2) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych2.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message3.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message3) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message3) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych3.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message4.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message4) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message4) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych4.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message5.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message5) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message5) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych5.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message6.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message6) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message6) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych6.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message7.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message7) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message7) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych7.UpdateChannelStateParams{
Sv: *sv,

Sv: toV7SignedVoucher(*sv),

Secret: secret,
})
if aerr != nil {
Expand Down
18 changes: 18 additions & 0 deletions chain/actors/builtin/paych/state.go.template
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,21 @@ func (ls *laneState{{.v}}) Redeemed() (big.Int, error) {
func (ls *laneState{{.v}}) Nonce() (uint64, error) {
return ls.LaneState.Nonce, nil
}

{{if (ge .v 7)}}
func toV{{.v}}SignedVoucher(sv SignedVoucher) paych{{.v}}.SignedVoucher {
return paych{{.v}}.SignedVoucher{
ChannelAddr: sv.ChannelAddr,
TimeLockMin: sv.TimeLockMin,
TimeLockMax: sv.TimeLockMax,
SecretHash: sv.SecretPreimage,
Extra: sv.Extra,
Lane: sv.Lane,
Nonce: sv.Nonce,
Amount: sv.Amount,
MinSettleHeight: sv.MinSettleHeight,
Merges: sv.Merges,
Signature: sv.Signature,
}
}
{{end}}
16 changes: 16 additions & 0 deletions chain/actors/builtin/paych/v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,19 @@ func (ls *laneState7) Redeemed() (big.Int, error) {
func (ls *laneState7) Nonce() (uint64, error) {
return ls.LaneState.Nonce, nil
}

func toV7SignedVoucher(sv SignedVoucher) paych7.SignedVoucher {
return paych7.SignedVoucher{
ChannelAddr: sv.ChannelAddr,
TimeLockMin: sv.TimeLockMin,
TimeLockMax: sv.TimeLockMax,
SecretHash: sv.SecretPreimage,
Extra: sv.Extra,
Lane: sv.Lane,
Nonce: sv.Nonce,
Amount: sv.Amount,
MinSettleHeight: sv.MinSettleHeight,
Merges: sv.Merges,
Signature: sv.Signature,
}
}
Loading