Skip to content

Commit

Permalink
make the write lock scope limited within a function
Browse files Browse the repository at this point in the history
Also avoid removing the writing set if there was an error while writing.
  • Loading branch information
vyzo committed Aug 10, 2021
1 parent a9403b4 commit 79f348a
Showing 1 changed file with 43 additions and 19 deletions.
62 changes: 43 additions & 19 deletions blockstore/splitstore/markset_badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type BadgerMarkSet struct {
cond sync.Cond
pend map[string]struct{}
writing map[int]map[string]struct{}
writers int
seqno int
version int

Expand Down Expand Up @@ -81,13 +82,19 @@ func (e *BadgerMarkSetEnv) Close() error {

func (s *BadgerMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()

if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}

return s.put(string(c.Hash()))
write, seqno := s.put(string(c.Hash()))
s.mx.Unlock()

if write {
return s.write(seqno)
}

return nil
}

func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
Expand Down Expand Up @@ -129,7 +136,6 @@ func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
s.mx.RUnlock()

s.mx.Lock()

// we have to do the check dance again
has, err = s.tryPending(pendKey)
if has || err != nil {
Expand All @@ -146,11 +152,14 @@ func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
}
}

if err := s.put(pendKey); err != nil {
return false, err
write, seqno := s.put(pendKey)
s.mx.Unlock()

if write {
err = s.write(seqno)
}

return true, nil
return true, err
}

// reader holds the (r)lock
Expand Down Expand Up @@ -190,28 +199,43 @@ func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) {
}
}

// writer holds the exclusive lock; put releases it
func (s *BadgerMarkSet) put(key string) error {
// writer holds the exclusive lock
func (s *BadgerMarkSet) put(key string) (write bool, seqno int) {
s.pend[key] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
s.mx.Unlock()
return nil
return false, 0
}

pend := s.pend
seqno := s.seqno
seqno = s.seqno
s.seqno++
s.writing[seqno] = pend
s.writing[seqno] = s.pend
s.pend = make(map[string]struct{})

return true, seqno
}

func (s *BadgerMarkSet) write(seqno int) (err error) {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}

pend := s.writing[seqno]
s.writers++
s.mx.Unlock()

defer func() {
s.mx.Lock()
defer s.mx.Unlock()

s.version++
delete(s.writing, seqno)
if len(s.writing) == 0 {
if err == nil {
delete(s.writing, seqno)
s.version++
}

s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()
Expand All @@ -222,12 +246,12 @@ func (s *BadgerMarkSet) put(key string) error {
defer batch.Cancel()

for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
if err = batch.Set([]byte(k), empty); err != nil {
return xerrors.Errorf("error setting batch: %w", err)
}
}

err := batch.Flush()
err = batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
Expand All @@ -243,7 +267,7 @@ func (s *BadgerMarkSet) Close() error {
return nil
}

for len(s.writing) > 0 {
for s.writers > 0 {
s.cond.Wait()
}

Expand Down

0 comments on commit 79f348a

Please sign in to comment.