Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add default mime type encode as json for amqp protocol #120

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions lib/event_source/operations/mime_decode.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MimeDecode
# @return [Dry::Monads::Success<String>] if decoding is successful
# @return [Dry::Monads::Failure<String>] if an error occurs (e.g., invalid MIME type, decoding failure)
def call(mime_type, payload)
valid_payload = yield validate_payload(payload, mime_type)
valid_payload, mime_type = yield validate_payload(payload, mime_type.to_s)
decoded_data = yield decode(valid_payload, mime_type)

Success(decoded_data)
Expand All @@ -38,15 +38,18 @@ def call(mime_type, payload)
# @return [Dry::Monads::Success<String>] if the payload is valid
# @return [Dry::Monads::Failure<String>] if the payload is invalid
def validate_payload(payload, mime_type)
unless MIME_TYPES.include?(mime_type.to_s)
unless MIME_TYPES.include?(mime_type)
return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.")
end

if mime_type.to_s == 'application/zlib' && !binary_payload?(payload)
return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.")
# Allow JSON string payloads to pass validation to avoid processing failures
# for existing JSON messages in the queue. These messages may have been queued
# with the wrong MIME type ('application/zlib') but are still valid JSON.
if mime_type == 'application/zlib'
return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") unless binary_payload?(payload) || valid_json_string?(payload)
end

Success(payload)
Success([payload, mime_type])
end

# Decodes the payload using the specified MIME type.
Expand All @@ -58,7 +61,7 @@ def validate_payload(payload, mime_type)
# @return [Dry::Monads::Success<String>] if decoding is successful
# @return [Dry::Monads::Failure<String>] if decoding fails
def decode(payload, mime_type)
decoded_data = Zlib.inflate(payload) if mime_type.to_s == 'application/zlib'
decoded_data = Zlib.inflate(payload) if mime_type == 'application/zlib' && binary_payload?(payload)

Success(decoded_data || payload)
rescue Zlib::Error => e
Expand All @@ -75,6 +78,10 @@ def binary_payload?(payload)

payload.encoding == Encoding::BINARY
end

def valid_json_string?(data)
data.is_a?(String) && JSON.parse(data) rescue false
end
end
end
end
51 changes: 30 additions & 21 deletions lib/event_source/operations/mime_encode.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,69 @@ class MimeEncode
# For example, compresses the payload using Zlib for 'application/zlib'.
#
# @param mime_type [String] the MIME type for encoding (e.g., 'application/zlib', 'application/json')
# @param payload [String, Hash] the payload to encode; must be a Hash or String
# @param payload [Any] the payload to encode;
#
# @return [Dry::Monads::Success<String>] if encoding is successful
# @return [Dry::Monads::Failure<String>] if an error occurs (e.g., invalid MIME type, payload type, or encoding failure)
def call(mime_type, payload)
json_payload = yield validate_payload(payload, mime_type)
encoded_data = yield encode(json_payload, mime_type)
mime_type = yield validate(mime_type)
encoded_data = yield encode(mime_type, payload)

Success(encoded_data)
end

private

# Validates the payload and MIME type before encoding.
# Ensures the MIME type is supported and the payload is either a Hash or a String.
# Validates theMIME type before encoding.
# Ensures the MIME type is supported
#
# @param payload [String, Hash] the payload to validate
# @param mime_type [String] the MIME type for encoding
#
# @return [Dry::Monads::Success<String>] if the payload and MIME type are valid
# @return [Dry::Monads::Failure<String>] if the MIME type is unsupported or the payload is invalid
def validate_payload(payload, mime_type)
def validate(mime_type)
unless MIME_TYPES.include?(mime_type.to_s)
return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.")
end

unless payload.is_a?(Hash) || payload.is_a?(String)
return Failure("Invalid payload type. Expected a Hash or String, but received #{payload.class}.")
end

