Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plugin helpers for parser and formatter #1023

Merged
merged 3 commits into from
Jun 6, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/fluent/plugin_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
require 'fluent/plugin_helper/timer'
require 'fluent/plugin_helper/child_process'
require 'fluent/plugin_helper/storage'
require 'fluent/plugin_helper/parser'
require 'fluent/plugin_helper/formatter'
require 'fluent/plugin_helper/retry_state'
require 'fluent/plugin_helper/compat_parameters'

Expand Down
141 changes: 141 additions & 0 deletions lib/fluent/plugin_helper/formatter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'monitor'
require 'forwardable'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these `require needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed to remove these.


require 'fluent/plugin'
require 'fluent/plugin/formatter'
require 'fluent/config/element'

module Fluent
module PluginHelper
module Formatter
def formatter_create(usage: '', type: nil, conf: nil)
formatter = @_formatters[usage]
return formatter if formatter

if !type
raise ArgumentError, "BUG: both type and conf are not specified" unless conf
raise Fluent::ConfigError, "@type is not specified for <format>" unless conf['@type']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, @type is required in <format> is clear than this message but not critical.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's understandable for me too.

type = conf['@type']
end
formatter = Fluent::Plugin.new_formatter(type, parent: self)
config = case conf
when Fluent::Config::Element
conf
when Hash
# in code, programmer may use symbols as keys, but Element needs strings
conf = Hash[conf.map{|k,v| [k.to_s, v]}]
Fluent::Config::Element.new('format', usage, conf, [])
when nil
Fluent::Config::Element.new('format', usage, {}, [])
else
raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
end
formatter.configure(config)
if @_formatters_started
formatter.start
end

@_formatters[usage] = formatter
formatter
end

def self.included(mod)
mod.instance_eval do
# minimum section definition to instantiate formatter plugin instances
config_section :format, required: false, multi: true, param_name: :formatter_configs do
config_argument :usage, :string, default: ''
config_param :@type, :string
end
end
end

attr_reader :_formatters # for tests

def initialize
super
@_formatters_started = false
@_formatters = {} # usage => formatter
end

def configure(conf)
super

@formatter_configs.each do |section|
if @_formatters[section.usage]
raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}"
end
formatter = Plugin.new_formatter(section[:@type], parent: self)
formatter.configure(section.corresponding_config_element)
@_formatters[section.usage] = formatter
end
end

def start
super
@_formatters_started = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be moved to end of this fucntion?
Formatters don't start yet in this position.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It's needed to set the variable to true here.
If any formatter calls formatter_create, that instance will not be started if we set it true at the end of this method.

@_formatters.each_pair do |usage, formatter|
formatter.start
end
end

def formatter_operate(method_name, &block)
@_formatters.each_pair do |usage, formatter|
begin
formatter.send(method_name)
block.call(formatter) if block_given?
rescue => e
log.error "unexpected error while #{method_name}", usage: usage, formatter: formatter, error: e
end
end
end

def stop
super
# timer stops automatically in super
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, it should be removed.

formatter_operate(:stop)
end

def before_shutdown
formatter_operate(:before_shutdown)
super
end

def shutdown
formatter_operate(:shutdown)
super
end

def after_shutdown
formatter_operate(:after_shutdown)
super
end

def close
formatter_operate(:close)
super
end

def terminate
formatter_operate(:terminate)
@_formatters = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is @_formatters_started = false needed for complete clean up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

super
end
end
end
end
141 changes: 141 additions & 0 deletions lib/fluent/plugin_helper/parser.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'monitor'
require 'forwardable'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


require 'fluent/plugin'
require 'fluent/plugin/parser'
require 'fluent/config/element'

module Fluent
module PluginHelper
module Parser
def parser_create(usage: '', type: nil, conf: nil)
parser = @_parsers[usage]
return parser if parser

if !type
raise ArgumentError, "BUG: both type and conf are not specified" unless conf
raise Fluent::ConfigError, "@type is not specified for <parse>" unless conf['@type']
type = conf['@type']
end
parser = Fluent::Plugin.new_parser(type, parent: self)
config = case conf
when Fluent::Config::Element
conf
when Hash
# in code, programmer may use symbols as keys, but Element needs strings
conf = Hash[conf.map{|k,v| [k.to_s, v]}]
Fluent::Config::Element.new('parse', usage, conf, [])
when nil
Fluent::Config::Element.new('parse', usage, {}, [])
else
raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
end
parser.configure(config)
if @_parsers_started
parser.start
end

@_parsers[usage] = parser
parser
end

def self.included(mod)
mod.instance_eval do
# minimum section definition to instantiate parser plugin instances
config_section :parse, required: false, multi: true, param_name: :parser_configs do
config_argument :usage, :string, default: ''
config_param :@type, :string
end
end
end

attr_reader :_parsers # for tests

def initialize
super
@_parsers_started = false
@_parsers = {} # usage => parser
end

def configure(conf)
super

@parser_configs.each do |section|
if @_parsers[section.usage]
raise Fluent::ConfigError, "duplicated parsers configured: #{section.usage}"
end
parser = Plugin.new_parser(section[:@type], parent: self)
parser.configure(section.corresponding_config_element)
@_parsers[section.usage] = parser
end
end

def start
super
@_parsers_started = true
@_parsers.each_pair do |usage, parser|
parser.start
end
end

def parser_operate(method_name, &block)
@_parsers.each_pair do |usage, parser|
begin
parser.send(method_name)
block.call(parser) if block_given?
rescue => e
log.error "unexpected error while #{method_name}", usage: usage, parser: parser, error: e
end
end
end

def stop
super
# timer stops automatically in super
parser_operate(:stop)
end

def before_shutdown
parser_operate(:before_shutdown)
super
end

def shutdown
parser_operate(:shutdown)
super
end

def after_shutdown
parser_operate(:after_shutdown)
super
end

def close
parser_operate(:close)
super
end

def terminate
parser_operate(:terminate)
@_parsers = {}
super
end
end
end
end
Loading