Skip to content

Commit

Permalink
NG Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Dec 16, 2015
1 parent c95947d commit ff274dc
Show file tree
Hide file tree
Showing 23 changed files with 2,465 additions and 300 deletions.
28 changes: 21 additions & 7 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "logstash/config/cpu_core_strategy"
require "uri"
require "net/http"
require "logstash/pipeline"
LogStash::Environment.load_locale!

class LogStash::Agent < Clamp::Command
Expand All @@ -20,10 +21,20 @@ class LogStash::Agent < Clamp::Command
:default_input => DEFAULT_INPUT, :default_output => DEFAULT_OUTPUT),
:default => "", :attribute_name => :config_string

option ["-w", "--filterworkers"], "COUNT",
I18n.t("logstash.agent.flag.filterworkers"),
:attribute_name => :filter_workers,
:default => 0, &:to_i
option ["-w", "--pipeline-workers"], "COUNT",
I18n.t("logstash.runner.flag.pipeline-workers"),
:attribute_name => :pipeline_workers,
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers], &:to_i

option ["-b", "--pipeline-batch-size"], "SIZE",
I18n.t("logstash.runner.flag.pipeline-batch-size"),
:attribute_name => :pipeline_batch_size,
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size], &:to_i

option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS",
I18n.t("logstash.runner.flag.pipeline-batch-delay"),
:attribute_name => :pipeline_batch_delay,
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay], &:to_i

option ["-l", "--log"], "FILE",
I18n.t("logstash.agent.flag.log"),
Expand Down Expand Up @@ -121,7 +132,12 @@ def execute
end

begin
pipeline = LogStash::Pipeline.new(@config_string)
pipeline = LogStash::Pipeline.new(@config_string, {
:pipeline_workers => pipeline_workers,
:pipeline_batch_size => pipeline_batch_size,
:pipeline_batch_delay => pipeline_batch_delay,
:pipeline_id => "base"
})
rescue LoadError => e
fail("Configuration problem.")
end
Expand Down Expand Up @@ -151,8 +167,6 @@ def execute
configure_logging(log_file)
end

pipeline.configure("filter-workers", filter_workers) if filter_workers > 0

# Stop now if we are only asking for a config test.
if config_test?
report "Configuration OK"
Expand Down
Loading

0 comments on commit ff274dc

Please sign in to comment.