Skip to content

Commit

Permalink
Refactor: Renaming prospector to input (#6078)
Browse files Browse the repository at this point in the history
This refactor rename the prospector type to the input type, this allow
this project to be more aligned with Logstash naming convention and also remove
some of the last naming legacy of the `logstash fowarder`. The input
name is also more appropriate for the UDP and the Redis code.

The prospectors are now moved to the `input` folder. Backward
compatibility is keep by using type aliasing over the older `prospector`
types. Logs statements were also changed to reflect this refactor.

Currently all the code and YAML are still using the *prospector(s)* keys,
but other PRs will move the usage to the inputs. If the `input(s)` are
present we will use them instead of the *prospectors* key.
  • Loading branch information
ph authored and ruflin committed Jan 22, 2018
1 parent d8bd52d commit bc1a68e
Show file tree
Hide file tree
Showing 64 changed files with 570 additions and 349 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

*Filebeat*
- Switch to docker prospector in sample manifests for Kubernetes deployment {pull}5963[5963]
- Renaming of the prospector type to the input type and all prospectors are now moved to the input
folder, to maintain backward compatibility type aliasing was used to map the old type to the new
one. This change also affect YAML configuration. {pull}6078[6078]

*Heartbeat*

Expand Down
9 changes: 8 additions & 1 deletion filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@
- name: prospector.type
required: true
description: >
The prospector type from which the event was generated. This field is set to the value specified for the `type` option in the prospector section of the Filebeat config file.
The input type from which the event was generated. This field is set to the value specified
for the `type` option in the input section of the Filebeat config file. (DEPRECATED: see `event.type`)
- name: event.type
required: true
description: >
The input type from which the event was generated. This field is set to the value specified
for the `type` option in the input section of the Filebeat config file.
- name: read_timestamp
description: >
Expand Down
18 changes: 9 additions & 9 deletions filebeat/beater/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
"github.com/elastic/beats/libbeat/common/bus"
)

// AutodiscoverAdapter for Filebeat modules & prospectors
// AutodiscoverAdapter for Filebeat modules & input
type AutodiscoverAdapter struct {
prospectorFactory cfgfile.RunnerFactory
moduleFactory cfgfile.RunnerFactory
inputFactory cfgfile.RunnerFactory
moduleFactory cfgfile.RunnerFactory
}

// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Filebeat modules & prospectors
func NewAutodiscoverAdapter(prospectorFactory, moduleFactory cfgfile.RunnerFactory) *AutodiscoverAdapter {
// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Filebeat modules & input
func NewAutodiscoverAdapter(inputFactory, moduleFactory cfgfile.RunnerFactory) *AutodiscoverAdapter {
return &AutodiscoverAdapter{
prospectorFactory: prospectorFactory,
moduleFactory: moduleFactory,
inputFactory: inputFactory,
moduleFactory: moduleFactory,
}
}

Expand All @@ -37,12 +37,12 @@ func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
return nil
}

// Create a module or prospector from the given config
// Create a module or input from the given config
func (m *AutodiscoverAdapter) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
if c.HasField("module") {
return m.moduleFactory.Create(c, meta)
}
return m.prospectorFactory.Create(c, meta)
return m.inputFactory.Create(c, meta)
}

// StartFilter returns the bus filter to retrieve runner start triggering events
Expand Down
38 changes: 23 additions & 15 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, err
}

if len(config.Prospectors) > 0 {
cfgwarn.Deprecate("7.0.0", "prospectors are deprecated, Use `inputs` instead.")
if len(config.Inputs) > 0 {
return nil, fmt.Errorf("prospectors and inputs used in the configuration file, define only inputs not both")
}
config.Inputs = config.Prospectors
}

moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info.Version, true)
if err != nil {
return nil, err
Expand All @@ -62,7 +70,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
logp.Info("Enabled modules/filesets: %s", moduleRegistry.InfoString())
}

moduleProspectors, err := moduleRegistry.GetProspectorConfigs()
moduleInputs, err := moduleRegistry.GetInputConfigs()
if err != nil {
return nil, err
}
Expand All @@ -71,28 +79,28 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, err
}

// Add prospectors created by the modules
config.Prospectors = append(config.Prospectors, moduleProspectors...)
// Add inputs created by the modules
config.Inputs = append(config.Inputs, moduleInputs...)

