Skip to content

Commit

Permalink
Migrate: UDP prospector the input interface (#6118)
Browse files Browse the repository at this point in the history
  • Loading branch information
ph authored and ruflin committed Feb 1, 2018
1 parent c42f32c commit 67ab782
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
folder, to maintain backward compatibility type aliasing was used to map the old type to the new
one. This change also affect YAML configuration. {pull}6078[6078]
- Refactor the Redis prospector to use the input interface {pull}6116[6116]
- Refactor the UDP prospector to use the input interface {pull}6118[6118]

*Heartbeat*

Expand Down
31 changes: 20 additions & 11 deletions filebeat/input/udp/prospector.go → filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,51 @@ package udp
import (
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
)

func init() {
err := prospector.Register("udp", NewProspector)
err := input.Register("udp", NewInput)
if err != nil {
panic(err)
}
}

type Prospector struct {
// Input define a udp input
type Input struct {
harvester *Harvester
started bool
outlet channel.Outleter
}

func NewProspector(cfg *common.Config, outlet channel.Factory, context prospector.Context) (prospector.Prospectorer, error) {
cfgwarn.Experimental("UDP prospector type is used")
// NewInput creates a new udp input
func NewInput(
cfg *common.Config,
outlet channel.Factory,
context input.Context,
) (input.Input, error) {
cfgwarn.Experimental("UDP input type is used")

out, err := outlet(cfg, context.DynamicFields)
if err != nil {
return nil, err
}

forwarder := harvester.NewForwarder(out)
return &Prospector{
return &Input{
outlet: out,
harvester: NewHarvester(forwarder, cfg),
started: false,
}, nil
}

func (p *Prospector) Run() {
// Run starts and execute the UDP server.
func (p *Input) Run() {
if !p.started {
logp.Info("Starting udp prospector")
logp.Info("Starting udp input")
p.started = true
go func() {
defer p.outlet.Close()
Expand All @@ -52,11 +59,13 @@ func (p *Prospector) Run() {
}
}

func (p *Prospector) Stop() {
logp.Info("Stopping udp prospector")
// Stop stops the UDP input
func (p *Input) Stop() {
logp.Info("stopping UDP input")
p.harvester.Stop()
}

func (p *Prospector) Wait() {
// Wait suspends the UDP input
func (p *Input) Wait() {
p.Stop()
}
1 change: 1 addition & 0 deletions filebeat/tests/system/test_udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ def test_udp(self):

assert len(output) == 2
assert output[0]["prospector.type"] == "udp"
assert output[0]["input.type"] == "udp"

0 comments on commit 67ab782

Please sign in to comment.