Skip to content

Commit

Permalink
Migrate: STDIN prospector to the input interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ph committed Feb 2, 2018
1 parent c42f32c commit bedda18
Showing 1 changed file with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,38 @@ import (

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/input/log"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

func init() {
err := prospector.Register("stdin", NewProspector)
err := input.Register("stdin", NewInput)
if err != nil {
panic(err)
}
}

// Prospector is a prospector for stdin
type Prospector struct {
// Input is an input for stdin
type Input struct {
harvester *log.Harvester
started bool
cfg *common.Config
outlet channel.Outleter
registry *harvester.Registry
}

// NewStdin creates a new stdin prospector
// This prospector contains one harvester which is reading from stdin
func NewProspector(cfg *common.Config, outlet channel.Factory, context prospector.Context) (prospector.Prospectorer, error) {
// NewInput creates a new stdin input
// This input contains one harvester which is reading from stdin
func NewInput(cfg *common.Config, outlet channel.Factory, context input.Context) (input.Input, error) {
out, err := outlet(cfg, context.DynamicFields)
if err != nil {
return nil, err
}

p := &Prospector{
p := &Input{
started: false,
cfg: cfg,
outlet: out,
Expand All @@ -51,8 +51,8 @@ func NewProspector(cfg *common.Config, outlet channel.Factory, context prospecto
return p, nil
}

// Run runs the prospector
func (p *Prospector) Run() {
// Run runs the input
func (p *Input) Run() {
// Make sure stdin harvester is only started once
if !p.started {
err := p.harvester.Setup()
Expand All @@ -68,7 +68,7 @@ func (p *Prospector) Run() {
}

// createHarvester creates a new harvester instance from the given state
func (p *Prospector) createHarvester(state file.State) (*log.Harvester, error) {
func (p *Input) createHarvester(state file.State) (*log.Harvester, error) {
// Each harvester gets its own copy of the outlet
h, err := log.NewHarvester(
p.cfg,
Expand All @@ -79,10 +79,10 @@ func (p *Prospector) createHarvester(state file.State) (*log.Harvester, error) {
return h, err
}

// Wait waits for completion of the prospector.
func (p *Prospector) Wait() {}
// Wait waits for completion of the input.
func (p *Input) Wait() {}

// Stop stops the prospector.
func (p *Prospector) Stop() {
// Stop stops the input
func (p *Input) Stop() {
p.outlet.Close()
}

0 comments on commit bedda18

Please sign in to comment.