haveEnabledProspectors := false
for _, prospector := range config.Prospectors {
if prospector.Enabled() {
haveEnabledProspectors = true
haveEnabledInputs := false
for _, input := range config.Inputs {
if input.Enabled() {
haveEnabledInputs = true
break
}
}

if !config.ConfigProspector.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledProspectors && config.Autodiscover == nil {
if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil {
if !b.InSetupCmd {
return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?")
return nil, errors.New("no modules or inputs enabled and configuration reloading disabled. What files do you want me to watch?")
}

// in the `setup` command, log this only as a warning
logp.Warn("Setup called, but no modules enabled.")
}

if *once && config.ConfigProspector.Enabled() && config.ConfigModules.Enabled() {
return nil, errors.New("prospector configs and -once cannot be used together")
if *once && config.ConfigInput.Enabled() && config.ConfigModules.Enabled() {
return nil, errors.New("input configs and -once cannot be used together")
}

fb := &Filebeat{
Expand Down Expand Up @@ -220,7 +228,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
outDone := make(chan struct{}) // outDone closes down all active pipeline connections
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
config.Prospectors,
config.Inputs,
b.Info.Version,
fb.done,
*once)
Expand Down Expand Up @@ -261,7 +269,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Warn(pipelinesWarning)
}

err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory)
err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory)
if err != nil {
crawler.Stop()
return err
Expand All @@ -279,7 +287,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adapter := NewAutodiscoverAdapter(crawler.ProspectorsFactory, crawler.ModulesFactory)
adapter := NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory)
adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover)
if err != nil {
return err
Expand All @@ -291,7 +299,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
waitFinished.AddChan(fb.done)
waitFinished.Wait()

// Stop autodiscover -> Stop crawler -> stop prospectors -> stop harvesters
// Stop autodiscover -> Stop crawler -> stop inputs -> stop harvesters
// Note: waiting for crawlers to stop here in order to install wgEvents.Wait
// after all events have been enqueued for publishing. Otherwise wgEvents.Wait
// or publisher might panic due to concurrent updates.
Expand Down
17 changes: 10 additions & 7 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ type clientEventer struct {
wgEvents eventCounter
}

// prospectorOutletConfig defines common prospector settings
// inputOutletConfig defines common input settings
// for the publisher pipline.
type prospectorOutletConfig struct {
type inputOutletConfig struct {
// event processing
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`

// implicit event fields
Type string `config:"type"` // prospector.type
Type string `config:"type"` // input.type

// hidden filebeat modules settings
Module string `config:"_module_name"` // hidden setting
Expand All @@ -44,7 +44,7 @@ type prospectorOutletConfig struct {
}

// NewOutletFactory creates a new outlet factory for
// connecting a prospector to the publisher pipeline.
// connecting an input to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
pipeline beat.Pipeline,
Expand All @@ -63,12 +63,12 @@ func NewOutletFactory(
return o
}

// Create builds a new Outleter, while applying common prospector settings.
// Prospectors and all harvesters use the same pipeline client instance.
// Create builds a new Outleter, while applying common input settings.
// Inputs and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) {
config := prospectorOutletConfig{}
config := inputOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
Expand Down Expand Up @@ -99,6 +99,9 @@ func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPoint
fields["prospector"] = common.MapStr{
"type": config.Type,
}
fields["event"] = common.MapStr{
"type": config.Type,
}
}

client, err := f.pipeline.ConnectWith(beat.ClientConfig{
Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// Factory is used to create a new Outlet instance
type Factory func(*common.Config, *common.MapStrPointer) (Outleter, error)

// Outleter is the outlet for a prospector
// Outleter is the outlet for an input
type Outleter interface {
Close() error
OnEvent(data *util.Data) bool
Expand Down
31 changes: 20 additions & 11 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ const (
)

type Config struct {
Prospectors []*common.Config `config:"prospectors"`
RegistryFile string `config:"registry_file"`
RegistryFlush time.Duration `config:"registry_flush"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ConfigProspector *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
Inputs []*common.Config `config:"inputs"`
Prospectors []*common.Config `config:"prospectors"`
RegistryFile string `config:"registry_file"`
RegistryFlush time.Duration `config:"registry_flush"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ConfigInput *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
}

var (
Expand Down Expand Up @@ -82,7 +83,15 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
return fmt.Errorf("Failed to read %s: %s", file, err)
}

config.Prospectors = append(config.Prospectors, tmpConfig.Filebeat.Prospectors...)
if len(tmpConfig.Filebeat.Prospectors) > 0 {
cfgwarn.Deprecate("7.0.0", "prospectors are deprecated, Use `inputs` instead.")
if len(tmpConfig.Filebeat.Inputs) > 0 {
return fmt.Errorf("prospectors and inputs used in the configuration file, define only inputs not both")
}
tmpConfig.Filebeat.Inputs = append(tmpConfig.Filebeat.Inputs, tmpConfig.Filebeat.Prospectors...)
}

config.Inputs = append(config.Inputs, tmpConfig.Filebeat.Inputs...)
}

return nil
Expand All @@ -97,7 +106,7 @@ func (config *Config) FetchConfigs() error {
return nil
}

cfgwarn.Deprecate("7.0.0", "config_dir is deprecated. Use `filebeat.config.prospectors` instead.")
cfgwarn.Deprecate("7.0.0", "config_dir is deprecated. Use `filebeat.config.inputs` instead.")

// If configDir is relative, consider it relative to the config path
configDir = paths.Resolve(paths.Config, configDir)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,5 @@ func TestMergeConfigFiles(t *testing.T) {
config := &Config{}
mergeConfigFiles(files, config)

assert.Equal(t, 4, len(config.Prospectors))
assert.Equal(t, 4, len(config.Inputs))
}
Loading

0 comments on commit bc1a68e

Please sign in to comment.