Skip to content

Commit

Permalink
Merge pull request #7933 from filecoin-project/asr/migration-autobatch
Browse files Browse the repository at this point in the history
feat: state: Fast migration for v15
  • Loading branch information
arajasek authored Jan 12, 2022
2 parents f87d8d0 + 3464dc2 commit b161f56
Show file tree
Hide file tree
Showing 22 changed files with 512 additions and 40 deletions.
242 changes: 242 additions & 0 deletions blockstore/autobatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package blockstore

import (
"context"
"sync"
"time"

"golang.org/x/xerrors"

block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

// autolog is a logger for the autobatching blockstore. It is subscoped from the
// blockstore logger.
var autolog = log.Named("auto")

// contains the same set of blocks twice, once as an ordered list for flushing, and as a map for fast access
type blockBatch struct {
blockList []block.Block
blockMap map[cid.Cid]block.Block
}

type AutobatchBlockstore struct {
// TODO: drop if memory consumption is too high
addedCids map[cid.Cid]struct{}

stateLock sync.Mutex
doFlushLock sync.Mutex
bufferedBatch blockBatch

flushingBatch blockBatch
flushErr error
flushWorkerDone bool

flushCh chan struct{}

flushRetryDelay time.Duration
flushCtx context.Context
shutdownCh chan struct{}

backingBs Blockstore

bufferCapacity int
bufferSize int
}

func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore {
bs := &AutobatchBlockstore{
addedCids: make(map[cid.Cid]struct{}),
backingBs: backingBs,
bufferCapacity: bufferCapacity,
flushCtx: ctx,
flushCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
// could be made configable
flushRetryDelay: time.Millisecond * 100,
flushWorkerDone: false,
}

bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block)

go bs.flushWorker()

return bs
}

func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error {
bs.stateLock.Lock()
defer bs.stateLock.Unlock()

_, ok := bs.addedCids[blk.Cid()]
if !ok {
bs.addedCids[blk.Cid()] = struct{}{}
bs.bufferedBatch.blockList = append(bs.bufferedBatch.blockList, blk)
bs.bufferedBatch.blockMap[blk.Cid()] = blk
bs.bufferSize += len(blk.RawData())
if bs.bufferSize >= bs.bufferCapacity {
// signal that a flush is appropriate, may be ignored
select {
case bs.flushCh <- struct{}{}:
default:
// do nothing
}
}
}

return nil
}

func (bs *AutobatchBlockstore) flushWorker() {
defer func() {
bs.stateLock.Lock()
bs.flushWorkerDone = true
bs.stateLock.Unlock()
}()
for {
select {
case <-bs.flushCh:
putErr := bs.doFlush(bs.flushCtx)
for putErr != nil {
select {
case <-bs.shutdownCh:
return
case <-time.After(bs.flushRetryDelay):
autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay)
putErr = bs.doFlush(bs.flushCtx)
}
}
case <-bs.shutdownCh:
return
}
}
}

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error {
bs.doFlushLock.Lock()
defer bs.doFlushLock.Unlock()
if bs.flushErr == nil {
bs.stateLock.Lock()
// We do NOT clear addedCids here, because its purpose is to expedite Puts
bs.flushingBatch = bs.bufferedBatch
bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList))
bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap))
bs.stateLock.Unlock()
}

bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)
bs.stateLock.Lock()
bs.flushingBatch = blockBatch{}
bs.stateLock.Unlock()

return bs.flushErr
}

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) Flush(ctx context.Context) error {
return bs.doFlush(ctx)
}

func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
bs.stateLock.Lock()
flushDone := bs.flushWorkerDone
bs.stateLock.Unlock()
if !flushDone {
// may racily block forever if Shutdown is called in parallel
bs.shutdownCh <- struct{}{}
}

return bs.flushErr
}

func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
// may seem backward to check the backingBs first, but that is the likeliest case
blk, err := bs.backingBs.Get(ctx, c)
if err == nil {
return blk, nil
}

if err != ErrNotFound {
return blk, err
}

v, ok := bs.flushingBatch.blockMap[c]
if ok {
return v, nil
}

v, ok = bs.bufferedBatch.blockMap[c]
if ok {
return v, nil
}

return bs.Get(ctx, c)
}

func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error {
// if we wanted to support this, we would have to:
// - flush
// - delete from the backingBs (if present)
// - remove from addedCids (if present)
// - if present in addedCids, also walk the ordered lists and remove if present
return xerrors.New("deletion is unsupported")
}

