Skip to content

Commit

Permalink
Channel pool
Browse files Browse the repository at this point in the history
Instead of giving each client connection an upstream connection, each
client channel is given an upstream channel upon request, which means that multiple
client connections can share a single upstream connection, massivly
reducing resources required of the upstream server.
  • Loading branch information
carlhoerberg committed Feb 17, 2024
1 parent 8de1405 commit 928e3c9
Show file tree
Hide file tree
Showing 13 changed files with 468 additions and 377 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
*.tar.gz
*.swp
/builds/
shard.override.yml
13 changes: 5 additions & 8 deletions shard.lock
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
# NOTICE: This lockfile contains some overrides from shard.override.yml
version: 2.0
shards:
ameba:
git: https://github.com/crystal-ameba/ameba.git
version: 1.5.0
version: 1.6.1

amq-protocol:
amq-protocol: # Overridden
git: https://github.com/cloudamqp/amq-protocol.cr.git
version: 1.1.4
version: 1.1.13+git.commit.0c5f766bbacfffc2e3d0cad6f579c073810a6df2

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.0.11

logger:
git: https://github.com/84codes/logger.cr.git
version: 1.0.2
version: 1.2.1

3 changes: 1 addition & 2 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ targets:
dependencies:
amq-protocol:
github: cloudamqp/amq-protocol.cr
logger:
github: 84codes/logger.cr
branch: buffered_from_io

development_dependencies:
amqp-client:
Expand Down
42 changes: 33 additions & 9 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ require "./spec_helper"

describe AMQProxy::Server do
it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
10.times do
AMQP::Client.start("amqp://localhost:5673") do |conn|
conn.channel
ch = conn.channel
ch.basic_publish "foobar", "amq.fanout", ""
s.client_connections.should eq 1
s.upstream_connections.should eq 1
end
sleep 0.1
# sleep 0.1
end
s.client_connections.should eq 0
s.upstream_connections.should eq 1
Expand All @@ -22,7 +23,7 @@ describe AMQProxy::Server do
end

it "publish and consume works" do
server = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
server = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { server.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -38,18 +39,19 @@ describe AMQProxy::Server do
queue = channel.queue(queue_name)
queue.publish_confirm(message_payload)
end
sleep 0.1
end
sleep 0.1

AMQP::Client.start("amqp://localhost:5673") do |conn|
channel = conn.channel
channel.basic_consume(queue_name, block: true, tag: "AMQProxy specs") do |msg|
channel.basic_consume(queue_name, no_ack: false, tag: "AMQProxy specs") do |msg|
body = msg.body_io.to_s
if body == message_payload
channel.basic_ack(msg.delivery_tag)
num_received_messages += 1
end
end
sleep 0.1
end

num_received_messages.should eq num_messages_to_publish
Expand All @@ -58,8 +60,30 @@ describe AMQProxy::Server do
end
end

it "a client can open all channels" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
max = 4000
AMQP::Client.start("amqp://localhost:5673?channel_max=#{max}") do |conn|
conn.channel_max.should eq max
conn.channel_max.times do
conn.channel
end
s.client_connections.should eq 1
s.upstream_connections.should eq 2
end
sleep 0.1
s.client_connections.should eq 0
s.upstream_connections.should eq 2
ensure
s.stop_accepting_clients
end
end

it "can reconnect if upstream closes" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -83,7 +107,7 @@ describe AMQProxy::Server do

it "responds to upstream heartbeats" do
system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -102,7 +126,7 @@ describe AMQProxy::Server do
it "supports waiting for client connections on graceful shutdown" do
started = Time.utc.to_unix

s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5)
s = AMQProxy::Server.new("127.0.0.1", 5672, false, 5)
wait_for_channel = Channel(Int32).new # channel used to wait for certain calls, to test certain behaviour
spawn do
s.listen("127.0.0.1", 5673)
Expand Down
48 changes: 24 additions & 24 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,27 @@ MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo "

private SPEC_TIMEOUT = 15.seconds

Spec.around_each do |example|
done = Channel(Exception?).new

spawn(same_thread: true) do
begin
example.run
rescue e
done.send(e)
else
done.send(nil)
end
end

timeout = SPEC_TIMEOUT

