Skip to content

Commit

Permalink
Merge pull request #2347 from fluent/add-source-address-key
Browse files Browse the repository at this point in the history
in_tcp/in_udp: Add source_address_key parameter
  • Loading branch information
repeatedly authored Mar 28, 2019
2 parents 3566901 + 59855bf commit 8ec88a9
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 0 deletions.
3 changes: 3 additions & 0 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class TcpInput < Input
config_param :source_host_key, :string, default: nil, deprecated: "use source_hostname_key instead."
desc "The field name of the client's hostname."
config_param :source_hostname_key, :string, default: nil
desc "The field name of the client's address."
config_param :source_address_key, :string, default: nil

config_param :blocking_timeout, :time, default: 0.5

Expand Down Expand Up @@ -76,6 +78,7 @@ def start
tag = extract_tag_from_record(record)
tag ||= @tag
time ||= extract_time_from_record(record) || Fluent::EventTime.now
record[@source_address_key] = conn.remote_addr if @source_address_key
record[@source_hostname_key] = conn.remote_host if @source_hostname_key
router.emit(tag, time, record)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class UdpInput < Input
config_param :source_host_key, :string, default: nil, deprecated: "use source_hostname_key instead."
desc "The field name of the client's hostname."
config_param :source_hostname_key, :string, default: nil
desc "The field name of the client's address."
config_param :source_address_key, :string, default: nil

desc "Deprecated parameter. Use message_length_limit instead"
config_param :body_size_limit, :size, default: nil, deprecated: "use message_length_limit instead."
Expand Down Expand Up @@ -79,6 +81,7 @@ def start
tag = extract_tag_from_record(record)
tag ||= @tag
time ||= extract_time_from_record(record) || Fluent::EventTime.now
record[@source_address_key] = sock.remote_addr if @source_address_key
record[@source_hostname_key] = sock.remote_host if @source_hostname_key
router.emit(tag, time, record)
end
Expand Down
20 changes: 20 additions & 0 deletions test/plugin/test_in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,24 @@ def create_tcp_socket(host, port, &block)
assert event[1].is_a?(Fluent::EventTime)
assert_equal hostname, event[2]['host']
end

test 'source_address_key' do
d = create_driver(BASE_CONFIG + %!
format none
source_address_key addr
!)
address = nil
d.run(expect_records: 1) do
create_tcp_socket('127.0.0.1', PORT) do |sock|
address = sock.peeraddr[3]
sock.send("test\n", 0)
end
end

assert_equal 1, d.events.size
event = d.events[0]
assert_equal "tcp", event[0]
assert event[1].is_a?(Fluent::EventTime)
assert_equal address, event[2]['addr']
end
end
20 changes: 20 additions & 0 deletions test/plugin/test_in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,26 @@ def create_udp_socket(host, port)
assert_equal hostname, d.events[0][2]['host']
end

test 'source_address_key' do
d = create_driver(BASE_CONFIG + %!
format none
source_address_key addr
!)
address = nil
d.run(expect_records: 1) do
create_udp_socket('127.0.0.1', PORT) do |u|
u.send("test", 0)
address = u.peeraddr[3]
end
end

expected = {'message' => 'test'}
assert_equal 1, d.events.size
assert_equal "udp", d.events[0][0]
assert d.events[0][1].is_a?(Fluent::EventTime)
assert_equal address, d.events[0][2]['addr']
end

test 'receive_buffer_size' do
# doesn't check exact value because it depends on platform and condition

Expand Down

0 comments on commit 8ec88a9

Please sign in to comment.