Skip to content

Commit

Permalink
sync to disk once pending proposals reach 4k
Browse files Browse the repository at this point in the history
  • Loading branch information
Janardhan Reddy committed Aug 1, 2017
1 parent c873db8 commit 80c72d3
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const (

// Metadata Bit which is stored to find out whether the stored value is pl or byte slice.
bitUidPostings byte = 0x01
maxUidsLen = 2
maxUidsLen = 64000
)

func init() {
Expand Down Expand Up @@ -620,15 +620,26 @@ func TypeID(edge *protos.DirectedEdge) types.TypeID {
// anything to disk. Some other background routine will be responsible for merging
// changes in mutation layers to RocksDB. Returns whether any mutation happens.
func (l *List) AddMutation(ctx context.Context, t *protos.DirectedEdge) (bool, error) {
var index uint64
if rv, ok := ctx.Value("raft").(x.RaftValue); ok {
index = rv.Index
}
l.RLock()
sync := len(l.pending) > 0 && index > l.pending[0]+4000
l.RUnlock()
if sync {
l.SyncIfDirty(false)
}

hasMutated, err := l.addMutationHelper(ctx, t, false)
if err != nil {
return false, err
}
if hasMutated {
if rv, ok := ctx.Value("raft").(x.RaftValue); ok {
l.water.Ch <- x.Mark{Index: rv.Index}
if index != 0 {
l.water.Ch <- x.Mark{Index: index}
l.Lock()
l.pending = append(l.pending, rv.Index)
l.pending = append(l.pending, index)
l.Unlock()
}
if dirtyChan != nil {
Expand All @@ -640,14 +651,22 @@ func (l *List) AddMutation(ctx context.Context, t *protos.DirectedEdge) (bool, e

func (l *List) addMutation(ctx context.Context, t *protos.DirectedEdge) (bool, error) {
l.AssertLock()
var index uint64
if rv, ok := ctx.Value("raft").(x.RaftValue); ok {
index = rv.Index
}
if len(l.pending) > 0 && index > l.pending[0]+4000 {
l.syncIfDirty(false)
}

hasMutated, err := l.addMutationHelper(ctx, t, true)
if err != nil {
return false, err
}
if hasMutated {
if rv, ok := ctx.Value("raft").(x.RaftValue); ok {
l.water.Ch <- x.Mark{Index: rv.Index}
l.pending = append(l.pending, rv.Index)
if index != 0 {
l.water.Ch <- x.Mark{Index: index}
l.pending = append(l.pending, index)
}
if dirtyChan != nil {
dirtyChan <- l.key
Expand Down Expand Up @@ -873,6 +892,11 @@ func (l *List) Length(afterUid uint64) int {
func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {
l.Lock()
defer l.Unlock()
return l.syncIfDirty(delFromCache)
}

func (l *List) syncIfDirty(delFromCache bool) (committed bool, err error) {
l.AssertLock()
// deleteAll is used to differentiate when we don't have any updates, v/s
// when we have explicitly deleted everything.
wb := make([]*badger.Entry, 0, 1)
Expand Down

0 comments on commit 80c72d3

Please sign in to comment.