select
when res = done.receive
raise res if res
when timeout(timeout)
_it = example.example
ex = Spec::AssertionFailed.new("spec timed out after #{timeout}", _it.file, _it.line)
_it.parent.report(:fail, _it.description, _it.file, _it.line, timeout, ex)
end
end
# Spec.around_each do |example|
# done = Channel(Exception?).new
#
# spawn(same_thread: true) do
# begin
# example.run
# rescue e
# done.send(e)
# else
# done.send(nil)
# end
# end
#
# timeout = SPEC_TIMEOUT
#
# select
# when res = done.receive
# raise res if res
# when timeout(timeout)
# _it = example.example
# ex = Spec::AssertionFailed.new("spec timed out after #{timeout}", _it.file, _it.line)
# _it.parent.report(:fail, _it.description, _it.file, _it.line, timeout, ex)
# end
# end
13 changes: 7 additions & 6 deletions src/amqproxy.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ require "./amqproxy/server"
require "option_parser"
require "uri"
require "ini"
require "logger"
require "log"

class AMQProxy::CLI
@listen_address = ENV["LISTEN_ADDRESS"]? || "localhost"
@listen_port = ENV["LISTEN_PORT"]? || 5673
@log_level : Logger::Severity = Logger::INFO
@log_level : Log::Severity = Log::Severity::Info
@idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i
@upstream = ENV["AMQP_URL"]?

Expand All @@ -19,7 +19,7 @@ class AMQProxy::CLI
section.each do |key, value|
case key
when "upstream" then @upstream = value
when "log_level" then @log_level = Logger::Severity.parse(value)
when "log_level" then @log_level = Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
Expand All @@ -29,7 +29,7 @@ class AMQProxy::CLI
case key
when "port" then @listen_port = value
when "bind", "address" then @listen_address = value
when "log_level" then @log_level = Logger::Severity.parse(value)
when "log_level" then @log_level = Log::Severity.parse(value)
else raise "Unsupported config #{name}/#{key}"
end
end
Expand All @@ -50,7 +50,7 @@ class AMQProxy::CLI
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = Logger::DEBUG }
parser.on("-d", "--debug", "Verbose logging") { @log_level = Log::Severity::Debug }
parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 }
Expand All @@ -71,7 +71,8 @@ class AMQProxy::CLI
port = u.port || default_port
tls = u.scheme == "amqps"

server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout)
Log.setup_from_env(default_level: @log_level)
server = AMQProxy::Server.new(u.host || "", port, tls, @idle_connection_timeout)

first_shutdown = true
shutdown = ->(_s : Signal) do
Expand Down
81 changes: 81 additions & 0 deletions src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
require "openssl"
require "log"
require "./records"
require "./upstream"

module AMQProxy
class ChannelPool
Log = ::Log.for(self)
@lock = Mutex.new
@upstreams = Deque(Upstream).new

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32)
end

def get(downstream_channel : DownstreamChannel) : UpstreamChannel
at_channel_max = 0
@lock.synchronize do
loop do
if upstream = @upstreams.shift?
next if upstream.closed?
begin
upstream_channel = upstream.open_channel_for(downstream_channel)
@upstreams.unshift(upstream)
return upstream_channel
rescue Upstream::ChannelMaxReached
@upstreams.push(upstream)
at_channel_max += 1
add_upstream if at_channel_max == @upstreams.size
end
else
add_upstream
end
end
end
end

private def add_upstream
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials)
@upstreams.unshift upstream
end

def connections
@upstreams.size
end

def close
@lock.synchronize do
while u = @upstreams.shift?
begin
u.close "AMQProxy shutdown"
rescue ex
Log.error { "Problem closing upstream: #{ex.inspect}" }
end
end
end
end

private def shrink_pool_loop
loop do
sleep 5.seconds
@lock.synchronize do
max_connection_age = Time.monotonic - @idle_connection_timeout.seconds
@upstreams.size.times do
u = @upstreams.shift
if u.last_used < max_connection_age
begin
u.close "Pooled connection closed due to inactivity"
rescue ex
Log.error { "Problem closing upstream: #{ex.inspect}" }
end
elsif u.closed?
Log.error { "Removing closed upstream connection from pool" }
else
@upstreams.push u
end
end
end
end
end
end
end
Loading

0 comments on commit 928e3c9

Please sign in to comment.