Skip to content

Commit

Permalink
large refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Feb 17, 2024
1 parent df1b5bd commit a57afd2
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 158 deletions.
10 changes: 3 additions & 7 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ version: 2.0
shards:
ameba:
git: https://github.com/crystal-ameba/ameba.git
version: 1.5.0
version: 1.6.1

amq-protocol: # Overridden
git: https://github.com/cloudamqp/amq-protocol.cr.git
version: 1.1.11+git.commit.eddbea6ddb19be0b1750c2760aa5c5ab0eeafed8
version: 1.1.13+git.commit.0c5f766bbacfffc2e3d0cad6f579c073810a6df2

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.1.0

logger:
git: https://github.com/84codes/logger.cr.git
version: 1.0.2
version: 1.2.1

3 changes: 1 addition & 2 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ targets:
dependencies:
amq-protocol:
github: cloudamqp/amq-protocol.cr
logger:
github: 84codes/logger.cr
branch: buffered_from_io

development_dependencies:
amqp-client:
Expand Down
12 changes: 6 additions & 6 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ require "./spec_helper"

describe AMQProxy::Server do
it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -23,7 +23,7 @@ describe AMQProxy::Server do
end

it "publish and consume works" do
server = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
server = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { server.listen("127.0.0.1", 5673) }
Fiber.yield
Expand Down Expand Up @@ -61,7 +61,7 @@ describe AMQProxy::Server do
end

it "a client can open all channels" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -83,7 +83,7 @@ describe AMQProxy::Server do
end

it "can reconnect if upstream closes" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -107,7 +107,7 @@ describe AMQProxy::Server do

it "responds to upstream heartbeats" do
system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -126,7 +126,7 @@ describe AMQProxy::Server do
it "supports waiting for client connections on graceful shutdown" do
started = Time.utc.to_unix

s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5)
s = AMQProxy::Server.new("127.0.0.1", 5672, false, 5)
wait_for_channel = Channel(Int32).new # channel used to wait for certain calls, to test certain behaviour
spawn do
s.listen("127.0.0.1", 5673)
Expand Down
13 changes: 7 additions & 6 deletions src/amqproxy.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ require "./amqproxy/server"
require "option_parser"
require "uri"
require "ini"
require "logger"
require "log"

class AMQProxy::CLI
@listen_address = ENV["LISTEN_ADDRESS"]? || "localhost"
@listen_port = ENV["LISTEN_PORT"]? || 5673
@log_level : Logger::Severity = Logger::INFO
@log_level : Log::Severity = Log::Severity::Info
@idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i
@upstream = ENV["AMQP_URL"]?

Expand All @@ -19,7 +19,7 @@ class AMQProxy::CLI
section.each do |key, value|
case key
when "upstream" then @upstream = value
when "log_level" then @log_level = Logger::Severity.parse(value)
when "log_level" then @log_level = Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
Expand All @@ -29,7 +29,7 @@ class AMQProxy::CLI
case key
when "port" then @listen_port = value
when "bind", "address" then @listen_address = value
when "log_level" then @log_level = Logger::Severity.parse(value)
when "log_level" then @log_level = Log::Severity.parse(value)
else raise "Unsupported config #{name}/#{key}"
end
end
Expand All @@ -50,7 +50,7 @@ class AMQProxy::CLI
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = Logger::DEBUG }
parser.on("-d", "--debug", "Verbose logging") { @log_level = Log::Severity::Debug }
parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 }
Expand All @@ -71,7 +71,8 @@ class AMQProxy::CLI
port = u.port || default_port
tls = u.scheme == "amqps"

server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout)
Log.setup_from_env(default_level: @log_level)
server = AMQProxy::Server.new(u.host || "", port, tls, @idle_connection_timeout)

first_shutdown = true
shutdown = ->(_s : Signal) do
Expand Down
95 changes: 42 additions & 53 deletions src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
@@ -1,66 +1,57 @@
require "openssl"
require "logger"
require "./credentials"
require "log"
require "./records"
require "./upstream"

module AMQProxy
class ChannelPool
@tls_ctx : OpenSSL::SSL::Context::Client?
@log : Logger
Log = ::Log.for(self)
@lock = Mutex.new
@upstreams = Hash(Credentials, Deque(Upstream)).new do |h, k|
h[k] = Deque(Upstream).new
end
@upstreams = Deque(Upstream).new

def initialize(@host : String, @port : Int32, tls : Bool, @log, @idle_connection_timeout : Int32)
@tls_ctx = OpenSSL::SSL::Context::Client.new if tls
@upstream_channel_channel = Hash(Credentials, Channel(Tuple(Upstream, UInt16))).new do |h, credentials|
chan = Channel(Tuple(Upstream, UInt16)).new(128)
spawn pool_loop(credentials, chan)
h[credentials] = chan
end
def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32)
end

# Open a new connection, open new channels and push a Channel
private def pool_loop(credentials, upstream_channel_channel)
loop do
upstream = Upstream.new(@host, @port, @tls_ctx, @log, credentials)
@upstreams[credentials] << upstream
spawn(name: "upstream read loop #{@host}:#{@port}") do
begin
upstream.read_loop # blocks until upstream closes connection
ensure
@upstreams[credentials].delete(upstream)
def get(downstream_channel : DownstreamChannel) : UpstreamChannel
at_channel_max = 0
@lock.synchronize do
loop do
if upstream = @upstreams.shift?
next if upstream.closed?
begin
upstream_channel = upstream.open_channel_for(downstream_channel)
@upstreams.unshift(upstream)
return upstream_channel
rescue Upstream::ChannelMaxReached
@upstreams.push(upstream)
at_channel_max += 1
add_upstream if at_channel_max == @upstreams.size
end
else
add_upstream
end
end
upstream.channel_loop(upstream_channel_channel)
rescue ex
next
end
end

