diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 653b82e8aebe..8246ee93d592 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -31,6 +31,9 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di *Filebeat* - Switch to docker prospector in sample manifests for Kubernetes deployment {pull}5963[5963] +- Renaming of the prospector type to the input type and all prospectors are now moved to the input + folder, to maintain backward compatibility type aliasing was used to map the old type to the new + one. This change also affect YAML configuration. {pull}6078[6078] *Heartbeat* diff --git a/filebeat/_meta/fields.common.yml b/filebeat/_meta/fields.common.yml index 886cde082faa..e1873a9191ac 100644 --- a/filebeat/_meta/fields.common.yml +++ b/filebeat/_meta/fields.common.yml @@ -32,7 +32,14 @@ - name: prospector.type required: true description: > - The prospector type from which the event was generated. This field is set to the value specified for the `type` option in the prospector section of the Filebeat config file. + The input type from which the event was generated. This field is set to the value specified + for the `type` option in the input section of the Filebeat config file. (DEPRECATED: see `event.type`) + + - name: event.type + required: true + description: > + The input type from which the event was generated. This field is set to the value specified + for the `type` option in the input section of the Filebeat config file. - name: read_timestamp description: > diff --git a/filebeat/beater/autodiscover.go b/filebeat/beater/autodiscover.go index 5fe4e5320f54..acd6838872ca 100644 --- a/filebeat/beater/autodiscover.go +++ b/filebeat/beater/autodiscover.go @@ -8,17 +8,17 @@ import ( "github.com/elastic/beats/libbeat/common/bus" ) -// AutodiscoverAdapter for Filebeat modules & prospectors +// AutodiscoverAdapter for Filebeat modules & input type AutodiscoverAdapter struct { - prospectorFactory cfgfile.RunnerFactory - moduleFactory cfgfile.RunnerFactory + inputFactory cfgfile.RunnerFactory + moduleFactory cfgfile.RunnerFactory } -// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Filebeat modules & prospectors -func NewAutodiscoverAdapter(prospectorFactory, moduleFactory cfgfile.RunnerFactory) *AutodiscoverAdapter { +// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Filebeat modules & input +func NewAutodiscoverAdapter(inputFactory, moduleFactory cfgfile.RunnerFactory) *AutodiscoverAdapter { return &AutodiscoverAdapter{ - prospectorFactory: prospectorFactory, - moduleFactory: moduleFactory, + inputFactory: inputFactory, + moduleFactory: moduleFactory, } } @@ -37,12 +37,12 @@ func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error { return nil } -// Create a module or prospector from the given config +// Create a module or input from the given config func (m *AutodiscoverAdapter) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { if c.HasField("module") { return m.moduleFactory.Create(c, meta) } - return m.prospectorFactory.Create(c, meta) + return m.inputFactory.Create(c, meta) } // StartFilter returns the bus filter to retrieve runner start triggering events diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 0d7ef66935e3..9912bf0f1cc1 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -54,6 +54,14 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { return nil, err } + if len(config.Prospectors) > 0 { + cfgwarn.Deprecate("7.0.0", "prospectors are deprecated, Use `inputs` instead.") + if len(config.Inputs) > 0 { + return nil, fmt.Errorf("prospectors and inputs used in the configuration file, define only inputs not both") + } + config.Inputs = config.Prospectors + } + moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info.Version, true) if err != nil { return nil, err @@ -62,7 +70,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { logp.Info("Enabled modules/filesets: %s", moduleRegistry.InfoString()) } - moduleProspectors, err := moduleRegistry.GetProspectorConfigs() + moduleInputs, err := moduleRegistry.GetInputConfigs() if err != nil { return nil, err } @@ -71,28 +79,28 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { return nil, err } - // Add prospectors created by the modules - config.Prospectors = append(config.Prospectors, moduleProspectors...) + // Add inputs created by the modules + config.Inputs = append(config.Inputs, moduleInputs...) - haveEnabledProspectors := false - for _, prospector := range config.Prospectors { - if prospector.Enabled() { - haveEnabledProspectors = true + haveEnabledInputs := false + for _, input := range config.Inputs { + if input.Enabled() { + haveEnabledInputs = true break } } - if !config.ConfigProspector.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledProspectors && config.Autodiscover == nil { + if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil { if !b.InSetupCmd { - return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?") + return nil, errors.New("no modules or inputs enabled and configuration reloading disabled. What files do you want me to watch?") } // in the `setup` command, log this only as a warning logp.Warn("Setup called, but no modules enabled.") } - if *once && config.ConfigProspector.Enabled() && config.ConfigModules.Enabled() { - return nil, errors.New("prospector configs and -once cannot be used together") + if *once && config.ConfigInput.Enabled() && config.ConfigModules.Enabled() { + return nil, errors.New("input configs and -once cannot be used together") } fb := &Filebeat{ @@ -220,7 +228,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { outDone := make(chan struct{}) // outDone closes down all active pipeline connections crawler, err := crawler.New( channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create, - config.Prospectors, + config.Inputs, b.Info.Version, fb.done, *once) @@ -261,7 +269,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Warn(pipelinesWarning) } - err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory) + err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory) if err != nil { crawler.Stop() return err @@ -279,7 +287,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { var adiscover *autodiscover.Autodiscover if fb.config.Autodiscover != nil { - adapter := NewAutodiscoverAdapter(crawler.ProspectorsFactory, crawler.ModulesFactory) + adapter := NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory) adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover) if err != nil { return err @@ -291,7 +299,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { waitFinished.AddChan(fb.done) waitFinished.Wait() - // Stop autodiscover -> Stop crawler -> stop prospectors -> stop harvesters + // Stop autodiscover -> Stop crawler -> stop inputs -> stop harvesters // Note: waiting for crawlers to stop here in order to install wgEvents.Wait // after all events have been enqueued for publishing. Otherwise wgEvents.Wait // or publisher might panic due to concurrent updates. diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index 025df89a2342..c77fb0afba50 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -24,15 +24,15 @@ type clientEventer struct { wgEvents eventCounter } -// prospectorOutletConfig defines common prospector settings +// inputOutletConfig defines common input settings // for the publisher pipline. -type prospectorOutletConfig struct { +type inputOutletConfig struct { // event processing common.EventMetadata `config:",inline"` // Fields and tags to add to events. Processors processors.PluginConfig `config:"processors"` // implicit event fields - Type string `config:"type"` // prospector.type + Type string `config:"type"` // input.type // hidden filebeat modules settings Module string `config:"_module_name"` // hidden setting @@ -44,7 +44,7 @@ type prospectorOutletConfig struct { } // NewOutletFactory creates a new outlet factory for -// connecting a prospector to the publisher pipeline. +// connecting an input to the publisher pipeline. func NewOutletFactory( done <-chan struct{}, pipeline beat.Pipeline, @@ -63,12 +63,12 @@ func NewOutletFactory( return o } -// Create builds a new Outleter, while applying common prospector settings. -// Prospectors and all harvesters use the same pipeline client instance. +// Create builds a new Outleter, while applying common input settings. +// Inputs and all harvesters use the same pipeline client instance. // This guarantees ordering between events as required by the registrar for // file.State updates func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) { - config := prospectorOutletConfig{} + config := inputOutletConfig{} if err := cfg.Unpack(&config); err != nil { return nil, err } @@ -99,6 +99,9 @@ func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPoint fields["prospector"] = common.MapStr{ "type": config.Type, } + fields["event"] = common.MapStr{ + "type": config.Type, + } } client, err := f.pipeline.ConnectWith(beat.ClientConfig{ diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 2fbed63ba832..06bbcd432774 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -8,7 +8,7 @@ import ( // Factory is used to create a new Outlet instance type Factory func(*common.Config, *common.MapStrPointer) (Outleter, error) -// Outleter is the outlet for a prospector +// Outleter is the outlet for an input type Outleter interface { Close() error OnEvent(data *util.Data) bool diff --git a/filebeat/config/config.go b/filebeat/config/config.go index d3a0ee31d730..40ec527253e6 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -21,15 +21,16 @@ const ( ) type Config struct { - Prospectors []*common.Config `config:"prospectors"` - RegistryFile string `config:"registry_file"` - RegistryFlush time.Duration `config:"registry_flush"` - ConfigDir string `config:"config_dir"` - ShutdownTimeout time.Duration `config:"shutdown_timeout"` - Modules []*common.Config `config:"modules"` - ConfigProspector *common.Config `config:"config.prospectors"` - ConfigModules *common.Config `config:"config.modules"` - Autodiscover *autodiscover.Config `config:"autodiscover"` + Inputs []*common.Config `config:"inputs"` + Prospectors []*common.Config `config:"prospectors"` + RegistryFile string `config:"registry_file"` + RegistryFlush time.Duration `config:"registry_flush"` + ConfigDir string `config:"config_dir"` + ShutdownTimeout time.Duration `config:"shutdown_timeout"` + Modules []*common.Config `config:"modules"` + ConfigInput *common.Config `config:"config.prospectors"` + ConfigModules *common.Config `config:"config.modules"` + Autodiscover *autodiscover.Config `config:"autodiscover"` } var ( @@ -82,7 +83,15 @@ func mergeConfigFiles(configFiles []string, config *Config) error { return fmt.Errorf("Failed to read %s: %s", file, err) } - config.Prospectors = append(config.Prospectors, tmpConfig.Filebeat.Prospectors...) + if len(tmpConfig.Filebeat.Prospectors) > 0 { + cfgwarn.Deprecate("7.0.0", "prospectors are deprecated, Use `inputs` instead.") + if len(tmpConfig.Filebeat.Inputs) > 0 { + return fmt.Errorf("prospectors and inputs used in the configuration file, define only inputs not both") + } + tmpConfig.Filebeat.Inputs = append(tmpConfig.Filebeat.Inputs, tmpConfig.Filebeat.Prospectors...) + } + + config.Inputs = append(config.Inputs, tmpConfig.Filebeat.Inputs...) } return nil @@ -97,7 +106,7 @@ func (config *Config) FetchConfigs() error { return nil } - cfgwarn.Deprecate("7.0.0", "config_dir is deprecated. Use `filebeat.config.prospectors` instead.") + cfgwarn.Deprecate("7.0.0", "config_dir is deprecated. Use `filebeat.config.inputs` instead.") // If configDir is relative, consider it relative to the config path configDir = paths.Resolve(paths.Config, configDir) diff --git a/filebeat/config/config_test.go b/filebeat/config/config_test.go index 0c95a5fa387b..c8df1b614368 100644 --- a/filebeat/config/config_test.go +++ b/filebeat/config/config_test.go @@ -93,5 +93,5 @@ func TestMergeConfigFiles(t *testing.T) { config := &Config{} mergeConfigFiles(files, config) - assert.Equal(t, 4, len(config.Prospectors)) + assert.Equal(t, 4, len(config.Inputs)) } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 54a2162aca1c..16e65f1d335f 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -7,7 +7,7 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/fileset" "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/filebeat/prospector" + input "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/registrar" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" @@ -17,53 +17,53 @@ import ( ) type Crawler struct { - prospectors map[uint64]*prospector.Prospector - prospectorConfigs []*common.Config - out channel.Factory - wg sync.WaitGroup - ProspectorsFactory cfgfile.RunnerFactory - ModulesFactory cfgfile.RunnerFactory - modulesReloader *cfgfile.Reloader - prospectorsReloader *cfgfile.Reloader - once bool - beatVersion string - beatDone chan struct{} + inputs map[uint64]*input.Runner + inputConfigs []*common.Config + out channel.Factory + wg sync.WaitGroup + InputsFactory cfgfile.RunnerFactory + ModulesFactory cfgfile.RunnerFactory + modulesReloader *cfgfile.Reloader + inputReloader *cfgfile.Reloader + once bool + beatVersion string + beatDone chan struct{} } -func New(out channel.Factory, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) { +func New(out channel.Factory, inputConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) { return &Crawler{ - out: out, - prospectors: map[uint64]*prospector.Prospector{}, - prospectorConfigs: prospectorConfigs, - once: once, - beatVersion: beatVersion, - beatDone: beatDone, + out: out, + inputs: map[uint64]*input.Runner{}, + inputConfigs: inputConfigs, + once: once, + beatVersion: beatVersion, + beatDone: beatDone, }, nil } -// Start starts the crawler with all prospectors -func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, +// Start starts the crawler with all inputs +func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory) error { - logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs)) + logp.Info("Loading Inputs: %v", len(c.inputConfigs)) // Prospect the globs/paths given on the command line and launch harvesters - for _, prospectorConfig := range c.prospectorConfigs { - err := c.startProspector(prospectorConfig, r.GetStates()) + for _, inputConfig := range c.inputConfigs { + err := c.startInput(inputConfig, r.GetStates()) if err != nil { return err } } - c.ProspectorsFactory = prospector.NewRunnerFactory(c.out, r, c.beatDone) - if configProspectors.Enabled() { - c.prospectorsReloader = cfgfile.NewReloader(configProspectors) - if err := c.prospectorsReloader.Check(c.ProspectorsFactory); err != nil { + c.InputsFactory = input.NewRunnerFactory(c.out, r, c.beatDone) + if configInputs.Enabled() { + c.inputReloader = cfgfile.NewReloader(configInputs) + if err := c.inputReloader.Check(c.InputsFactory); err != nil { return err } go func() { - c.prospectorsReloader.Run(c.ProspectorsFactory) + c.inputReloader.Run(c.InputsFactory) }() } @@ -79,26 +79,26 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config }() } - logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors)) + logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs)) return nil } -func (c *Crawler) startProspector(config *common.Config, states []file.State) error { +func (c *Crawler) startInput(config *common.Config, states []file.State) error { if !config.Enabled() { return nil } - p, err := prospector.New(config, c.out, c.beatDone, states, nil) + p, err := input.New(config, c.out, c.beatDone, states, nil) if err != nil { - return fmt.Errorf("Error in initing prospector: %s", err) + return fmt.Errorf("Error in initing input: %s", err) } p.Once = c.once - if _, ok := c.prospectors[p.ID]; ok { - return fmt.Errorf("Prospector with same ID already exists: %d", p.ID) + if _, ok := c.inputs[p.ID]; ok { + return fmt.Errorf("Input with same ID already exists: %d", p.ID) } - c.prospectors[p.ID] = p + c.inputs[p.ID] = p p.Start() @@ -116,14 +116,14 @@ func (c *Crawler) Stop() { }() } - logp.Info("Stopping %v prospectors", len(c.prospectors)) - for _, p := range c.prospectors { - // Stop prospectors in parallel + logp.Info("Stopping %v inputs", len(c.inputs)) + for _, p := range c.inputs { + // Stop inputs in parallel asyncWaitStop(p.Stop) } - if c.prospectorsReloader != nil { - asyncWaitStop(c.prospectorsReloader.Stop) + if c.inputReloader != nil { + asyncWaitStop(c.inputReloader.Stop) } if c.modulesReloader != nil { diff --git a/filebeat/docker-compose.yml b/filebeat/docker-compose.yml index c0bd070decc7..f010c8c999c7 100644 --- a/filebeat/docker-compose.yml +++ b/filebeat/docker-compose.yml @@ -6,7 +6,7 @@ services: - proxy_dep env_file: - ${PWD}/build/test.env - - ${PWD}/prospector/redis/_meta/env + - ${PWD}/input/redis/_meta/env environment: - KIBANA_HOST=kibana - KIBANA_PORT=5601 @@ -37,4 +37,4 @@ services: service: kibana redis: - build: ${PWD}/prospector/redis/_meta + build: ${PWD}/input/redis/_meta diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index e50ad1e96179..dfeec75dd36e 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -957,7 +957,15 @@ Log stream when reading container logs, can be 'stdout' or 'stderr' required: True -The prospector type from which the event was generated. This field is set to the value specified for the `type` option in the prospector section of the Filebeat config file. +The input type from which the event was generated. This field is set to the value specified for the `type` option in the input section of the Filebeat config file. (DEPRECATED: see `event.type`) + + +[float] +=== `event.type` + +required: True + +The input type from which the event was generated. This field is set to the value specified for the `type` option in the input section of the Filebeat config file. [float] diff --git a/filebeat/fileset/config.go b/filebeat/fileset/config.go index 7a20fa158c81..ac6d46e502be 100644 --- a/filebeat/fileset/config.go +++ b/filebeat/fileset/config.go @@ -1,5 +1,12 @@ package fileset +import ( + "fmt" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" +) + // ModuleConfig contains the configuration file options for a module type ModuleConfig struct { Module string `config:"module" validate:"required"` @@ -13,5 +20,24 @@ type ModuleConfig struct { type FilesetConfig struct { Enabled *bool `config:"enabled"` Var map[string]interface{} `config:"var"` + Input map[string]interface{} `config:"input"` Prospector map[string]interface{} `config:"prospector"` } + +// NewFilesetConfig creates a new FilesetConfig from a common.Config. +func NewFilesetConfig(cfg *common.Config) (*FilesetConfig, error) { + var fcfg FilesetConfig + err := cfg.Unpack(&fcfg) + if err != nil { + return nil, fmt.Errorf("error unpacking configuration") + } + + if len(fcfg.Prospector) > 0 { + cfgwarn.Deprecate("7.0.0", "prospector is deprecated. Use `input` instead.") + if len(fcfg.Input) > 0 { + return nil, fmt.Errorf("error prospector and input are defined in the fileset, use only input") + } + fcfg.Input = fcfg.Prospector + } + return &fcfg, nil +} diff --git a/filebeat/fileset/config_test.go b/filebeat/fileset/config_test.go new file mode 100644 index 000000000000..f96a345ca98e --- /dev/null +++ b/filebeat/fileset/config_test.go @@ -0,0 +1,62 @@ +package fileset + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestProspectorDeprecation(t *testing.T) { + cfg := map[string]interface{}{ + "enabled": true, + "prospector": map[string]interface{}{ + "close_eof": true, + }, + } + + c, err := common.NewConfigFrom(cfg) + assert.NoError(t, err) + + f, err := NewFilesetConfig(c) + if assert.NoError(t, err) { + assert.Equal(t, f.Input["close_eof"], true) + } +} + +func TestInputSettings(t *testing.T) { + cfg := map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "close_eof": true, + }, + } + + c, err := common.NewConfigFrom(cfg) + assert.NoError(t, err) + + f, err := NewFilesetConfig(c) + if assert.NoError(t, err) { + assert.Equal(t, f.Input["close_eof"], true) + assert.Nil(t, f.Prospector) + } +} + +func TestProspectorDeprecationWhenInputIsAlsoDefined(t *testing.T) { + cfg := map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "close_eof": true, + }, + "prospector": map[string]interface{}{ + "close_eof": true, + }, + } + + c, err := common.NewConfigFrom(cfg) + assert.NoError(t, err) + + _, err = NewFilesetConfig(c) + assert.Error(t, err, "error prospector and input are defined in the fileset, use only input") +} diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index d946578ba99a..4becbc73b8b1 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -4,7 +4,7 @@ import ( "github.com/mitchellh/hashstructure" "github.com/elastic/beats/filebeat/channel" - "github.com/elastic/beats/filebeat/prospector" + input "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/registrar" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" @@ -21,11 +21,11 @@ type Factory struct { beatDone chan struct{} } -// Wrap an array of prospectors and implements cfgfile.Runner interface -type prospectorsRunner struct { +// Wrap an array of inputs and implements cfgfile.Runner interface +type inputsRunner struct { id uint64 moduleRegistry *ModuleRegistry - prospectors []*prospector.Prospector + inputs []*input.Runner pipelineLoaderFactory PipelineLoaderFactory } @@ -49,7 +49,7 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile. return nil, err } - pConfigs, err := m.GetProspectorConfigs() + pConfigs, err := m.GetInputConfigs() if err != nil { return nil, err } @@ -62,24 +62,24 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile. return nil, err } - prospectors := make([]*prospector.Prospector, len(pConfigs)) + inputs := make([]*input.Runner, len(pConfigs)) for i, pConfig := range pConfigs { - prospectors[i], err = prospector.New(pConfig, f.outlet, f.beatDone, f.registrar.GetStates(), meta) + inputs[i], err = input.New(pConfig, f.outlet, f.beatDone, f.registrar.GetStates(), meta) if err != nil { - logp.Err("Error creating prospector: %s", err) + logp.Err("Error creating input: %s", err) return nil, err } } - return &prospectorsRunner{ + return &inputsRunner{ id: id, moduleRegistry: m, - prospectors: prospectors, + inputs: inputs, pipelineLoaderFactory: f.pipelineLoaderFactory, }, nil } -func (p *prospectorsRunner) Start() { +func (p *inputsRunner) Start() { // Load pipelines if p.pipelineLoaderFactory != nil { // Load pipelines instantly and then setup a callback for reconnections: @@ -101,16 +101,16 @@ func (p *prospectorsRunner) Start() { elasticsearch.RegisterConnectCallback(callback) } - for _, prospector := range p.prospectors { - prospector.Start() + for _, input := range p.inputs { + input.Start() } } -func (p *prospectorsRunner) Stop() { - for _, prospector := range p.prospectors { - prospector.Stop() +func (p *inputsRunner) Stop() { + for _, input := range p.inputs { + input.Stop() } } -func (p *prospectorsRunner) String() string { +func (p *inputsRunner) String() string { return p.moduleRegistry.InfoString() } diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index 519ada7eeae1..8992b29a474d 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -84,6 +84,7 @@ type manifest struct { ModuleVersion string `config:"module_version"` Vars []map[string]interface{} `config:"var"` IngestPipeline string `config:"ingest_pipeline"` + Input string `config:"input"` Prospector string `config:"prospector"` MachineLearning []struct { Name string `config:"name"` @@ -96,6 +97,18 @@ type manifest struct { } `config:"requires"` } +func newManifest(cfg *common.Config) (*manifest, error) { + var manifest manifest + err := cfg.Unpack(&manifest) + if err != nil { + return nil, err + } + if manifest.Prospector != "" { + manifest.Input = manifest.Prospector + } + return &manifest, nil +} + // ProcessorRequirement represents the declaration of a dependency to a particular // Ingest Node processor / plugin. type ProcessorRequirement struct { @@ -109,12 +122,11 @@ func (fs *Fileset) readManifest() (*manifest, error) { if err != nil { return nil, fmt.Errorf("Error reading manifest file: %v", err) } - var manifest manifest - err = cfg.Unpack(&manifest) + manifest, err := newManifest(cfg) if err != nil { return nil, fmt.Errorf("Error unpacking manifest: %v", err) } - return &manifest, nil + return manifest, nil } // evaluateVars resolves the fileset variables. @@ -265,31 +277,31 @@ func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) { }, nil } -func (fs *Fileset) getProspectorConfig() (*common.Config, error) { - path, err := applyTemplate(fs.vars, fs.manifest.Prospector, false) +func (fs *Fileset) getInputConfig() (*common.Config, error) { + path, err := applyTemplate(fs.vars, fs.manifest.Input, false) if err != nil { - return nil, fmt.Errorf("Error expanding vars on the prospector path: %v", err) + return nil, fmt.Errorf("Error expanding vars on the input path: %v", err) } contents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path)) if err != nil { - return nil, fmt.Errorf("Error reading prospector file %s: %v", path, err) + return nil, fmt.Errorf("Error reading input file %s: %v", path, err) } yaml, err := applyTemplate(fs.vars, string(contents), false) if err != nil { - return nil, fmt.Errorf("Error interpreting the template of the prospector: %v", err) + return nil, fmt.Errorf("Error interpreting the template of the input: %v", err) } cfg, err := common.NewConfigWithYAML([]byte(yaml), "") if err != nil { - return nil, fmt.Errorf("Error reading prospector config: %v", err) + return nil, fmt.Errorf("Error reading input config: %v", err) } // overrides - if len(fs.fcfg.Prospector) > 0 { - overrides, err := common.NewConfigFrom(fs.fcfg.Prospector) + if len(fs.fcfg.Input) > 0 { + overrides, err := common.NewConfigFrom(fs.fcfg.Input) if err != nil { - return nil, fmt.Errorf("Error creating config from prospector overrides: %v", err) + return nil, fmt.Errorf("Error creating config from input overrides: %v", err) } cfg, err = common.MergeConfigs(cfg, overrides) if err != nil { @@ -300,20 +312,20 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) { // force our pipeline ID err = cfg.SetString("pipeline", -1, fs.pipelineID) if err != nil { - return nil, fmt.Errorf("Error setting the pipeline ID in the prospector config: %v", err) + return nil, fmt.Errorf("Error setting the pipeline ID in the input config: %v", err) } // force our the module/fileset name err = cfg.SetString("_module_name", -1, fs.mcfg.Module) if err != nil { - return nil, fmt.Errorf("Error setting the _module_name cfg in the prospector config: %v", err) + return nil, fmt.Errorf("Error setting the _module_name cfg in the input config: %v", err) } err = cfg.SetString("_fileset_name", -1, fs.name) if err != nil { - return nil, fmt.Errorf("Error setting the _fileset_name cfg in the prospector config: %v", err) + return nil, fmt.Errorf("Error setting the _fileset_name cfg in the input config: %v", err) } - cfg.PrintDebugf("Merged prospector config for fileset %s/%s", fs.mcfg.Module, fs.name) + cfg.PrintDebugf("Merged input config for fileset %s/%s", fs.mcfg.Module, fs.name) return cfg, nil } diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index da63567b127b..55355218d9d5 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -30,7 +30,7 @@ func TestLoadManifestNginx(t *testing.T) { assert.NoError(t, err) assert.Equal(t, manifest.ModuleVersion, "1.0") assert.Equal(t, manifest.IngestPipeline, "ingest/default.json") - assert.Equal(t, manifest.Prospector, "config/nginx-access.yml") + assert.Equal(t, manifest.Input, "config/nginx-access.yml") vars := manifest.Vars assert.Equal(t, "paths", vars[0]["name"]) @@ -145,11 +145,11 @@ func TestResolveVariable(t *testing.T) { } } -func TestGetProspectorConfigNginx(t *testing.T) { +func TestGetInputConfigNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") assert.NoError(t, fs.Read("5.2.0")) - cfg, err := fs.getProspectorConfig() + cfg, err := fs.getInputConfig() assert.NoError(t, err) assert.True(t, cfg.HasField("paths")) @@ -160,11 +160,11 @@ func TestGetProspectorConfigNginx(t *testing.T) { assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID) } -func TestGetProspectorConfigNginxOverrides(t *testing.T) { +func TestGetInputConfigNginxOverrides(t *testing.T) { modulesPath, err := filepath.Abs("../module") assert.NoError(t, err) fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{ - Prospector: map[string]interface{}{ + Input: map[string]interface{}{ "close_eof": true, }, }) @@ -172,7 +172,7 @@ func TestGetProspectorConfigNginxOverrides(t *testing.T) { assert.NoError(t, fs.Read("5.2.0")) - cfg, err := fs.getProspectorConfig() + cfg, err := fs.getInputConfig() assert.NoError(t, err) assert.True(t, cfg.HasField("paths")) diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 550a304265a9..0eea39306a8f 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -133,7 +133,7 @@ func mcfgFromConfig(cfg *common.Config) (*ModuleConfig, error) { err = cfg.Unpack(&dict) if err != nil { - return nil, fmt.Errorf("Error unpacking module %s in a dict: %v", mcfg.Module, err) + return nil, fmt.Errorf("error unpacking module %s in a dict: %v", mcfg.Module, err) } mcfg.Filesets = map[string]*FilesetConfig{} @@ -142,17 +142,16 @@ func mcfgFromConfig(cfg *common.Config) (*ModuleConfig, error) { continue } - var fcfg FilesetConfig tmpCfg, err := common.NewConfigFrom(filesetConfig) if err != nil { - return nil, fmt.Errorf("Error creating config from fileset %s/%s: %v", mcfg.Module, name, err) + return nil, fmt.Errorf("error creating config from fileset %s/%s: %v", mcfg.Module, name, err) } - err = tmpCfg.Unpack(&fcfg) + + fcfg, err := NewFilesetConfig(tmpCfg) if err != nil { - return nil, fmt.Errorf("Error unpacking fileset %s/%s: %v", mcfg.Module, name, err) + return nil, fmt.Errorf("error creating config from fileset %s/%s: %v", mcfg.Module, name, err) } - mcfg.Filesets[name] = &fcfg - + mcfg.Filesets[name] = fcfg } return &mcfg, nil @@ -204,13 +203,12 @@ func applyOverrides(fcfg *FilesetConfig, return nil, fmt.Errorf("Error merging configs: %v", err) } - var res FilesetConfig - err = resultConfig.Unpack(&res) + res, err := NewFilesetConfig(resultConfig) if err != nil { return nil, fmt.Errorf("Error unpacking configs: %v", err) } - return &res, nil + return res, nil } // appendWithoutDuplicates appends basic module configuration for each module in the @@ -238,11 +236,11 @@ func appendWithoutDuplicates(moduleConfigs []*ModuleConfig, modules []string) ([ return moduleConfigs, nil } -func (reg *ModuleRegistry) GetProspectorConfigs() ([]*common.Config, error) { +func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) { result := []*common.Config{} for module, filesets := range reg.registry { for name, fileset := range filesets { - fcfg, err := fileset.getProspectorConfig() + fcfg, err := fileset.getInputConfig() if err != nil { return result, fmt.Errorf("Error getting config for fielset %s/%s: %v", module, name, err) diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index feed6e6cb9f8..7be009065399 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -58,7 +58,7 @@ func TestNewModuleRegistry(t *testing.T) { for module, filesets := range reg.registry { for name, fileset := range filesets { - cfg, err := fileset.getProspectorConfig() + cfg, err := fileset.getInputConfig() assert.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module, name)) moduleName, err := cfg.String("_module_name", -1) @@ -181,11 +181,32 @@ func TestApplyOverrides(t *testing.T) { }, }, expected: FilesetConfig{ + Input: map[string]interface{}{ + "close_eof": true, + }, Prospector: map[string]interface{}{ "close_eof": true, }, }, }, + { + name: "input overrides", + fcfg: FilesetConfig{}, + module: "nginx", + fileset: "access", + overrides: &ModuleOverrides{ + "nginx": map[string]*common.Config{ + "access": load(t, map[string]interface{}{ + "input.close_eof": true, + }), + }, + }, + expected: FilesetConfig{ + Input: map[string]interface{}{ + "close_eof": true, + }, + }, + }, } for _, test := range tests { @@ -351,9 +372,9 @@ func TestMissingModuleFolder(t *testing.T) { assert.NotNil(t, reg) // this should return an empty list, but no error - prospectors, err := reg.GetProspectorConfigs() + inputs, err := reg.GetInputConfigs() assert.NoError(t, err) - assert.Equal(t, 0, len(prospectors)) + assert.Equal(t, 0, len(inputs)) } func TestInterpretError(t *testing.T) { diff --git a/filebeat/harvester/forwarder.go b/filebeat/harvester/forwarder.go index 3bc813f989a8..d89812b0606f 100644 --- a/filebeat/harvester/forwarder.go +++ b/filebeat/harvester/forwarder.go @@ -27,13 +27,13 @@ func NewForwarder(outlet Outlet) *Forwarder { return &Forwarder{Outlet: outlet} } -// Send updates the prospector state and sends the event to the spooler -// All state updates done by the prospector itself are synchronous to make sure no states are overwritten +// Send updates the input state and sends the event to the spooler +// All state updates done by the input itself are synchronous to make sure no states are overwritten func (f *Forwarder) Send(data *util.Data) error { ok := f.Outlet.OnEvent(data) if !ok { - logp.Info("Prospector outlet closed") - return errors.New("prospector outlet closed") + logp.Info("Input outlet closed") + return errors.New("input outlet closed") } return nil diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 50d06169cf7f..6931b467e5ca 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -5,7 +5,7 @@ import ( ) // Harvester contains all methods which must be supported by each harvester -// so the registry can be used by the prospector. +// so the registry can be used by the input type Harvester interface { ID() uuid.UUID Run() error diff --git a/filebeat/harvester/registry.go b/filebeat/harvester/registry.go index 9b78ee0bf15a..5d0bdce0823e 100644 --- a/filebeat/harvester/registry.go +++ b/filebeat/harvester/registry.go @@ -80,7 +80,7 @@ func (r *Registry) Start(h Harvester) error { // Starts harvester and picks the right type. In case type is not set, set it to default (log) err := h.Run() if err != nil { - logp.Err("Error running prospector: %v", err) + logp.Err("Error running input: %v", err) } }() return nil diff --git a/filebeat/harvester/util.go b/filebeat/harvester/util.go index 640d5bb2af67..c144f2d3f6bf 100644 --- a/filebeat/harvester/util.go +++ b/filebeat/harvester/util.go @@ -2,7 +2,7 @@ package harvester import "github.com/elastic/beats/libbeat/common/match" -// Contains available prospector types +// Contains available input types const ( LogType = "log" StdinType = "stdin" diff --git a/filebeat/include/list.go b/filebeat/include/list.go index 3ac8060e384f..e1aae4e763ac 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -1,16 +1,16 @@ /* -Package include imports all prospector packages so that they register +Package include imports all input packages so that they register their factories with the global registry. This package can be imported in the -main package to automatically register all of the standard supported prospectors +main package to automatically register all of the standard supported inputs modules. */ package include import ( // This list is automatically generated by `make imports` - _ "github.com/elastic/beats/filebeat/prospector/docker" - _ "github.com/elastic/beats/filebeat/prospector/log" - _ "github.com/elastic/beats/filebeat/prospector/redis" - _ "github.com/elastic/beats/filebeat/prospector/stdin" - _ "github.com/elastic/beats/filebeat/prospector/udp" + _ "github.com/elastic/beats/filebeat/input/docker" + _ "github.com/elastic/beats/filebeat/input/log" + _ "github.com/elastic/beats/filebeat/input/redis" + _ "github.com/elastic/beats/filebeat/input/stdin" + _ "github.com/elastic/beats/filebeat/input/udp" ) diff --git a/filebeat/prospector/config.go b/filebeat/input/config.go similarity index 66% rename from filebeat/prospector/config.go rename to filebeat/input/config.go index e4eadc2da5a2..6f64678b84a0 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/input/config.go @@ -1,4 +1,4 @@ -package prospector +package input import ( "time" @@ -8,21 +8,21 @@ import ( ) var ( - defaultConfig = prospectorConfig{ + defaultConfig = inputConfig{ ScanFrequency: 10 * time.Second, Type: cfg.DefaultType, } ) -type prospectorConfig struct { +type inputConfig struct { ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` Type string `config:"type"` InputType string `config:"input_type"` } -func (c *prospectorConfig) Validate() error { +func (c *inputConfig) Validate() error { if c.InputType != "" { - cfgwarn.Deprecate("6.0.0", "input_type prospector config is deprecated. Use type instead.") + cfgwarn.Deprecate("6.0.0", "input_type input config is deprecated. Use type instead.") c.Type = c.InputType } return nil diff --git a/filebeat/prospector/docker/config.go b/filebeat/input/docker/config.go similarity index 100% rename from filebeat/prospector/docker/config.go rename to filebeat/input/docker/config.go diff --git a/filebeat/prospector/docker/prospector.go b/filebeat/input/docker/prospector.go similarity index 96% rename from filebeat/prospector/docker/prospector.go rename to filebeat/input/docker/prospector.go index 49b451d86f02..76a3b4d0c7da 100644 --- a/filebeat/prospector/docker/prospector.go +++ b/filebeat/input/docker/prospector.go @@ -5,8 +5,8 @@ import ( "path" "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/input/log" "github.com/elastic/beats/filebeat/prospector" - "github.com/elastic/beats/filebeat/prospector/log" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 06dcd323812d..0b7b8cd53c1b 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -79,7 +79,7 @@ func (s *States) Update(newState State) { } else { // No existing state found, add new one s.states = append(s.states, newState) - logp.Debug("prospector", "New state added for %s", newState.Source) + logp.Debug("input", "New state added for %s", newState.Source) } } diff --git a/filebeat/input/input.go b/filebeat/input/input.go new file mode 100644 index 000000000000..89c882a82444 --- /dev/null +++ b/filebeat/input/input.go @@ -0,0 +1,150 @@ +package input + +import ( + "fmt" + "sync" + "time" + + "github.com/mitchellh/hashstructure" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +// Input is the interface common to all input +type Input interface { + Run() + Stop() + Wait() +} + +// Runner encapsulate the lifecycle of the input +type Runner struct { + config inputConfig + input Input + done chan struct{} + wg *sync.WaitGroup + ID uint64 + Once bool + beatDone chan struct{} +} + +// New instantiates a new Runner +func New( + conf *common.Config, + outlet channel.Factory, + beatDone chan struct{}, + states []file.State, + dynFields *common.MapStrPointer, +) (*Runner, error) { + input := &Runner{ + config: defaultConfig, + wg: &sync.WaitGroup{}, + done: make(chan struct{}), + Once: false, + beatDone: beatDone, + } + + var err error + if err = conf.Unpack(&input.config); err != nil { + return nil, err + } + + var h map[string]interface{} + conf.Unpack(&h) + input.ID, err = hashstructure.Hash(h, nil) + if err != nil { + return nil, err + } + + var f Factory + f, err = GetFactory(input.config.Type) + if err != nil { + return input, err + } + + context := Context{ + States: states, + Done: input.done, + BeatDone: input.beatDone, + DynamicFields: dynFields, + } + var ipt Input + ipt, err = f(conf, outlet, context) + if err != nil { + return input, err + } + input.input = ipt + + return input, nil +} + +// Start starts the input +func (p *Runner) Start() { + p.wg.Add(1) + logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID) + + onceWg := sync.WaitGroup{} + if p.Once { + // Make sure start is only completed when Run did a complete first scan + defer onceWg.Wait() + } + + onceWg.Add(1) + // Add waitgroup to make sure input is finished + go func() { + defer func() { + onceWg.Done() + p.stop() + p.wg.Done() + }() + + p.Run() + }() +} + +// Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file +func (p *Runner) Run() { + // Initial input run + p.input.Run() + + // Shuts down after the first complete run of all input + if p.Once { + return + } + + for { + select { + case <-p.done: + logp.Info("input ticker stopped") + return + case <-time.After(p.config.ScanFrequency): + logp.Debug("input", "Run input") + p.input.Run() + } + } +} + +// Stop stops the input and with it all harvesters +func (p *Runner) Stop() { + // Stop scanning and wait for completion + close(p.done) + p.wg.Wait() +} + +func (p *Runner) stop() { + logp.Info("Stopping Input: %d", p.ID) + + // In case of once, it will be waited until harvesters close itself + if p.Once { + p.input.Wait() + } else { + p.input.Stop() + } +} + +func (p *Runner) String() string { + return fmt.Sprintf("input [type=%s, ID=%d]", p.config.Type, p.ID) +} diff --git a/filebeat/prospector/prospector_test.go b/filebeat/input/input_test.go similarity index 55% rename from filebeat/prospector/prospector_test.go rename to filebeat/input/input_test.go index c28447a115c6..db0f75623b60 100644 --- a/filebeat/prospector/prospector_test.go +++ b/filebeat/input/input_test.go @@ -1,3 +1,3 @@ // +build !integration -package prospector +package input diff --git a/filebeat/prospector/log/config.go b/filebeat/input/log/config.go similarity index 100% rename from filebeat/prospector/log/config.go rename to filebeat/input/log/config.go diff --git a/filebeat/prospector/log/config_test.go b/filebeat/input/log/config_test.go similarity index 100% rename from filebeat/prospector/log/config_test.go rename to filebeat/input/log/config_test.go diff --git a/filebeat/prospector/log/file.go b/filebeat/input/log/file.go similarity index 100% rename from filebeat/prospector/log/file.go rename to filebeat/input/log/file.go diff --git a/filebeat/prospector/log/harvester.go b/filebeat/input/log/harvester.go similarity index 100% rename from filebeat/prospector/log/harvester.go rename to filebeat/input/log/harvester.go diff --git a/filebeat/prospector/log/harvester_test.go b/filebeat/input/log/harvester_test.go similarity index 100% rename from filebeat/prospector/log/harvester_test.go rename to filebeat/input/log/harvester_test.go diff --git a/filebeat/prospector/log/log.go b/filebeat/input/log/log.go similarity index 100% rename from filebeat/prospector/log/log.go rename to filebeat/input/log/log.go diff --git a/filebeat/prospector/log/prospector.go b/filebeat/input/log/prospector.go similarity index 100% rename from filebeat/prospector/log/prospector.go rename to filebeat/input/log/prospector.go diff --git a/filebeat/prospector/log/prospector_other_test.go b/filebeat/input/log/prospector_other_test.go similarity index 100% rename from filebeat/prospector/log/prospector_other_test.go rename to filebeat/input/log/prospector_other_test.go diff --git a/filebeat/prospector/log/prospector_test.go b/filebeat/input/log/prospector_test.go similarity index 100% rename from filebeat/prospector/log/prospector_test.go rename to filebeat/input/log/prospector_test.go diff --git a/filebeat/prospector/log/prospector_windows_test.go b/filebeat/input/log/prospector_windows_test.go similarity index 100% rename from filebeat/prospector/log/prospector_windows_test.go rename to filebeat/input/log/prospector_windows_test.go diff --git a/filebeat/prospector/log/stdin.go b/filebeat/input/log/stdin.go similarity index 100% rename from filebeat/prospector/log/stdin.go rename to filebeat/input/log/stdin.go diff --git a/filebeat/prospector/plugin.go b/filebeat/input/plugin.go similarity index 59% rename from filebeat/prospector/plugin.go rename to filebeat/input/plugin.go index 0467736026d8..251c8baacf09 100644 --- a/filebeat/prospector/plugin.go +++ b/filebeat/input/plugin.go @@ -1,4 +1,4 @@ -package prospector +package input import ( "errors" @@ -6,18 +6,18 @@ import ( "github.com/elastic/beats/libbeat/plugin" ) -type prospectorPlugin struct { +type inputPlugin struct { name string factory Factory } -const pluginKey = "filebeat.prospector" +const pluginKey = "filebeat.input" func init() { plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error { - p, ok := ifc.(prospectorPlugin) + p, ok := ifc.(inputPlugin) if !ok { - return errors.New("plugin does not match filebeat prospector plugin type") + return errors.New("plugin does not match filebeat input plugin type") } if p.factory != nil { @@ -34,5 +34,5 @@ func Plugin( module string, factory Factory, ) map[string][]interface{} { - return plugin.MakePlugin(pluginKey, prospectorPlugin{module, factory}) + return plugin.MakePlugin(pluginKey, inputPlugin{module, factory}) } diff --git a/filebeat/prospector/redis/_meta/Dockerfile b/filebeat/input/redis/_meta/Dockerfile similarity index 100% rename from filebeat/prospector/redis/_meta/Dockerfile rename to filebeat/input/redis/_meta/Dockerfile diff --git a/filebeat/prospector/redis/_meta/env b/filebeat/input/redis/_meta/env similarity index 100% rename from filebeat/prospector/redis/_meta/env rename to filebeat/input/redis/_meta/env diff --git a/filebeat/prospector/redis/config.go b/filebeat/input/redis/config.go similarity index 100% rename from filebeat/prospector/redis/config.go rename to filebeat/input/redis/config.go diff --git a/filebeat/prospector/redis/doc.go b/filebeat/input/redis/doc.go similarity index 100% rename from filebeat/prospector/redis/doc.go rename to filebeat/input/redis/doc.go diff --git a/filebeat/prospector/redis/harvester.go b/filebeat/input/redis/harvester.go similarity index 100% rename from filebeat/prospector/redis/harvester.go rename to filebeat/input/redis/harvester.go diff --git a/filebeat/prospector/redis/prospector.go b/filebeat/input/redis/prospector.go similarity index 100% rename from filebeat/prospector/redis/prospector.go rename to filebeat/input/redis/prospector.go diff --git a/filebeat/prospector/registry.go b/filebeat/input/registry.go similarity index 55% rename from filebeat/prospector/registry.go rename to filebeat/input/registry.go index e11928852bb3..a94391489fbe 100644 --- a/filebeat/prospector/registry.go +++ b/filebeat/input/registry.go @@ -1,4 +1,4 @@ -package prospector +package input import ( "fmt" @@ -16,31 +16,31 @@ type Context struct { DynamicFields *common.MapStrPointer } -type Factory func(config *common.Config, outletFactory channel.Factory, context Context) (Prospectorer, error) +type Factory = func(config *common.Config, outletFactory channel.Factory, context Context) (Input, error) var registry = make(map[string]Factory) func Register(name string, factory Factory) error { - logp.Info("Registering prospector factory") + logp.Info("Registering input factory") if name == "" { - return fmt.Errorf("Error registering prospector: name cannot be empty") + return fmt.Errorf("Error registering input: name cannot be empty") } if factory == nil { - return fmt.Errorf("Error registering prospector '%v': factory cannot be empty", name) + return fmt.Errorf("Error registering input '%v': factory cannot be empty", name) } if _, exists := registry[name]; exists { - return fmt.Errorf("Error registering prospector '%v': already registered", name) + return fmt.Errorf("Error registering input '%v': already registered", name) } registry[name] = factory - logp.Info("Successfully registered prospector") + logp.Info("Successfully registered input") return nil } func GetFactory(name string) (Factory, error) { if _, exists := registry[name]; !exists { - return nil, fmt.Errorf("Error creating prospector. No such prospector type exist: '%v'", name) + return nil, fmt.Errorf("Error creating input. No such input type exist: '%v'", name) } return registry[name], nil } diff --git a/filebeat/prospector/registry_test.go b/filebeat/input/registry_test.go similarity index 66% rename from filebeat/prospector/registry_test.go rename to filebeat/input/registry_test.go index 52179b5e5b69..a1247f72b23a 100644 --- a/filebeat/prospector/registry_test.go +++ b/filebeat/input/registry_test.go @@ -1,4 +1,4 @@ -package prospector +package input import ( "testing" @@ -9,21 +9,21 @@ import ( "github.com/elastic/beats/libbeat/common" ) -var fakeFactory = func(config *common.Config, outletFactory channel.Factory, context Context) (Prospectorer, error) { +var fakeFactory = func(config *common.Config, outletFactory channel.Factory, context Context) (Input, error) { return nil, nil } func TestAddFactoryEmptyName(t *testing.T) { err := Register("", nil) if assert.Error(t, err) { - assert.Equal(t, "Error registering prospector: name cannot be empty", err.Error()) + assert.Equal(t, "Error registering input: name cannot be empty", err.Error()) } } func TestAddNilFactory(t *testing.T) { err := Register("name", nil) if assert.Error(t, err) { - assert.Equal(t, "Error registering prospector 'name': factory cannot be empty", err.Error()) + assert.Equal(t, "Error registering input 'name': factory cannot be empty", err.Error()) } } @@ -36,7 +36,7 @@ func TestAddFactoryTwice(t *testing.T) { err = Register("name", fakeFactory) if assert.Error(t, err) { - assert.Equal(t, "Error registering prospector 'name': already registered", err.Error()) + assert.Equal(t, "Error registering input 'name': already registered", err.Error()) } } @@ -52,6 +52,6 @@ func TestGetNonExistentFactory(t *testing.T) { f, err := GetFactory("noSuchFactory") assert.Nil(t, f) if assert.Error(t, err) { - assert.Equal(t, "Error creating prospector. No such prospector type exist: 'noSuchFactory'", err.Error()) + assert.Equal(t, "Error creating input. No such input type exist: 'noSuchFactory'", err.Error()) } } diff --git a/filebeat/prospector/runnerfactory.go b/filebeat/input/runnerfactory.go similarity index 85% rename from filebeat/prospector/runnerfactory.go rename to filebeat/input/runnerfactory.go index 641de1facc67..ae557f8ecb0b 100644 --- a/filebeat/prospector/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -1,4 +1,4 @@ -package prospector +package input import ( "github.com/elastic/beats/filebeat/channel" @@ -23,11 +23,11 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be } } -// Create creates a prospector based on a config +// Create creates a input based on a config func (r *RunnerFactory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { p, err := New(c, r.outlet, r.beatDone, r.registrar.GetStates(), meta) if err != nil { - // In case of error with loading state, prospector is still returned + // In case of error with loading state, input is still returned return p, err } diff --git a/filebeat/prospector/stdin/prospector.go b/filebeat/input/stdin/prospector.go similarity index 97% rename from filebeat/prospector/stdin/prospector.go rename to filebeat/input/stdin/prospector.go index a1e81ef52bee..781e5582af41 100644 --- a/filebeat/prospector/stdin/prospector.go +++ b/filebeat/input/stdin/prospector.go @@ -6,8 +6,8 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/filebeat/input/log" "github.com/elastic/beats/filebeat/prospector" - "github.com/elastic/beats/filebeat/prospector/log" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) diff --git a/filebeat/prospector/udp/config.go b/filebeat/input/udp/config.go similarity index 100% rename from filebeat/prospector/udp/config.go rename to filebeat/input/udp/config.go diff --git a/filebeat/prospector/udp/harvester.go b/filebeat/input/udp/harvester.go similarity index 100% rename from filebeat/prospector/udp/harvester.go rename to filebeat/input/udp/harvester.go diff --git a/filebeat/prospector/udp/prospector.go b/filebeat/input/udp/prospector.go similarity index 100% rename from filebeat/prospector/udp/prospector.go rename to filebeat/input/udp/prospector.go diff --git a/filebeat/main.go b/filebeat/main.go index c7b9961982c7..a40fadac8135 100644 --- a/filebeat/main.go +++ b/filebeat/main.go @@ -7,12 +7,12 @@ import ( ) // The basic model of execution: -// - prospector: finds files in paths/globs to harvest, starts harvesters +// - input: finds files in paths/globs to harvest, starts harvesters // - harvester: reads a file, sends events to the spooler // - spooler: buffers events until ready to flush to the publisher // - publisher: writes to the network, notifies registrar // - registrar: records positions of files read -// Finally, prospector uses the registrar information, on restart, to +// Finally, input uses the registrar information, on restart, to // determine where in each file to restart a harvester. func main() { if err := cmd.RootCmd.Execute(); err != nil { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index f2b9a8673d90..c6e4ca2c760c 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -1,150 +1,37 @@ +// Package prospector allows to define new way of reading data in Filebeat +// Deprecated: See the input package package prospector -import ( - "fmt" - "sync" - "time" +import "github.com/elastic/beats/filebeat/input" - "github.com/mitchellh/hashstructure" +// Prospectorer defines how to read new data +// Deprecated: See input.input +type Prospectorer = input.Input - "github.com/elastic/beats/filebeat/channel" - "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) +// Runner encapsulate the lifecycle of a prospectorer +// Deprecated: See input.Runner +type Runner = input.Runner -// Prospectorer is the interface common to all prospectors -type Prospectorer interface { - Run() - Stop() - Wait() -} +// Context wrapper for backward compatibility +// Deprecated: See input.Context +type Context = input.Context -// Prospector contains the prospector -type Prospector struct { - config prospectorConfig - prospectorer Prospectorer - done chan struct{} - wg *sync.WaitGroup - ID uint64 - Once bool - beatDone chan struct{} -} +// Factory wrapper for backward compatibility +// Deprecated: See input.Factory +type Factory = input.Factory -// NewProspector instantiates a new prospector -func New( - conf *common.Config, - outlet channel.Factory, - beatDone chan struct{}, - states []file.State, - dynFields *common.MapStrPointer, -) (*Prospector, error) { - prospector := &Prospector{ - config: defaultConfig, - wg: &sync.WaitGroup{}, - done: make(chan struct{}), - Once: false, - beatDone: beatDone, - } +// Register wrapper for backward compatibility +// Deprecated: See input.Register +var Register = input.Register - var err error - if err = conf.Unpack(&prospector.config); err != nil { - return nil, err - } +// GetFactory wrapper for backward compatibility +// Deprecated: See input.GetFactory +var GetFactory = input.GetFactory - var h map[string]interface{} - conf.Unpack(&h) - prospector.ID, err = hashstructure.Hash(h, nil) - if err != nil { - return nil, err - } +// New wrapper for backward compatibility +// Deprecated: see input.New +var New = input.New - var f Factory - f, err = GetFactory(prospector.config.Type) - if err != nil { - return prospector, err - } - - context := Context{ - States: states, - Done: prospector.done, - BeatDone: prospector.beatDone, - DynamicFields: dynFields, - } - var prospectorer Prospectorer - prospectorer, err = f(conf, outlet, context) - if err != nil { - return prospector, err - } - prospector.prospectorer = prospectorer - - return prospector, nil -} - -// Start starts the prospector -func (p *Prospector) Start() { - p.wg.Add(1) - logp.Debug("prospector", "Starting prospector of type: %v; ID: %d", p.config.Type, p.ID) - - onceWg := sync.WaitGroup{} - if p.Once { - // Make sure start is only completed when Run did a complete first scan - defer onceWg.Wait() - } - - onceWg.Add(1) - // Add waitgroup to make sure prospectors finished - go func() { - defer func() { - onceWg.Done() - p.stop() - p.wg.Done() - }() - - p.Run() - }() -} - -// Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file -func (p *Prospector) Run() { - // Initial prospector run - p.prospectorer.Run() - - // Shuts down after the first complete run of all prospectors - if p.Once { - return - } - - for { - select { - case <-p.done: - logp.Info("Prospector ticker stopped") - return - case <-time.After(p.config.ScanFrequency): - logp.Debug("prospector", "Run prospector") - p.prospectorer.Run() - } - } -} - -// Stop stops the prospector and with it all harvesters -func (p *Prospector) Stop() { - // Stop scanning and wait for completion - close(p.done) - p.wg.Wait() -} - -func (p *Prospector) stop() { - logp.Info("Stopping Prospector: %d", p.ID) - - // In case of once, it will be waited until harvesters close itself - if p.Once { - p.prospectorer.Wait() - } else { - p.prospectorer.Stop() - } -} - -func (p *Prospector) String() string { - return fmt.Sprintf("prospector [type=%s, ID=%d]", p.config.Type, p.ID) -} +// NewRunnerFactory wrapper for backward compatibility +// Deprecated: see input.NewRunnerFactory +var NewRunnerFactory = input.NewRunnerFactory diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 0dc312ddf57c..1be9d0e79f73 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -122,11 +122,11 @@ func (r *Registrar) loadStates() error { } // resetStates sets all states to finished and disable TTL on restart -// For all states covered by a prospector, TTL will be overwritten with the prospector value +// For all states covered by an input, TTL will be overwritten with the input value func resetStates(states []file.State) []file.State { for key, state := range states { state.Finished = true - // Set ttl to -2 to easily spot which states are not managed by a prospector + // Set ttl to -2 to easily spot which states are not managed by a input state.TTL = -2 states[key] = state } @@ -134,7 +134,7 @@ func resetStates(states []file.State) []file.State { } func (r *Registrar) Start() error { - // Load the previous log file locations now, for use in prospector + // Load the previous log file locations now, for use in input err := r.loadStates() if err != nil { return fmt.Errorf("Error loading state: %v", err) diff --git a/filebeat/scripts/generate_imports_helper.py b/filebeat/scripts/generate_imports_helper.py index 7104812886bb..5e8fcc0df9f6 100644 --- a/filebeat/scripts/generate_imports_helper.py +++ b/filebeat/scripts/generate_imports_helper.py @@ -2,19 +2,21 @@ from os import listdir -comment = """Package include imports all prospector packages so that they register +comment = """Package include imports all input packages so that they register their factories with the global registry. This package can be imported in the -main package to automatically register all of the standard supported prospectors +main package to automatically register all of the standard supported inputs modules.""" def get_importable_lines(go_beat_path, import_line): - path = abspath("prospector") + path = abspath("input") imported_prospector_lines = [] - prospectors = [p for p in listdir(path) if isdir(join(path, p))] + + # Skip the file folder, its not an input but I will do the move with another PR + prospectors = [p for p in listdir(path) if isdir(join(path, p)) and p.find("file") is -1] for prospector in sorted(prospectors): - prospector_import = import_line.format(beat_path=go_beat_path, module="prospector", name=prospector) + prospector_import = import_line.format(beat_path=go_beat_path, module="input", name=prospector) imported_prospector_lines.append(prospector_import) return imported_prospector_lines diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 4c1e9e7b36dc..a95f3a8be804 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -1,6 +1,6 @@ ###################### Filebeat Config Template ############################### -filebeat.prospectors: +filebeat.{{input_config | default("prospectors")}}: {% if prospectors is not defined %} {% set prospectors = true %} {% endif %} diff --git a/filebeat/tests/system/test_autodiscover.py b/filebeat/tests/system/test_autodiscover.py index df8fc479b8cf..930a9f24a6e1 100644 --- a/filebeat/tests/system/test_autodiscover.py +++ b/filebeat/tests/system/test_autodiscover.py @@ -14,7 +14,7 @@ class TestAutodiscover(filebeat.BaseTest): "integration test not available on 2.x") def test_docker(self): """ - Test docker autodiscover starts prospector + Test docker autodiscover starts input """ import docker docker_client = docker.from_env() @@ -42,8 +42,8 @@ def test_docker(self): docker_client.images.pull('busybox') docker_client.containers.run('busybox', 'sleep 1') - self.wait_until(lambda: self.log_contains('Autodiscover starting runner: prospector')) - self.wait_until(lambda: self.log_contains('Autodiscover stopping runner: prospector')) + self.wait_until(lambda: self.log_contains('Autodiscover starting runner: input')) + self.wait_until(lambda: self.log_contains('Autodiscover stopping runner: input')) output = self.read_output_json() proc.check_kill_and_wait() diff --git a/filebeat/tests/system/test_deprecated.py b/filebeat/tests/system/test_deprecated.py index 0dd25fba926c..3c4b1652334f 100644 --- a/filebeat/tests/system/test_deprecated.py +++ b/filebeat/tests/system/test_deprecated.py @@ -2,8 +2,6 @@ from filebeat import BaseTest import os -import codecs -import time """ Test Harvesters @@ -37,4 +35,31 @@ def test_input_type_deprecated(self): filebeat.check_kill_and_wait() - assert self.log_contains("DEPRECATED: input_type prospector config is deprecated") + assert self.log_contains("DEPRECATED: input_type input config is deprecated") + + def test_prospectors_deprecated(self): + """ + Checks that harvesting works with deprecated prospectors but a deprecation warning is printed. + """ + + self.render_config_template( + input_config="prospectors", + path=os.path.abspath(self.working_dir) + "/log/test.log", + scan_frequency="0.1s" + ) + os.mkdir(self.working_dir + "/log/") + + logfile = self.working_dir + "/log/test.log" + + with open(logfile, 'w') as f: + f.write("Hello world\n") + + filebeat = self.start_beat() + + # Let it read the file + self.wait_until( + lambda: self.output_has(lines=1), max_timeout=10) + + filebeat.check_kill_and_wait() + + assert self.log_contains("DEPRECATED: prospectors are deprecated, Use `inputs` instead.") diff --git a/filebeat/tests/system/test_load.py b/filebeat/tests/system/test_load.py index 56dd81866ba1..bc33dbdae070 100644 --- a/filebeat/tests/system/test_load.py +++ b/filebeat/tests/system/test_load.py @@ -56,7 +56,7 @@ def test_no_missing_events(self): # wait until filebeat is fully running self.wait_until( lambda: self.log_contains( - "Loading and starting Prospectors completed."), + "Loading and starting Inputs completed."), max_timeout=15) # Start logging and rotating diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index ab057aff7c3b..a22151adcb0c 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -281,7 +281,7 @@ def test_shutdown_no_prospectors(self): self.wait_until( lambda: self.log_contains( - "No modules or prospectors enabled"), + "no modules or inputs enabled"), max_timeout=10) filebeat.check_wait(exit_code=1) diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index eac4888cdd8a..531767aae3e7 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -1034,7 +1034,7 @@ def test_restart_state_reset(self): # Wait until prospectors are started self.wait_until( lambda: self.log_contains_count( - "Starting prospector of type: log", logfile="filebeat2.log") >= 1, + "Starting input of type: log", logfile="filebeat2.log") >= 1, max_timeout=10) filebeat.check_kill_and_wait()