diff --git a/auth.go b/auth.go index ed3baad15..75d2ebc36 100644 --- a/auth.go +++ b/auth.go @@ -170,6 +170,12 @@ func (socket *mongoSocket) Login(cred Credential) error { return nil } } + if socket.dropLogout(cred) { + debugf("Socket %p to %s: login: db=%q user=%q (cached)", socket, socket.addr, cred.Source, cred.Username) + socket.creds = append(socket.creds, cred) + socket.Unlock() + return nil + } socket.Unlock() debugf("Socket %p to %s: login: db=%q user=%q", socket, socket.addr, cred.Source, cred.Username) @@ -406,50 +412,36 @@ func (socket *mongoSocket) Logout(db string) { cred, found := socket.dropAuth(db) if found { debugf("Socket %p to %s: logout: db=%q (flagged)", socket, socket.addr, db) - socket.Unlock() - err := socket.flushLogout(cred) - if err != nil { - debugf("fail to logout for cred %v; error: %v", cred, err) - } - } else { - socket.Unlock() + socket.logout = append(socket.logout, cred) } + socket.Unlock() } func (socket *mongoSocket) LogoutAll() { socket.Lock() if l := len(socket.creds); l > 0 { - credCopy := make([]Credential, l) - copy(credCopy, socket.creds) - socket.creds = socket.creds[0:0] - socket.Unlock() debugf("Socket %p to %s: logout all (flagged %d)", socket, socket.addr, l) - err := socket.flushLogout(credCopy...) - if err != nil { - debugf("fail to logout for cred %v, error: %v", credCopy, err) - } - } else { - socket.Unlock() + socket.logout = append(socket.logout, socket.creds...) + socket.creds = socket.creds[0:0] } + socket.Unlock() } -func (socket *mongoSocket) flushLogout(cred ...Credential) error { - if l := len(cred); l > 0 { +func (socket *mongoSocket) flushLogout() (ops []interface{}) { + socket.Lock() + if l := len(socket.logout); l > 0 { debugf("Socket %p to %s: logout all (flushing %d)", socket, socket.addr, l) - ops := make([]interface{}, l) for i := 0; i != l; i++ { op := queryOp{} op.query = &logoutCmd{1} - op.collection = cred[i].Source + ".$cmd" + op.collection = socket.logout[i].Source + ".$cmd" op.limit = -1 - ops[i] = &op - } - err := socket.Query(ops...) - if err != nil { - return fmt.Errorf("fail to logout: %v", err) + ops = append(ops, &op) } + socket.logout = socket.logout[0:0] } - return nil + socket.Unlock() + return } func (socket *mongoSocket) dropAuth(db string) (cred Credential, found bool) { @@ -462,3 +454,14 @@ func (socket *mongoSocket) dropAuth(db string) (cred Credential, found bool) { } return cred, false } + +func (socket *mongoSocket) dropLogout(cred Credential) (found bool) { + for i, sockCred := range socket.logout { + if sockCred == cred { + copy(socket.logout[i:], socket.logout[i+1:]) + socket.logout = socket.logout[:len(socket.logout)-1] + return true + } + } + return false +} diff --git a/auth_test.go b/auth_test.go index b7298786f..689812477 100644 --- a/auth_test.go +++ b/auth_test.go @@ -46,7 +46,7 @@ import ( func (s *S) TestAuthLoginDatabase(c *C) { // Test both with a normal database and with an authenticated shard. for _, addr := range []string{"localhost:40002", "localhost:40203"} { - session, err := mgo.Dial(addr + expFeaturesString) + session, err := mgo.Dial(addr) c.Assert(err, IsNil) defer session.Close() @@ -70,7 +70,7 @@ func (s *S) TestAuthLoginDatabase(c *C) { func (s *S) TestAuthLoginSession(c *C) { // Test both with a normal database and with an authenticated shard. for _, addr := range []string{"localhost:40002", "localhost:40203"} { - session, err := mgo.Dial(addr + expFeaturesString) + session, err := mgo.Dial(addr) c.Assert(err, IsNil) defer session.Close() @@ -123,7 +123,7 @@ func (s *S) TestAuthLoginLogout(c *C) { } func (s *S) TestAuthLoginLogoutAll(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -171,7 +171,7 @@ func (s *S) TestAuthUpsertUser(c *C) { if !s.versionAtLeast(2, 4) { c.Skip("UpsertUser only works on 2.4+") } - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -252,7 +252,7 @@ func (s *S) TestAuthUpsertUserOtherDBRoles(c *C) { if !s.versionAtLeast(2, 4) { c.Skip("UpsertUser only works on 2.4+") } - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -285,7 +285,7 @@ func (s *S) TestAuthUpsertUserUpdates(c *C) { if !s.versionAtLeast(2, 4) { c.Skip("UpsertUser only works on 2.4+") } - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -313,7 +313,7 @@ func (s *S) TestAuthUpsertUserUpdates(c *C) { c.Assert(err, IsNil) // Login with the new user. - usession, err := mgo.Dial("myruser:mynewpass@localhost:40002/mydb" + expFeaturesString) + usession, err := mgo.Dial("myruser:mynewpass@localhost:40002/mydb") c.Assert(err, IsNil) defer usession.Close() @@ -332,7 +332,7 @@ func (s *S) TestAuthUpsertUserUpdates(c *C) { c.Assert(err, IsNil) // Dial again to ensure the password hasn't changed. - usession, err = mgo.Dial("myruser:mynewpass@localhost:40002/mydb" + expFeaturesString) + usession, err = mgo.Dial("myruser:mynewpass@localhost:40002/mydb") c.Assert(err, IsNil) defer usession.Close() @@ -342,7 +342,7 @@ func (s *S) TestAuthUpsertUserUpdates(c *C) { } func (s *S) TestAuthAddUser(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -373,7 +373,7 @@ func (s *S) TestAuthAddUser(c *C) { } func (s *S) TestAuthAddUserReplaces(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -400,7 +400,7 @@ func (s *S) TestAuthAddUserReplaces(c *C) { } func (s *S) TestAuthRemoveUser(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -421,7 +421,7 @@ func (s *S) TestAuthRemoveUser(c *C) { } func (s *S) TestAuthLoginTwiceDoesNothing(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -439,7 +439,7 @@ func (s *S) TestAuthLoginTwiceDoesNothing(c *C) { } func (s *S) TestAuthLoginLogoutLoginDoesNothing(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -449,17 +449,16 @@ func (s *S) TestAuthLoginLogoutLoginDoesNothing(c *C) { oldStats := mgo.GetStats() - admindb.Logout() // 1 op - err = admindb.Login("root", "rapadura") // 3 op + admindb.Logout() + err = admindb.Login("root", "rapadura") c.Assert(err, IsNil) newStats := mgo.GetStats() - // Logout is flush directly - c.Assert(newStats.SentOps, Equals, oldStats.SentOps+4) + c.Assert(newStats.SentOps, Equals, oldStats.SentOps) } func (s *S) TestAuthLoginSwitchUser(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -486,7 +485,7 @@ func (s *S) TestAuthLoginSwitchUser(c *C) { } func (s *S) TestAuthLoginChangePassword(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -515,7 +514,7 @@ func (s *S) TestAuthLoginChangePassword(c *C) { } func (s *S) TestAuthLoginCachingWithSessionRefresh(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -531,7 +530,7 @@ func (s *S) TestAuthLoginCachingWithSessionRefresh(c *C) { } func (s *S) TestAuthLoginCachingWithSessionCopy(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -548,7 +547,7 @@ func (s *S) TestAuthLoginCachingWithSessionCopy(c *C) { } func (s *S) TestAuthLoginCachingWithSessionClone(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -565,7 +564,7 @@ func (s *S) TestAuthLoginCachingWithSessionClone(c *C) { } func (s *S) TestAuthLoginCachingWithNewSession(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -585,7 +584,7 @@ func (s *S) TestAuthLoginCachingAcrossPool(c *C) { // Logins are cached even when the connection goes back // into the pool. - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -618,9 +617,9 @@ func (s *S) TestAuthLoginCachingAcrossPool(c *C) { err = other.DB("mydb").Login("myuser", "mypass") c.Assert(err, IsNil) - // No more caching, logout is flush directly + // Both logins were cached, so no ops. newStats := mgo.GetStats() - c.Assert(newStats.SentOps, Equals, oldStats.SentOps+6) + c.Assert(newStats.SentOps, Equals, oldStats.SentOps) // And they actually worked. err = other.DB("mydb").C("mycoll").Insert(M{"n": 1}) @@ -636,7 +635,7 @@ func (s *S) TestAuthLoginCachingAcrossPoolWithLogout(c *C) { // Now verify that logouts are properly flushed if they // are not revalidated after leaving the pool. - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -665,12 +664,12 @@ func (s *S) TestAuthLoginCachingAcrossPoolWithLogout(c *C) { oldStats := mgo.GetStats() - err = other.DB("mydb").Login("myuser", "mypass") // 3 op + err = other.DB("mydb").Login("myuser", "mypass") c.Assert(err, IsNil) - // No more caching, logout is flush directly + // Login was cached, so no ops. newStats := mgo.GetStats() - c.Assert(newStats.SentOps, Equals, oldStats.SentOps+3) + c.Assert(newStats.SentOps, Equals, oldStats.SentOps) // Can't write, since root has been implicitly logged out // when the collection went into the pool, and not revalidated. @@ -687,7 +686,7 @@ func (s *S) TestAuthLoginCachingAcrossPoolWithLogout(c *C) { func (s *S) TestAuthEventual(c *C) { // Eventual sessions don't keep sockets around, so they are // an interesting test case. - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -723,7 +722,7 @@ func (s *S) TestAuthEventual(c *C) { } func (s *S) TestAuthURL(c *C) { - session, err := mgo.Dial("mongodb://root:rapadura@localhost:40002/" + expFeaturesString) + session, err := mgo.Dial("mongodb://root:rapadura@localhost:40002/") c.Assert(err, IsNil) defer session.Close() @@ -732,7 +731,7 @@ func (s *S) TestAuthURL(c *C) { } func (s *S) TestAuthURLWrongCredentials(c *C) { - session, err := mgo.Dial("mongodb://root:wrong@localhost:40002/" + expFeaturesString) + session, err := mgo.Dial("mongodb://root:wrong@localhost:40002/") if session != nil { session.Close() } @@ -743,7 +742,7 @@ func (s *S) TestAuthURLWrongCredentials(c *C) { func (s *S) TestAuthURLWithNewSession(c *C) { // When authentication is in the URL, the new session will // actually carry it on as well, even if logged out explicitly. - session, err := mgo.Dial("mongodb://root:rapadura@localhost:40002/" + expFeaturesString) + session, err := mgo.Dial("mongodb://root:rapadura@localhost:40002/") c.Assert(err, IsNil) defer session.Close() @@ -760,7 +759,7 @@ func (s *S) TestAuthURLWithNewSession(c *C) { } func (s *S) TestAuthURLWithDatabase(c *C) { - session, err := mgo.Dial("mongodb://root:rapadura@localhost:40002" + expFeaturesString) + session, err := mgo.Dial("mongodb://root:rapadura@localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -772,9 +771,9 @@ func (s *S) TestAuthURLWithDatabase(c *C) { for i := 0; i < 2; i++ { var url string if i == 0 { - url = "mongodb://myruser:mypass@localhost:40002/mydb" + expFeaturesString + url = "mongodb://myruser:mypass@localhost:40002/mydb" } else { - url = "mongodb://myruser:mypass@localhost:40002/admin?authSource=mydb" + "&" + string(expFeaturesString[1:]) + url = "mongodb://myruser:mypass@localhost:40002/admin?authSource=mydb" } usession, err := mgo.Dial(url) c.Assert(err, IsNil) @@ -798,7 +797,7 @@ func (s *S) TestDefaultDatabase(c *C) { } for _, test := range tests { - session, err := mgo.Dial(test.url + expFeaturesString) + session, err := mgo.Dial(test.url) c.Assert(err, IsNil) defer session.Close() @@ -815,7 +814,7 @@ func (s *S) TestAuthDirect(c *C) { // Direct connections must work to the master and slaves. for _, port := range []string{"40031", "40032", "40033"} { url := fmt.Sprintf("mongodb://root:rapadura@localhost:%s/?connect=direct", port) - session, err := mgo.Dial(url + "&" + string(expFeaturesString[1:])) + session, err := mgo.Dial(url) c.Assert(err, IsNil) defer session.Close() @@ -831,7 +830,7 @@ func (s *S) TestAuthDirectWithLogin(c *C) { // Direct connections must work to the master and slaves. for _, port := range []string{"40031", "40032", "40033"} { url := fmt.Sprintf("mongodb://localhost:%s/?connect=direct", port) - session, err := mgo.Dial(url + "&" + string(expFeaturesString[1:])) + session, err := mgo.Dial(url) c.Assert(err, IsNil) defer session.Close() @@ -859,7 +858,7 @@ func (s *S) TestAuthScramSha1Cred(c *C) { } host := "localhost:40002" c.Logf("Connecting to %s...", host) - session, err := mgo.Dial(host + expFeaturesString) + session, err := mgo.Dial(host) c.Assert(err, IsNil) defer session.Close() @@ -885,7 +884,7 @@ func (s *S) TestAuthScramSha1URL(c *C) { } host := "localhost:40002" c.Logf("Connecting to %s...", host) - session, err := mgo.Dial(fmt.Sprintf("root:rapadura@%s?authMechanism=SCRAM-SHA-1&%s", host, string(expFeaturesString[1:]))) + session, err := mgo.Dial(fmt.Sprintf("root:rapadura@%s?authMechanism=SCRAM-SHA-1", host)) c.Assert(err, IsNil) defer session.Close() @@ -897,7 +896,7 @@ func (s *S) TestAuthScramSha1URL(c *C) { } func (s *S) TestAuthX509Cred(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() binfo, err := session.BuildInfo() @@ -922,9 +921,6 @@ func (s *S) TestAuthX509Cred(c *C) { c.Logf("Connecting to %s...", host) session, err = mgo.DialWithInfo(&mgo.DialInfo{ Addrs: []string{host}, - ExperimentalFeatures: map[string]bool{ - "opmsg": true, - }, DialServer: func(addr *mgo.ServerAddr) (net.Conn, error) { return tls.Dial("tcp", addr.String(), tlsConfig) }, @@ -969,7 +965,7 @@ func (s *S) TestAuthX509Cred(c *C) { } func (s *S) TestAuthX509CredRDNConstruction(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() binfo, err := session.BuildInfo() @@ -996,9 +992,6 @@ func (s *S) TestAuthX509CredRDNConstruction(c *C) { c.Logf("Connecting to %s...", host) session, err = mgo.DialWithInfo(&mgo.DialInfo{ Addrs: []string{host}, - ExperimentalFeatures: map[string]bool{ - "opmsg": true, - }, DialServer: func(addr *mgo.ServerAddr) (net.Conn, error) { return tls.Dial("tcp", addr.String(), tlsConfig) }, diff --git a/bulk_test.go b/bulk_test.go index cc552c614..fa91dc44c 100644 --- a/bulk_test.go +++ b/bulk_test.go @@ -32,7 +32,7 @@ import ( ) func (s *S) TestBulkInsert(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -52,7 +52,7 @@ func (s *S) TestBulkInsert(c *C) { } func (s *S) TestBulkInsertError(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -73,7 +73,7 @@ func (s *S) TestBulkInsertError(c *C) { } func (s *S) TestBulkInsertErrorUnordered(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -100,7 +100,7 @@ func (s *S) TestBulkInsertErrorUnorderedSplitBatch(c *C) { // into the proper size and delivers them one by one. This test ensures that // the behavior of unordered (that is, continue on error) remains correct // when errors happen and there are batches left. - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -132,7 +132,7 @@ func (s *S) TestBulkInsertErrorUnorderedSplitBatch(c *C) { } func (s *S) TestBulkErrorString(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -180,7 +180,7 @@ func (s *S) TestBulkErrorCases_2_6(c *C) { if !s.versionAtLeast(2, 6) { c.Skip("2.4- has poor bulk reporting") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -219,7 +219,7 @@ func (s *S) TestBulkErrorCases_2_4(c *C) { if s.versionAtLeast(2, 6) { c.Skip("2.6+ has better reporting") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -255,7 +255,7 @@ func (s *S) TestBulkErrorCases_2_4(c *C) { } func (s *S) TestBulkErrorCasesOrdered(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -289,7 +289,7 @@ func (s *S) TestBulkErrorCasesOrdered(c *C) { } func (s *S) TestBulkUpdate(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -318,7 +318,7 @@ func (s *S) TestBulkUpdate(c *C) { } func (s *S) TestBulkUpdateOver1000(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -340,7 +340,7 @@ func (s *S) TestBulkUpdateOver1000(c *C) { } func (s *S) TestBulkUpdateError(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -367,7 +367,7 @@ func (s *S) TestBulkUpdateError(c *C) { } func (s *S) TestBulkUpdateErrorUnordered(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -395,7 +395,7 @@ func (s *S) TestBulkUpdateErrorUnordered(c *C) { } func (s *S) TestBulkUpdateAll(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -424,7 +424,7 @@ func (s *S) TestBulkUpdateAll(c *C) { } func (s *S) TestBulkMixedUnordered(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -454,7 +454,7 @@ func (s *S) TestBulkMixedUnordered(c *C) { } func (s *S) TestBulkUpsert(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -478,7 +478,7 @@ func (s *S) TestBulkUpsert(c *C) { } func (s *S) TestBulkRemove(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -502,7 +502,7 @@ func (s *S) TestBulkRemove(c *C) { } func (s *S) TestBulkRemoveAll(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() diff --git a/cluster.go b/cluster.go index 262e7eafb..7fc639c24 100644 --- a/cluster.go +++ b/cluster.go @@ -132,17 +132,15 @@ func (cluster *mongoCluster) removeServer(server *mongoServer) { } type isMasterResult struct { - IsMaster bool - Secondary bool - Primary string - Hosts []string - Passives []string - Tags bson.D - Msg string - SetName string `bson:"setName"` - MaxWireVersion int `bson:"maxWireVersion"` - MaxWriteBatchSize int `bson:"maxWriteBatchSize"` - MaxMessageSizeBytes int `bson:"maxMessageSizeBytes"` + IsMaster bool + Secondary bool + Primary string + Hosts []string + Passives []string + Tags bson.D + Msg string + SetName string `bson:"setName"` + MaxWireVersion int `bson:"maxWireVersion"` } func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error { @@ -242,13 +240,11 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI } info = &mongoServerInfo{ - Master: result.IsMaster, - Mongos: result.Msg == "isdbgrid", - Tags: result.Tags, - SetName: result.SetName, - MaxWireVersion: result.MaxWireVersion, - MaxWriteBatchSize: result.MaxWriteBatchSize, - MaxMessageSizeBytes: result.MaxMessageSizeBytes, + Master: result.IsMaster, + Mongos: result.Msg == "isdbgrid", + Tags: result.Tags, + SetName: result.SetName, + MaxWireVersion: result.MaxWireVersion, } hosts = make([]string, 0, 1+len(result.Hosts)+len(result.Passives)) diff --git a/cluster_test.go b/cluster_test.go index 14d0db273..8945e0962 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -42,8 +42,7 @@ import ( ) func (s *S) TestNewSession(c *C) { - - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -97,8 +96,7 @@ func (s *S) TestNewSession(c *C) { } func (s *S) TestCloneSession(c *C) { - - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -163,7 +161,7 @@ func (s *S) TestCloneSession(c *C) { } func (s *S) TestModeStrong(c *C) { - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() @@ -202,7 +200,7 @@ func (s *S) TestModeStrong(c *C) { func (s *S) TestModeMonotonic(c *C) { // Must necessarily connect to a slave, otherwise the // master connection will be available first. - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() @@ -245,7 +243,7 @@ func (s *S) TestModeMonotonicAfterStrong(c *C) { // Test that a strong session shifting to a monotonic // one preserves the socket untouched. - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() @@ -282,7 +280,7 @@ func (s *S) TestModeStrongAfterMonotonic(c *C) { // Must necessarily connect to a slave, otherwise the // master connection will be available first. - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() @@ -317,7 +315,7 @@ func (s *S) TestModeStrongAfterMonotonic(c *C) { func (s *S) TestModeMonotonicWriteOnIteration(c *C) { // Must necessarily connect to a slave, otherwise the // master connection will be available first. - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() @@ -362,7 +360,7 @@ func (s *S) TestModeMonotonicWriteOnIteration(c *C) { func (s *S) TestModeEventual(c *C) { // Must necessarily connect to a slave, otherwise the // master connection will be available first. - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() @@ -400,7 +398,7 @@ func (s *S) TestModeEventualAfterStrong(c *C) { // Test that a strong session shifting to an eventual // one preserves the socket untouched. - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() @@ -439,7 +437,7 @@ func (s *S) TestModeStrongFallover(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40021" + expFeaturesString) + session, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer session.Close() @@ -480,7 +478,7 @@ func (s *S) TestModePrimaryHiccup(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40021" + expFeaturesString) + session, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer session.Close() @@ -531,7 +529,7 @@ func (s *S) TestModeMonotonicFallover(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40021" + expFeaturesString) + session, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer session.Close() @@ -574,7 +572,7 @@ func (s *S) TestModeMonotonicWithSlaveFallover(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40021" + expFeaturesString) + session, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer session.Close() @@ -604,7 +602,7 @@ func (s *S) TestModeMonotonicWithSlaveFallover(c *C) { c.Fatal("Unknown host: ", ssresult.Host) } - session, err = mgo.Dial(addr + expFeaturesString) + session, err = mgo.Dial(addr) c.Assert(err, IsNil) defer session.Close() @@ -653,7 +651,7 @@ func (s *S) TestModeEventualFallover(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40021" + expFeaturesString) + session, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer session.Close() @@ -690,7 +688,7 @@ func (s *S) TestModeSecondaryJustPrimary(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -705,7 +703,7 @@ func (s *S) TestModeSecondaryPreferredJustPrimary(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -721,7 +719,7 @@ func (s *S) TestModeSecondaryPreferredFallover(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -780,7 +778,7 @@ func (s *S) TestModePrimaryPreferredFallover(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -823,7 +821,7 @@ func (s *S) TestModePrimaryFallover(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -850,7 +848,7 @@ func (s *S) TestModeSecondary(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -876,7 +874,7 @@ func (s *S) TestPreserveSocketCountOnSync(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -933,7 +931,7 @@ func (s *S) TestPreserveSocketCountOnSync(c *C) { // single connection was established. func (s *S) TestTopologySyncWithSingleMaster(c *C) { // Use hostname here rather than IP, to make things trickier. - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -957,7 +955,7 @@ func (s *S) TestTopologySyncWithSingleMaster(c *C) { func (s *S) TestTopologySyncWithSlaveSeed(c *C) { // That's supposed to be a slave. Must run discovery // and find out master to insert successfully. - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() @@ -990,7 +988,7 @@ func (s *S) TestSyncTimeout(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1017,7 +1015,7 @@ func (s *S) TestDialWithTimeout(c *C) { started := time.Now() // 40009 isn't used by the test servers. - session, err := mgo.DialWithTimeout("localhost:40009"+expFeaturesString, timeout) + session, err := mgo.DialWithTimeout("localhost:40009", timeout) if session != nil { session.Close() } @@ -1032,7 +1030,7 @@ func (s *S) TestSocketTimeout(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1063,7 +1061,7 @@ func (s *S) TestSocketTimeoutOnDial(c *C) { started := time.Now() - session, err := mgo.DialWithTimeout("localhost:40001"+expFeaturesString, timeout) + session, err := mgo.DialWithTimeout("localhost:40001", timeout) c.Assert(err, ErrorMatches, "no reachable servers") c.Assert(session, IsNil) @@ -1076,7 +1074,7 @@ func (s *S) TestSocketTimeoutOnInactiveSocket(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1131,9 +1129,6 @@ func (s *S) TestDialWithReplicaSetName(c *C) { Addrs: seedList, Timeout: 5 * time.Second, ReplicaSetName: "rs1", - ExperimentalFeatures: map[string]bool{ - "opmsg": true, - }, } session, err := mgo.DialWithInfo(&info) @@ -1163,7 +1158,7 @@ func (s *S) TestDialWithReplicaSetName(c *C) { } func (s *S) TestDirect(c *C) { - session, err := mgo.Dial("localhost:40012?connect=direct" + "&" + string(expFeaturesString[1:])) + session, err := mgo.Dial("localhost:40012?connect=direct") c.Assert(err, IsNil) defer session.Close() @@ -1207,7 +1202,7 @@ func (s *S) TestDirect(c *C) { } func (s *S) TestDirectToUnknownStateMember(c *C) { - session, err := mgo.Dial("localhost:40041?connect=direct" + "&" + string(expFeaturesString[1:])) + session, err := mgo.Dial("localhost:40041?connect=direct") c.Assert(err, IsNil) defer session.Close() @@ -1234,11 +1229,8 @@ func (s *S) TestDirectToUnknownStateMember(c *C) { func (s *S) TestFailFast(c *C) { info := mgo.DialInfo{ - Addrs: []string{"localhost:99999"}, - Timeout: 5 * time.Second, - ExperimentalFeatures: map[string]bool{ - "opmsg": true, - }, + Addrs: []string{"localhost:99999"}, + Timeout: 5 * time.Second, FailFast: true, } @@ -1252,7 +1244,7 @@ func (s *S) TestFailFast(c *C) { func (s *S) countQueries(c *C, server string) (n int) { defer func() { c.Logf("Queries for %q: %d", server, n) }() - session, err := mgo.Dial(server + "?connect=direct" + "&" + string(expFeaturesString[1:])) + session, err := mgo.Dial(server + "?connect=direct") c.Assert(err, IsNil) defer session.Close() session.SetMode(mgo.Monotonic, true) @@ -1274,7 +1266,7 @@ func (s *S) countQueries(c *C, server string) (n int) { func (s *S) countCommands(c *C, server, commandName string) (n int) { defer func() { c.Logf("Queries for %q: %d", server, n) }() - session, err := mgo.Dial(server + "?connect=direct" + "&" + string(expFeaturesString[1:])) + session, err := mgo.Dial(server + "?connect=direct") c.Assert(err, IsNil) defer session.Close() session.SetMode(mgo.Monotonic, true) @@ -1292,7 +1284,7 @@ func (s *S) TestMonotonicSlaveOkFlagWithMongos(c *C) { if s.versionAtLeast(3, 4) { c.Skip("fail on 3.4+ ? ") } - session, err := mgo.Dial("localhost:40021" + expFeaturesString) + session, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer session.Close() @@ -1311,7 +1303,7 @@ func (s *S) TestMonotonicSlaveOkFlagWithMongos(c *C) { s.Stop(":40201") s.StartAll() - mongos, err := mgo.Dial("localhost:40202" + expFeaturesString) + mongos, err := mgo.Dial("localhost:40202") c.Assert(err, IsNil) defer mongos.Close() @@ -1386,7 +1378,7 @@ func (s *S) TestSecondaryModeWithMongos(c *C) { if s.versionAtLeast(3, 4) { c.Skip("fail on 3.4+ ?") } - session, err := mgo.Dial("localhost:40021" + expFeaturesString) + session, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer session.Close() @@ -1405,7 +1397,7 @@ func (s *S) TestSecondaryModeWithMongos(c *C) { s.Stop(":40201") s.StartAll() - mongos, err := mgo.Dial("localhost:40202" + expFeaturesString) + mongos, err := mgo.Dial("localhost:40202") c.Assert(err, IsNil) defer mongos.Close() @@ -1480,7 +1472,7 @@ func (s *S) TestSecondaryModeWithMongosInsert(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40202" + expFeaturesString) + session, err := mgo.Dial("localhost:40202") c.Assert(err, IsNil) defer session.Close() @@ -1501,7 +1493,7 @@ func (s *S) TestRemovalOfClusterMember(c *C) { c.Skip("-fast") } - master, err := mgo.Dial("localhost:40021" + expFeaturesString) + master, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer master.Close() @@ -1582,11 +1574,11 @@ func (s *S) TestPoolLimitSimple(c *C) { var session *mgo.Session var err error if test == 0 { - session, err = mgo.Dial("localhost:40001" + expFeaturesString) + session, err = mgo.Dial("localhost:40001") c.Assert(err, IsNil) session.SetPoolLimit(1) } else { - session, err = mgo.Dial("localhost:40001?maxPoolSize=1" + "&" + string(expFeaturesString[1:])) + session, err = mgo.Dial("localhost:40001?maxPoolSize=1") c.Assert(err, IsNil) } defer session.Close() @@ -1619,7 +1611,7 @@ func (s *S) TestPoolLimitMany(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -1658,7 +1650,7 @@ func (s *S) TestPoolLimitMany(c *C) { } func (s *S) TestSetModeEventualIterBug(c *C) { - session1, err := mgo.Dial("localhost:40011" + expFeaturesString) + session1, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session1.Close() @@ -1682,7 +1674,7 @@ func (s *S) TestSetModeEventualIterBug(c *C) { } } - session2, err := mgo.Dial("localhost:40011" + expFeaturesString) + session2, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session2.Close() @@ -1775,7 +1767,7 @@ func (s *S) TestPrimaryShutdownOnAuthShard(c *C) { } // Dial the shard. - session, err := mgo.Dial("localhost:40203" + expFeaturesString) + session, err := mgo.Dial("localhost:40203") c.Assert(err, IsNil) defer session.Close() @@ -1786,7 +1778,7 @@ func (s *S) TestPrimaryShutdownOnAuthShard(c *C) { c.Assert(err, IsNil) // Dial the replica set to figure the master out. - rs, err := mgo.Dial("root:rapadura@localhost:40031" + expFeaturesString) + rs, err := mgo.Dial("root:rapadura@localhost:40031") c.Assert(err, IsNil) defer rs.Close() @@ -1832,7 +1824,7 @@ func (s *S) TestNearestSecondary(c *C) { rs1c := "127.0.0.1:40013" s.Freeze(rs1b) - session, err := mgo.Dial(rs1a + expFeaturesString) + session, err := mgo.Dial(rs1a) c.Assert(err, IsNil) defer session.Close() @@ -1897,7 +1889,7 @@ func (s *S) TestNearestServer(c *C) { rs1b := "127.0.0.1:40012" rs1c := "127.0.0.1:40013" - session, err := mgo.Dial(rs1a + expFeaturesString) + session, err := mgo.Dial(rs1a) c.Assert(err, IsNil) defer session.Close() @@ -1961,7 +1953,7 @@ func (s *S) TestConnectCloseConcurrency(c *C) { for i := 0; i < n; i++ { go func() { defer wg.Done() - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") if err != nil { c.Fatal(err) } @@ -1977,7 +1969,7 @@ func (s *S) TestSelectServers(c *C) { c.Skip("read preferences introduced in 2.2") } - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -2006,7 +1998,7 @@ func (s *S) TestSelectServersWithMongos(c *C) { c.Skip("fail on 3.4+") } - session, err := mgo.Dial("localhost:40021" + expFeaturesString) + session, err := mgo.Dial("localhost:40021") c.Assert(err, IsNil) defer session.Close() @@ -2037,7 +2029,7 @@ func (s *S) TestSelectServersWithMongos(c *C) { q23a := s.countQueries(c, "localhost:40023") // Do a SlaveOk query through MongoS - mongos, err := mgo.Dial("localhost:40202" + expFeaturesString) + mongos, err := mgo.Dial("localhost:40202") c.Assert(err, IsNil) defer mongos.Close() @@ -2095,7 +2087,7 @@ func (s *S) TestDoNotFallbackToMonotonic(c *C) { c.Skip("failing on 3.2.17+") } - session, err := mgo.Dial("localhost:40012" + expFeaturesString) + session, err := mgo.Dial("localhost:40012") c.Assert(err, IsNil) defer session.Close() diff --git a/gridfs_test.go b/gridfs_test.go index 984ded17c..9fdd0a26f 100644 --- a/gridfs_test.go +++ b/gridfs_test.go @@ -184,7 +184,7 @@ func (s *S) TestGridFSFileDetails(c *C) { } func (s *S) TestGridFSSetUploadDate(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -212,7 +212,7 @@ func (s *S) TestGridFSSetUploadDate(c *C) { } func (s *S) TestGridFSCreateWithChunking(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -293,7 +293,7 @@ func (s *S) TestGridFSCreateWithChunking(c *C) { } func (s *S) TestGridFSAbort(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -330,7 +330,7 @@ func (s *S) TestGridFSAbort(c *C) { } func (s *S) TestGridFSCloseConflict(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -358,7 +358,7 @@ func (s *S) TestGridFSCloseConflict(c *C) { } func (s *S) TestGridFSOpenNotFound(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -375,7 +375,7 @@ func (s *S) TestGridFSOpenNotFound(c *C) { } func (s *S) TestGridFSReadAll(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -412,7 +412,7 @@ func (s *S) TestGridFSReadAll(c *C) { } func (s *S) TestGridFSReadChunking(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -472,7 +472,7 @@ func (s *S) TestGridFSReadChunking(c *C) { } func (s *S) TestGridFSOpen(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -502,7 +502,7 @@ func (s *S) TestGridFSOpen(c *C) { } func (s *S) TestGridFSSeek(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -581,7 +581,7 @@ func (s *S) TestGridFSSeek(c *C) { } func (s *S) TestGridFSRemoveId(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -619,7 +619,7 @@ func (s *S) TestGridFSRemoveId(c *C) { } func (s *S) TestGridFSRemove(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -649,7 +649,7 @@ func (s *S) TestGridFSRemove(c *C) { } func (s *S) TestGridFSOpenNext(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() diff --git a/server.go b/server.go index ce6140941..7ad955255 100644 --- a/server.go +++ b/server.go @@ -36,12 +36,6 @@ import ( "github.com/globalsign/mgo/bson" ) -const ( - // default value for MongoDB 3.6 - defaultWriteBatchSize = 100000 - defaultMaxMessageSizeBytes = 48000000 -) - // --------------------------------------------------------------------------- // Mongo server encapsulation. @@ -73,15 +67,15 @@ func (dial dialer) isSet() bool { } type mongoServerInfo struct { - Master bool - Mongos bool - Tags bson.D - MaxWireVersion int - SetName string - MaxWriteBatchSize int - MaxMessageSizeBytes int + Master bool + Mongos bool + Tags bson.D + MaxWireVersion int + SetName string } +var defaultServerInfo mongoServerInfo + func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer { server := &mongoServer{ Addr: addr, @@ -89,11 +83,8 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) * tcpaddr: tcpaddr, sync: sync, dial: dial, - info: &mongoServerInfo{ - MaxWriteBatchSize: defaultWriteBatchSize, - MaxMessageSizeBytes: defaultMaxMessageSizeBytes, - }, - pingValue: time.Hour, // Push it back before an actual ping. + info: &defaultServerInfo, + pingValue: time.Hour, // Push it back before an actual ping. } go server.pinger(true) return server diff --git a/session.go b/session.go index e31323f92..d18ca0869 100644 --- a/session.go +++ b/session.go @@ -103,7 +103,6 @@ type Session struct { queryConfig query bypassValidation bool slaveOk bool - experimental map[string]bool } // Database holds collections of documents @@ -320,7 +319,6 @@ func ParseURL(url string) (*DialInfo, error) { poolLimit := 0 appName := "" readPreferenceMode := Primary - experimental := map[string]bool{} var readPreferenceTagSets []bson.D for _, opt := range uinfo.options { switch opt.key { @@ -342,13 +340,6 @@ func ParseURL(url string) (*DialInfo, error) { return nil, errors.New("appName too long, must be < 128 bytes: " + opt.value) } appName = opt.value - case "experimental": - switch opt.value { - case "opmsg": - experimental[opt.value] = true - default: - return nil, errors.New("unknow experimental feature: " + opt.value) - } case "readPreference": switch opt.value { case "nearest": @@ -408,8 +399,7 @@ func ParseURL(url string) (*DialInfo, error) { Mode: readPreferenceMode, TagSets: readPreferenceTagSets, }, - ReplicaSetName: setName, - ExperimentalFeatures: experimental, + ReplicaSetName: setName, } return &info, nil } @@ -489,12 +479,6 @@ type DialInfo struct { // WARNING: This field is obsolete. See DialServer above. Dial func(addr net.Addr) (net.Conn, error) - - // List of experimental feature to enable. Set the value to 'true' - // to enable a feature. - // Currently, experimental features are: - // - opmsg - ExperimentalFeatures map[string]bool } // ReadPreference defines the manner in which servers are chosen. @@ -585,12 +569,7 @@ func DialWithInfo(info *DialInfo) (*Session, error) { } else { session.SetMode(Strong, true) } - if len(info.ExperimentalFeatures) > 0 { - session.experimental = make(map[string]bool, 0) - for k, v := range info.ExperimentalFeatures { - session.experimental[k] = v - } - } + return session, nil } @@ -682,14 +661,6 @@ func copySession(session *Session, keepCreds bool) (s *Session) { } else if session.dialCred != nil { creds = []Credential{*session.dialCred} } - - var experimental map[string]bool - if len(session.experimental) > 0 { - experimental = make(map[string]bool, len(session.experimental)) - for k, v := range session.experimental { - experimental[k] = v - } - } scopy := Session{ defaultdb: session.defaultdb, sourcedb: session.sourcedb, @@ -707,7 +678,6 @@ func copySession(session *Session, keepCreds bool) (s *Session) { queryConfig: session.queryConfig, bypassValidation: session.bypassValidation, slaveOk: session.slaveOk, - experimental: experimental, } s = &scopy debugf("New session %p on cluster %p (copy from %p)", s, cluster, session) @@ -4966,15 +4936,13 @@ func (iter *Iter) replyFunc() replyFunc { } type writeCmdResult struct { - Ok bool `bson:"ok"` - N int `bson:"n"` - NModified int `bson:"nModified"` + Ok bool + N int + NModified int `bson:"nModified"` Upserted []struct { Index int Id interface{} `bson:"_id"` } - Code int `bson:"code"` - Errmsg string `bson:"errmsg"` ConcernError writeConcernError `bson:"writeConcernError"` Errors []writeCmdError `bson:"writeErrors"` } @@ -4998,140 +4966,6 @@ func (r *writeCmdResult) BulkErrorCases() []BulkErrorCase { return ecases } -func (c *Collection) writeOpWithOpMsg(socket *mongoSocket, serverInfo *mongoServerInfo, op interface{}, ordered, bypassValidation bool, safeOp *queryOp) (*LastError, error) { - var cmd bson.D - var documents []interface{} - var docSeqID string - canUseOpMsg := true - switch msgOp := op.(type) { - case *insertOp: - cmd = bson.D{ - {Name: "insert", Value: c.Name}, - } - docSeqID = "documents" - documents = msgOp.documents - case bulkUpdateOp: - cmd = bson.D{ - {Name: "update", Value: c.Name}, - } - docSeqID = "updates" - documents = msgOp - case bulkDeleteOp: - cmd = bson.D{ - {Name: "delete", Value: c.Name}, - } - docSeqID = "deletes" - documents = msgOp - default: - canUseOpMsg = false - } - - if canUseOpMsg { - //msg flags, see https://docs.mongodb.com/master/reference/mongodb-wire-protocol/#flag-bits - flags := uint32(0) - - var writeConcern interface{} - if safeOp == nil { - // unacknowledged writes - flags |= opMsgFlagMoreToCome - writeConcern = bson.D{{Name: "w", Value: 0}} - } else { - writeConcern = safeOp.query.(*getLastError) - } - - cmd = append(cmd, bson.DocElem{ - Name: "$db", Value: c.Database.Name, - }, bson.DocElem{ - Name: "ordered", Value: ordered, - }, bson.DocElem{ - Name: "writeConcern", Value: writeConcern, - }, bson.DocElem{ - Name: "bypassDocumentValidation", Value: bypassValidation, - }) - - body := msgSection{ - payloadType: msgPayload0, - data: cmd, - } - - n := 0 - modified := 0 - var errs []BulkErrorCase - var lerr LastError - - l := len(documents) - batchNb := (l / serverInfo.MaxWriteBatchSize) + 1 - if l != 0 && (l%serverInfo.MaxWriteBatchSize) == 0 { - batchNb-- - } - count := 0 - - for count < batchNb { - start := count * serverInfo.MaxWriteBatchSize - length := l - start - if length > serverInfo.MaxWriteBatchSize { - length = serverInfo.MaxWriteBatchSize - } - - docs := msgSection{ - payloadType: msgPayload1, - data: payloadType1{ - identifier: docSeqID, - docs: documents[start : start+length], - }, - } - count++ - - // CRC-32 checksum is not implemented in Mongodb 3.6 but - // will be in future release. It's optional, so no need - // to set it for the moment - newOp := &msgOp{ - flags: flags, - sections: []msgSection{body, docs}, - checksum: 0, - } - result, err := socket.sendMessage(newOp) - if err != nil { - return &lerr, err - } - // for some reason, command result format has changed and - // code|errmsg are sometimes top level fields in writeCommandResult - // TODO need to investigate further - if result.Code != 0 { - return &lerr, errors.New(result.Errmsg) - } - if result.ConcernError.Code != 0 { - return &lerr, errors.New(result.ConcernError.ErrMsg) - } - - n += result.N - modified += result.NModified - - if len(result.Errors) > 0 { - for _, e := range result.Errors { - errs = append(errs, BulkErrorCase{ - e.Index, - &QueryError{ - Code: e.Code, - Message: e.ErrMsg, - }, - }) - } - } - } - lerr = LastError{ - N: n, - modified: modified, - ecases: errs, - } - if len(lerr.ecases) > 0 { - return &lerr, lerr.ecases[0].Err - } - return &lerr, nil - } - return nil, nil -} - // writeOp runs the given modifying operation, potentially followed up // by a getLastError command in case the session is in safe mode. The // LastError result is made available in lerr, and if lerr.Err is set it @@ -5147,20 +4981,9 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err s.m.RLock() safeOp := s.safeOp bypassValidation := s.bypassValidation - enableOpMsg := s.experimental["opmsg"] s.m.RUnlock() - serverInfo := socket.ServerInfo() - - if enableOpMsg && serverInfo.MaxWireVersion >= 6 { - // we can use OP_MSG introduced in Mongodb 3.6 - oPlerr, oPerr := c.writeOpWithOpMsg(socket, serverInfo, op, ordered, bypassValidation, safeOp) - if oPlerr != nil || oPerr != nil { - return oPlerr, oPerr - } - } - - if serverInfo.MaxWireVersion >= 2 { + if socket.ServerInfo().MaxWireVersion >= 2 { // Servers with a more recent write protocol benefit from write commands. if op, ok := op.(*insertOp); ok && len(op.documents) > 1000 { var lerr LastError @@ -5440,7 +5263,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string { var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;") //The elements in the sequence needs to be reversed when converting them for i := len(*RDNElements) - 1; i >= 0; i-- { - var nameAndValueList = make([]string, len((*RDNElements)[i])) + var nameAndValueList = make([]string,len((*RDNElements)[i])) for j, attribute := range (*RDNElements)[i] { var shortAttributeName = rdnOIDToShortName(attribute.Type) if len(shortAttributeName) <= 0 { diff --git a/session_test.go b/session_test.go index 84aa1f8a7..eb2c812b3 100644 --- a/session_test.go +++ b/session_test.go @@ -44,7 +44,7 @@ import ( ) func (s *S) TestRunString(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -55,7 +55,7 @@ func (s *S) TestRunString(c *C) { } func (s *S) TestRunValue(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -66,7 +66,7 @@ func (s *S) TestRunValue(c *C) { } func (s *S) TestPing(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -86,19 +86,19 @@ func (s *S) TestPing(c *C) { } func (s *S) TestDialIPAddress(c *C) { - session, err := mgo.Dial("127.0.0.1:40001" + expFeaturesString) + session, err := mgo.Dial("127.0.0.1:40001") c.Assert(err, IsNil) defer session.Close() if os.Getenv("NOIPV6") != "1" { - session, err = mgo.Dial("[::1%]:40001" + expFeaturesString) + session, err = mgo.Dial("[::1%]:40001") c.Assert(err, IsNil) defer session.Close() } } func (s *S) TestURLSingle(c *C) { - session, err := mgo.Dial("mongodb://localhost:40001/" + expFeaturesString) + session, err := mgo.Dial("mongodb://localhost:40001/") c.Assert(err, IsNil) defer session.Close() @@ -109,7 +109,7 @@ func (s *S) TestURLSingle(c *C) { } func (s *S) TestURLMany(c *C) { - session, err := mgo.Dial("mongodb://localhost:40011,localhost:40012/" + expFeaturesString) + session, err := mgo.Dial("mongodb://localhost:40011,localhost:40012/") c.Assert(err, IsNil) defer session.Close() @@ -125,7 +125,7 @@ func (s *S) TestURLParsing(c *C) { "localhost:40001?foo=1;bar=2", } for _, url := range urls { - session, err := mgo.Dial(url + "&" + string(expFeaturesString[1:])) + session, err := mgo.Dial(url) if session != nil { session.Close() } @@ -205,7 +205,7 @@ func (s *S) TestURLWithAppName(c *C) { c.Skip("appName depends on MongoDB 3.4+") } appName := "myAppName" - session, err := mgo.Dial("localhost:40001?appName=" + appName + "&" + string(expFeaturesString[1:])) + session, err := mgo.Dial("localhost:40001?appName=" + appName) c.Assert(err, IsNil) defer session.Close() @@ -240,12 +240,12 @@ func (s *S) TestURLWithAppNameTooLong(c *C) { } appName := "myAppNameWayTooLongmyAppNameWayTooLongmyAppNameWayTooLongmyAppNameWayTooLong" appName += appName - _, err := mgo.Dial("localhost:40001?appName=" + appName + "&" + string(expFeaturesString[1:])) + _, err := mgo.Dial("localhost:40001?appName=" + appName) c.Assert(err, ErrorMatches, "appName too long, must be < 128 bytes: "+appName) } func (s *S) TestInsertFindOne(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -269,7 +269,7 @@ func (s *S) TestInsertFindOne(c *C) { } func (s *S) TestInsertFindOneNil(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -279,7 +279,7 @@ func (s *S) TestInsertFindOneNil(c *C) { } func (s *S) TestInsertFindOneMap(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -294,7 +294,7 @@ func (s *S) TestInsertFindOneMap(c *C) { } func (s *S) TestInsertFindAll(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -340,7 +340,7 @@ func (s *S) TestInsertFindAll(c *C) { } func (s *S) TestFindRef(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -386,7 +386,7 @@ func (s *S) TestFindRef(c *C) { } func (s *S) TestDatabaseAndCollectionNames(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -421,7 +421,7 @@ func (s *S) TestDatabaseAndCollectionNames(c *C) { } func (s *S) TestSelect(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -437,7 +437,7 @@ func (s *S) TestSelect(c *C) { } func (s *S) TestInlineMap(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -468,7 +468,7 @@ func (s *S) TestInlineMap(c *C) { } func (s *S) TestUpdate(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -500,7 +500,7 @@ func (s *S) TestUpdate(c *C) { } func (s *S) TestUpdateId(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -528,7 +528,7 @@ func (s *S) TestUpdateId(c *C) { } func (s *S) TestUpdateNil(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -558,7 +558,7 @@ func (s *S) TestUpdateNil(c *C) { } func (s *S) TestUpsert(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -621,7 +621,7 @@ func (s *S) TestUpsert(c *C) { } func (s *S) TestUpsertId(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -658,7 +658,7 @@ func (s *S) TestUpsertId(c *C) { } func (s *S) TestUpdateAll(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -707,7 +707,7 @@ func (s *S) TestUpdateAll(c *C) { } func (s *S) TestRemove(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -736,7 +736,7 @@ func (s *S) TestRemove(c *C) { } func (s *S) TestRemoveId(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -754,7 +754,7 @@ func (s *S) TestRemoveId(c *C) { } func (s *S) TestRemoveUnsafe(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -774,7 +774,7 @@ func (s *S) TestRemoveUnsafe(c *C) { } func (s *S) TestRemoveAll(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -817,7 +817,7 @@ func (s *S) TestRemoveAll(c *C) { } func (s *S) TestDropDatabase(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -859,7 +859,7 @@ func filterDBs(dbs []string) []string { } func (s *S) TestDropCollection(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -883,7 +883,7 @@ func (s *S) TestDropCollection(c *C) { } func (s *S) TestCreateCollectionCapped(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -909,7 +909,7 @@ func (s *S) TestCreateCollectionCapped(c *C) { } func (s *S) TestCreateCollectionNoIndex(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -929,7 +929,7 @@ func (s *S) TestCreateCollectionNoIndex(c *C) { } func (s *S) TestCreateCollectionForceIndex(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -955,7 +955,7 @@ func (s *S) TestCreateCollectionValidator(c *C) { if !s.versionAtLeast(3, 2) { c.Skip("validation depends on MongoDB 3.2+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1007,7 +1007,7 @@ func (s *S) TestCreateCollectionStorageEngine(c *C) { if !s.versionAtLeast(3, 0) { c.Skip("storageEngine option depends on MongoDB 3.0+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1025,7 +1025,7 @@ func (s *S) TestCreateCollectionWithCollation(c *C) { if !s.versionAtLeast(3, 4) { c.Skip("depends on mongodb 3.4+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1069,7 +1069,7 @@ func (s *S) TestIsDupValues(c *C) { } func (s *S) TestIsDupPrimary(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1083,7 +1083,7 @@ func (s *S) TestIsDupPrimary(c *C) { } func (s *S) TestIsDupUnique(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1105,7 +1105,7 @@ func (s *S) TestIsDupUnique(c *C) { } func (s *S) TestIsDupCapped(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1129,7 +1129,7 @@ func (s *S) TestIsDupCapped(c *C) { } func (s *S) TestIsDupFindAndModify(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1148,7 +1148,7 @@ func (s *S) TestIsDupFindAndModify(c *C) { } func (s *S) TestIsDupRetryUpsert(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1168,7 +1168,7 @@ func (s *S) TestIsDupRetryUpsert(c *C) { } func (s *S) TestFindAndModify(c *C) { - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -1236,7 +1236,7 @@ func (s *S) TestFindAndModify(c *C) { } func (s *S) TestFindAndModifyBug997828(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1258,7 +1258,7 @@ func (s *S) TestFindAndModifyBug997828(c *C) { } func (s *S) TestFindAndModifyErrmsgDoc(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1272,7 +1272,7 @@ func (s *S) TestFindAndModifyErrmsgDoc(c *C) { } func (s *S) TestCountCollection(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1294,7 +1294,7 @@ func (s *S) TestView(c *C) { c.Skip("depends on mongodb 3.4+") } // CreateView has to be run against mongos - session, err := mgo.Dial("localhost:40201" + expFeaturesString) + session, err := mgo.Dial("localhost:40201") c.Assert(err, IsNil) defer session.Close() @@ -1382,12 +1382,16 @@ func (s *S) TestView(c *C) { } func (s *S) TestViewWithCollation(c *C) { - // SERVER-31049 is fixed in 3.4.10 - if !s.versionAtLeast(3, 4, 10) { + // This test is currently failing because of a bug in mongodb. A ticket describing + // the issue is available here: https://jira.mongodb.org/browse/SERVER-31049 + // TODO remove this line when SERVER-31049 is fixed + c.Skip("Fails because of a MongoDB bug as of version 3.4.9, cf https://jira.mongodb.org/browse/SERVER-31049") + + if !s.versionAtLeast(3, 4) { c.Skip("depends on mongodb 3.4+") } // CreateView has to be run against mongos - session, err := mgo.Dial("localhost:40201" + expFeaturesString) + session, err := mgo.Dial("localhost:40201") c.Assert(err, IsNil) defer session.Close() @@ -1416,7 +1420,7 @@ func (s *S) TestViewWithCollation(c *C) { } func (s *S) TestCountQuery(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1434,7 +1438,7 @@ func (s *S) TestCountQuery(c *C) { } func (s *S) TestCountQuerySorted(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1452,7 +1456,7 @@ func (s *S) TestCountQuerySorted(c *C) { } func (s *S) TestCountSkipLimit(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1478,7 +1482,7 @@ func (s *S) TestCountMaxTimeMS(c *C) { c.Skip("SetMaxTime only supported in 2.6+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1501,7 +1505,7 @@ func (s *S) TestCountHint(c *C) { c.Skip("Not implemented until mongo 2.5.5 https://jira.mongodb.org/browse/SERVER-2677") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1517,7 +1521,7 @@ func (s *S) TestCountHint(c *C) { } func (s *S) TestQueryExplain(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1552,7 +1556,7 @@ func (s *S) TestQueryExplain(c *C) { } func (s *S) TestQuerySetMaxScan(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() coll := session.DB("mydb").C("mycoll") @@ -1575,7 +1579,7 @@ func (s *S) TestQuerySetMaxTime(c *C) { c.Skip("SetMaxTime only supported in 2.6+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() coll := session.DB("mydb").C("mycoll") @@ -1594,7 +1598,7 @@ func (s *S) TestQuerySetMaxTime(c *C) { } func (s *S) TestQueryHint(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1617,7 +1621,7 @@ func (s *S) TestQueryHint(c *C) { } func (s *S) TestQueryComment(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1663,7 +1667,7 @@ func (s *S) TestQueryComment(c *C) { } func (s *S) TestFindOneNotFound(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1677,7 +1681,7 @@ func (s *S) TestFindOneNotFound(c *C) { } func (s *S) TestFindIterNotFound(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1691,7 +1695,7 @@ func (s *S) TestFindIterNotFound(c *C) { } func (s *S) TestFindNil(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1707,7 +1711,7 @@ func (s *S) TestFindNil(c *C) { } func (s *S) TestFindId(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1725,7 +1729,7 @@ func (s *S) TestFindId(c *C) { } func (s *S) TestFindIterAll(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1771,7 +1775,7 @@ func (s *S) TestFindIterAll(c *C) { } func (s *S) TestFindIterTwiceWithSameQuery(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1797,7 +1801,7 @@ func (s *S) TestFindIterTwiceWithSameQuery(c *C) { } func (s *S) TestFindIterWithoutResults(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1814,7 +1818,7 @@ func (s *S) TestFindIterWithoutResults(c *C) { } func (s *S) TestFindIterLimit(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1866,7 +1870,7 @@ func (s *S) TestResumeIter(c *C) { } const numDocuments = 10 - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") session.SetBatch(4) c.Assert(err, IsNil) defer session.Close() @@ -1928,7 +1932,7 @@ func (s *S) TestFindIterCursorTimeout(c *C) { if !*cursorTimeout { c.Skip("-cursor-timeout") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1970,7 +1974,7 @@ func (s *S) TestFindIterCursorNoTimeout(c *C) { if !*cursorTimeout { c.Skip("-cursor-timeout") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2021,7 +2025,7 @@ func (s *S) TestTooManyItemsLimitBug(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU())) @@ -2057,7 +2061,7 @@ func (s *S) TestBatchSizeZeroGetMore(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU())) @@ -2103,7 +2107,7 @@ func (s *S) TestFindIterLimitWithMore(c *C) { if s.versionAtLeast(3, 4) { c.Skip("fail on 3.4+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2166,7 +2170,7 @@ func (s *S) TestFindIterLimitWithMore(c *C) { } func (s *S) TestFindIterLimitWithBatch(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2225,7 +2229,7 @@ func (s *S) TestFindIterLimitWithBatch(c *C) { } func (s *S) TestFindIterSortWithBatch(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2292,7 +2296,7 @@ func (s *S) TestFindTailTimeoutWithSleep(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2386,7 +2390,7 @@ func (s *S) TestFindTailTimeoutWithSleep(c *C) { // Test tailable cursors in a situation where Next never gets to sleep once // to respect the timeout requested on Tail. func (s *S) TestFindTailTimeoutNoSleep(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2471,7 +2475,7 @@ func (s *S) TestFindTailNoTimeout(c *C) { c.Skip("-fast") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2556,7 +2560,7 @@ func (s *S) TestFindTailNoTimeout(c *C) { } func (s *S) TestIterNextResetsResult(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2626,7 +2630,7 @@ func (s *S) TestIterNextResetsResult(c *C) { } func (s *S) TestFindForOnIter(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2678,7 +2682,7 @@ func (s *S) TestFindForOnIter(c *C) { } func (s *S) TestFindFor(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2730,7 +2734,7 @@ func (s *S) TestFindFor(c *C) { } func (s *S) TestFindForStopOnError(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2757,7 +2761,7 @@ func (s *S) TestFindForStopOnError(c *C) { } func (s *S) TestFindForResetsResult(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2828,7 +2832,7 @@ func (s *S) TestFindForResetsResult(c *C) { func (s *S) TestFindIterSnapshot(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2872,7 +2876,7 @@ func (s *S) TestFindIterSnapshot(c *C) { } func (s *S) TestSort(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2907,7 +2911,7 @@ func (s *S) TestSort(c *C) { } func (s *S) TestSortWithBadArgs(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2923,7 +2927,7 @@ func (s *S) TestSortWithBadArgs(c *C) { } func (s *S) TestSortScoreText(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -2983,7 +2987,7 @@ func (s *S) TestSortScoreText(c *C) { } func (s *S) TestPrefetching(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3072,8 +3076,7 @@ func (s *S) TestPrefetching(c *C) { } func (s *S) TestSafeSetting(c *C) { - - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3186,7 +3189,7 @@ func (s *S) TestSafeSetting(c *C) { } func (s *S) TestSafeInsert(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3201,16 +3204,16 @@ func (s *S) TestSafeInsert(c *C) { // Session should be safe by default, so inserting it again must fail. err = coll.Insert(M{"_id": 1}) c.Assert(err, ErrorMatches, ".*E11000 duplicate.*") - if lerr, ok := err.(*mgo.LastError); ok { - c.Assert(lerr.Code, Equals, 11000) - } else { - c.Assert(err.(*mgo.QueryError).Code, Equals, 11000) - } + c.Assert(err.(*mgo.LastError).Code, Equals, 11000) - // It must have sent one operation + // It must have sent two operations (INSERT_OP + getLastError QUERY_OP) stats := mgo.GetStats() - c.Assert(stats.SentOps, Equals, 1) + if s.versionAtLeast(2, 6) { + c.Assert(stats.SentOps, Equals, 1) + } else { + c.Assert(stats.SentOps, Equals, 2) + } mgo.ResetStats() @@ -3225,8 +3228,7 @@ func (s *S) TestSafeInsert(c *C) { } func (s *S) TestSafeParameters(c *C) { - - session, err := mgo.Dial("localhost:40011" + expFeaturesString) + session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) defer session.Close() @@ -3243,7 +3245,7 @@ func (s *S) TestSafeParameters(c *C) { } func (s *S) TestQueryErrorOne(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3263,7 +3265,7 @@ func (s *S) TestQueryErrorOne(c *C) { } func (s *S) TestQueryErrorNext(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3467,7 +3469,7 @@ func getIndex34(session *mgo.Session, db, collection, name string) M { } func (s *S) TestEnsureIndex(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3571,7 +3573,7 @@ func (s *S) TestEnsureIndex(c *C) { } func (s *S) TestEnsureIndexWithBadInfo(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3585,7 +3587,7 @@ func (s *S) TestEnsureIndexWithBadInfo(c *C) { } func (s *S) TestEnsureIndexWithUnsafeSession(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3610,7 +3612,7 @@ func (s *S) TestEnsureIndexWithUnsafeSession(c *C) { } func (s *S) TestEnsureIndexKey(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3669,7 +3671,7 @@ func (s *S) TestEnsureIndexKey(c *C) { } func (s *S) TestEnsureIndexDropIndex(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3725,7 +3727,7 @@ func (s *S) TestEnsureIndexDropIndex(c *C) { } func (s *S) TestEnsureIndexDropIndexName(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3780,7 +3782,7 @@ func (s *S) TestEnsureIndexDropIndexName(c *C) { } func (s *S) TestEnsureIndexDropAllIndexes(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3813,7 +3815,7 @@ func (s *S) TestEnsureIndexDropAllIndexes(c *C) { } func (s *S) TestEnsureIndexCaching(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3854,7 +3856,7 @@ func (s *S) TestEnsureIndexCaching(c *C) { } func (s *S) TestEnsureIndexGetIndexes(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3891,7 +3893,7 @@ func (s *S) TestEnsureIndexGetIndexes(c *C) { } func (s *S) TestEnsureIndexNameCaching(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3935,7 +3937,7 @@ func (s *S) TestEnsureIndexNameCaching(c *C) { } func (s *S) TestEnsureIndexEvalGetIndexes(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -3972,7 +3974,7 @@ func (s *S) TestEnsureIndexEvalGetIndexes(c *C) { var testTTL = flag.Bool("test-ttl", false, "test TTL collections (may take 1 minute)") func (s *S) TestEnsureIndexExpireAfter(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4020,7 +4022,7 @@ func (s *S) TestEnsureIndexExpireAfter(c *C) { } func (s *S) TestDistinct(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4038,7 +4040,7 @@ func (s *S) TestDistinct(c *C) { } func (s *S) TestMapReduce(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4073,7 +4075,7 @@ func (s *S) TestMapReduce(c *C) { } func (s *S) TestMapReduceFinalize(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4104,7 +4106,7 @@ func (s *S) TestMapReduceFinalize(c *C) { } func (s *S) TestMapReduceToCollection(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4144,7 +4146,7 @@ func (s *S) TestMapReduceToCollection(c *C) { } func (s *S) TestMapReduceToOtherDb(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4184,7 +4186,7 @@ func (s *S) TestMapReduceToOtherDb(c *C) { } func (s *S) TestMapReduceOutOfOrder(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4207,7 +4209,7 @@ func (s *S) TestMapReduceOutOfOrder(c *C) { } func (s *S) TestMapReduceScope(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4229,7 +4231,7 @@ func (s *S) TestMapReduceScope(c *C) { } func (s *S) TestMapReduceVerbose(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4252,7 +4254,7 @@ func (s *S) TestMapReduceVerbose(c *C) { } func (s *S) TestMapReduceLimit(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4274,7 +4276,7 @@ func (s *S) TestMapReduceLimit(c *C) { } func (s *S) TestBuildInfo(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4315,7 +4317,7 @@ func (s *S) TestBuildInfo(c *C) { } func (s *S) TestZeroTimeRoundtrip(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4333,7 +4335,7 @@ func (s *S) TestZeroTimeRoundtrip(c *C) { } func (s *S) TestFsyncLock(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4361,7 +4363,7 @@ func (s *S) TestFsyncLock(c *C) { } func (s *S) TestFsync(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4380,7 +4382,7 @@ func (s *S) TestRepairCursor(c *C) { c.Skip("fail on 3.2.17+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() session.SetBatch(2) @@ -4427,7 +4429,7 @@ func (s *S) TestPipeIter(c *C) { c.Skip("Pipe only works on 2.1+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4463,7 +4465,7 @@ func (s *S) TestPipeAll(c *C) { c.Skip("Pipe only works on 2.1+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4488,7 +4490,7 @@ func (s *S) TestPipeOne(c *C) { c.Skip("Pipe only works on 2.1+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4513,7 +4515,7 @@ func (s *S) TestPipeExplain(c *C) { c.Skip("Pipe only works on 2.1+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4530,7 +4532,7 @@ func (s *S) TestPipeExplain(c *C) { } func (s *S) TestBatch1Bug(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4553,7 +4555,7 @@ func (s *S) TestBatch1Bug(c *C) { } func (s *S) TestInterfaceIterBug(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4576,7 +4578,7 @@ func (s *S) TestInterfaceIterBug(c *C) { } func (s *S) TestFindIterCloseKillsCursor(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4597,7 +4599,7 @@ func (s *S) TestFindIterCloseKillsCursor(c *C) { } func (s *S) TestFindIterDoneWithBatches(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4626,7 +4628,7 @@ func (s *S) TestFindIterDoneWithBatches(c *C) { } func (s *S) TestFindIterDoneErr(c *C) { - session, err := mgo.Dial("localhost:40002" + expFeaturesString) + session, err := mgo.Dial("localhost:40002") c.Assert(err, IsNil) defer session.Close() @@ -4641,7 +4643,7 @@ func (s *S) TestFindIterDoneErr(c *C) { } func (s *S) TestFindIterDoneNotFound(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4655,7 +4657,7 @@ func (s *S) TestFindIterDoneNotFound(c *C) { } func (s *S) TestLogReplay(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4676,7 +4678,7 @@ func (s *S) TestLogReplay(c *C) { } func (s *S) TestSetCursorTimeout(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4696,7 +4698,7 @@ func (s *S) TestSetCursorTimeout(c *C) { } func (s *S) TestNewIterNoServer(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4718,7 +4720,7 @@ func (s *S) TestNewIterNoServer(c *C) { } func (s *S) TestNewIterNoServerPresetErr(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4743,7 +4745,7 @@ func (s *S) TestBypassValidation(c *C) { if !s.versionAtLeast(3, 2) { c.Skip("validation supported on 3.2+") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4813,7 +4815,7 @@ func (s *S) TestCollationQueries(c *C) { if !s.versionAtLeast(3, 3, 12) { c.Skip("collations being released with 3.4") } - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4855,7 +4857,7 @@ func (s *S) TestCollationQueries(c *C) { // Some benchmarks that require a running database. func (s *S) BenchmarkFindIterRaw(c *C) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -4887,7 +4889,7 @@ func (s *S) BenchmarkFindIterRaw(c *C) { } func BenchmarkInsertSingle(b *testing.B) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") if err != nil { b.Fatal(err) } @@ -4907,7 +4909,7 @@ func BenchmarkInsertSingle(b *testing.B) { } func BenchmarkInsertMultiple(b *testing.B) { - session, err := mgo.Dial("localhost:40001" + expFeaturesString) + session, err := mgo.Dial("localhost:40001") if err != nil { b.Fatal(err) } diff --git a/socket.go b/socket.go index f739baf9c..f6158189c 100644 --- a/socket.go +++ b/socket.go @@ -29,7 +29,6 @@ package mgo import ( "errors" "fmt" - "io" "net" "sync" "time" @@ -39,24 +38,22 @@ import ( type replyFunc func(err error, reply *replyOp, docNum int, docData []byte) -type opMsgReplyFunc func(reply *msgOp, err error) - type mongoSocket struct { sync.Mutex - server *mongoServer // nil when cached - conn net.Conn - timeout time.Duration - addr string // For debugging only. - nextRequestId uint32 - replyFuncs map[uint32]replyFunc - opMsgReplyFuncs map[uint32]opMsgReplyFunc - references int - creds []Credential - cachedNonce string - gotNonce sync.Cond - dead error - serverInfo *mongoServerInfo - closeAfterIdle bool + server *mongoServer // nil when cached + conn net.Conn + timeout time.Duration + addr string // For debugging only. + nextRequestId uint32 + replyFuncs map[uint32]replyFunc + references int + creds []Credential + logout []Credential + cachedNonce string + gotNonce sync.Cond + dead error + serverInfo *mongoServerInfo + closeAfterIdle bool } type queryOpFlags uint32 @@ -68,29 +65,6 @@ const ( flagLogReplay flagNoCursorTimeout flagAwaitData - // section type, as defined here: - // https://docs.mongodb.com/master/reference/mongodb-wire-protocol/#sections - msgPayload0 = uint8(0) - msgPayload1 = uint8(1) - // all possible opCodes, as defined here: - // https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#request-opcodes - opInvalid = 0 - opReply = 1 - dbMsg = 1000 - dbUpdate = 2001 - dbInsert = 2002 - dbQuery = 2004 - dbGetMore = 2005 - dbDelete = 2006 - dbKillCursors = 2007 - dbCommand = 2010 - dbCommandReply = 2011 - dbCompressed = 2012 - dbMessage = 2013 - // opMsg flags - opMsgFlagChecksumPresent = 1 - opMsgFlagMoreToCome = (1 << 1) - opMsgFlagExhaustAllowed = (1 << 16) ) type queryOp struct { @@ -200,29 +174,6 @@ type killCursorsOp struct { cursorIds []int64 } -type msgSection struct { - payloadType uint8 - data interface{} -} - -// op_msg is introduced in mongodb 3.6, see -// https://docs.mongodb.com/master/reference/mongodb-wire-protocol/#op-msg -// for details -type msgOp struct { - flags uint32 - sections []msgSection - checksum uint32 -} - -// PayloadType1 is a container for the OP_MSG payload data of type 1. -// There is no definition of the type 0 payload because that is simply a -// bson document. -type payloadType1 struct { - size int32 - identifier string - docs []interface{} -} - type requestInfo struct { bufferPos int replyFunc replyFunc @@ -230,11 +181,10 @@ type requestInfo struct { func newSocket(server *mongoServer, conn net.Conn, timeout time.Duration) *mongoSocket { socket := &mongoSocket{ - conn: conn, - addr: server.Addr, - server: server, - replyFuncs: make(map[uint32]replyFunc), - opMsgReplyFuncs: make(map[uint32]opMsgReplyFunc), + conn: conn, + addr: server.Addr, + server: server, + replyFuncs: make(map[uint32]replyFunc), } socket.gotNonce.L = &socket.Mutex if err := socket.InitialAcquire(server.Info(), timeout); err != nil { @@ -402,8 +352,6 @@ func (socket *mongoSocket) kill(err error, abend bool) { stats.socketsAlive(-1) replyFuncs := socket.replyFuncs socket.replyFuncs = make(map[uint32]replyFunc) - opMsgReplyFuncs := socket.opMsgReplyFuncs - socket.opMsgReplyFuncs = make(map[uint32]opMsgReplyFunc) server := socket.server socket.server = nil socket.gotNonce.Broadcast() @@ -412,10 +360,6 @@ func (socket *mongoSocket) kill(err error, abend bool) { logf("Socket %p to %s: notifying replyFunc of closed socket: %s", socket, socket.addr, err.Error()) replyFunc(err, nil, -1, nil) } - for _, opMsgReplyFunc := range opMsgReplyFuncs { - logf("Socket %p to %s: notifying replyFunc of closed socket: %s", socket, socket.addr, err.Error()) - opMsgReplyFunc(nil, err) - } if abend { server.AbendSocket(socket) } @@ -459,8 +403,14 @@ var bytesBufferPool = sync.Pool{ func (socket *mongoSocket) Query(ops ...interface{}) (err error) { - buf := getSizedBuffer(0) - defer bytesBufferPool.Put(buf) + if lops := socket.flushLogout(); len(lops) > 0 { + ops = append(lops, ops...) + } + + buf := bytesBufferPool.Get().([]byte) + defer func() { + bytesBufferPool.Put(buf[:0]) + }() // Serialize operations synchronously to avoid interrupting // other goroutines while we can't really be sending data. @@ -481,7 +431,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) { switch op := op.(type) { case *updateOp: - buf = addHeader(buf, dbUpdate) + buf = addHeader(buf, 2001) buf = addInt32(buf, 0) // Reserved buf = addCString(buf, op.Collection) buf = addInt32(buf, int32(op.Flags)) @@ -497,7 +447,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) { } case *insertOp: - buf = addHeader(buf, dbInsert) + buf = addHeader(buf, 2002) buf = addInt32(buf, int32(op.flags)) buf = addCString(buf, op.collection) for _, doc := range op.documents { @@ -509,7 +459,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) { } case *queryOp: - buf = addHeader(buf, dbQuery) + buf = addHeader(buf, 2004) buf = addInt32(buf, int32(op.flags)) buf = addCString(buf, op.collection) buf = addInt32(buf, op.skip) @@ -527,7 +477,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) { replyFunc = op.replyFunc case *getMoreOp: - buf = addHeader(buf, dbGetMore) + buf = addHeader(buf, 2005) buf = addInt32(buf, 0) // Reserved buf = addCString(buf, op.collection) buf = addInt32(buf, op.limit) @@ -535,7 +485,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) { replyFunc = op.replyFunc case *deleteOp: - buf = addHeader(buf, dbDelete) + buf = addHeader(buf, 2006) buf = addInt32(buf, 0) // Reserved buf = addCString(buf, op.Collection) buf = addInt32(buf, int32(op.Flags)) @@ -546,7 +496,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) { } case *killCursorsOp: - buf = addHeader(buf, dbKillCursors) + buf = addHeader(buf, 2007) buf = addInt32(buf, 0) // Reserved buf = addInt32(buf, int32(len(op.cursorIds))) for _, cursorId := range op.cursorIds { @@ -611,245 +561,123 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) { return err } -// sendMessage send data to the database using the OP_MSG wire protocol -// introduced in MongoDB 3.6 (require maxWireVersion >= 6) -func (socket *mongoSocket) sendMessage(op *msgOp) (writeCmdResult, error) { - var wr writeCmdResult - var err error - - buf := getSizedBuffer(0) - defer bytesBufferPool.Put(buf) - - buf = addHeader(buf, dbMessage) - buf = addInt32(buf, int32(op.flags)) - - for _, section := range op.sections { - buf, err = addSection(buf, section) - if err != nil { - return wr, err - } - } - - if len(buf) > socket.ServerInfo().MaxMessageSizeBytes { - return wr, fmt.Errorf("message length to long, should be < %v, but was %v", socket.ServerInfo().MaxMessageSizeBytes, len(buf)) - } - // set the total message size - setInt32(buf, 0, int32(len(buf))) - - var wait sync.Mutex - var reply msgOp - var responseError error - var wcr writeCmdResult - // if no response expected, ie op.flags&opMsgFlagMoreToCome == 1, - // request should have id 0 - var requestID uint32 - // if moreToCome flag is set, we don't want to know the outcome of the message. - // There is no response to a request where moreToCome has been set. - expectReply := (op.flags & opMsgFlagMoreToCome) == 0 - - socket.Lock() - if socket.dead != nil { - dead := socket.dead - socket.Unlock() - debugf("Socket %p to %s: failing query, already closed: %s", socket, socket.addr, socket.dead.Error()) - return wr, dead - } - if expectReply { - // Reserve id 0 for requests which should have no responses. - again: - requestID = socket.nextRequestId + 1 - socket.nextRequestId++ - if requestID == 0 { - goto again - } - wait.Lock() - socket.opMsgReplyFuncs[requestID] = func(msg *msgOp, err error) { - reply = *msg - responseError = err - wait.Unlock() - } - } - socket.Unlock() - - setInt32(buf, 4, int32(requestID)) - stats.sentOps(1) - - socket.updateDeadline(writeDeadline) - _, err = socket.conn.Write(buf) - - if expectReply { - socket.updateDeadline(readDeadline) - wait.Lock() - - if responseError != nil { - return wcr, responseError - } - // for the moment, OP_MSG responses return a body section only, - // cf https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst : - // - // "Similarly, certain commands will reply to messages using this technique when possible - // to avoid the overhead of BSON Arrays. Drivers will be required to allow all command - // replies to use this technique. Drivers will be required to handle Payload Type 1." - // - // so we only return the first section of the response (ie the body) - wcr = reply.sections[0].data.(writeCmdResult) - } - - return wcr, err -} - -// get a slice of byte of `size` length from the pool -func getSizedBuffer(size int) []byte { - b := bytesBufferPool.Get().([]byte) - if len(b) < size { - for i := len(b); i < size; i++ { - b = append(b, byte(0)) - } - return b +func fill(r net.Conn, b []byte) error { + l := len(b) + n, err := r.Read(b) + for n != l && err == nil { + var ni int + ni, err = r.Read(b[n:]) + n += ni } - return b[0:size] + return err } // Estimated minimum cost per socket: 1 goroutine + memory for the largest // document ever seen. func (socket *mongoSocket) readLoop() { - header := make([]byte, 16) // 16 bytes for header - p := make([]byte, 20) // 20 bytes for fixed fields of OP_REPLY + p := make([]byte, 36) // 16 from header + 20 from OP_REPLY fixed fields s := make([]byte, 4) - var r io.Reader = socket.conn // No locking, conn never changes. + conn := socket.conn // No locking, conn never changes. for { - _, err := io.ReadFull(r, header) + err := fill(conn, p) if err != nil { socket.kill(err, true) return } - totalLen := getInt32(header, 0) - responseTo := getInt32(header, 8) - opCode := getInt32(header, 12) + totalLen := getInt32(p, 0) + responseTo := getInt32(p, 8) + opCode := getInt32(p, 12) // Don't use socket.server.Addr here. socket is not // locked and socket.server may go away. debugf("Socket %p to %s: got reply (%d bytes)", socket, socket.addr, totalLen) - stats.receivedOps(1) - switch opCode { - case opReply: - _, err := io.ReadFull(r, p) - if err != nil { - socket.kill(err, true) - return - } - reply := replyOp{ - flags: uint32(getInt32(p, 0)), - cursorId: getInt64(p, 4), - firstDoc: getInt32(p, 12), - replyDocs: getInt32(p, 16), - } - stats.receivedDocs(int(reply.replyDocs)) + _ = totalLen - socket.Lock() - replyFunc, ok := socket.replyFuncs[uint32(responseTo)] - if ok { - delete(socket.replyFuncs, uint32(responseTo)) - } - socket.Unlock() - - if replyFunc != nil && reply.replyDocs == 0 { - replyFunc(nil, &reply, -1, nil) - } else { - for i := 0; i != int(reply.replyDocs); i++ { - _, err := io.ReadFull(r, s) - if err != nil { - if replyFunc != nil { - replyFunc(err, nil, -1, nil) - } - socket.kill(err, true) - return - } - b := getSizedBuffer(int(getInt32(s, 0))) - defer bytesBufferPool.Put(b) - - copy(b[0:4], s) - - _, err = io.ReadFull(r, b[4:]) - if err != nil { - if replyFunc != nil { - replyFunc(err, nil, -1, nil) - } - socket.kill(err, true) - return - } + if opCode != 1 { + socket.kill(errors.New("opcode != 1, corrupted data?"), true) + return + } - if globalDebug && globalLogger != nil { - m := bson.M{} - if err := bson.Unmarshal(b, m); err == nil { - debugf("Socket %p to %s: received document: %#v", socket, socket.addr, m) - } - } + reply := replyOp{ + flags: uint32(getInt32(p, 16)), + cursorId: getInt64(p, 20), + firstDoc: getInt32(p, 28), + replyDocs: getInt32(p, 32), + } + stats.receivedOps(+1) + stats.receivedDocs(int(reply.replyDocs)) + + socket.Lock() + replyFunc, ok := socket.replyFuncs[uint32(responseTo)] + if ok { + delete(socket.replyFuncs, uint32(responseTo)) + } + socket.Unlock() + + if replyFunc != nil && reply.replyDocs == 0 { + replyFunc(nil, &reply, -1, nil) + } else { + for i := 0; i != int(reply.replyDocs); i++ { + err := fill(conn, s) + if err != nil { if replyFunc != nil { - replyFunc(nil, &reply, i, b) + replyFunc(err, nil, -1, nil) } - // XXX Do bound checking against totalLen. + socket.kill(err, true) + return } - } - socket.Lock() - if len(socket.replyFuncs) == 0 { - // Nothing else to read for now. Disable deadline. - socket.conn.SetReadDeadline(time.Time{}) - } else { - socket.updateDeadline(readDeadline) - } - socket.Unlock() + b := make([]byte, int(getInt32(s, 0))) - case dbMessage: - body := getSizedBuffer(int(totalLen) - 16) - defer bytesBufferPool.Put(body) - _, err := io.ReadFull(r, body) - if err != nil { - socket.kill(err, true) - return - } + // copy(b, s) in an efficient way. + b[0] = s[0] + b[1] = s[1] + b[2] = s[2] + b[3] = s[3] - sections, err := getSections(body[4:]) - if err != nil { - socket.kill(err, true) - return - } - // TODO check CRC-32 checksum if checksum byte is set - reply := &msgOp{ - flags: uint32(getInt32(body, 0)), - sections: sections, - } + err = fill(conn, b[4:]) + if err != nil { + if replyFunc != nil { + replyFunc(err, nil, -1, nil) + } + socket.kill(err, true) + return + } - // TODO update this when msgPayload1 section is implemented in MongoDB - stats.receivedDocs(1) - socket.Lock() - opMsgReplyFunc, ok := socket.opMsgReplyFuncs[uint32(responseTo)] - if ok { - delete(socket.opMsgReplyFuncs, uint32(responseTo)) - } - socket.Unlock() + if globalDebug && globalLogger != nil { + m := bson.M{} + if err := bson.Unmarshal(b, m); err == nil { + debugf("Socket %p to %s: received document: %#v", socket, socket.addr, m) + } + } - if opMsgReplyFunc != nil { - opMsgReplyFunc(reply, err) - } else { - socket.kill(fmt.Errorf("couldn't handle response properly"), true) - return + if replyFunc != nil { + replyFunc(nil, &reply, i, b) + } + + // XXX Do bound checking against totalLen. } + } + + socket.Lock() + if len(socket.replyFuncs) == 0 { + // Nothing else to read for now. Disable deadline. socket.conn.SetReadDeadline(time.Time{}) - default: - socket.kill(errors.New("opcode != 1 && opcode != 2013, corrupted data?"), true) - return + } else { + socket.updateDeadline(readDeadline) } + socket.Unlock() + + // XXX Do bound checking against totalLen. } } var emptyHeader = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} -func addHeader(b []byte, opcode int32) []byte { +func addHeader(b []byte, opcode int) []byte { i := len(b) b = append(b, emptyHeader...) // Enough for current opcodes. @@ -873,35 +701,6 @@ func addCString(b []byte, s string) []byte { return b } -// Marshal a section and add it to the provided buffer -// https://docs.mongodb.com/master/reference/mongodb-wire-protocol/#sections -func addSection(b []byte, s msgSection) ([]byte, error) { - var err error - b = append(b, s.payloadType) - switch s.payloadType { - case msgPayload0: - b, err = addBSON(b, s.data) - if err != nil { - return b, err - } - case msgPayload1: - pos := len(b) - b = addInt32(b, 0) - s1 := s.data.(payloadType1) - b = addCString(b, s1.identifier) - for _, doc := range s1.docs { - b, err = bson.MarshalBuffer(doc, b) - if err != nil { - return b, err - } - } - setInt32(b, pos, int32(len(b)-pos)) - default: - return b, fmt.Errorf("invalid section kind in op_msg: %v", s.payloadType) - } - return b, nil -} - func addBSON(b []byte, doc interface{}) ([]byte, error) { if doc == nil { return append(b, 5, 0, 0, 0, 0), nil @@ -937,36 +736,3 @@ func getInt64(b []byte, pos int) int64 { (int64(b[pos+6]) << 48) | (int64(b[pos+7]) << 56) } - -// UnMarshal an array of bytes into a section -// https://docs.mongodb.com/master/reference/mongodb-wire-protocol/#sections -func getSections(b []byte) ([]msgSection, error) { - var sections []msgSection - pos := 0 - for pos != len(b) { - sectionLength := int(getInt32(b, pos+1)) - // first byte is section type - switch b[pos] { - case msgPayload0: - var result writeCmdResult - err := bson.Unmarshal(b[pos+1:pos+sectionLength+1], &result) - if err != nil { - return nil, err - } - sections = append(sections, msgSection{ - payloadType: b[pos], - data: result, - }) - case msgPayload1: - // not implemented yet - // - // b[0:4] size - // b[4:?] docSeqID - // b[?:len(b)] documentSequence - default: - return nil, fmt.Errorf("invalid section type: %v", b[0]) - } - pos += sectionLength + 1 - } - return sections, nil -} diff --git a/suite_test.go b/suite_test.go index e4a149569..624d5a543 100644 --- a/suite_test.go +++ b/suite_test.go @@ -45,8 +45,6 @@ import ( var fast = flag.Bool("fast", false, "Skip slow tests") -var expFeaturesString = "?experimental=opmsg" - type M bson.M type cLogger C