diff --git a/posting/list.go b/posting/list.go index 8e34065bf50..4dbd32d5296 100644 --- a/posting/list.go +++ b/posting/list.go @@ -65,7 +65,7 @@ const ( // Metadata Bit which is stored to find out whether the stored value is pl or byte slice. bitUidPostings byte = 0x01 - maxUidsLen = 2 + maxUidsLen = 64000 ) func init() { @@ -620,15 +620,26 @@ func TypeID(edge *protos.DirectedEdge) types.TypeID { // anything to disk. Some other background routine will be responsible for merging // changes in mutation layers to RocksDB. Returns whether any mutation happens. func (l *List) AddMutation(ctx context.Context, t *protos.DirectedEdge) (bool, error) { + var index uint64 + if rv, ok := ctx.Value("raft").(x.RaftValue); ok { + index = rv.Index + } + l.RLock() + sync := len(l.pending) > 0 && index > l.pending[0]+4000 + l.RUnlock() + if sync { + l.SyncIfDirty(false) + } + hasMutated, err := l.addMutationHelper(ctx, t, false) if err != nil { return false, err } if hasMutated { - if rv, ok := ctx.Value("raft").(x.RaftValue); ok { - l.water.Ch <- x.Mark{Index: rv.Index} + if index != 0 { + l.water.Ch <- x.Mark{Index: index} l.Lock() - l.pending = append(l.pending, rv.Index) + l.pending = append(l.pending, index) l.Unlock() } if dirtyChan != nil { @@ -640,14 +651,22 @@ func (l *List) AddMutation(ctx context.Context, t *protos.DirectedEdge) (bool, e func (l *List) addMutation(ctx context.Context, t *protos.DirectedEdge) (bool, error) { l.AssertLock() + var index uint64 + if rv, ok := ctx.Value("raft").(x.RaftValue); ok { + index = rv.Index + } + if len(l.pending) > 0 && index > l.pending[0]+4000 { + l.syncIfDirty(false) + } + hasMutated, err := l.addMutationHelper(ctx, t, true) if err != nil { return false, err } if hasMutated { - if rv, ok := ctx.Value("raft").(x.RaftValue); ok { - l.water.Ch <- x.Mark{Index: rv.Index} - l.pending = append(l.pending, rv.Index) + if index != 0 { + l.water.Ch <- x.Mark{Index: index} + l.pending = append(l.pending, index) } if dirtyChan != nil { dirtyChan <- l.key @@ -873,6 +892,11 @@ func (l *List) Length(afterUid uint64) int { func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) { l.Lock() defer l.Unlock() + return l.syncIfDirty(delFromCache) +} + +func (l *List) syncIfDirty(delFromCache bool) (committed bool, err error) { + l.AssertLock() // deleteAll is used to differentiate when we don't have any updates, v/s // when we have explicitly deleted everything. wb := make([]*badger.Entry, 0, 1)