diff --git a/txn/flusher.go b/txn/flusher.go index eb9449003..d668da8d7 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -159,41 +159,33 @@ const preloadBatchSize = 100 func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t - // we shouldn't need this one anymore because we are processing it now delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } - toPreload := make([]bson.ObjectId, 0) for _, dkey := range t.docKeys() { - queue := f.queue[dkey] - remaining := make([]bson.ObjectId, 0, len(queue)) + remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) + toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) for _, dtt := range f.queue[dkey] { id := dtt.id() - if seen[id] != nil { + if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } + toPreload[id] = struct{}{} remaining = append(remaining, id) } - + // done with this map + toPreload = nil for len(remaining) > 0 { - toPreload = toPreload[:0] - batchSize := preloadBatchSize - if batchSize > len(remaining) { - batchSize = len(remaining) - } - batch := remaining[:batchSize] - remaining = remaining[batchSize:] - for _, id := range batch { - if preloaded[id] == nil { - toPreload = append(toPreload, id) - } + batch := remaining + if len(batch) > preloadBatchSize { + batch = remaining[:preloadBatchSize] } - if len(toPreload) > 0 { - if err := f.loadMulti(toPreload, preloaded); err != nil { - return err - } + remaining = remaining[len(batch):] + err := f.loadMulti(batch, preloaded) + if err != nil { + return err } for _, id := range batch { if seen[id] != nil { @@ -310,8 +302,6 @@ NextDoc: if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno - // We updated the Q, so this should force refresh - // TODO: We could *just* add the new txn-queue entry/reuse existing tokens f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc @@ -373,14 +363,8 @@ NextDoc: } else { f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } - existRevno, rok := revno[dkey] - existQ, qok := f.queue[dkey] - if rok && qok && existRevno == info.Revno && len(existQ) == len(info.Queue) { - // We've already loaded this doc, no need to load it again - } else { - revno[dkey] = info.Revno - f.queue[dkey] = tokensWithIds(info.Queue) - } + revno[dkey] = info.Revno + f.queue[dkey] = tokensWithIds(info.Queue) continue NextDoc } } @@ -514,9 +498,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) goto RetryDoc } revno[dkey] = info.Revno - // TODO(jam): 2017-05-31: linear search for each token in info.Queue during all rescans is potentially O(N^2) - // if we first checked to see that we've already loaded this info.Queue in f.queue, we could use a different - // structure (map) to do faster lookups to see if the tokens are already present. + found := false for _, id := range info.Queue { if id == tt { @@ -524,15 +506,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - // f.queue[dkey] = tokensWithIds(info.Queue, &RescanUpdatedQueue) - existQ, qok := f.queue[dkey] - if qok && len(existQ) == len(info.Queue) { - // we could check that info.Q matches existQ.tt - } else { - if len(existQ) != len(info.Queue) { - } - f.queue[dkey] = tokensWithIds(info.Queue) - } + f.queue[dkey] = tokensWithIds(info.Queue) if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: diff --git a/txn/txn.go b/txn/txn.go index 9aefdeb7a..3bc6e640f 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -136,10 +136,8 @@ func newNonce() string { type token string -func (tt token) id() bson.ObjectId { - return bson.ObjectIdHex(string(tt[:24])) -} -func (tt token) nonce() string { return string(tt[25:]) } +func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) } +func (tt token) nonce() string { return string(tt[25:]) } // Op represents an operation to a single document that may be // applied as part of a transaction with other operations. @@ -332,8 +330,6 @@ func (r *Runner) ResumeAll() (err error) { panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) } } - // TODO(jam): 2017-06-04 This is not calling iter.Close() and dealing with - // any error it might encounter (db connection closed, etc.) return nil } @@ -482,6 +478,7 @@ func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*tra return nil } + type typeNature int const (