Success(payload.is_a?(Hash) ? payload.to_json : payload)
Success(mime_type.to_s)
end

# Encodes the payload based on the MIME type.
# For 'application/zlib', compresses the payload using Zlib.
# Logs the original and encoded payload sizes for debugging.
#
# @param json_payload [String] the JSON stringified payload to encode
# @param data [String] the JSON stringified payload to encode
# @param mime_type [String] the MIME type for encoding
#
# @return [Dry::Monads::Success<String>] if encoding is successful
# @return [Dry::Monads::Failure<String>] if encoding fails
def encode(json_payload, mime_type)
encoded_data = Zlib.deflate(json_payload) if mime_type.to_s == 'application/zlib'
def encode(mime_type, payload)
case mime_type
when 'application/zlib'
json_payload = payload.to_json
encoded_data = Zlib.deflate(json_payload)
log_encoding_details(mime_type, json_payload, encoded_data)
when 'application/json'
encoded_data = payload.to_json
end

Success(encoded_data || payload)
rescue JSON::GeneratorError => e
Failure("Failed to encode payload to JSON: #{e.message}")
rescue Zlib::Error => e
Failure("Failed to compress payload using Zlib: #{e.message}")
rescue StandardError => e
Failure("Unexpected error during encoding: #{e.message}")
end

# Logs details of the encoding process.
def log_encoding_details(mime_type, payload, encoded_data)
logger.debug "*" * 80
logger.debug "Starting payload encoding for MIME type: '#{mime_type}'"
logger.debug "Original payload size: #{data_size_in_kb(json_payload)} KB"
logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" if encoded_data
logger.debug "Original payload size: #{data_size_in_kb(payload)} KB"
logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB"
logger.debug "*" * 80

Success(encoded_data || json_payload)
rescue Zlib::Error => e
Failure("Failed to compress payload using Zlib: #{e.message}")
end

# Calculates the size of the data in kilobytes (KB).
Expand Down
9 changes: 2 additions & 7 deletions lib/event_source/protocols/amqp/bunny_exchange_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class BunnyExchangeProxy
# @attr_reader [EventSource::Protcols::Amqp::BunnyChannelProxy] channel_proxy the channel_proxy used to create this exchange
attr_reader :subject, :channel_proxy

DefaultMimeType = 'application/json'.freeze

# @param [EventSource::AsyncApi::Channel] channel_proxy instance on which to open this Exchange
# @param [Hash<EventSource::AsyncApi::Exchange>] exchange_bindings instance with configuration for this Exchange
def initialize(channel_proxy, exchange_bindings)
Expand Down Expand Up @@ -50,7 +52,6 @@ def publish(payload:, publish_bindings:, headers: {})

logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}"

payload = payload.to_json unless is_binary?(payload)
@subject.publish(payload, bunny_publish_bindings)

logger.debug "BunnyExchange#publish published message: #{payload}"
Expand All @@ -70,12 +71,6 @@ def message_id
SecureRandom.uuid
end

def is_binary?(payload)
return false unless payload.respond_to?(:encoding)

payload.encoding == Encoding::BINARY
end

# Filtering and renaming AsyncAPI Operation bindings to Bunny/RabitMQ
# bindings
#
Expand Down
19 changes: 15 additions & 4 deletions lib/event_source/publish_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ def call(payload, options = {})
# @return [String] The encoded payload, or the original payload if no encoding is specified.
# @raise [EventSource::Error::PayloadEncodeError] if the encoding process fails.
def encode_payload(payload)
return payload unless @async_api_publish_operation.message

message_bindings = @async_api_publish_operation.message['bindings']
encoding = message_bindings.first[1]['contentEncoding'] if message_bindings
encoding = determine_encoding
return payload unless encoding

output = EventSource::Operations::MimeEncode.new.call(encoding, payload)
Expand All @@ -63,5 +60,19 @@ def encode_payload(payload)
raise EventSource::Error::PayloadEncodeError, output.failure
end
end

