Skip to content

Commit

Permalink
Use constants for pipeline defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Nov 25, 2015
1 parent 0d09556 commit 5aff6ac
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
15 changes: 9 additions & 6 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
module LogStash; class Pipeline
attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id

DEFAULT_SETTINGS = {
# We don't set pipeline workers defaults here, but rather in safe_pipeline_workers
:default_pipeline_workers => LogStash::Config::CpuCoreStrategy.fifty_percent,
:pipeline_batch_size => 125,
:pipeline_batch_delay => 5 # in milliseconds
}

def initialize(config_str, settings = {})
@pipeline_id = settings[:pipeline_id] || self.object_id
@logger = Cabin::Channel.get(LogStash)
Expand Down Expand Up @@ -58,11 +65,7 @@ def initialize(config_str, settings = {})

@input_threads = []

@settings = {
# We don't set pipeline workers defaults here, but rather in safe_pipeline_workers
:pipeline_batch_size => 125,
:pipeline_batch_delay => 50 # in milliseconds
}
@settings = DEFAULT_SETTINGS.clone

# @ready requires thread safety since it is typically polled from outside the pipeline thread
@ready = Concurrent::AtomicBoolean.new(false)
Expand All @@ -79,7 +82,7 @@ def configure(setting, value)
end

def safe_pipeline_worker_count
default = LogStash::Config::CpuCoreStrategy.fifty_percent
default = DEFAULT_SETTINGS[:default_pipeline_workers]
thread_count = @settings[:pipeline_workers] #override from args "-w 8" or config
safe_filters, unsafe_filters = @filters.partition(&:threadsafe?)

Expand Down
6 changes: 3 additions & 3 deletions logstash-core/lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ class LogStash::Runner < Clamp::Command
option ["-w", "--pipeline-workers"], "COUNT",
I18n.t("logstash.runner.flag.pipeline-workers"),
:attribute_name => :pipeline_workers,
:default => LogStash::Config::CpuCoreStrategy.fifty_percent, &:to_i
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers], &:to_i

option ["-b", "--pipeline-batch-size"], "SIZE",
I18n.t("logstash.runner.flag.pipeline-batch-size"),
:attribute_name => :pipeline_batch_size,
:default => 125, &:to_i
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size], &:to_i

option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS",
I18n.t("logstash.runner.flag.pipeline-batch-delay"),
:attribute_name => :pipeline_batch_delay,
:default => 50, &:to_i
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay], &:to_i

option ["-l", "--log"], "FILE",
I18n.t("logstash.runner.flag.log"),
Expand Down
3 changes: 1 addition & 2 deletions logstash-core/spec/logstash/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TestPipeline < LogStash::Pipeline
end

describe LogStash::Pipeline do
let(:worker_thread_count) { 8 }
let(:worker_thread_count) { LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers] }
let(:safe_thread_count) { 1 }
let(:override_thread_count) { 42 }

Expand All @@ -95,7 +95,6 @@ class TestPipeline < LogStash::Pipeline
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummysafefilter").and_return(DummySafeFilter)
allow(LogStash::Config::CpuCoreStrategy).to receive(:fifty_percent).and_return(worker_thread_count)
end

context "when there are some not threadsafe filters" do
Expand Down

0 comments on commit 5aff6ac

Please sign in to comment.