diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 0ea7e86aca5..4081a6e5dec 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di folder, to maintain backward compatibility type aliasing was used to map the old type to the new one. This change also affect YAML configuration. {pull}6078[6078] - Refactor the Redis prospector to use the input interface {pull}6116[6116] +- Refactor the UDP prospector to use the input interface {pull}6118[6118] *Heartbeat* diff --git a/filebeat/input/udp/prospector.go b/filebeat/input/udp/input.go similarity index 58% rename from filebeat/input/udp/prospector.go rename to filebeat/input/udp/input.go index acf2d1b118d..5a5c7fdf5cc 100644 --- a/filebeat/input/udp/prospector.go +++ b/filebeat/input/udp/input.go @@ -3,27 +3,33 @@ package udp import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" - "github.com/elastic/beats/filebeat/prospector" + "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" ) func init() { - err := prospector.Register("udp", NewProspector) + err := input.Register("udp", NewInput) if err != nil { panic(err) } } -type Prospector struct { +// Input define a udp input +type Input struct { harvester *Harvester started bool outlet channel.Outleter } -func NewProspector(cfg *common.Config, outlet channel.Factory, context prospector.Context) (prospector.Prospectorer, error) { - cfgwarn.Experimental("UDP prospector type is used") +// NewInput creates a new udp input +func NewInput( + cfg *common.Config, + outlet channel.Factory, + context input.Context, +) (input.Input, error) { + cfgwarn.Experimental("UDP input type is used") out, err := outlet(cfg, context.DynamicFields) if err != nil { @@ -31,16 +37,17 @@ func NewProspector(cfg *common.Config, outlet channel.Factory, context prospecto } forwarder := harvester.NewForwarder(out) - return &Prospector{ + return &Input{ outlet: out, harvester: NewHarvester(forwarder, cfg), started: false, }, nil } -func (p *Prospector) Run() { +// Run starts and execute the UDP server. +func (p *Input) Run() { if !p.started { - logp.Info("Starting udp prospector") + logp.Info("Starting udp input") p.started = true go func() { defer p.outlet.Close() @@ -52,11 +59,13 @@ func (p *Prospector) Run() { } } -func (p *Prospector) Stop() { - logp.Info("Stopping udp prospector") +// Stop stops the UDP input +func (p *Input) Stop() { + logp.Info("stopping UDP input") p.harvester.Stop() } -func (p *Prospector) Wait() { +// Wait suspends the UDP input +func (p *Input) Wait() { p.Stop() } diff --git a/filebeat/tests/system/test_udp.py b/filebeat/tests/system/test_udp.py index 2efddfabd0b..88ac6304186 100644 --- a/filebeat/tests/system/test_udp.py +++ b/filebeat/tests/system/test_udp.py @@ -39,3 +39,4 @@ def test_udp(self): assert len(output) == 2 assert output[0]["prospector.type"] == "udp" + assert output[0]["input.type"] == "udp"