Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error writing to upstream #161

Merged
merged 10 commits into from
May 9, 2024
2 changes: 1 addition & 1 deletion src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module AMQProxy
@lock.synchronize do
(@upstreams.size - 1).times do # leave at least one connection
u = @upstreams.pop
if u.active_channels.zero?
if u.channels.zero?
begin
u.close "Pooled connection closed due to inactivity"
rescue ex
Expand Down
37 changes: 22 additions & 15 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module AMQProxy
class Client
Log = ::Log.for(self)
getter credentials : Credentials
@channel_map = Hash(UInt16, UpstreamChannel).new
@channel_map = Hash(UInt16, UpstreamChannel?).new
@outgoing_frames = Channel(AMQ::Protocol::Frame).new(128)
@frame_max : UInt32
@channel_max : UInt16
Expand All @@ -28,6 +28,7 @@ module AMQProxy
def read_loop(channel_pool, socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: socket.remote_address.to_s)
Log.debug { "Connected" }
i = 0u64
socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
Expand All @@ -49,22 +50,32 @@ module AMQProxy
end
write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::CloseOk
# CloseOk already sent in Upstream#read_loop
# CloseOk reply to server is already sent in Upstream#read_loop
@channel_map.delete(frame.channel)
when frame.channel.zero?
Log.error { "Unexpected connection frame: #{frame}" }
close_connection(540_u16, "NOT_IMPLEMENTED", frame)
else
src_channel = frame.channel
begin
upstream_channel = @channel_map[frame.channel]
upstream_channel.write(frame)
if upstream_channel = @channel_map[frame.channel]
upstream_channel.write(frame)
else
# Channel::Close is sent, waiting for CloseOk
end
rescue ex : Upstream::WriteError
close_channel(src_channel)
rescue KeyError
close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame)
end
end
Fiber.yield if (i &+= 1) % 4096 == 0
rescue ex : Upstream::AccessError
Log.error { "Access refused, reason: #{ex.message}" }
close_connection(403_u16, ex.message || "ACCESS_REFUSED")
rescue ex : Upstream::Error
Log.error(exception: ex) { "Upstream error" }
close_connection(503_u16, "UPSTREAM_ERROR - #{ex.message}")
rescue IO::TimeoutError
time_since_last_heartbeat = (Time.monotonic - @last_heartbeat).total_seconds.to_i # ignore subsecond latency
if time_since_last_heartbeat <= 1 + @heartbeat # add 1s grace because of rounding
Expand All @@ -75,16 +86,8 @@ module AMQProxy
return
end
end
rescue ex : IO::EOFError
Log.debug { "Disconnected" }
rescue ex : IO::Error
Log.error(exception: ex) { "IO error" } unless socket.closed?
rescue ex : Upstream::AccessError
Log.error { "Access refused, reason: #{ex.message}" }
close_connection(403_u16, ex.message || "ACCESS_REFUSED")
rescue ex : Upstream::Error
Log.error(exception: ex) { "Upstream error" }
close_connection(503_u16, "UPSTREAM_ERROR - #{ex.message}")
Log.debug { "Disconnected #{ex.inspect}" }
else
Log.debug { "Disconnected" }
ensure
Expand All @@ -100,7 +103,7 @@ module AMQProxy
break if frame.is_a? AMQ::Protocol::Frame::Connection::CloseOk
end
rescue ex : IO::Error
raise ex unless socket.closed?
# Client closed connection, suppress error
ensure
@outgoing_frames.close
socket.close rescue nil
Expand All @@ -110,6 +113,8 @@ module AMQProxy
# Send frame to client, channel id should already be remapped by the caller
def write(frame : AMQ::Protocol::Frame)
@outgoing_frames.send frame
rescue Channel::ClosedError
# do nothing
end

def close_connection(code, text, frame = nil)
Expand All @@ -123,12 +128,14 @@ module AMQProxy

def close_channel(id)
write AMQ::Protocol::Frame::Channel::Close.new(id, 500_u16, "UPSTREAM_DISCONNECTED", 0_u16, 0_u16)
@channel_map[id] = nil
end

private def close_all_upstream_channels
@channel_map.each_value do |upstream_channel|
upstream_channel.unassign
upstream_channel.try &.unassign
rescue Upstream::WriteError
Log.debug { "Upstream write error while closing client's channels" }
next # Nothing to do
end
@channel_map.clear
Expand Down
3 changes: 2 additions & 1 deletion src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ module AMQProxy
end
rescue ex # only raise from constructor, when negotating
Log.debug { "Client connection failure (#{remote_address}) #{ex.inspect}" }
socket.close
ensure
socket.close rescue nil
end

