Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: Fix missing error class and test for out_forward #942

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/fluent/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,13 @@ def initialize
super
end

def format_stream(tag, es)
es.to_msgpack_stream
end

def emit(tag, es, chain)
@emit_count += 1
data = es.to_msgpack_stream
data = format_stream(tag, es)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracting routine to format_stream is for @time_as_integer so this diff itself is not needed for this patch?
We want to reduce the change without no reason.

key = tag
if @buffer.emit(key, data, chain)
submit_flush
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class ForwardOutputError < StandardError
class ForwardOutputResponseError < ForwardOutputError
end

class ForwardOutputConnectionClosedError < ForwardOutputError
end

class ForwardOutputACKTimeoutError < ForwardOutputResponseError
end

Expand Down
18 changes: 11 additions & 7 deletions lib/fluent/test/output_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,24 @@ def expect_format(str)
def run(num_waits = 10, &block)
result = nil
super(num_waits) {
block.call if block

es = ArrayEventStream.new(@entries)
buffer = @instance.format_stream(@tag, es)

block.call if block

if @expected_buffer
assert_equal(@expected_buffer, buffer)
end

key = ''
if @instance.respond_to?(:time_slicer)
case
when @instance.is_a?(Fluent::ObjectBufferedOutput)
key = @tag
when @instance.respond_to?(:time_slicer)
# this block is only for test_out_file
time, record = @entries.first
time, _record = @entries.first
key = @instance.time_slicer.call(time)
else
key = ''
end
chunk = @instance.buffer.new_chunk(key)
chunk << buffer
Expand Down Expand Up @@ -128,15 +132,15 @@ def expect_format(str)
def run(&block)
result = []
super {
block.call if block

buffer = ''
@entries.keys.each {|key|
es = ArrayEventStream.new(@entries[key])
@instance.emit(@tag, es, NullOutputChain.instance)
buffer << @instance.format_stream(@tag, es)
}

block.call if block

if @expected_buffer
assert_equal(@expected_buffer, buffer)
end
Expand Down
40 changes: 22 additions & 18 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def setup
]

def create_driver(conf=CONFIG)
Fluent::Test::OutputTestDriver.new(Fluent::ForwardOutput) {
Fluent::Test::BufferedOutputTestDriver.new(Fluent::ForwardOutput) {
attr_reader :responses, :exceptions

def initialize
Expand Down Expand Up @@ -231,10 +231,12 @@ def test_require_a_node_not_supporting_responses_to_respond_with_ack
end
d.run_timeout = 2

target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
assert_raise Fluent::ForwardOutputACKTimeoutError do
target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end
end
Expand Down Expand Up @@ -272,10 +274,12 @@ def test_require_a_node_not_supporting_responses_2_to_respond_with_ack
end
d.run_timeout = 2

target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
assert_raise Fluent::ForwardOutputConnectionClosedError do
target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end
end
Expand Down Expand Up @@ -318,7 +322,10 @@ def write(data)
end

def close
@sock.close
unless @sock.closed?
@sock.close_write
@sock.close
end
end
}

Expand All @@ -332,14 +339,18 @@ def close
handler.on_read(raw_data)
# chunk_counter is reset to zero only after all the data have been received and successfully deserialized.
break if handler.chunk_counter == 0
break if sock.closed?
end
if disconnect
handler.close
sock = nil
end
sleep # wait for connection to be closed by client
ensure
sock.close if sock
if sock && !sock.closed?
sock.close_write
sock.close
end
end
end
end
Expand Down Expand Up @@ -370,10 +381,6 @@ class DummyEngineDriver < Fluent::Test::TestDriver
def initialize(klass, &block)
super(klass, &block)
@engine = DummyEngineClass.new
@klass = klass
# To avoid accessing Fluent::Engine, set Engine as a plugin's class constant (Fluent::SomePlugin::Engine).
# But this makes it impossible to run tests concurrently by threading in a process.
@klass.const_set(:Engine, @engine)
end

def inject_router
Expand All @@ -383,9 +390,6 @@ def inject_router

def run(&block)
super(&block)
@klass.class_eval do
remove_const(:Engine)
end
end

def emits
Expand Down