-
Notifications
You must be signed in to change notification settings - Fork 230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MongoDB 3.6: implement the new wire protocol #61
Conversation
Hi @feliixx This is a fantastic PR - we all truly appreciate the work you've put in to this fork. Saying thanks doesn't seem like enough! I've added some reviewers but a few are away at the moment - we'll review ASAP and then coordinate any work that needs doing - we should be able to help get this completed. Dom |
Hi @domodwyer Just noticed that mongodb already implemented this in a private fork of mgo: llmgo, see this commit for the implementation. Don't know why they didn't add it to their previous fork Also looks like they are developing a new driver called yamgo, see TOOLS-1833, but couldn't find more infos about it... It may be better to use their implementation, or just wait for the new driver ? |
@@ -2083,8 +2098,8 @@ func (s *S) TestDoNotFallbackToMonotonic(c *C) { | |||
if !s.versionAtLeast(3, 0) { | |||
c.Skip("command-counting logic depends on 3.0+") | |||
} | |||
if s.versionAtLeast(3, 4) { | |||
c.Skip("failing on 3.4+") | |||
if s.versionAtLeast(3, 2, 17) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe best to move all version checks to the top of the method, before any mgo.Dial() calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already at the top of the method ( TestDoNotFallbackToMonotonic
) but I agree that it's not clear in the git diff
harness/mongojs/dropall.js
Outdated
var ok = admin.auth("root", "rapadura") | ||
if (!ok) { | ||
print("failed to auth for port " + port) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An early break here would remove the nesting of the (ok == true) branch.
lerr, _ := err.(*mgo.LastError) | ||
c.Assert(lerr, NotNil, Commentf("err: %#v", err)) | ||
c.Assert(lerr.Code, Equals, 10140) | ||
c.Assert(qerr.Code, Equals, 16837) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the original mgo driver didn't do this, but it's worthwhile constifying these.
Hi @feliixx I've just had a quick look over the llmgo changes (sorry, busy week!) - I quite like how it's been structured (sections, payload types) and it's got partial support for checksums etc but it's not actually used within the driver itself - it just provides the ability to handle What do you think to combining your implementation with theirs? i.e. bolt their types into your implementation of The spec says we MUST NOT exceed As for yamgo we'd happily welcome a maintained driver from MongoDB but we'll be keeping ours around too as it's fairly stable, and there's always going to be a need for a drop-in |
Hi @domodwyer Ok I've updated the code to use types as close as possible as theirs, it's more readable now ! Test should now pass on 3.6 but sometimes it fails because it's taking too much time to finish, don't really know why yet |
This is starting to look great! So we had a chat internally - we think to give the best driver stability we should release the OP_MSG support in three stages:
What do you think? This is just to see how the changes (ours and mongos!) behave in the wild before committing ourselves. Dom |
Ok sure, totally understand the need for stability here But instead of setting this flag in
that the user would call just once at the beginning of his program with Actually, this method has to exist anyway ( otherwise enabling OP_MSG in the tests will be a real pain) so the question is more wether it should be exported or not? |
- use last minor version for each major serie of mongodb - use travis 'go' param instead of eval "$(gimme $GO)" as it fails to install correctly last minor version ( 1.8.x notation for example)
also re-enable TestFindIterSnapshot as it was fixed a long time ago
fix TestAuthX509CredRDNConstruction test: need to create an user with {"username": subject} before trying to login
Make sure that "rs3/127.0.0.1/40031" is elected at primary. Create user before running 'addShard' command as it requires root access Also add a retry mechanism to make sure that shard are correctly added cf https://docs.mongodb.com/manual/tutorial/deploy-shard-cluster/
Hi @feliixx You're right about the What do you think about adding a Dom |
Hi @domodwyer what about making this more generic, with a top level method called The thing is, if you want to keep this feature flag mechanism, in a few month type DialInfo struct {
// Addrs holds the addresses for the seed servers.
Addrs []string
...
// Deprecated
EnableOpMsg bool
// Deprecated
EnableCompressedMessage
// Deprecated
EnableSomeOtherFeature
...
} Second thing, if we pass it in the dial url, we'll need to update the ~300 occurences of |
Fully agree with the If I'm honest the driving point behind having it as a dial parameter means the behaviour is switchable without having to recompile the binary. This means the ops team can switch the behaviour from the service config, without requiring a deployment should it be needed in the worst case. How do you feel about this: have a Dom |
Hi @domodwyer Ok for the |
Hey @feliixx Looking great, thanks for all the hard work! I need to do some things internally before I get this merged - should be today or tomorrow I hope. Sorry for the delay! Dom |
if socket.ServerInfo().MaxWireVersion >= 2 { | ||
serverInfo := socket.ServerInfo() | ||
|
||
if enableOpMsg && serverInfo.MaxWireVersion >= 6 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is way too long. Could you please split it into
writeOpWithOpMsg
and writeOpLegacy
or something similiar ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or it could be split into separate methods for
insertOp, bulkUpdateOp, bulkDeleteOp?
Maybe this way it would be possible to reuse some code. Didn't try it, so can't tell for sure.
My point is that this code was already not very readable. And it is getting worse. It is not a dig @feliixx of course, we all work with the tools available to us.
This might impede future merges from other mgo forks though :(. Opinions are welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agree with you, this part of the code is quite a mess... I didn't wanted to rewrite the whole thing, and tried to left the old code untouched to avoid introduce bugs. Plus, I wanted to get rid of the old workflow to have something more readable (and avoid some useless locking):
old way:
writeOp() -> writeOpCommand() -> run() -> SimpleQuery() -> Query() -> CheckQueryError()
new way:
writeOp() -> sendMessage()
this result in some code duplication, but I think it's totally worth it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, modifying this code can be scary. I am fine with some code duplication, and simplifying the flow would be worth it.
session.go
Outdated
var documents []interface{} | ||
var docSeqID string | ||
canUseOpMsg := false | ||
if op, ok := op.(*insertOp); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be replaced by a type switch, no ?
canUseOpMsg := true
switch op := op.(type) {
case *insertOp:
[...]
default:
canUseOpMsg = false
}
server.go
Outdated
@@ -36,6 +36,11 @@ import ( | |||
"github.com/globalsign/mgo/bson" | |||
) | |||
|
|||
const ( | |||
defaultWriteBatchSize = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is 100000 in version 3.6.
https://docs.mongodb.com/master/reference/limits/#Write-Command-Batch-Limit-Size
This should be safe to change, as defaultWriteBatchSize is used only for the OP_MSG right now.
server.go
Outdated
@@ -36,6 +36,11 @@ import ( | |||
"github.com/globalsign/mgo/bson" | |||
) | |||
|
|||
const ( | |||
defaultWriteBatchSize = 1000 | |||
defaultMaxMessageSizeBytes = 48000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The max message size is not verified anywhere in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in sendMessage()
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Damn. I think I have missed a part of the PR.
hmm, looks like I forgot to implement experimental flags in |
require maxWireVersion >= 6 on server side, and `experimental=opmsg` in the connection string - get `MaxMessageSizeBytes` and `MaxWriteBatchSize` from handshake - allow unacknowledged writes with `moreToCome` flag - split bulk operations in batch of `maxWriteBatchSize` - add 'experimental' param in URL. To enable an experimental feature, add `experimental=featureName` in the connection URL flush logout directly Previously, `flushLogout()` was called at the beginning of each Query to the database. To avoid these useless calls, we flush logout directly when `Logout()` or `LogoutAll()` is called
SERVER-31049 is fixed in 3.4.10, so re-enable it
Ok it should work fine now! |
@@ -170,12 +170,6 @@ func (socket *mongoSocket) Login(cred Credential) error { | |||
return nil | |||
} | |||
} | |||
if socket.dropLogout(cred) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an weird idea originally? Nice fix
start := count * serverInfo.MaxWriteBatchSize | ||
length := l - start | ||
if length > serverInfo.MaxWriteBatchSize { | ||
length = serverInfo.MaxWriteBatchSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this should never happen, but I think we should stop and return an error if it does - silently dropping data would be surprising to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, data is not silently dropped here: if we have to insert/update/delete more than MaxWriteBatchSize
documents, the driver split the list of documents in batch of MaxWriteBatchSize
and send them to the database one by one (same behavior in the old wire protocol)
if enableOpMsg && serverInfo.MaxWireVersion >= 6 { | ||
// we can use OP_MSG introduced in Mongodb 3.6 | ||
oPlerr, oPerr := c.writeOpWithOpMsg(socket, serverInfo, op, ordered, bypassValidation, safeOp) | ||
if oPlerr != nil || oPerr != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just drop the conditional and do:
return c.writeOpWithOpMsg(socket, serverInfo, op, ordered, bypassValidation, safeOp)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't do this here, because some command in MongoDB 3.6 still use the old wire protocol. Without the conditional, the method would sometimes return before actually sending the command to the database
// https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#request-opcodes | ||
opInvalid = 0 | ||
opReply = 1 | ||
dbMsg = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These make the the huge switch in Query()
way more readable - thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And thanks for including the reference URL!
return err | ||
|
||
if len(buf) > socket.ServerInfo().MaxMessageSizeBytes { | ||
return wr, fmt.Errorf("message length to long, should be < %v, but was %v", socket.ServerInfo().MaxMessageSizeBytes, len(buf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this is possible to trigger by adding too many documents to the query batch?
We should probably either limit the user when building the batch (i.e. return an error to the caller of Bulk.Insert()
) to avoid this fairly confusing error (we know what it is, but most won't).
I'm not massively familiar with the bulk ops though, any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Insert()
currently doesn't return an error, so this would be a drastic change in the way Bulk operations works. Instead, I think the driver should handle this internally:
for example, if someone tries to insert 100000 docs of 480+ bytes, the driver should split the operation in several smaller ops
socket.go
Outdated
socket.updateDeadline(writeDeadline) | ||
_, err = socket.conn.Write(buf) | ||
|
||
// TODO optimize this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bytesBufferPool
is used elsewhere so you'd be surprised by what's in the buffer!
The append()
calls on a pre-allocated slice are fast enough to not worry about :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was intended to use the same pool for old and new wire protocol. In MongoDB 3.6, some calls use the new OP_MSG
wire protocol, some other don't. The pool only contains empty slice of bytes, so it shouldn't be an issue ?
socket.go
Outdated
socket.kill(fmt.Errorf("couldn't handle response properly"), true) | ||
return | ||
} | ||
responsePool.Put(body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer
this on line 821 - currently this leaks the buffer on lines 824, 830, and 851.
I know some people dislike defer
because it used to be slow - the cost is ~0.00004ms these days (~40ns - I benchmarked recently on 1.9.2). The maintenance cost is well worth the 40ns in my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically though I would recommend calling pool.Get()
in the same func as pool.Put()
and passing the slice into getEmptyBody()
:
var buffer = pool.Get().([]byte)[:0]
var body = getEmptyBody(buffer)
defer pool.Put(body)
Where getEmptyBody()
calls append()
on buffer, and returns the appended slice (which is backed by the same array):
func getEmptyBody(buffer) {
var newBuf = append(buffer, something)
// etc...
return newBuf
}
This keeps all the pooling in the same code block and easy to read, making a leak less likely when refactoring. I also always reslice on the Get()
call so you're sure you've never got a dirty buffer, but I know the existing code does it on Put()
- I'll sort it all one day!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
totally agree with you, this is an excellent idea ! I've just pushed a commit implementing this
Hey @feliixx A couple comments but nothing serious. You've really covered a lot of corner cases here - it's fantastic work! Do you want a hand on any of the above? Dom |
use the same pool for send and received messages. Slices are returned to the pool without being resized. Default allocation size might need to be updated (currently 256, no benchmarks available yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is truly great work, thanks for contributing @feliixx - supporting OP_MSG is a great feature and there's no way we would have had the time currently!
I'm happy to merge this and open an issue for the MaxMessageSizeBytes
splitting - this is a beta/experimental feature anyway.
Again, thanks!
socket.go
Outdated
func getEmptyBody(size int) []byte { | ||
b := responsePool.Get().([]byte) | ||
// get a slice of byte of `size` length from the pool | ||
func getSizedBuffer(size int) []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great idea - makes buffer pooling nice and clean!
Just noticed that MongoDB 3.6.0 is out, so I updated the travis config! |
Great work! |
Since the credential handling refactor in #61, acquired session credentials are dropped after every query. This issue became apparent when testing using SCAM-SHA1 authentication for a mgo release.
* Revert "MongoDB 3.6: implement the new wire protocol (#61)" This reverts commit 90c056c. * test against 3.6 * update go and mongodb version - use last minor version for each major serie of mongodb - use travis 'go' param instead of eval "$(gimme $GO)" as it fails to install correctly last minor version ( 1.8.x notation for example) * test fixes on 3.2.17 also re-enable TestFindIterSnapshot as it was fixed a long time ago * fix X509 test fix TestAuthX509CredRDNConstruction test: need to create an user with {"username": subject} before trying to login * Fix auth test on 3.6-rc3 Make sure that "rs3/127.0.0.1/40031" is elected at primary. Create user before running 'addShard' command as it requires root access Also add a retry mechanism to make sure that shard are correctly added cf https://docs.mongodb.com/manual/tutorial/deploy-shard-cluster/ * update to 3.6.0 stable * tests: cherry pick missing 3.6+ support changes
Mongodb 3.6: implement the new wire protocol
WIP - Do not merge
Goal
This PR is an attempt to implement the new wire protocol introduced in Mongodb 3.6. This protocol add a new message format: OP_MSG
Tests currently run against MongoDB 3.6-rc3 (first 3.6 stable version should be released next month)
I would like to have some feedbacks on this before going any further !
What's in this PR
this PR contains several commits, first 6 are small update to make test pass against last version of each major serie, and the last one is the new wire protocol implementation
commit c1fc70d3d748fb4dd0ebb851b1a8d9a908e053fa
test against 3.6-rc3
commit a594e559227a1301f8aeecb5895cd00127cff502
update mongodb version
use last minor version for each major serie
commit f6a8010534991d83d812e235a6fc91f5bb801699
test fixes on 3.2.17
commit 8242b9c0ab4762bbbb3e6865c91c7ed888a9ae7f
fix X509 test
fix TestAuthX509CredRDNConstruction test: need to create an user with {"username": subject}
before trying to login
commit 3365de912f1ba8ebbe2372b8d8646c097d927105
Fix auth test on 3.6-rc3
Make sure that "rs3/127.0.0.1/40031" is elected at primary.
Create user before running 'addShard' command as it requires root access
Also add a retry mechanism to make sure that shard are correctly added
cf https://docs.mongodb.com/manual/tutorial/deploy-shard-cluster/
commit 17833465551a225e567eb2250c2759354260c96d
re-enable TestViewWithCollation
SERVER-31049 was fixed in 3.4.10, so re-enable it in tests
commit 6a815bbddf6a20ab1e1ddc981cebeb5045e59946
start implementing new wire protocol
TODO
writeConcern