Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

1.5.0 fix retries operations #316

Open
wants to merge 9 commits into
base: 1.5.0-stable
Choose a base branch
from
20 changes: 15 additions & 5 deletions lib/moped/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,21 @@ def write(operations)
#
# @since 1.2.9
def read_data(socket, length)
data = socket.read(length)
unless data
raise Errors::ConnectionFailure.new(
"Attempted to read #{length} bytes from the socket but nothing was returned."
)
# Block on data to read for op_timeout seconds
# using the suggested implementation of http://www.ruby-doc.org/core-2.1.3/Kernel.html#method-i-select
# to work with SSL connections
time_left = op_timeout = @options[:op_timeout] || timeout
begin
raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.") if (time_left -= 0.1) <= 0
data = socket.read_nonblock(length)
rescue IO::WaitReadable
Kernel::select([socket], nil, [socket], 0.1)
retry
rescue IO::WaitWritable
Kernel::select(nil, [socket], [socket], 0.1)
retry
rescue SystemCallError, IOError => e
raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.")
end
if data.length < length
data << read_data(socket, length - data.length)
Expand Down
3 changes: 3 additions & 0 deletions lib/moped/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ module Errors
# Generic error class for exceptions related to connection failures.
class ConnectionFailure < StandardError; end

# Generic error class for exceptions related to read timeout failures.
class OperationTimeout < StandardError; end

# Raised when a database name is invalid.
class InvalidDatabaseName < StandardError; end

Expand Down
10 changes: 2 additions & 8 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def command(database, cmd, options = {})
if reply.command_failure?
if reply.unauthorized? && auth.has_key?(database)
login(database, *auth[database])
result = command(database, cmd, options)
raise Errors::ReplicaSetReconfigured.new(operation, result)
else
raise Errors::OperationFailure.new(operation, result)
end
Expand Down Expand Up @@ -372,14 +372,8 @@ def query(database, collection, selector, options = {})
process(operation) do |reply|
if reply.query_failed?
if reply.unauthorized? && auth.has_key?(database)
# If we got here, most likely this is the case of Moped
# authenticating successfully against the node originally, but the
# node has been reset or gone down and come back up. The most
# common case here is a rs.stepDown() which will reinitialize the
# connection. In this case we need to requthenticate and try again,
# otherwise we'll just raise the error to the user.
login(database, *auth[database])
reply = query(database, collection, selector, options)
raise Errors::ReplicaSetReconfigured.new(operation, reply.documents.first)
else
raise Errors::QueryFailure.new(operation, reply.documents.first)
end
Expand Down
198 changes: 197 additions & 1 deletion spec/moped/cluster_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@
end

let(:node) do
stub
double
end

context "when a node has no peers" do
Expand Down Expand Up @@ -459,3 +459,199 @@
end
end
end

describe Moped::Cluster, "after a reconfiguration" do
let(:options) do
{
max_retries: 30,
retry_interval: 1,
timeout: 5,
database: 'test_db',
consistency: :strong,
safe: {w: 'majority'}
}
end

let(:replica_set_name) { 'dev' }

let(:session) do
Moped::Session.new([ "127.0.0.1:31100", "127.0.0.1:31101", "127.0.0.1:31102" ], options)
end

def servers_status
auth = has_user_admin? ? "-u admin -p admin_pwd --authenticationDatabase admin" : ""
`echo 'rs.status().members[0].stateStr + "|" + rs.status().members[1].stateStr + "|" + rs.status().members[2].stateStr' | mongo --quiet --port 31100 #{auth} 2>/dev/null`.chomp.split("|")
end

def has_user_admin?
auth = with_authentication? ? "-u admin -p admin_pwd --authenticationDatabase admin" : ""
`echo 'db.getSisterDB("admin").getUser("admin").user' | mongo --quiet --port 31100 #{auth} 2>/dev/null`.chomp == "admin"
end

