Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_forward: Add source_hostname_key parameter. fix #804 #807

Merged
merged 2 commits into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def initialize
config_param :chunk_size_limit, :size, default: nil
desc 'Skip an event if incoming event is invalid.'
config_param :skip_invalid_event, :bool, default: false
desc "The field name of the client's hostname."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's better to have a note for performance degradation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_param :source_hostname_key, :string, default: nil

def configure(conf)
super
Expand Down Expand Up @@ -145,40 +147,41 @@ def run
# 3: object record
# 4: object option (optional)
# }
def on_message(msg, chunk_size, source)
def on_message(msg, chunk_size, peeraddr)
if msg.nil?
# for future TCP heartbeat_request
return
end

# TODO: raise an exception if broken chunk is generated by recoverable situation
unless msg.is_a?(Array)
log.warn "incoming chunk is broken:", source: source, msg: msg
log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg
return
end

tag = msg[0]
entries = msg[1]

if @chunk_size_limit && (chunk_size > @chunk_size_limit)
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source, limit: @chunk_size_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size
return
elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source, limit: @chunk_size_warn_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size
end

if entries.class == String
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es = MessagePackEventStream.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, source) if @skip_invalid_event
es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
router.emit_stream(tag, es)

elsif entries.class == Array
# Forward
es = if @skip_invalid_event
check_and_skip_invalid_event(tag, entries, source)
check_and_skip_invalid_event(tag, entries, peeraddr)
else
es = MultiEventStream.new
entries.each { |e|
Expand All @@ -190,6 +193,7 @@ def on_message(msg, chunk_size, source)
}
es
end
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
router.emit_stream(tag, es)
option = msg[2]

Expand All @@ -198,11 +202,12 @@ def on_message(msg, chunk_size, source)
time = msg[1]
record = msg[2]
if @skip_invalid_event && invalid_event?(tag, time, record)
log.warn "got invalid event and drop it:", source: source, tag: tag, time: time, record: record
log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Engine.now if time.to_i == 0
record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
router.emit(tag, time, record)
option = msg[3]
end
Expand All @@ -215,28 +220,41 @@ def invalid_event?(tag, time, record)
!((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String))
end

def check_and_skip_invalid_event(tag, es, source)
def check_and_skip_invalid_event(tag, es, peeraddr)
new_es = MultiEventStream.new
es.each { |time, record|
if invalid_event?(tag, time, record)
log.warn "skip invalid event:", source: source, tag: tag, time: time, record: record
log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record
next
end
new_es.add(time, record)
}
new_es
end

def add_source_host(es, host)
new_es = MultiEventStream.new
es.each { |time, record|
record[@source_hostname_key] = host
new_es.add(time, record)
}
new_es
end

def source_message(peeraddr)
_, port, host, addr = peeraddr
"host: #{host}, addr: #{addr}, port: #{port}"
end

class Handler < Coolio::Socket
PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"]

def initialize(io, linger_timeout, log, on_message)
super(io)

@peeraddr = nil
if io.is_a?(TCPSocket) # for unix domain socket support in the future
_proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED )
@source = "host: #{host}, addr: #{addr}, port: #{port}"

@peeraddr = (io.peeraddr rescue PEERADDR_FAILED)
opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
end
Expand Down Expand Up @@ -265,7 +283,7 @@ def on_read(data)
@serializer = :to_json.to_proc
@y = Yajl::Parser.new
@y.on_parse_complete = lambda { |obj|
option = @on_message.call(obj, @chunk_counter, @source)
option = @on_message.call(obj, @chunk_counter, @peeraddr)
respond option
@chunk_counter = 0
}
Expand Down Expand Up @@ -293,7 +311,7 @@ def on_read_json(data)
def on_read_msgpack(data)
@chunk_counter += data.bytesize
@u.feed_each(data) do |obj|
option = @on_message.call(obj, @chunk_counter, @source)
option = @on_message.call(obj, @chunk_counter, @peeraddr)
respond option if option
@chunk_counter = 0
end
Expand Down
60 changes: 54 additions & 6 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def setup
port #{PORT}
bind 127.0.0.1
]
PEERADDR = ['?', '0000', '127.0.0.1', '127.0.0.1']

def create_driver(conf=CONFIG)
Fluent::Test::InputTestDriver.new(Fluent::ForwardInput).configure(conf)
Expand Down Expand Up @@ -260,7 +261,7 @@ def test_set_size_to_option

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
option = d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
option = d.instance.send(:on_message, obj, chunk.size, PEERADDR)
assert_equal option['size'], events.length
end
end
Expand All @@ -282,7 +283,7 @@ def test_send_large_chunk_warning

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
d.instance.send(:on_message, obj, chunk.size, PEERADDR)
end
end

Expand Down Expand Up @@ -311,7 +312,7 @@ def test_send_large_chunk_only_warning

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
d.instance.send(:on_message, obj, chunk.size, PEERADDR)
end
end

Expand All @@ -338,7 +339,7 @@ def test_send_large_chunk_limit
# d.run => send_data
d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
d.instance.send(:on_message, obj, chunk.size, PEERADDR)
end
end

Expand All @@ -360,7 +361,7 @@ def test_send_broken_chunk(data)

# d.run => send_data
d.run do
d.instance.send(:on_message, data, 1000000000, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
d.instance.send(:on_message, data, 1000000000, PEERADDR)
end

# check emitted data
Expand Down Expand Up @@ -476,7 +477,6 @@ def test_respond_to_message_json_requiring_ack

assert_equal events, d.emits
assert_equal expected_acks, @responses.map { |res| JSON.parse(res)['ack'] }

end

def test_not_respond_to_message_not_requiring_ack
Expand Down Expand Up @@ -583,5 +583,53 @@ def send_data(data, try_to_receive_response=false, response_timeout=1)
@responses << res if try_to_receive_response
end

# TODO: Use sub_test_case. Currently Errno::EADDRINUSE happens inside sub_test_case
test 'message protocol with source_hostname_key' do
execute_test { |events|
events.each { |tag, time, record|
send_data [tag, time, record].to_msgpack
}
}
end

test 'forward protocol with source_hostname_key' do
execute_test { |events|
entries = []
events.each {|tag,time,record|
entries << [time, record]
}
send_data ['tag1', entries].to_msgpack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot understand why this code works well... time should be an ext typed value, but there are no info about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct way to do it is using Fluent::MessagePackFactory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventTime is serialized to int via to_msgpack and this is why this test works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, using Fluent::MessagePackFactory is better. "packed forward protocol with source_hostname_key" case does it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks incorrect behavior to simulate out_forward... but not so serious for this test.

}
end

test 'packed forward protocol with source_hostname_key' do
execute_test { |events|
entries = ''
events.each { |tag, time, record|
Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush
}
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
}
end

def execute_test(&block)
d = create_driver(CONFIG + 'source_hostname_key source')

time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC")
events = [
["tag1", time, {"a"=>1}],
["tag1", time, {"a"=>2}]
]
d.expected_emits_length = events.length

d.run do
block.call(events)
end

d.emits.each { |tag, _time, record|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does d.emits really have 2 events? This test code doesn't wait/confirm to receive data via TCP socket.
I doubt that d.emits is empty here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test calls d.expected_emits_length = events.length before so d.run is finished after received 2 events.

assert_true record.has_key?('source')
}
end

# TODO heartbeat
end