func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
// see note in DeleteBlock()
return xerrors.New("deletion is unsupported")
}

func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, err := bs.Get(ctx, c)
if err == nil {
return true, nil
}
if err == ErrNotFound {
return false, nil
}

return false, err
}

func (bs *AutobatchBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blk, err := bs.Get(ctx, c)
if err != nil {
return 0, err
}

return len(blk.RawData()), nil
}

func (bs *AutobatchBlockstore) PutMany(ctx context.Context, blks []block.Block) error {
for _, blk := range blks {
if err := bs.Put(ctx, blk); err != nil {
return err
}
}

return nil
}

func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if err := bs.Flush(ctx); err != nil {
return nil, err
}

return bs.backingBs.AllKeysChan(ctx)
}

func (bs *AutobatchBlockstore) HashOnRead(enabled bool) {
bs.backingBs.HashOnRead(enabled)
}

func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
blk, err := bs.Get(ctx, cid)
if err != nil {
return err
}

return callback(blk.RawData())
}
34 changes: 34 additions & 0 deletions blockstore/autobatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package blockstore

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestAutobatchBlockstore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ab := NewAutobatch(ctx, NewMemory(), len(b0.RawData())+len(b1.RawData())-1)

require.NoError(t, ab.Put(ctx, b0))
require.NoError(t, ab.Put(ctx, b1))
require.NoError(t, ab.Put(ctx, b2))

v0, err := ab.Get(ctx, b0.Cid())
require.NoError(t, err)
require.Equal(t, b0.RawData(), v0.RawData())

v1, err := ab.Get(ctx, b1.Cid())
require.NoError(t, err)
require.Equal(t, b1.RawData(), v1.RawData())

v2, err := ab.Get(ctx, b2.Cid())
require.NoError(t, err)
require.Equal(t, b2.RawData(), v2.RawData())

require.NoError(t, ab.Flush(ctx))
require.NoError(t, ab.Shutdown(ctx))
}
4 changes: 4 additions & 0 deletions chain/actors/builtin/paych/message.go.template
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func (m message{{.v}}) Create(to address.Address, initialAmount abi.TokenAmount)

func (m message{{.v}}) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych{{.v}}.UpdateChannelStateParams{
{{if (ge .v 7)}}
Sv: toV{{.v}}SignedVoucher(*sv),
{{else}}
Sv: *sv,
{{end}}
Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message0.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message0) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message0) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych0.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message2.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message2) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message2) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych2.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message3.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message3) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message3) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych3.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message4.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message4) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message4) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych4.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message5.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message5) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message5) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych5.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message6.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message6) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message6) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych6.UpdateChannelStateParams{
Sv: *sv,

Sv: *sv,

Secret: secret,
})
if aerr != nil {
Expand Down
4 changes: 3 additions & 1 deletion chain/actors/builtin/paych/message7.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (m message7) Create(to address.Address, initialAmount abi.TokenAmount) (*ty

func (m message7) Update(paych address.Address, sv *SignedVoucher, secret []byte) (*types.Message, error) {
params, aerr := actors.SerializeParams(&paych7.UpdateChannelStateParams{
Sv: *sv,

Sv: toV7SignedVoucher(*sv),

Secret: secret,
})
if aerr != nil {
Expand Down
18 changes: 18 additions & 0 deletions chain/actors/builtin/paych/state.go.template
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,21 @@ func (ls *laneState{{.v}}) Redeemed() (big.Int, error) {
func (ls *laneState{{.v}}) Nonce() (uint64, error) {
return ls.LaneState.Nonce, nil
}

{{if (ge .v 7)}}
func toV{{.v}}SignedVoucher(sv SignedVoucher) paych{{.v}}.SignedVoucher {
return paych{{.v}}.SignedVoucher{
ChannelAddr: sv.ChannelAddr,
TimeLockMin: sv.TimeLockMin,
TimeLockMax: sv.TimeLockMax,
SecretHash: sv.SecretPreimage,
Extra: sv.Extra,
Lane: sv.Lane,
Nonce: sv.Nonce,
Amount: sv.Amount,
MinSettleHeight: sv.MinSettleHeight,
Merges: sv.Merges,
Signature: sv.Signature,
}
}
{{end}}
Loading

0 comments on commit b161f56

Please sign in to comment.