def get(client : Client, client_channel : UInt16) : Tuple(Upstream, UInt16)
upstream, channel = @upstream_channel_channel[client.credentials].receive
upstream.assign_channel(channel, client, client_channel)
{upstream, channel}
private def add_upstream
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials)
@upstreams.unshift upstream
end

def connections
@upstreams.each_value.sum &.size
@upstreams.size
end

def close
@lock.synchronize do
@upstreams.each_value do |q|
while u = q.shift?
begin
u.close "AMQProxy shutdown"
rescue ex
@log.error "Problem closing upstream: #{ex.inspect}"
end
while u = @upstreams.shift?
begin
u.close "AMQProxy shutdown"
rescue ex
Log.error { "Problem closing upstream: #{ex.inspect}" }
end
end
@upstreams.clear
end
end

Expand All @@ -69,20 +60,18 @@ module AMQProxy
sleep 5.seconds
@lock.synchronize do
max_connection_age = Time.monotonic - @idle_connection_timeout.seconds
@upstreams.each_value do |q|
q.size.times do
u = q.shift
if u.last_used < max_connection_age
begin
u.close "Pooled connection closed due to inactivity"
rescue ex
@log.error "Problem closing upstream: #{ex.inspect}"
end
elsif u.closed?
@log.error "Removing closed upstream connection from pool"
else
q.push u
@upstreams.size.times do
u = @upstreams.shift
if u.last_used < max_connection_age
begin
u.close "Pooled connection closed due to inactivity"
rescue ex
Log.error { "Problem closing upstream: #{ex.inspect}" }
end
elsif u.closed?
Log.error { "Removing closed upstream connection from pool" }
else
@upstreams.push u
end
end
end
Expand Down
26 changes: 13 additions & 13 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ require "socket"
require "amq-protocol"
require "./version"
require "./upstream"
require "./credentials"
require "./records"

# Be able to overwrite channel id
module AMQ
Expand All @@ -17,13 +17,13 @@ module AMQProxy
class Client
getter credentials : Credentials
@lock = Mutex.new
@channel_map = Hash(UInt16, Tuple(Upstream, UInt16)).new
@channel_map = Hash(UInt16, UpstreamChannel).new
@outgoing_frames = Channel(AMQ::Protocol::Frame).new(128)
@frame_max : UInt32
@channel_max : UInt16
@heartbeat : UInt16

def initialize(@socket : TCPSocket, @channel_pool : ChannelPool)
def initialize(@socket : TCPSocket)
tune_ok, @credentials = negotiate(@socket)
@frame_max = tune_ok.frame_max
@channel_max = tune_ok.channel_max
Expand All @@ -32,7 +32,7 @@ module AMQProxy
end

# frames from enduser
def read_loop(socket = @socket)
def read_loop(channel_pool, socket = @socket)
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat
Expand All @@ -44,13 +44,12 @@ module AMQProxy
return
when AMQ::Protocol::Frame::Channel::Open
raise "Channel already opened" if @channel_map.has_key? frame.channel
upstream, upstream_channel = @channel_pool.get(self, frame.channel)
@channel_map[frame.channel] = {upstream, upstream_channel}
upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel))
@channel_map[frame.channel] = upstream_channel
write AMQ::Protocol::Frame::Channel::OpenOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::Close
if value = @channel_map.delete(frame.channel)
upstream, upstream_channel = value
upstream.unassign_channel(upstream_channel)
if upstream_channel = @channel_map.delete(frame.channel)
upstream_channel.unassign
end
write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::CloseOk
Expand All @@ -60,8 +59,8 @@ module AMQProxy
else
src_channel = frame.channel
begin
upstream, frame.channel = @channel_map[frame.channel]
upstream.write frame
upstream_channel = @channel_map[frame.channel]
upstream_channel.write(frame)
rescue ex : Upstream::WriteError
close_channel(src_channel)
rescue KeyError
Expand Down Expand Up @@ -101,8 +100,8 @@ module AMQProxy
end

private def close_all_upstream_channels
@channel_map.each_value do |upstream, upstream_channel|
upstream.unassign_channel(upstream_channel)
@channel_map.each_value do |upstream_channel|
upstream_channel.unassign
rescue Upstream::WriteError
next # Nothing to do
end
Expand All @@ -122,6 +121,7 @@ module AMQProxy
write AMQ::Protocol::Frame::Connection::Close.new(0_u16,
"AMQProxy shutdown",
0_u16, 0_u16)
# @socket.read_timeout = 1.seconds
end

# Close the outgoing frames channel which will let write_loop close the socket
Expand Down
3 changes: 0 additions & 3 deletions src/amqproxy/credentials.cr

This file was deleted.

28 changes: 28 additions & 0 deletions src/amqproxy/records.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
require "./upstream"
require "./client"

module AMQProxy
record UpstreamChannel, upstream : Upstream, channel : UInt16 do
def write(frame)
frame.channel = @channel
@upstream.write frame
end

def unassign
@upstream.unassign_channel(@channel)
end
end

record DownstreamChannel, client : Client, channel : UInt16 do
def write(frame)
frame.channel = @channel
@client.write(frame)
end

def close
@client.close_channel(@channel)
end
end

record Credentials, user : String, password : String, vhost : String
end
Loading

0 comments on commit a57afd2

Please sign in to comment.