Skip to content

Commit

Permalink
Initial heartbeat support
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Nov 10, 2012
1 parent 172c4ef commit 55d57e3
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 24 deletions.
17 changes: 17 additions & 0 deletions examples/connection/heartbeat.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "bundler"
Bundler.setup

$:.unshift(File.expand_path("../../../lib", __FILE__))

require 'bunny'


b = Bunny.new(:heartbeat_interval => 2)
b.start

c = b.create_channel

sleep 10
59 changes: 59 additions & 0 deletions lib/bunny/heartbeat_sender.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
require "thread"
require "amq/protocol/client"
require "amq/protocol/frame"

module Bunny
class HeartbeatSender

#
# API
#

def initialize(session)
@session = session
@mutex = Mutex.new

@last_activity_time = Time.now
end

def start(period = 30)
@mutex.synchronize do
@period = period

@thread = Thread.new(&method(:run))
end
end

def stop
@mutex.synchronize { @thread.exit }
end

def signal_activity!
@last_activity_time = Time.now
end

protected

def run
begin
loop do
self.beat

sleep (@period / 2)
end
rescue IOError => ioe
# ignored
rescue Exception => e
puts e.message
end
end

def beat
now = Time.now

if now > (@last_activity_time + @period)
@session.send_raw(AMQ::Protocol::HeartbeatFrame.encode)
end
end
end
end
61 changes: 37 additions & 24 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "thread"

require "bunny/socket"
require "bunny/heartbeat_sender"

require "amq/protocol/client"
require "amq/settings"
Expand Down Expand Up @@ -160,6 +161,27 @@ def open?
# Implementation
#

# Writes data to the socket. If read/write timeout was specified, Bunny::ClientTimeout will be raised
# if the operation times out.
#
# @raise [ClientTimeout]
def write(*args)
begin
raise Bunny::ConnectionError, 'No connection - socket has not been created' if !@socket
if @read_write_timeout
Bunny::Timer.timeout(@read_write_timeout, Bunny::ClientTimeout) do
@socket.write(*args)
end
else
@socket.write(*args)
end
rescue Errno::EPIPE, Errno::EAGAIN, Bunny::ClientTimeout, IOError => e
close_socket
raise Bunny::ConnectionError, e.message
end
end
alias send_raw write

def hostname_from(options)
options[:host] || options[:hostname] || DEFAULT_HOST
end
Expand Down Expand Up @@ -248,7 +270,10 @@ def read_ready?(timeout)
def read_next_frame(opts = {})
raise Bunny::ClientTimeout.new("I/O timeout") unless self.read_ready?(opts.fetch(:timeout, @read_write_timeout))

Bunny::Framing::IO::Frame.decode(@socket)
frame = Bunny::Framing::IO::Frame.decode(@socket)
@heartbeat_sender.signal_activity! if @heartbeat_sender

frame
end

def read_fully(*args)
Expand Down Expand Up @@ -330,14 +355,25 @@ def open_connection
connection_open_ok = frame.decode_payload

@status = :open
if @heartbeat && @heartbeat > 0
initialize_heartbeat_sender
end

raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
end

def initialize_heartbeat_sender
@heartbeat_sender = HeartbeatSender.new(self)
@heartbeat_sender.start(@heartbeat)
end

def close_connection
self.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))

method = self.read_next_frame.decode_payload
if @heartbeat_sender
@heartbeat_sender.stop
end
close_socket

method
Expand All @@ -349,33 +385,10 @@ def make_sure_socket_is_initialized
end

# Sends AMQ protocol header (also known as preamble).
#
# @see http://bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2)
def send_preamble
self.send_raw(AMQ::Protocol::PREAMBLE)
end

# Writes data to the socket. If read/write timeout was specified, Bunny::ClientTimeout will be raised
# if the operation times out.
#
# @raise [ClientTimeout]
def write(*args)
begin
raise Bunny::ConnectionError, 'No connection - socket has not been created' if !@socket
if @read_write_timeout
Bunny::Timer.timeout(@read_write_timeout, Bunny::ClientTimeout) do
@socket.write(*args)
end
else
@socket.write(*args)
end
rescue Errno::EPIPE, Errno::EAGAIN, Bunny::ClientTimeout, IOError => e
close_socket
raise Bunny::ConnectionError, e.message
end
end
alias send_raw write


# Reads data from the socket. If read/write timeout was specified, Bunny::ClientTimeout will be raised
# if the operation times out.
Expand Down
37 changes: 37 additions & 0 deletions spec/higher_level_api/integration/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,43 @@



context "initialized with :host => 127.0.0.1 and non-default credentials (take 2)" do
after :each do
subject.close if subject.open?
end

let(:host) { "127.0.0.1" }
# see ./bin/ci/before_build.sh
let(:username) { "bunny_gem" }
let(:password) { "bunny_password" }
let(:vhost) { "bunny_testbed" }
let(:interval) { 1 }

subject do
described_class.new(:hostname => host, :user => username, :pass => password, :vhost => vhost, :heartbeat_interval => interval)
end

it "successfully connects" do
subject.start
subject.should be_connected

subject.server_properties.should_not be_nil
subject.server_capabilities.should_not be_nil

props = subject.server_properties

props["product"].should_not be_nil
props["platform"].should_not be_nil
props["version"].should_not be_nil
end

it "uses provided heartbeat interval" do
subject.heartbeat.should == interval
end
end



context "initialized with :host => 127.0.0.1 and INVALID credentials" do
let(:host) { "127.0.0.1" }
# see ./bin/ci/before_build.sh
Expand Down

0 comments on commit 55d57e3

Please sign in to comment.