diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index cd4a856..400639b 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -26,23 +26,32 @@ module AMQProxy # Keep a buffer of publish frames # Only send to upstream when the full message is received - @next_publish : AMQ::Protocol::Frame::Basic::Publish? - @next_header : AMQ::Protocol::Frame::Header? - @next_bodies = Array(AMQ::Protocol::Frame::BytesBody).new + @publish_buffers = Hash(UInt16, PublishBuffer).new + + private class PublishBuffer + getter publish : AMQ::Protocol::Frame::Basic::Publish + property! header : AMQ::Protocol::Frame::Header? + getter bodies = Array(AMQ::Protocol::Frame::BytesBody).new + + def initialize(@publish) + end + + def full? + header.body_size == @bodies.sum &.body_size + end + end private def finish_publish(channel) - next_publish = @next_publish.not_nil!("Missing Basic.Publish frame") - next_header = @next_header.not_nil!("Missing Header frame") + buffer = @publish_buffers[channel] if upstream_channel = @channel_map[channel] - upstream_channel.write(next_publish) - upstream_channel.write(next_header) - @next_bodies.each do |body| + upstream_channel.write(buffer.publish) + upstream_channel.write(buffer.header) + buffer.bodies.each do |body| upstream_channel.write(body) end end ensure - @next_publish = @next_header = nil - @next_bodies.clear + @publish_buffers.delete channel end # frames from enduser @@ -69,16 +78,14 @@ module AMQProxy # Server closed channel, CloseOk reply to server is already sent @channel_map.delete(frame.channel) when AMQ::Protocol::Frame::Basic::Publish - @next_publish = frame + @publish_buffers[frame.channel] = PublishBuffer.new(frame) when AMQ::Protocol::Frame::Header - @next_header = frame + @publish_buffers[frame.channel].header = frame finish_publish(frame.channel) if frame.body_size.zero? when AMQ::Protocol::Frame::BytesBody - @next_bodies << frame - if @next_header.not_nil!("Missing Header frame").body_size == - @next_bodies.sum &.body_size - finish_publish(frame.channel) - end + buffer = @publish_buffers[frame.channel] + buffer.bodies << frame + finish_publish(frame.channel) if buffer.full? when frame.channel.zero? Log.error { "Unexpected connection frame: #{frame}" } close_connection(540_u16, "NOT_IMPLEMENTED", frame)