Skip to content

Commit

Permalink
Revert "try to reuse the info.Queue conversion has a negative perform…
Browse files Browse the repository at this point in the history
…ance effect"

This reverts commit 2ecd4fc.
  • Loading branch information
jameinel committed Jun 15, 2017
1 parent a3e83d6 commit b5ff827
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 49 deletions.
60 changes: 17 additions & 43 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,41 +159,33 @@ const preloadBatchSize = 100

func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error {
seen[t.Id] = t
// we shouldn't need this one anymore because we are processing it now
delete(preloaded, t.Id)
err := f.advance(t, nil, false)
if err != errPreReqs {
return err
}
toPreload := make([]bson.ObjectId, 0)
for _, dkey := range t.docKeys() {
queue := f.queue[dkey]
remaining := make([]bson.ObjectId, 0, len(queue))
remaining := make([]bson.ObjectId, 0, len(f.queue[dkey]))
toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey]))
for _, dtt := range f.queue[dkey] {
id := dtt.id()
if seen[id] != nil {
if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil {
continue
}
toPreload[id] = struct{}{}
remaining = append(remaining, id)
}

// done with this map
toPreload = nil
for len(remaining) > 0 {
toPreload = toPreload[:0]
batchSize := preloadBatchSize
if batchSize > len(remaining) {
batchSize = len(remaining)
}
batch := remaining[:batchSize]
remaining = remaining[batchSize:]
for _, id := range batch {
if preloaded[id] == nil {
toPreload = append(toPreload, id)
}
batch := remaining
if len(batch) > preloadBatchSize {
batch = remaining[:preloadBatchSize]
}
if len(toPreload) > 0 {
if err := f.loadMulti(toPreload, preloaded); err != nil {
return err
}
remaining = remaining[len(batch):]
err := f.loadMulti(batch, preloaded)
if err != nil {
return err
}
for _, id := range batch {
if seen[id] != nil {
Expand Down Expand Up @@ -310,8 +302,6 @@ NextDoc:
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
// We updated the Q, so this should force refresh
// TODO: We could *just* add the new txn-queue entry/reuse existing tokens
f.queue[dkey] = tokensWithIds(info.Queue)
f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
continue NextDoc
Expand Down Expand Up @@ -373,14 +363,8 @@ NextDoc:
} else {
f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
}
existRevno, rok := revno[dkey]
existQ, qok := f.queue[dkey]
if rok && qok && existRevno == info.Revno && len(existQ) == len(info.Queue) {
// We've already loaded this doc, no need to load it again
} else {
revno[dkey] = info.Revno
f.queue[dkey] = tokensWithIds(info.Queue)
}
revno[dkey] = info.Revno
f.queue[dkey] = tokensWithIds(info.Queue)
continue NextDoc
}
}
Expand Down Expand Up @@ -514,25 +498,15 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error)
goto RetryDoc
}
revno[dkey] = info.Revno
// TODO(jam): 2017-05-31: linear search for each token in info.Queue during all rescans is potentially O(N^2)
// if we first checked to see that we've already loaded this info.Queue in f.queue, we could use a different
// structure (map) to do faster lookups to see if the tokens are already present.

found := false
for _, id := range info.Queue {
if id == tt {
found = true
break
}
}
// f.queue[dkey] = tokensWithIds(info.Queue, &RescanUpdatedQueue)
existQ, qok := f.queue[dkey]
if qok && len(existQ) == len(info.Queue) {
// we could check that info.Q matches existQ.tt
} else {
if len(existQ) != len(info.Queue) {
}
f.queue[dkey] = tokensWithIds(info.Queue)
}
f.queue[dkey] = tokensWithIds(info.Queue)
if !found {
// Rescanned transaction id was not in the queue. This could mean one
// of three things:
Expand Down
9 changes: 3 additions & 6 deletions txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,8 @@ func newNonce() string {

type token string

func (tt token) id() bson.ObjectId {
return bson.ObjectIdHex(string(tt[:24]))
}
func (tt token) nonce() string { return string(tt[25:]) }
func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) }
func (tt token) nonce() string { return string(tt[25:]) }

// Op represents an operation to a single document that may be
// applied as part of a transaction with other operations.
Expand Down Expand Up @@ -332,8 +330,6 @@ func (r *Runner) ResumeAll() (err error) {
panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State))
}
}
// TODO(jam): 2017-06-04 This is not calling iter.Close() and dealing with
// any error it might encounter (db connection closed, etc.)
return nil
}

Expand Down Expand Up @@ -482,6 +478,7 @@ func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*tra
return nil
}


type typeNature int

const (
Expand Down

0 comments on commit b5ff827

Please sign in to comment.