Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

correctness fixes for the autobatch blockstore #7940

Merged
merged 1 commit into from
Jan 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 55 additions & 37 deletions blockstore/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@ type AutobatchBlockstore struct {
addedCids map[cid.Cid]struct{}

stateLock sync.Mutex
doFlushLock sync.Mutex
bufferedBatch blockBatch

flushingBatch blockBatch
flushErr error
flushWorkerDone bool
flushingBatch blockBatch
flushErr error

flushCh chan struct{}

doFlushLock sync.Mutex
flushRetryDelay time.Duration
flushCtx context.Context
shutdownCh chan struct{}
doneCh chan struct{}
shutdown context.CancelFunc

backingBs Blockstore

Expand All @@ -46,21 +45,21 @@ type AutobatchBlockstore struct {
}

func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore {
ctx, cancel := context.WithCancel(ctx)
bs := &AutobatchBlockstore{
addedCids: make(map[cid.Cid]struct{}),
backingBs: backingBs,
bufferCapacity: bufferCapacity,
flushCtx: ctx,
flushCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
// could be made configable
flushRetryDelay: time.Millisecond * 100,
flushWorkerDone: false,
shutdown: cancel,
}

bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block)

go bs.flushWorker()
go bs.flushWorker(ctx)

return bs
}
Expand Down Expand Up @@ -88,66 +87,85 @@ func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error {
return nil
}

func (bs *AutobatchBlockstore) flushWorker() {
defer func() {
bs.stateLock.Lock()
bs.flushWorkerDone = true
bs.stateLock.Unlock()
}()
func (bs *AutobatchBlockstore) flushWorker(ctx context.Context) {
defer close(bs.doneCh)
for {
select {
case <-bs.flushCh:
putErr := bs.doFlush(bs.flushCtx)
// TODO: check if we _should_ actually flush. We could get a spurious wakeup
// here.
putErr := bs.doFlush(ctx, false)
for putErr != nil {
select {
case <-bs.shutdownCh:
case <-ctx.Done():
return
case <-time.After(bs.flushRetryDelay):
autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay)
putErr = bs.doFlush(bs.flushCtx)
putErr = bs.doFlush(ctx, true)
}
}
case <-bs.shutdownCh:
case <-ctx.Done():
// Do one last flush.
_ = bs.doFlush(ctx, false)
return
}
}
}

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error {
// set retryOnly to true to only retry a failed flush and not flush anything new.
func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) error {
bs.doFlushLock.Lock()
defer bs.doFlushLock.Unlock()
if bs.flushErr == nil {
bs.stateLock.Lock()
// We do NOT clear addedCids here, because its purpose is to expedite Puts
bs.flushingBatch = bs.bufferedBatch
bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList))
bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap))
bs.stateLock.Unlock()

// If we failed to flush last time, try flushing again.
if bs.flushErr != nil {
bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)
}

bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)
// If we failed, or we're _only_ retrying, bail.
if retryOnly || bs.flushErr != nil {
return bs.flushErr
}

// Then take the current batch...
bs.stateLock.Lock()
bs.flushingBatch = blockBatch{}
// We do NOT clear addedCids here, because its purpose is to expedite Puts
bs.flushingBatch = bs.bufferedBatch
bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList))
bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap))
bs.stateLock.Unlock()

// And try to flush it.
bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)

// If we succeeded, reset the batch. Otherwise, we'll try again next time.
if bs.flushErr == nil {
bs.stateLock.Lock()
bs.flushingBatch = blockBatch{}
bs.stateLock.Unlock()
}

return bs.flushErr
}

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) Flush(ctx context.Context) error {
return bs.doFlush(ctx)
return bs.doFlush(ctx, false)
}

func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
bs.stateLock.Lock()
flushDone := bs.flushWorkerDone
bs.stateLock.Unlock()
if !flushDone {
// may racily block forever if Shutdown is called in parallel
bs.shutdownCh <- struct{}{}
// TODO: Prevent puts after we call this to avoid losing data.
bs.shutdown()
select {
case <-bs.doneCh:
case <-ctx.Done():
return ctx.Err()
}

bs.doFlushLock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just to make sure that any earlier claimers of the lock finish before we return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrent Flush and Shutdown.

defer bs.doFlushLock.Unlock()

return bs.flushErr
}

Expand Down