From e83b6efb90f9912af8b33cf30fa7c853ccd95660 Mon Sep 17 00:00:00 2001 From: zhaomei Date: Thu, 7 Jun 2018 17:02:31 +0800 Subject: [PATCH] findAndModify support writeConcern --- session.go | 44 ++++++++++++++++++++++++++++++++++---------- session_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/session.go b/session.go index 365b7f76b..4f84d2421 100644 --- a/session.go +++ b/session.go @@ -4895,11 +4895,13 @@ type findModifyCmd struct { Collection string `bson:"findAndModify"` Query, Update, Sort, Fields interface{} `bson:",omitempty"` Upsert, Remove, New bool `bson:",omitempty"` + WriteConcern interface{} `bson:",omitempty"` } type valueResult struct { - Value bson.Raw - LastError LastError `bson:"lastErrorObject"` + Value bson.Raw + LastError LastError `bson:"lastErrorObject"` + ConcernError writeConcernError `bson:writeConcernError` } // Apply runs the findAndModify MongoDB command, which allows updating, upserting @@ -4907,6 +4909,8 @@ type valueResult struct { // version (the default) or the new version of the document (when ReturnNew is true). // If no objects are found Apply returns ErrNotFound. // +// If the session is in safe mode, the LastError result will be returned as err. +// // The Sort and Select query methods affect the result of Apply. In case // multiple documents match the query, Sort enables selecting which document to // act upon by ordering it first. Select enables retrieving only a selection @@ -4943,15 +4947,27 @@ func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err dbname := op.collection[:c] cname := op.collection[c+1:] + // https://docs.mongodb.com/manual/reference/command/findAndModify/#dbcmd.findAndModify + session.m.RLock() + safeOp := session.safeOp + session.m.RUnlock() + var writeConcern interface{} + if safeOp == nil { + writeConcern = bson.D{{Name: "w", Value: 0}} + } else { + writeConcern = safeOp.query.(*getLastError) + } + cmd := findModifyCmd{ - Collection: cname, - Update: change.Update, - Upsert: change.Upsert, - Remove: change.Remove, - New: change.ReturnNew, - Query: op.query, - Sort: op.options.OrderBy, - Fields: op.selector, + Collection: cname, + Update: change.Update, + Upsert: change.Upsert, + Remove: change.Remove, + New: change.ReturnNew, + Query: op.query, + Sort: op.options.OrderBy, + Fields: op.selector, + WriteConcern: writeConcern, } session = session.Clone() @@ -4994,6 +5010,14 @@ func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err } else if change.Upsert { info.UpsertedId = lerr.UpsertedId } + if doc.ConcernError.Code != 0 { + var lerr LastError + e := doc.ConcernError + lerr.Code = e.Code + lerr.Err = e.ErrMsg + err = &lerr + return info, err + } return info, nil } diff --git a/session_test.go b/session_test.go index 08f330d91..dc85f5133 100644 --- a/session_test.go +++ b/session_test.go @@ -1370,6 +1370,38 @@ func (s *S) TestFindAndModify(c *C) { c.Assert(info, IsNil) } +func (s *S) TestFindAndModifyWriteConcern(c *C) { + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"fid": 42}) + c.Assert(err, IsNil) + + // Tweak the safety parameters to something unachievable. + session.SetSafe(&mgo.Safe{W: 4, WTimeout: 100}) + + var ret struct { + Id uint64 `bson:"id"` + } + + change := mgo.Change{ + Update: M{"$inc": M{"id": 8}}, + ReturnNew: false, + } + info, err := coll.Find(M{"id": M{"$exists": true}}).Apply(change, &ret) + c.Assert(err, ErrorMatches, "timeout|timed out waiting for slaves|Not enough data-bearing nodes|waiting for replication timed out") + if !s.versionAtLeast(2, 6) { + // 2.6 turned it into a query error. + c.Assert(err.(*mgo.LastError).WTimeout, Equals, true) + } + c.Assert(info.Updated, Equals, 1) + c.Assert(info.Matched, Equals, 1) + c.Assert(info.UpsertedId, NotNil) + c.Assert(ret.Id, Equals, 50) +} + func (s *S) TestFindAndModifyBug997828(c *C) { session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil)