Skip to content

Commit

Permalink
move build_action_tuple and friends out of commons
Browse files Browse the repository at this point in the history
  • Loading branch information
colinsurprenant committed Nov 24, 2020
1 parent 49a6906 commit 7b861ca
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 117 deletions.
116 changes: 114 additions & 2 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,62 @@ def stop_template_installer
@template_installer.join unless @template_installer.nil?
end

# not private for elasticsearch_spec.rb
# Convert the event into a 3-tuple of action, params, and event
def event_action_tuple(event)
action = event.sprintf(@action)

params = {
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => event.sprintf(@index),
routing_field_name => @routing ? event.sprintf(@routing) : nil
}

params[:_type] = get_event_type(event) if use_event_type?(nil)

if @pipeline
value = event.sprintf(@pipeline)
# convention: empty string equates to not using a pipeline
# this is useful when using a field reference in the pipeline setting, e.g.
# elasticsearch {
# pipeline => "%{[@metadata][pipeline]}"
# }
params[:pipeline] = value unless value.empty?
end

if @parent
if @join_field
join_value = event.get(@join_field)
parent_value = event.sprintf(@parent)
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
params[routing_field_name] = event.sprintf(@parent)
else
params[:parent] = event.sprintf(@parent)
end
end

if action == 'update'
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_script] = event.sprintf(@script) if @script != ""
params[retry_on_conflict_action_name] = @retry_on_conflict
end

if @version
params[:version] = event.sprintf(@version)
end

if @version_type
params[:version_type] = event.sprintf(@version_type)
end

[action, params, event]
end

# not private for elasticsearch_spec.rb
def retry_on_conflict_action_name
maximum_seen_major_version >= 7 ? :retry_on_conflict : :_retry_on_conflict
end

@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }

@@plugins.each do |plugin|
Expand All @@ -317,8 +373,58 @@ def stop_template_installer

private

def build_event_index(event)
event.sprintf(@index)
def discover_cluster_uuid
return unless defined?(plugin_metadata)
cluster_info = client.get('/')
plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid'])
rescue => e
# TODO introducing this logging message breaks many tests that need refactoring
# @logger.error("Unable to retrieve elasticsearch cluster uuid", error => e.message)
end

def successful_connection?
!!maximum_seen_major_version
end

def routing_field_name
maximum_seen_major_version >= 6 ? :routing : :_routing
end

# Determine the correct value for the 'type' field for the given event
DEFAULT_EVENT_TYPE_ES6="doc".freeze
DEFAULT_EVENT_TYPE_ES7="_doc".freeze
def get_event_type(event)
# Set the 'type' value for the index.
type = if @document_type
event.sprintf(@document_type)
else
if maximum_seen_major_version < 6
event.get("type") || DEFAULT_EVENT_TYPE_ES6
elsif maximum_seen_major_version == 6
DEFAULT_EVENT_TYPE_ES6
elsif maximum_seen_major_version == 7
DEFAULT_EVENT_TYPE_ES7
else
nil
end
end

if !(type.is_a?(String) || type.is_a?(Numeric))
@logger.warn("Bad event type! Non-string/integer type value set!", :type_class => type.class, :type_value => type.to_s, :event => event)
end

type.to_s
end

##
# WARNING: This method is overridden in a subclass in Logstash Core 7.7-7.8's monitoring,
# where a `client` argument is both required and ignored. In later versions of
# Logstash Core it is optional and ignored, but to make it optional here would
# allow us to accidentally break compatibility with Logstashes where it was required.
# @param noop_required_client [nil]: required `nil` for legacy reasons.
# @return [Boolean]
def use_event_type?(noop_required_client)
maximum_seen_major_version < 8
end

def install_template
Expand Down Expand Up @@ -361,6 +467,12 @@ def setup_ecs_compatibility_related_defaults
@template_name ||= default_template_name
end

# To be overidden by the -java version
VALID_HTTP_ACTIONS=["index", "delete", "create", "update"]
def valid_actions
VALID_HTTP_ACTIONS
end

def check_action_validity
raise LogStash::ConfigurationError, "No action specified!" unless @action

Expand Down
115 changes: 0 additions & 115 deletions lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,6 @@ module Common
# will never succeed.
VERSION_TYPES_PERMITTING_CONFLICT = ["external", "external_gt", "external_gte"]

##
# WARNING: This method is overridden in a subclass in Logstash Core 7.7-7.8's monitoring,
# where a `client` argument is both required and ignored. In later versions of
# Logstash Core it is optional and ignored, but to make it optional here would
# allow us to accidentally break compatibility with Logstashes where it was required.
# @param noop_required_client [nil]: required `nil` for legacy reasons.
# @return [Boolean]
def use_event_type?(noop_required_client)
maximum_seen_major_version < 8
end

# Convert the event into a 3-tuple of action, params, and event
def event_action_tuple(event)
action = event.sprintf(@action)

params = {
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => build_event_index(event),
routing_field_name => @routing ? event.sprintf(@routing) : nil
}

params[:_type] = get_event_type(event) if use_event_type?(nil)

if @pipeline
value = event.sprintf(@pipeline)
# convention: empty string equates to not using a pipeline
# this is useful when using a field reference in the pipeline setting, e.g.
# elasticsearch {
# pipeline => "%{[@metadata][pipeline]}"
# }
params[:pipeline] = value unless value.empty?
end

if @parent
if @join_field
join_value = event.get(@join_field)
parent_value = event.sprintf(@parent)
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
params[routing_field_name] = event.sprintf(@parent)
else
params[:parent] = event.sprintf(@parent)
end
end

if action == 'update'
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_script] = event.sprintf(@script) if @script != ""
params[retry_on_conflict_action_name] = @retry_on_conflict
end

if @version
params[:version] = event.sprintf(@version)
end

if @version_type
params[:version_type] = event.sprintf(@version_type)
end

[action, params, event]
end

# Perform some ES options validations and Build the HttpClient.
# Note that this methods may sets the @user, @password, @hosts and @client ivars as a side effect.
# @param license_checker [#appropriate_license?] An optional license checker that will be used by the Pool class.
Expand All @@ -101,7 +40,6 @@ def build_client(license_checker = nil)
@client ||= ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
end


def validate_authentication
authn_options = 0
authn_options += 1 if @cloud_auth
Expand Down Expand Up @@ -187,33 +125,6 @@ def maximum_seen_major_version
client.maximum_seen_major_version
end

def routing_field_name
maximum_seen_major_version >= 6 ? :routing : :_routing
end

def retry_on_conflict_action_name
maximum_seen_major_version >= 7 ? :retry_on_conflict : :_retry_on_conflict
end

def successful_connection?
!!maximum_seen_major_version
end

def discover_cluster_uuid
return unless defined?(plugin_metadata)
cluster_info = client.get('/')
plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid'])
rescue => e
# TODO introducing this logging message breaks many tests that need refactoring
# @logger.error("Unable to retrieve elasticsearch cluster uuid", error => e.message)
end

# To be overidden by the -java version
VALID_HTTP_ACTIONS=["index", "delete", "create", "update"]
def valid_actions
VALID_HTTP_ACTIONS
end

def retrying_submit(actions)
# Initially we submit the full list of actions
submit_actions = actions
Expand Down Expand Up @@ -322,32 +233,6 @@ def handle_dlq_status(message, action, status, response)
end
end

# Determine the correct value for the 'type' field for the given event
DEFAULT_EVENT_TYPE_ES6="doc".freeze
DEFAULT_EVENT_TYPE_ES7="_doc".freeze
def get_event_type(event)
# Set the 'type' value for the index.
type = if @document_type
event.sprintf(@document_type)
else
if maximum_seen_major_version < 6
event.get("type") || DEFAULT_EVENT_TYPE_ES6
elsif maximum_seen_major_version == 6
DEFAULT_EVENT_TYPE_ES6
elsif maximum_seen_major_version == 7
DEFAULT_EVENT_TYPE_ES7
else
nil
end
end

if !(type.is_a?(String) || type.is_a?(Numeric))
@logger.warn("Bad event type! Non-string/integer type value set!", :type_class => type.class, :type_value => type.to_s, :event => event)
end

type.to_s
end

# Rescue retryable errors during bulk submission
def safe_bulk(actions)
sleep_interval = @retry_initial_interval
Expand Down

0 comments on commit 7b861ca

Please sign in to comment.