Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from EverFi/fc-682-allow-read-uncommitted
Browse files Browse the repository at this point in the history
FC-682: allow consumers to read uncommitted messages from kafka brokers
  • Loading branch information
jwilger authored Feb 10, 2021
2 parents d3796da + 984cd6e commit 016286e
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 15 deletions.
2 changes: 1 addition & 1 deletion lib/kafka/protocol/fetch_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def encode(encoder)
encoder.write_int32(@max_wait_time)
encoder.write_int32(@min_bytes)
encoder.write_int32(@max_bytes)
encoder.write_int8(ISOLATION_READ_COMMITTED)
encoder.write_int8(ISOLATION_READ_UNCOMMITTED)

encoder.write_array(@topics) do |topic, partitions|
encoder.write_string(topic)
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka/protocol/list_offset_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def response_class

def encode(encoder)
encoder.write_int32(@replica_id)
encoder.write_int8(ISOLATION_READ_COMMITTED)
encoder.write_int8(ISOLATION_READ_UNCOMMITTED)

encoder.write_array(@topics) do |topic, partitions|
encoder.write_string(topic)
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Kafka
VERSION = "1.3.0"
VERSION = "1.3.0.everfi.1"
end
19 changes: 9 additions & 10 deletions spec/datadog_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,11 @@
require "fake_datadog_agent"

describe Kafka::Datadog do
let(:agent) { FakeDatadogAgent.new }

before do
agent.start
end

after do
agent.stop
end

context "when host and port are specified" do
it "emits metrics to the Datadog agent" do
agent = FakeDatadogAgent.new
agent.start
Kafka::Datadog.socket_path = nil
Kafka::Datadog.host = agent.host
Kafka::Datadog.port = agent.port

Expand All @@ -30,12 +23,17 @@
metric = agent.metrics.first

expect(metric).to eq "ruby_kafka.greetings"
agent.stop
end
end

context "when socket_path is specified" do
it "emits metrics to the Datadog agent" do
agent = FakeDatadogAgent.new
agent.start
Kafka::Datadog.socket_path = agent.socket_path
Kafka::Datadog.host = nil
Kafka::Datadog.port = nil

client = Kafka::Datadog.statsd

Expand All @@ -48,6 +46,7 @@
metric = agent.metrics.first

expect(metric).to eq "ruby_kafka.greetings"
agent.stop
end
end
end
4 changes: 3 additions & 1 deletion spec/socket_with_timeout_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

expect {
Kafka::SocketWithTimeout.new(host, port, connect_timeout: timeout, timeout: 1)
}.to raise_exception(Errno::ETIMEDOUT)
}.to raise_exception(SystemCallError) { |exception|
expect([Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::ENETUNREACH]).to include(exception.class)
}

finish = Time.now

Expand Down
4 changes: 3 additions & 1 deletion spec/ssl_socket_with_timeout_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

expect {
Kafka::SSLSocketWithTimeout.new(host, port, connect_timeout: timeout, timeout: 1, ssl_context: OpenSSL::SSL::SSLContext.new)
}.to raise_exception(Errno::ETIMEDOUT)
}.to raise_exception(SystemCallError) { |exception|
expect([Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::ENETUNREACH]).to include(exception.class)
}

finish = Time.now

Expand Down

0 comments on commit 016286e

Please sign in to comment.