diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index a960dcf227a..b7d54805e19 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -18,6 +18,13 @@ module LogStash; class Pipeline attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id + DEFAULT_SETTINGS = { + # We don't set pipeline workers defaults here, but rather in safe_pipeline_workers + :default_pipeline_workers => LogStash::Config::CpuCoreStrategy.fifty_percent, + :pipeline_batch_size => 125, + :pipeline_batch_delay => 5 # in milliseconds + } + def initialize(config_str, settings = {}) @pipeline_id = settings[:pipeline_id] || self.object_id @logger = Cabin::Channel.get(LogStash) @@ -58,11 +65,7 @@ def initialize(config_str, settings = {}) @input_threads = [] - @settings = { - # We don't set pipeline workers defaults here, but rather in safe_pipeline_workers - :pipeline_batch_size => 125, - :pipeline_batch_delay => 50 # in milliseconds - } + @settings = DEFAULT_SETTINGS.clone # @ready requires thread safety since it is typically polled from outside the pipeline thread @ready = Concurrent::AtomicBoolean.new(false) @@ -79,7 +82,7 @@ def configure(setting, value) end def safe_pipeline_worker_count - default = LogStash::Config::CpuCoreStrategy.fifty_percent + default = DEFAULT_SETTINGS[:default_pipeline_workers] thread_count = @settings[:pipeline_workers] #override from args "-w 8" or config safe_filters, unsafe_filters = @filters.partition(&:threadsafe?) diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 27e7ae25c58..13e74d4c8a9 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -29,17 +29,17 @@ class LogStash::Runner < Clamp::Command option ["-w", "--pipeline-workers"], "COUNT", I18n.t("logstash.runner.flag.pipeline-workers"), :attribute_name => :pipeline_workers, - :default => LogStash::Config::CpuCoreStrategy.fifty_percent, &:to_i + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers], &:to_i option ["-b", "--pipeline-batch-size"], "SIZE", I18n.t("logstash.runner.flag.pipeline-batch-size"), :attribute_name => :pipeline_batch_size, - :default => 125, &:to_i + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size], &:to_i option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS", I18n.t("logstash.runner.flag.pipeline-batch-delay"), :attribute_name => :pipeline_batch_delay, - :default => 50, &:to_i + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay], &:to_i option ["-l", "--log"], "FILE", I18n.t("logstash.runner.flag.log"), diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index 13cf6587db5..6851afd7ce9 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -84,7 +84,7 @@ class TestPipeline < LogStash::Pipeline end describe LogStash::Pipeline do - let(:worker_thread_count) { 8 } + let(:worker_thread_count) { LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers] } let(:safe_thread_count) { 1 } let(:override_thread_count) { 42 } @@ -95,7 +95,6 @@ class TestPipeline < LogStash::Pipeline allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput) allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter) allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummysafefilter").and_return(DummySafeFilter) - allow(LogStash::Config::CpuCoreStrategy).to receive(:fifty_percent).and_return(worker_thread_count) end context "when there are some not threadsafe filters" do