private def active_client(client, &)
Expand Down
55 changes: 13 additions & 42 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ module AMQProxy
class Upstream
Log = ::Log.for(self)
@socket : IO
@unsafe_channels = Set(UInt16).new
@channels = Hash(UInt16, DownstreamChannel?).new
@channels = Hash(UInt16, DownstreamChannel).new
@channels_lock = Mutex.new
@channel_max : UInt16
@lock = Mutex.new
Expand Down Expand Up @@ -38,14 +37,7 @@ module AMQProxy
def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel
@channels_lock.synchronize do
1_u16.upto(@channel_max) do |i|
if @channels.has_key?(i)
if @channels[i].nil?
@channels[i] = downstream_channel
return UpstreamChannel.new(self, i) # reuse
else
next # in use
end
else
unless @channels.has_key?(i)
@channels[i] = downstream_channel
send AMQ::Protocol::Frame::Channel::Open.new(i)
return UpstreamChannel.new(self, i)
Expand All @@ -56,27 +48,21 @@ module AMQProxy
end

def unassign_channel(channel : UInt16)
@channels_lock.synchronize do
if @unsafe_channels.delete channel
send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16)
@channels.delete channel
elsif @channels.has_key? channel
@channels[channel] = nil # keep for reuse
end
if @channels_lock.synchronize { @channels.delete(channel) }
send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16)
end
rescue ex : IO::Error | OpenSSL::SSL::Error
Log.debug(exception: ex) { "Error while closing upstream channel #{channel}" }
end

def channels
@channels.size
end

def active_channels
@channels.count { |_, v| !v.nil? }
end

# Frames from upstream (to client)
def read_loop(socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: @remote_address)
i = 0u64
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat then send frame
Expand All @@ -95,7 +81,6 @@ module AMQProxy
when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close
when AMQ::Protocol::Frame::Channel::Close
send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
@unsafe_channels.delete(frame.channel)
if downstream_channel = @channels.delete(frame.channel)
downstream_channel.write(frame)
end
Expand All @@ -108,9 +93,10 @@ module AMQProxy
"DOWNSTREAM_DISCONNECTED", 0_u16, 0_u16)
end
end
Fiber.yield if (i &+= 1) % 4096 == 0
end
rescue ex : IO::Error | OpenSSL::SSL::Error
Log.error(exception: ex) { "Error reading from upstream" } unless socket.closed?
Log.info { "Connection error #{ex.inspect}" } unless socket.closed?
ensure
socket.close rescue nil
close_all_client_channels
Expand All @@ -121,16 +107,12 @@ module AMQProxy
end

private def close_all_client_channels
Log.debug { "Closing all client channels for closed upstream" }
@channels_lock.synchronize do
cnt = 0
return if @channels.empty?
Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" }
@channels.each_value do |downstream_channel|
if dch = downstream_channel
dch.close
cnt += 1
end
downstream_channel.close
end
Log.debug { "Upstream connection closed, closing #{cnt} client channels" } unless cnt.zero?
@channels.clear
end
end
Expand All @@ -140,9 +122,7 @@ module AMQProxy
clients = Set(Client).new
@channels_lock.synchronize do
@channels.each_value do |downstream_channel|
if dc = downstream_channel
clients << dc.client
end
clients << downstream_channel.client
end
end
clients.each do |client|
Expand All @@ -153,19 +133,10 @@ module AMQProxy
# Forward frames from client to upstream
def write(frame : AMQ::Protocol::Frame) : Nil
case frame
when AMQ::Protocol::Frame::Basic::Publish,
AMQ::Protocol::Frame::Basic::Qos
when AMQ::Protocol::Frame::Basic::Get
@unsafe_channels.add(frame.channel) unless frame.no_ack
when AMQ::Protocol::Frame::Basic,
AMQ::Protocol::Frame::Confirm,
AMQ::Protocol::Frame::Tx
@unsafe_channels.add(frame.channel)
when AMQ::Protocol::Frame::Connection
raise "Connection frames should not be sent through here: #{frame}"
when AMQ::Protocol::Frame::Channel::CloseOk # when upstream server requested a channel close and client confirmed
@channels_lock.synchronize do
@unsafe_channels.delete(frame.channel)
@channels.delete(frame.channel)
end
when AMQ::Protocol::Frame::Channel
Expand Down