def step_down_servers
step_down_file = File.join(Dir.tmpdir, with_authentication? ? "step_down_with_authentication.js" : "step_down_without_authentication.js")
unless File.exists?(step_down_file)
File.open(step_down_file, "w") do |file|
user_data = with_authentication? ? ", 'admin', 'admin_pwd'" : ""
file.puts %{
function stepDown(dbs) {
for (i in dbs) {
dbs[i].adminCommand({replSetFreeze:5});
try { dbs[i].adminCommand({replSetStepDown:5}); } catch(e) { print(e) };
}
};

var db1 = connect('localhost:31100/admin'#{user_data});
var db2 = connect('localhost:31101/admin'#{user_data});
var db3 = connect('localhost:31102/admin'#{user_data});

var dbs = [db1, db2, db3];
stepDown(dbs);

while (db1.adminCommand({ismaster:1}).ismaster || db2.adminCommand({ismaster:1}).ismaster || db2.adminCommand({ismaster:1}).ismaster) {
stepDown(dbs);
}
}
end
end
system "mongo --nodb #{step_down_file} 2>&1 > /dev/null"
end

shared_examples_for "recover the session" do
it "should execute commands normally before the stepDown" do
time = Benchmark.realtime do
session[:foo].find().remove_all()
session[:foo].find().to_a.count.should eql(0)
session[:foo].insert({ name: "bar 1" })
session[:foo].find().to_a.count.should eql(1)
expect {
session[:foo].insert({ name: "bar 1" })
}.to raise_exception
end
time.should be < 2
end

it "should recover and execute a find" do
session[:foo].find().remove_all()
session[:foo].insert({ name: "bar 1" })
step_down_servers
time = Benchmark.realtime do
session[:foo].find().to_a.count.should eql(1)
end
time.should be > 5
time.should be < 29
end

it "should recover and execute an insert" do
session[:foo].find().remove_all()
session[:foo].insert({ name: "bar 1" })
step_down_servers
time = Benchmark.realtime do
session[:foo].insert({ name: "bar 2" })
session[:foo].find().to_a.count.should eql(2)
end
time.should be > 5
time.should be < 29

session[:foo].insert({ name: "bar 3" })
session[:foo].find().to_a.count.should eql(3)
end

it "should recover and try an insert which hit a constraint" do
session[:foo].find().remove_all()
session[:foo].insert({ name: "bar 1" })
step_down_servers
time = Benchmark.realtime do
expect {
session[:foo].insert({ name: "bar 1" })
}.to raise_exception
end
time.should be > 5
time.should be < 29

session[:foo].find().to_a.count.should eql(1)

session[:foo].insert({ name: "bar 2" })
session[:foo].find().to_a.count.should eql(2)
end
end

describe "with authentication off" do
before do
unless servers_status.all?{|st| st == "PRIMARY" || st == "SECONDARY"} && !has_user_admin?
start_mongo_server(31100, "--replSet #{replica_set_name}")
start_mongo_server(31101, "--replSet #{replica_set_name}")
start_mongo_server(31102, "--replSet #{replica_set_name}")

`echo "rs.initiate({_id : '#{replica_set_name}', 'members' : [{_id:0, host:'localhost:31100'},{_id:1, host:'localhost:31101'},{_id:2, host:'localhost:31102'}]})" | mongo --port 31100`
sleep 0.1 while !servers_status.all?{|st| st == "PRIMARY" || st == "SECONDARY"}

master = `echo 'db.isMaster().primary' | mongo --quiet --port 31100`.chomp

`echo "
use test_db;
db.foo.ensureIndex({name:1}, {unique:1});
" | mongo #{master}`
end
end

let(:with_authentication?) { false }

it_should_behave_like "recover the session"
end

describe "with authentication on" do
before do
unless servers_status.all?{|st| st == "PRIMARY" || st == "SECONDARY"} && has_user_admin?
keyfile = File.join(Dir.tmpdir, "31000", "keyfile")
FileUtils.mkdir_p(File.dirname(keyfile))
File.open(keyfile, "w") do |f| f.puts "SyrfEmAevWPEbgRZoZx9qZcZtJAAfd269da+kzi0H/7OuowGLxM3yGGUHhD379qP
nw4X8TT2T6ecx6aqJgxG+biJYVOpNK3HHU9Dp5q6Jd0bWGHGGbgFHV32/z2FFiti
EFLimW/vfn2DcJwTW29nQWhz2wN+xfMuwA6hVxFczlQlz5hIY0+a+bQChKw8wDZk
rW1OjTQ//csqPbVA8fwB49ghLGp+o84VujhRxLJ+0sbs8dKoIgmVlX2kLeHGQSf0
KmF9b8kAWRLwLneOR3ESovXpEoK0qpQb2ym6BNqP32JKyPA6Svb/smVONhjUI71f
/zQ2ETX7ylpxIzw2SMv/zOWcVHBqIbdP9Llrxb3X0EsB6J8PeI8qLjpS94FyEddw
ACMcAxbP+6BaLjXyJ2WsrEeqThAyUC3uF5YN/oQ9XiATqP7pDOTrmfn8LvryyzcB
ByrLRTPOicBaG7y13ATcCbBdrYH3BE4EeLkTUZOg7VzvRnATvDpt0wOkSnbqXow8
GQ6iMUgd2XvUCuknQLD6gWyoUyHiPADKrLsgnd3Qo9BPxYJ9VWSKB4phK3N7Bic+
BwxlcpDFzGI285GR4IjcJbRRjjywHq5XHOxrJfN+QrZ/6wy6yu2+4NTPj+BPC5iX
/dNllTEyn7V+pr6FiRv8rv8RcxJgf3nfn/Xz0t2zW2olcalEFxwKKmR20pZxPnSv
Kr6sVHEzh0mtA21LoK5G8bztXsgFgWU7hh9z8UUo7KQQnDfyPb6k4xroeeQtWBNo
TZF1pI5joLytNSEtT+BYA5wQSYm4WCbhG+j7ipcPIJw6Un4ZtAZs0aixDfVE0zo0
w2FWrYH2dmmCMbz7cEXeqvQiHh9IU/hkTrKGY95STszGGFFjhtS2TbHAn2rRoFI0
VwNxMJCC+9ZijTWBeGyQOuEupuI4C9IzA5Gz72048tpZ0qMJ9mOiH3lZFtNTg/5P
28Td2xzaujtXjRnP3aZ9z2lKytlr
"
end

File.chmod(0600, keyfile)

start_mongo_server(31100, "--replSet #{replica_set_name} --keyFile #{keyfile} --auth")
start_mongo_server(31101, "--replSet #{replica_set_name} --keyFile #{keyfile} --auth")
start_mongo_server(31102, "--replSet #{replica_set_name} --keyFile #{keyfile} --auth")

`echo "rs.initiate({_id : '#{replica_set_name}', 'members' : [{_id:0, host:'localhost:31100'},{_id:1, host:'localhost:31101'},{_id:2, host:'localhost:31102'}]})" | mongo --port 31100`
sleep 0.1 while !servers_status.all?{|st| st == "PRIMARY" || st == "SECONDARY"}

master = `echo 'db.isMaster().primary' | mongo --quiet --port 31100`.chomp

`echo "
use admin;
db.addUser('admin', 'admin_pwd');
" | mongo #{master}`

`echo "
use test_db;
db.addUser('common', 'common_pwd');
db.foo.ensureIndex({name:1}, {unique:1});
" | mongo #{master} -u admin -p admin_pwd --authenticationDatabase admin`
end

session.login('common', 'common_pwd')
end

let(:with_authentication?) { true }

it_should_behave_like "recover the session"
end
end
38 changes: 37 additions & 1 deletion spec/moped/query_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@
it "raises an error when hinting an invalid index" do
expect {
users.find(scope: scope).hint(scope: 1).to_a
}.to raise_error(Moped::Errors::QueryFailure, %r{failed with error 10113: "bad hint"})
}.to raise_error(Moped::Errors::QueryFailure, %r{bad hint})
end
end

Expand Down Expand Up @@ -919,6 +919,42 @@
end
end

context "with test commands enabled" do

let(:session) do
Moped::Session.new([ "127.0.0.1:#{port}" ], database: "moped_test")
end

let(:users) do
session.with(safe: true)[:users]
end

describe "when a query take too long" do
let(:port) { 31100 }

before do
start_mongo_server(port, "--setParameter enableTestCommands=1")
Process.detach(spawn("echo 'db.adminCommand({sleep: 1, w: true, secs: 10})' | mongo localhost:#{port} 2>&1 > /dev/null"))
sleep(1) # to sleep command on mongodb begins work
end

after do
stop_mongo_server(port)
end

it "raises a operation timeout exception" do
time = Benchmark.realtime do
expect {
Timeout::timeout(7) do
users.find("age" => { "$gte" => 65 }).first
end
}.to raise_exception("Took more than 5 seconds to receive data.")
end
expect(time).to be < 5.5
end
end
end

context "with a remote connection", mongohq: :auth do

before(:all) do
Expand Down
26 changes: 26 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

$:.unshift((Pathname(__FILE__).dirname.parent + "lib").to_s)

require "benchmark"
require "fileutils"
require "tmpdir"
require "tempfile"
require "moped"
require "support/examples"
require "support/mongohq"
Expand All @@ -35,7 +39,29 @@
return true if value == :auth && !Support::MongoHQ.auth_node_configured?
end

config.after(:suite) do
stop_mongo_server(31100)
stop_mongo_server(31101)
stop_mongo_server(31102)
end

unless Support::MongoHQ.replica_set_configured? || Support::MongoHQ.auth_node_configured?
$stderr.puts Support::MongoHQ.message
end
end

def start_mongo_server(port, extra_options=nil)
stop_mongo_server(port)
dbpath = File.join(Dir.tmpdir, port.to_s)
FileUtils.mkdir_p(dbpath)
`mongod --oplogSize 40 --noprealloc --smallfiles --port #{port} --dbpath #{dbpath} --logpath #{dbpath}/log --pidfilepath #{dbpath}/pid --fork #{extra_options}`

sleep 0.1 while `echo 'db.runCommand({ping:1}).ok' | mongo --quiet --port #{port}`.chomp != "1"
end

def stop_mongo_server(port)
dbpath = File.join(Dir.tmpdir, port.to_s)
pidfile = File.join(dbpath, "pid")
`kill #{File.read(pidfile).chomp}` if File.exists?(pidfile)
FileUtils.rm_rf(dbpath)
end
8 changes: 7 additions & 1 deletion spec/support/replica_set_simulator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,13 @@ def next_client

servers.each do |server|
Moped.logger.debug "replica_set: accepting new client for #{server.port}"
@clients << server.accept
begin
@clients << server.accept
rescue IOError, Errno::EBADF, TypeError
# Looks like we hit a bad file descriptor or closed connection.
Moped.logger.debug "replica_set: io error, retrying"
retry
end
end

Moped.logger.debug "replica_set: closing dead clients"
Expand Down