Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(fluentd): resolve rubocop failures #6359

Merged
merged 1 commit into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clients/cmd/fluentd/.rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require: rubocop-rspec

AllCops:
TargetRubyVersion: 2.7
NewCops: disable
Exclude:
- 'bin/**'
Expand Down
2 changes: 1 addition & 1 deletion clients/cmd/fluentd/docker/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

source 'https://rubygems.org'

gem 'fluent-plugin-multi-format-parser', '~>1.0.0'
gem 'fluentd', '1.9.0'
gem 'fluent-plugin-multi-format-parser', '~>1.0.0'
4 changes: 3 additions & 1 deletion clients/cmd/fluentd/fluent-plugin-grafana-loki.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ Gem::Specification.new do |spec|
spec.homepage = 'https://github.com/grafana/loki/'
spec.license = 'Apache-2.0'

spec.required_ruby_version = '~> 2.7'

# test_files, files = `git ls-files -z`.split("\x0").partition do |f|
# f.match(%r{^(test|spec|features)/})
# end
spec.files = Dir.glob('{bin,lib}/**/*') + %w[LICENSE README.md]
spec.files = Dir['{bin,lib}/**/*'] + %w[LICENSE README.md]
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ['lib']

Expand Down
49 changes: 24 additions & 25 deletions clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ class LogPostError < StandardError; end
config_set_default :chunk_keys, []
end

def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
# rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def configure(conf)
compat_parameters_convert(conf, :buffer)
super
@uri = URI.parse(@url + '/loki/api/v1/push')
@uri = URI.parse("#{@url}/loki/api/v1/push")
unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme'
end
Expand All @@ -108,25 +109,24 @@ def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
validate_client_cert_key
end

raise "bearer_token_file #{@bearer_token_file} not found" if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file)
if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file)
raise "bearer_token_file #{@bearer_token_file} not found"
end

@auth_token_bearer = nil
if !@bearer_token_file.nil?
if !File.exist?(@bearer_token_file)
raise "bearer_token_file #{@bearer_token_file} not found"
end
unless @bearer_token_file.nil?
raise "bearer_token_file #{@bearer_token_file} not found" unless File.exist?(@bearer_token_file)

# Read the file once, assume long-lived authentication token.
@auth_token_bearer = File.read(@bearer_token_file)
if @auth_token_bearer.empty?
raise "bearer_token_file #{@bearer_token_file} is empty"
end
raise "bearer_token_file #{@bearer_token_file} is empty" if @auth_token_bearer.empty?

log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header"
end


raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end
# rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

def client_cert_configured?
!@key.nil? && !@cert.nil?
Expand Down Expand Up @@ -204,8 +204,7 @@ def http_request_opts(uri)
def generic_to_loki(chunk)
# log.debug("GenericToLoki: converting #{chunk}")
streams = chunk_to_loki(chunk)
payload = payload_builder(streams)
payload
payload_builder(streams)
end

private
Expand All @@ -215,7 +214,7 @@ def loki_http_request(body, tenant)
@uri.request_uri
)
req.add_field('Content-Type', 'application/json')
req.add_field('Authorization', "Bearer #{@auth_token_bearer}") if !@auth_token_bearer.nil?
req.add_field('Authorization', "Bearer #{@auth_token_bearer}") unless @auth_token_bearer.nil?
req.add_field('X-Scope-OrgID', tenant) if tenant
req.body = Yajl.dump(body)
req.basic_auth(@username, @password) if @username
Expand All @@ -241,7 +240,7 @@ def format_labels(data_labels)
data_labels = {} if data_labels.nil?
data_labels = data_labels.merge(@extra_labels)
# sanitize label values
data_labels.each { |k, v| formatted_labels[k] = v.gsub('"', '\\"') if v && v&.is_a?(String) }
data_labels.each { |k, v| formatted_labels[k] = v.gsub('"', '\\"') if v.is_a?(String) }
formatted_labels
end

