diff --git a/lib/mosql/cli.rb b/lib/mosql/cli.rb index 4f537ec..efe9126 100644 --- a/lib/mosql/cli.rb +++ b/lib/mosql/cli.rb @@ -121,8 +121,8 @@ def parse_args end def connect_mongo - @mongo = Mongo::MongoClient.from_uri(options[:mongo]) - config = @mongo['admin'].command(:ismaster => 1) + @mongo = Mongo::Client.new(options[:mongo]) + config = @mongo.use('admin').command(:ismaster => 1).documents.first if !config['setName'] && !options[:skip_tail] log.warn("`#{options[:mongo]}' is not a replset.") log.warn("Will run the initial import, then stop.") diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 7e0f119..7bfea03 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -191,12 +191,12 @@ def transform_primitive(v, type=nil) v.to_s when BSON::Binary if type.downcase == 'uuid' - v.to_s.unpack("H*").first + v.data.to_s.unpack("H*").first else - Sequel::SQL::Blob.new(v.to_s) + Sequel::SQL::Blob.new(v.data.to_s) end - when BSON::DBRef - v.object_id.to_s + when Mongo::DBRef + v.id.to_s else v end @@ -209,7 +209,7 @@ def transform(ns, obj, schema=nil) # Do a deep clone, because we're potentially going to be # mutating embedded objects. - obj = BSON.deserialize(BSON.serialize(obj)) + obj = Marshal.load(Marshal.dump(obj)) row = [] schema[:columns].each do |col| @@ -259,7 +259,7 @@ def sanitize(value) when Array value.map {|v| sanitize(v)} when BSON::Binary - Base64.encode64(value.to_s) + Base64.encode64(value.data.to_s) when Float # NaN is illegal in JSON. Translate into null. value.nan? ? nil : value diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index d630e96..b7b38e2 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -31,7 +31,7 @@ def import def collection_for_ns(ns) dbname, collection = ns.split(".", 2) - @mongo.db(dbname).collection(collection) + @mongo.use(dbname)[collection] end def unsafe_handle_exceptions(ns, obj) @@ -114,7 +114,7 @@ def initial_import end log.info("Importing for Mongo DB #{dbname}...") - db = @mongo.db(dbname) + db = @mongo.use(dbname) collections = db.collections.select { |c| spec.key?(c.name) } collections.each do |collection| @@ -141,21 +141,19 @@ def import_collection(ns, collection, filter) start = Time.now sql_time = 0 - collection.find(filter, :batch_size => BATCH) do |cursor| - with_retries do - cursor.each do |obj| - batch << @schema.transform(ns, obj) - count += 1 - - if batch.length >= BATCH - sql_time += track_time do - bulk_upsert(table, ns, batch) - end - elapsed = Time.now - start - log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") - batch.clear - exit(0) if @done + with_retries do + collection.find(filter, :batch_size => BATCH).each do |obj| + batch << @schema.transform(ns, obj) + count += 1 + + if batch.length >= BATCH + sql_time += track_time do + bulk_upsert(table, ns, batch) end + elapsed = Time.now - start + log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") + batch.clear + exit(0) if @done end end end @@ -179,7 +177,7 @@ def optail end def sync_object(ns, selector) - obj = collection_for_ns(ns).find_one(selector) + obj = collection_for_ns(ns).find(selector).limit(1).first if obj unsafe_handle_exceptions(ns, obj) do @sql.upsert_ns(ns, obj) diff --git a/lib/mosql/version.rb b/lib/mosql/version.rb index f4449b1..daa119c 100644 --- a/lib/mosql/version.rb +++ b/lib/mosql/version.rb @@ -1,3 +1,3 @@ module MoSQL - VERSION = "0.4.3" + VERSION = "0.5.0" end diff --git a/mosql.gemspec b/mosql.gemspec index 8089b30..992ffe4 100644 --- a/mosql.gemspec +++ b/mosql.gemspec @@ -22,11 +22,11 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "log4r" gem.add_runtime_dependency "json" - gem.add_runtime_dependency "mongoriver", "0.4" + gem.add_runtime_dependency "mongoriver", "0.5" - gem.add_runtime_dependency "mongo", "~> 1.10" - gem.add_runtime_dependency "bson", "~> 1.10" - gem.add_runtime_dependency "bson_ext", "~> 1.10" + gem.add_runtime_dependency "mongo", "~> 2.0" + gem.add_runtime_dependency "bson", "~> 4.0" + gem.add_runtime_dependency "bson_ext" gem.add_development_dependency "minitest" gem.add_development_dependency "mocha" diff --git a/test/functional/_lib.rb b/test/functional/_lib.rb index 523cec8..c24c99a 100644 --- a/test/functional/_lib.rb +++ b/test/functional/_lib.rb @@ -37,8 +37,8 @@ def connect_sql def connect_mongo begin - Mongo::Connection.from_uri(mongo_test_uri) - rescue Mongo::ConnectionFailure, Mongo::ConnectionError + mongo = Mongo::Client.new(mongo_test_uri, { database: mongo_test_dbname } ) + rescue Mongo::Error $stderr.puts < 100), - :w => 1) + mongo.use('mosql_test')['collection'].insert_one(o.merge('var' => 100), + :w => 1) @streamer.handle_op({ 'ns' => 'mosql_test.collection', 'op' => 'u', @@ -172,8 +172,10 @@ def build_streamer # $set's are currently a bit of a hack where we read the object # from the db, so make sure the new object exists in mongo - connect_mongo['mosql_test']['renameid'].insert(o.merge('goats' => 0), - :w => 1) + #connect_mongo['mosql_test'].insert(o.merge('goats' => 0), + #mongo['mosql_test'].insert_one(o.merge('goats' => 0), + mongo.use('mosql_test')['renameid'].insert_one(o.merge('goats' => 0), + :w => 1) @streamer.handle_op({ 'ns' => 'mosql_test.renameid', 'op' => 'u', @@ -197,9 +199,9 @@ def build_streamer it 'filters unwanted records' do data = [{:_id => BSON::ObjectId.from_time(Time.utc(2014, 7, 1)), :var => 2}, {:_id => BSON::ObjectId.from_time(Time.utc(2014, 7, 2)), :var => 3}] - collection = mongo["filter_test"]["collection"] + collection = mongo.use('filter_test')['collection'] collection.drop - data.map { |rec| collection.insert(rec)} + data.map { |rec| collection.insert_one(rec)} @streamer.options[:skip_tail] = true @streamer.initial_import @@ -214,14 +216,14 @@ def build_streamer it 'handles "u" ops with a compsite key' do date = Time.utc(2014, 7, 1) o = {'_id' => {'s' => 'asdf', 't' => date}, 'var' => 'data'} - collection = mongo["composite_key_test"]["collection"] + collection = mongo.use('composite_key_test')['collection'] collection.drop - collection.insert(o) + collection.insert_one(o) @streamer.options[:skip_tail] = true @streamer.initial_import - collection.update({ '_id' => { 's' => 'asdf', 't' => date}}, { '$set' => { 'var' => 'new_data'}}) + collection.update_one({ '_id' => { 's' => 'asdf', 't' => date}}, { '$set' => { 'var' => 'new_data'}}) @streamer.handle_op({'ns' => 'composite_key_test.collection', 'op' => 'u', 'o2' => { '_id' => { 's' => 'asdf', 't' => date}}, @@ -234,9 +236,9 @@ def build_streamer it 'handles composite keys' do o = {'_id' => {'s' => 'asdf', 't' => Time.new}, 'var' => 'data'} - collection = mongo["composite_key_test"]["collection"] + collection = mongo.use('composite_key_test')['collection'] collection.drop - collection.insert(o) + collection.insert_one(o) @streamer.options[:skip_tail] = true @streamer.initial_import @@ -333,9 +335,9 @@ def build_streamer it 'imports from all dbs' do ids = (1.upto(4)).map { BSON::ObjectId.new } ids.each_with_index do |_id, i| - collection = mongo["test_#{i}"]['collection'] + collection = mongo.use("test_#{i}")['collection'] collection.drop - collection.insert({:_id => _id, :var => i}, :w => 1) + collection.insert_one({:_id => _id, :var => i}, :w => 1) end @streamer.options[:skip_tail] = true @@ -361,7 +363,7 @@ def build_streamer @map = MoSQL::Schema.new(YAML.load(TIMESTAMP_MAP)) @adapter = MoSQL::SQLAdapter.new(@map, sql_test_uri) - mongo['db']['has_timestamp'].drop + mongo.use('db')['has_timestamp'].drop @sequel.drop_table?(:has_timestamp) @map.create_schema(@sequel) @@ -370,7 +372,7 @@ def build_streamer it 'preserves milliseconds on import' do ts = Time.utc(2014, 8, 7, 6, 54, 32, 123000) - mongo['db']['has_timestamp'].insert({ts: ts}) + mongo.use('db')['has_timestamp'].insert_one({ts: ts}) @streamer.options[:skip_tail] = true @streamer.initial_import @@ -382,7 +384,7 @@ def build_streamer it 'preserves milliseconds on tailing' do ts = Time.utc(2006,01,02, 15,04,05,678000) - id = mongo['db']['has_timestamp'].insert({ts: ts}) + id = mongo.use('db')['has_timestamp'].insert_one({ts: ts}).inserted_id @streamer.handle_op( { "ts" => {"t" => 1408647630, "i" => 4}, @@ -390,11 +392,11 @@ def build_streamer "v" => 2, "op" => "i", "ns" => "db.has_timestamp", - "o" => mongo['db']['has_timestamp'].find_one({_id: id}) + "o" => mongo.use('db')['has_timestamp'].find({_id: id}).first }) got = @sequel[:has_timestamp].where(:_id => id.to_s).select.first[:ts] assert_equal(ts.to_i, got.to_i) assert_equal(ts.tv_usec, got.tv_usec) end - end + end end diff --git a/test/functional/transform.rb b/test/functional/transform.rb index 234b97a..7187d59 100644 --- a/test/functional/transform.rb +++ b/test/functional/transform.rb @@ -19,7 +19,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional 'stringy' ], [ - BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')), + Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id, 'TEXT', '5405fae77c584947fc000001' ], @@ -41,29 +41,27 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional ], [ [ - BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')), - BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002')) + Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id, + Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002')).id ], 'TEXT ARRAY', ['5405fae77c584947fc000001', '5405fae77c584947fc000002'] ], [ [ - BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')), - BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002')) + Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id, + Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002')).id ], 'TEXT', ['5405fae77c584947fc000001', '5405fae77c584947fc000002'].to_json ], [ - BSON::Binary.new(["2d931510d99f494a8c6787feb05e1594"].pack("H*"), - BSON::Binary::SUBTYPE_UUID), + BSON::Binary.new(["2d931510d99f494a8c6787feb05e1594"].pack("H*"), :uuid), 'UUID', "2d931510-d99f-494a-8c67-87feb05e1594" ], [ - BSON::Binary.new(["deadbeefcafebabe"].pack("H*"), - BSON::Binary::SUBTYPE_SIMPLE), + BSON::Binary.new(["deadbeefcafebabe"].pack("H*"), :generic), 'BYTEA', ["deadbeefcafebabe"].pack("H*") ] @@ -84,7 +82,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional schema = MoSQL::Schema.new(map) adapter = MoSQL::SQLAdapter.new(schema, sql_test_uri) @sequel.drop_table?(:test_transform) - collection = @mongo['test']['test_transform'] + collection = @mongo.use('test')['test_transform'] collection.drop schema.create_schema(@sequel) @@ -96,7 +94,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional # Test initial import id = 'imported' - collection.insert({_id: id, value: mongo}) + collection.insert_one({_id: id, value: mongo}) streamer.initial_import got = @sequel[:test_transform].where(_id: id).to_a @@ -104,7 +102,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional # Test streaming an insert id = 'inserted' - collection.insert({_id: id, value: mongo}) + collection.insert_one({_id: id, value: mongo}) streamer.handle_op( { "ts" => {"t" => 1408647630, "i" => 4}, @@ -112,7 +110,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional "v" => 2, "op" => "i", "ns" => "test.test_transform", - "o" => collection.find_one(_id: id) + "o" => collection.find(_id: id).first }) got = @sequel[:test_transform].where(_id: id).to_a diff --git a/test/unit/lib/mosql/schema.rb b/test/unit/lib/mosql/schema.rb index 399008e..81ab768 100644 --- a/test/unit/lib/mosql/schema.rb +++ b/test/unit/lib/mosql/schema.rb @@ -198,13 +198,13 @@ class MoSQL::Test::SchemaTest < MoSQL::Test it 'extracts object ids from a DBRef' do oid = BSON::ObjectId.new out = @map.transform('db.collection', {'_id' => "row 1", - 'str' => BSON::DBRef.new('db.otherns', oid)}) + 'str' => Mongo::DBRef.new('db.otherns', oid)}) assert_equal(["row 1", nil, oid.to_s, nil], out) end it 'converts DBRef to object id in arrays' do oid = [ BSON::ObjectId.new, BSON::ObjectId.new] - o = {'_id' => "row 1", "str" => [ BSON::DBRef.new('db.otherns', oid[0]), BSON::DBRef.new('db.otherns', oid[1]) ] } + o = {'_id' => "row 1", "str" => [ Mongo::DBRef.new('db.otherns', oid[0]), Mongo::DBRef.new('db.otherns', oid[1]) ] } out = @map.transform('db.collection', o) assert_equal(["row 1", nil, JSON.dump(oid.map! {|o| o.to_s}), nil ], out) end @@ -219,8 +219,8 @@ class MoSQL::Test::SchemaTest < MoSQL::Test it 'base64-encodes BSON::Binary blobs in extra_props' do out = @map.transform('db.with_extra_props', {'_id' => 7, - 'blob' => BSON::Binary.new("\x00\x00\x00"), - 'embedded' => {'thing' => BSON::Binary.new("\x00\x00\x00")}}) + 'blob' => BSON::Binary.new("\x00\x00\x00", :generic), + 'embedded' => {'thing' => BSON::Binary.new("\x00\x00\x00", :generic)}}) extra = JSON.parse(out[1]) assert(extra.key?('blob')) assert_equal('AAAA', extra['blob'].strip)