diff --git a/lib/event_source/operations/mime_decode.rb b/lib/event_source/operations/mime_decode.rb index f147182..1f5bc8e 100644 --- a/lib/event_source/operations/mime_decode.rb +++ b/lib/event_source/operations/mime_decode.rb @@ -21,7 +21,7 @@ class MimeDecode # @return [Dry::Monads::Success] if decoding is successful # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, decoding failure) def call(mime_type, payload) - valid_payload = yield validate_payload(payload, mime_type) + valid_payload, mime_type = yield validate_payload(payload, mime_type.to_s) decoded_data = yield decode(valid_payload, mime_type) Success(decoded_data) @@ -38,15 +38,18 @@ def call(mime_type, payload) # @return [Dry::Monads::Success] if the payload is valid # @return [Dry::Monads::Failure] if the payload is invalid def validate_payload(payload, mime_type) - unless MIME_TYPES.include?(mime_type.to_s) + unless MIME_TYPES.include?(mime_type) return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") end - if mime_type.to_s == 'application/zlib' && !binary_payload?(payload) - return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") + # Allow JSON string payloads to pass validation to avoid processing failures + # for existing JSON messages in the queue. These messages may have been queued + # with the wrong MIME type ('application/zlib') but are still valid JSON. + if mime_type == 'application/zlib' + return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") unless binary_payload?(payload) || valid_json_string?(payload) end - Success(payload) + Success([payload, mime_type]) end # Decodes the payload using the specified MIME type. @@ -58,7 +61,7 @@ def validate_payload(payload, mime_type) # @return [Dry::Monads::Success] if decoding is successful # @return [Dry::Monads::Failure] if decoding fails def decode(payload, mime_type) - decoded_data = Zlib.inflate(payload) if mime_type.to_s == 'application/zlib' + decoded_data = Zlib.inflate(payload) if mime_type == 'application/zlib' && binary_payload?(payload) Success(decoded_data || payload) rescue Zlib::Error => e @@ -75,6 +78,13 @@ def binary_payload?(payload) payload.encoding == Encoding::BINARY end + + def valid_json_string?(data) + data.is_a?(String) && JSON.parse(data) + true + rescue JSON::ParserError + false + end end end end diff --git a/lib/event_source/operations/mime_encode.rb b/lib/event_source/operations/mime_encode.rb index c5b6f8e..4bd551c 100644 --- a/lib/event_source/operations/mime_encode.rb +++ b/lib/event_source/operations/mime_encode.rb @@ -18,60 +18,69 @@ class MimeEncode # For example, compresses the payload using Zlib for 'application/zlib'. # # @param mime_type [String] the MIME type for encoding (e.g., 'application/zlib', 'application/json') - # @param payload [String, Hash] the payload to encode; must be a Hash or String + # @param payload [Any] the payload to encode; # # @return [Dry::Monads::Success] if encoding is successful # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, payload type, or encoding failure) def call(mime_type, payload) - json_payload = yield validate_payload(payload, mime_type) - encoded_data = yield encode(json_payload, mime_type) + mime_type = yield validate(mime_type) + encoded_data = yield encode(mime_type, payload) Success(encoded_data) end private - # Validates the payload and MIME type before encoding. - # Ensures the MIME type is supported and the payload is either a Hash or a String. + # Validates theMIME type before encoding. + # Ensures the MIME type is supported # - # @param payload [String, Hash] the payload to validate # @param mime_type [String] the MIME type for encoding # # @return [Dry::Monads::Success] if the payload and MIME type are valid # @return [Dry::Monads::Failure] if the MIME type is unsupported or the payload is invalid - def validate_payload(payload, mime_type) + def validate(mime_type) unless MIME_TYPES.include?(mime_type.to_s) return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") end - unless payload.is_a?(Hash) || payload.is_a?(String) - return Failure("Invalid payload type. Expected a Hash or String, but received #{payload.class}.") - end - - Success(payload.is_a?(Hash) ? payload.to_json : payload) + Success(mime_type.to_s) end # Encodes the payload based on the MIME type. # For 'application/zlib', compresses the payload using Zlib. # Logs the original and encoded payload sizes for debugging. # - # @param json_payload [String] the JSON stringified payload to encode + # @param data [String] the JSON stringified payload to encode # @param mime_type [String] the MIME type for encoding # # @return [Dry::Monads::Success] if encoding is successful # @return [Dry::Monads::Failure] if encoding fails - def encode(json_payload, mime_type) - encoded_data = Zlib.deflate(json_payload) if mime_type.to_s == 'application/zlib' + def encode(mime_type, payload) + case mime_type + when 'application/zlib' + json_payload = payload.to_json + encoded_data = Zlib.deflate(json_payload) + log_encoding_details(mime_type, json_payload, encoded_data) + when 'application/json' + encoded_data = payload.to_json + end + + Success(encoded_data || payload) + rescue JSON::GeneratorError => e + Failure("Failed to encode payload to JSON: #{e.message}") + rescue Zlib::Error => e + Failure("Failed to compress payload using Zlib: #{e.message}") + rescue StandardError => e + Failure("Unexpected error during encoding: #{e.message}") + end + # Logs details of the encoding process. + def log_encoding_details(mime_type, payload, encoded_data) logger.debug "*" * 80 logger.debug "Starting payload encoding for MIME type: '#{mime_type}'" - logger.debug "Original payload size: #{data_size_in_kb(json_payload)} KB" - logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" if encoded_data + logger.debug "Original payload size: #{data_size_in_kb(payload)} KB" + logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" logger.debug "*" * 80 - - Success(encoded_data || json_payload) - rescue Zlib::Error => e - Failure("Failed to compress payload using Zlib: #{e.message}") end # Calculates the size of the data in kilobytes (KB). diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 4dd1b2c..a73e6d3 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -15,6 +15,8 @@ class BunnyExchangeProxy # @attr_reader [EventSource::Protcols::Amqp::BunnyChannelProxy] channel_proxy the channel_proxy used to create this exchange attr_reader :subject, :channel_proxy + DefaultMimeType = 'application/json'.freeze + # @param [EventSource::AsyncApi::Channel] channel_proxy instance on which to open this Exchange # @param [Hash] exchange_bindings instance with configuration for this Exchange def initialize(channel_proxy, exchange_bindings) @@ -50,7 +52,6 @@ def publish(payload:, publish_bindings:, headers: {}) logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}" - payload = payload.to_json unless is_binary?(payload) @subject.publish(payload, bunny_publish_bindings) logger.debug "BunnyExchange#publish published message: #{payload}" @@ -70,12 +71,6 @@ def message_id SecureRandom.uuid end - def is_binary?(payload) - return false unless payload.respond_to?(:encoding) - - payload.encoding == Encoding::BINARY - end - # Filtering and renaming AsyncAPI Operation bindings to Bunny/RabitMQ # bindings # diff --git a/lib/event_source/publish_operation.rb b/lib/event_source/publish_operation.rb index ddb7fb9..13f1211 100644 --- a/lib/event_source/publish_operation.rb +++ b/lib/event_source/publish_operation.rb @@ -49,10 +49,7 @@ def call(payload, options = {}) # @return [String] The encoded payload, or the original payload if no encoding is specified. # @raise [EventSource::Error::PayloadEncodeError] if the encoding process fails. def encode_payload(payload) - return payload unless @async_api_publish_operation.message - - message_bindings = @async_api_publish_operation.message['bindings'] - encoding = message_bindings.first[1]['contentEncoding'] if message_bindings + encoding = determine_encoding return payload unless encoding output = EventSource::Operations::MimeEncode.new.call(encoding, payload) @@ -63,5 +60,19 @@ def encode_payload(payload) raise EventSource::Error::PayloadEncodeError, output.failure end end + + # Determines the encoding for the payload based on message bindings or protocol defaults. + # - If message bindings are present, uses the 'contentEncoding' value from the bindings. + # - If no message bindings are present and the protocol is AMQP, uses the default encoding for the AMQP protocol. Other protocols return nil. + def determine_encoding + message_bindings = @async_api_publish_operation.message&.dig('bindings') + return message_bindings.first[1]['contentEncoding'] if message_bindings.present? + + amqp_protocol? ? "#{subject.class}::DefaultMimeType".constantize : nil + end + + def amqp_protocol? + subject.is_a?(EventSource::Protocols::Amqp::BunnyExchangeProxy) + end end end diff --git a/spec/event_source/operations/mime_decode_spec.rb b/spec/event_source/operations/mime_decode_spec.rb index 7045177..17ddf62 100644 --- a/spec/event_source/operations/mime_decode_spec.rb +++ b/spec/event_source/operations/mime_decode_spec.rb @@ -52,5 +52,47 @@ expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") end end + + context "when the mime_type is 'application/zlib'" do + context "and the payload is a JSON string but not binary" do + let(:json_string) { "Invalid compressed data".to_json } + let(:mime_type) { "application/zlib" } + + it "passes validation" do + result = subject.call(mime_type, json_string) + + expect(result).to be_success + expect(result.value!).to eq(json_string) + end + end + + context "and the payload is neither binary nor valid JSON" do + let(:non_json_payload) { "Invalid compressed data" } + let(:mime_type) { "application/zlib" } + + it "returns a failure with a validation error message" do + result = subject.call(mime_type, non_json_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + + context "and the payload is not binary and raises an error when parsed as JSON" do + let(:corrupted_json_payload) { "Invalid compressed data" } + let(:mime_type) { "application/zlib" } + + before do + allow(JSON).to receive(:parse).with(corrupted_json_payload).and_raise(JSON::ParserError) + end + + it "returns a failure with a validation error message" do + result = subject.call(mime_type, corrupted_json_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + end end end diff --git a/spec/event_source/operations/mime_encode_spec.rb b/spec/event_source/operations/mime_encode_spec.rb index ca91814..b2475d0 100644 --- a/spec/event_source/operations/mime_encode_spec.rb +++ b/spec/event_source/operations/mime_encode_spec.rb @@ -4,11 +4,15 @@ subject { described_class.new } describe "#call" do - context "when the payload and mime type are valid" do + + let(:valid_payload) { { key: 'value' } } + let(:invalid_payload) { -> {} } + + context "when MIME type is application/zlib" do let(:payload) { { message: "Hello, World!" } } let(:mime_type) { "application/zlib" } - it "successfully encodes the payload" do + it "compresses the payload using Zlib" do result = subject.call(mime_type, payload) expect(result).to be_success @@ -16,7 +20,7 @@ end end - context "when the payload is a string and mime type is valid" do + context "when MIME type is application/json" do let(:payload) { "Hello, World!" } let(:mime_type) { "application/json" } @@ -24,7 +28,7 @@ result = subject.call(mime_type, payload) expect(result).to be_success - expect(result.value!).to eq(payload) + expect(result.value!).to eq(payload.to_json) end end @@ -40,15 +44,42 @@ end end - context "when the payload is invalid" do - let(:payload) { 1000 } - let(:mime_type) { "application/json" } + context 'when payload cannot be converted to JSON' do + before do + allow(invalid_payload).to receive(:to_json).and_raise(JSON::GeneratorError) + end - it "returns a failure" do - result = subject.call(mime_type, payload) + it 'returns a failure with JSON::GeneratorError' do + result = subject.call('application/json', invalid_payload) + + expect(result).to be_failure + expect(result.failure).to match(/Failed to encode payload to JSON:/) + end + end + + context 'when Zlib compression fails' do + before do + allow(Zlib).to receive(:deflate).and_raise(Zlib::Error, 'Compression failed') + end + + it 'returns a failure with Zlib::Error' do + result = subject.call('application/zlib', valid_payload) + + expect(result).to be_failure + expect(result.failure).to eq('Failed to compress payload using Zlib: Compression failed') + end + end + + context 'when an unexpected error occurs' do + before do + allow(valid_payload).to receive(:to_json).and_raise(StandardError, 'something went wrong') + end + + it 'returns a failure with StandardError' do + result = subject.call('application/json', valid_payload) expect(result).to be_failure - expect(result.failure).to eq("Invalid payload type. Expected a Hash or String, but received Integer.") + expect(result.failure).to eq('Unexpected error during encoding: something went wrong') end end end diff --git a/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb index d9a2015..5cee63b 100644 --- a/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb @@ -28,7 +28,7 @@ describe '#publish' do it 'publishes the payload with the correct bindings and headers' do - subject.publish(payload: payload, publish_bindings: publish_bindings, headers: headers) + subject.publish(payload: payload.to_json, publish_bindings: publish_bindings, headers: headers) expect(bunny_exchange).to have_received(:publish).with(payload.to_json, { correlation_id: '12345', @@ -41,7 +41,7 @@ expect(subject.logger).to receive(:debug).with(/published message:/) expect(subject.logger).to receive(:debug).with(/published message to exchange:/) - subject.publish(payload: payload, publish_bindings: publish_bindings, headers: headers) + subject.publish(payload: payload.to_json, publish_bindings: publish_bindings, headers: headers) end context 'when the payload is binary' do