# Determines the encoding for the payload based on message bindings or protocol defaults.
# - If message bindings are present, uses the 'contentEncoding' value from the bindings.
# - If no message bindings are present and the protocol is AMQP, uses the default encoding for the AMQP protocol. Other protocols return nil.
def determine_encoding
message_bindings = @async_api_publish_operation.message&.dig('bindings')
return message_bindings.first[1]['contentEncoding'] if message_bindings.present?

amqp_protocol? ? "#{subject.class}::DefaultMimeType".constantize : nil
end

def amqp_protocol?
subject.is_a?(EventSource::Protocols::Amqp::BunnyExchangeProxy)
end
end
end
51 changes: 41 additions & 10 deletions spec/event_source/operations/mime_encode_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,31 @@
subject { described_class.new }

describe "#call" do
context "when the payload and mime type are valid" do

let(:valid_payload) { { key: 'value' } }
let(:invalid_payload) { -> {} }

context "when MIME type is application/zlib" do
let(:payload) { { message: "Hello, World!" } }
let(:mime_type) { "application/zlib" }

it "successfully encodes the payload" do
it "compresses the payload using Zlib" do
result = subject.call(mime_type, payload)

expect(result).to be_success
expect(Zlib.inflate(result.value!)).to eq(payload.to_json)
end
end

context "when the payload is a string and mime type is valid" do
context "when MIME type is application/json" do
let(:payload) { "Hello, World!" }
let(:mime_type) { "application/json" }

it "returns the payload as JSON" do
result = subject.call(mime_type, payload)

expect(result).to be_success
expect(result.value!).to eq(payload)
expect(result.value!).to eq(payload.to_json)
end
end

Expand All @@ -40,15 +44,42 @@
end
end

context "when the payload is invalid" do
let(:payload) { 1000 }
let(:mime_type) { "application/json" }
context 'when payload cannot be converted to JSON' do
before do
allow(invalid_payload).to receive(:to_json).and_raise(JSON::GeneratorError)
end

it "returns a failure" do
result = subject.call(mime_type, payload)
it 'returns a failure with JSON::GeneratorError' do
result = subject.call('application/json', invalid_payload)

expect(result).to be_failure
expect(result.failure).to match(/Failed to encode payload to JSON:/)
end
end

context 'when Zlib compression fails' do
before do
allow(Zlib).to receive(:deflate).and_raise(Zlib::Error, 'Compression failed')
end

it 'returns a failure with Zlib::Error' do
result = subject.call('application/zlib', valid_payload)

expect(result).to be_failure
expect(result.failure).to eq('Failed to compress payload using Zlib: Compression failed')
end
end

context 'when an unexpected error occurs' do
before do
allow(valid_payload).to receive(:to_json).and_raise(StandardError, 'something went wrong')
end

it 'returns a failure with StandardError' do
result = subject.call('application/json', valid_payload)

expect(result).to be_failure
expect(result.failure).to eq("Invalid payload type. Expected a Hash or String, but received Integer.")
expect(result.failure).to eq('Unexpected error during encoding: something went wrong')
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

describe '#publish' do
it 'publishes the payload with the correct bindings and headers' do
subject.publish(payload: payload, publish_bindings: publish_bindings, headers: headers)
subject.publish(payload: payload.to_json, publish_bindings: publish_bindings, headers: headers)

expect(bunny_exchange).to have_received(:publish).with(payload.to_json, {
correlation_id: '12345',
Expand All @@ -41,7 +41,7 @@
expect(subject.logger).to receive(:debug).with(/published message:/)
expect(subject.logger).to receive(:debug).with(/published message to exchange:/)

subject.publish(payload: payload, publish_bindings: publish_bindings, headers: headers)
subject.publish(payload: payload.to_json, publish_bindings: publish_bindings, headers: headers)
end

context 'when the payload is binary' do
Expand Down
Loading