From 44186736165368c9839fb7958335bbf66ab2a6eb Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Thu, 5 Jul 2018 17:31:41 +0900 Subject: [PATCH] in_http: Allow users to send payload in compressed form This adds support for the 'Content-Encoding' header, which allows HTTP clients to transfer payload more efficiently by applying compression. You can test it using the following command: $ echo 'json={"foo":"bar"}' | gzip > json.gz $ curl --data-binary @json.gz -H "Content-Encoding: gzip" \ http://localhost:9880/debug.log This feature should be useful especially for users who handle large amounts of data. Signed-off-by: Fujimoto Seiji --- lib/fluent/plugin/in_http.rb | 19 +++++++++++++ test/plugin/test_in_http.rb | 55 ++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index efff85bbb9..07d81c531c 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -316,6 +316,7 @@ def on_headers_complete(headers) end @env = {} @content_type = "" + @content_encoding = "" headers.each_pair {|k,v| @env["HTTP_#{k.gsub('-','_').upcase}"] = v case k @@ -325,6 +326,8 @@ def on_headers_complete(headers) size = v.to_i when /Content-Type/i @content_type = v + when /Content-Encoding/i + @content_encoding = v when /Connection/i if v =~ /close/i @keep_alive = false @@ -376,6 +379,22 @@ def on_message_complete end end + # Content Encoding + # ================= + # Decode payload according to the "Content-Encoding" header. + # For now, we only support 'gzip' and 'deflate'. + begin + if @content_encoding == 'gzip' + @body = Zlib::GzipReader.new(StringIO.new(@body)).read + elsif @content_encoding == 'deflate' + @body = Zlib::Inflate.inflate(@body) + end + rescue + @log.warn 'fails to decode payload', error: $!.to_s + send_response_and_close("400 Bad Request", {}, "") + return + end + @env['REMOTE_ADDR'] = @remote_addr if @remote_addr uri = URI.parse(@parser.request_url) diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 25b8819a70..1ce51aeb35 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -600,6 +600,54 @@ def test_cors_allowed assert_equal_event_time time, d.events[1][1] end + def test_content_encoding_gzip + d = create_driver + + time = event_time("2011-01-02 13:14:15 UTC") + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + res_headers = [] + + d.run do + events.each do |tag, time, record| + header = {'Content-Type'=>'application/json', 'Content-Encoding'=>'gzip'} + res = post("/#{tag}?time=#{time}", compress_gzip(record.to_json), header) + res_codes << res.code + end + end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] + end + + def test_content_encoding_deflate + d = create_driver + + time = event_time("2011-01-02 13:14:15 UTC") + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + res_headers = [] + + d.run do + events.each do |tag, time, record| + header = {'Content-Type'=>'application/msgpack', 'Content-Encoding'=>'deflate'} + res = post("/#{tag}?time=#{time}", Zlib.deflate(record.to_msgpack), header) + res_codes << res.code + end + end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] + end + def test_cors_disallowed d = create_driver(CONFIG + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins @@ -690,6 +738,13 @@ def post(path, params, header = {}, &block) http.request(req) end + def compress_gzip(data) + io = StringIO.new + io.binmode + Zlib::GzipWriter.wrap(io) { |gz| gz.write data } + return io.string + end + def include_http_header?(record) record.keys.find { |header| header.start_with?('HTTP_') } end