Expand Down Expand Up @@ -270,6 +269,7 @@ def to_nano(time)
end
end

# rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def record_to_line(record)
line = ''
if @drop_single_key && record.keys.length == 1
Expand All @@ -282,11 +282,9 @@ def record_to_line(record)
formatted_labels = []
record.each do |k, v|
# Remove non UTF-8 characters by force-encoding the string
if v.is_a?(String)
v = v.encode('utf-8', invalid: :replace, undef: :replace, replace: '?')
end
v = v.encode('utf-8', invalid: :replace, undef: :replace, replace: '?') if v.is_a?(String)
# Escape double quotes and backslashes by prefixing them with a backslash
v = v.to_s.gsub(%r{(["\\])}, '\\\\\1')
v = v.to_s.gsub(/(["\\])/, '\\\\\1')
if v.include?(' ') || v.include?('=')
formatted_labels.push(%(#{k}="#{v}"))
else
Expand All @@ -298,25 +296,25 @@ def record_to_line(record)
end
line
end
# rubocop:enable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

# convert a line to loki line with labels
# rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def line_to_loki(record)
chunk_labels = {}
line = ''
if record.is_a?(Hash)
@record_accessors&.each do |name, accessor|
new_key = name.gsub(%r{[.\-\/]}, '_')
new_key = name.gsub(%r{[.\-/]}, '_')
chunk_labels[new_key] = accessor.call(record)
accessor.delete(record)
end

if @extract_kubernetes_labels && record.key?('kubernetes')
kubernetes_labels = record['kubernetes']['labels']
if !kubernetes_labels.nil?
kubernetes_labels.each_key do |l|
new_key = l.gsub(%r{[.\-\/]}, '_')
chunk_labels[new_key] = kubernetes_labels[l]
end
kubernetes_labels&.each_key do |l|
new_key = l.gsub(%r{[.\-/]}, '_')
chunk_labels[new_key] = kubernetes_labels[l]
end
end

Expand Down Expand Up @@ -345,6 +343,7 @@ def line_to_loki(record)
labels: chunk_labels
}
end
# rubocop:enable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

# iterate through each chunk and create a loki stream entry
def chunk_to_loki(chunk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/non_utf8.log')[0]
chunk = [Time.at(1_546_270_458), {'message'=>content, 'number': 1.2345, 'stream'=>'stdout'}]
chunk = [Time.at(1_546_270_458), { 'message' => content, 'number': 1.2345, 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream'].empty?).to eq true
expect(payload[0]['values'].count).to eq 1
expect(payload[0]['values'][0][0]).to eq "1546270458000000000"
expect(payload[0]['values'][0][1]).to eq "message=\"? rest of line\" number=1.2345 stream=stdout"
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq 'message="? rest of line" number=1.2345 stream=stdout'
end

it 'handle non utf-8 characters from log lines in json format' do
Expand All @@ -134,12 +134,14 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/non_utf8.log')[0]
chunk = [Time.at(1_546_270_458), {'message'=>content, 'number': 1.2345, 'stream'=>'stdout'}]
chunk = [Time.at(1_546_270_458), { 'message' => content, 'number': 1.2345, 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream'].empty?).to eq true
expect(payload[0]['values'].count).to eq 1
expect(payload[0]['values'][0][0]).to eq "1546270458000000000"
expect(payload[0]['values'][0][1]).to eq "{\"message\":\"\xC1 rest of line\",\"number\":1.2345,\"stream\":\"stdout\"}"
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq(
"{\"message\":\"\xC1 rest of line\",\"number\":1.2345,\"stream\":\"stdout\"}"
)
end

it 'formats record hash as key_value' do
Expand All @@ -155,7 +157,7 @@
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq 'message="' + content[0] + '" stream=stdout'
expect(body[:streams][0]['values'][0][1]).to eq "message=\"#{content[0]}\" stream=stdout"
end

it 'formats record hash as json' do
Expand Down