Skip to content

Commit

Permalink
Filebeat inputv2 integration (#19686)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Jul 7, 2020
1 parent 3dca9ea commit bce6dea
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 31 deletions.
42 changes: 39 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"flag"
"fmt"
"strings"
"time"

"github.com/pkg/errors"

Expand All @@ -29,6 +30,8 @@ import (
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/filebeat/input/v2/compat"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -42,6 +45,8 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/go-concert/unison"

_ "github.com/elastic/beats/v7/filebeat/include"

Expand All @@ -66,12 +71,26 @@ var (
type Filebeat struct {
config *cfg.Config
moduleRegistry *fileset.ModuleRegistry
pluginFactory PluginFactory
done chan struct{}
pipeline beat.PipelineConnector
}

type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin

type StateStore interface {
Access() (*statestore.Store, error)
CleanupInterval() time.Duration
}

// New creates a new Filebeat pointer instance.
func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
func New(plugins PluginFactory) beat.Creator {
return func(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return newBeater(b, plugins, rawConfig)
}
}

func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *common.Config) (beat.Beater, error) {
config := cfg.DefaultConfig
if err := rawConfig.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
Expand Down Expand Up @@ -135,6 +154,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
done: make(chan struct{}),
config: &config,
moduleRegistry: moduleRegistry,
pluginFactory: plugins,
}

err = fb.setupPipelineLoaderCallback(b)
Expand Down Expand Up @@ -268,8 +288,24 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Warn(pipelinesWarning)
}

inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info,
input.NewRunnerFactory(pipelineConnector, registrar, fb.done))
inputsLogger := logp.NewLogger("input")
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore)
v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType)
if err != nil {
panic(err) // loader detected invalid state.
}

var inputTaskGroup unison.TaskGroup
defer inputTaskGroup.Stop()
if err := v2InputLoader.Init(&inputTaskGroup, v2.ModeRun); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
}

inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info, compat.Combine(
compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader),
input.NewRunnerFactory(pipelineConnector, registrar, fb.done),
))
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)

crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once)
Expand Down
22 changes: 11 additions & 11 deletions filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ import (
)

// Name of this beat
var Name = "filebeat"
const Name = "filebeat"

// RootCmd to handle beats cli
var RootCmd *cmd.BeatsRootCmd

func init() {
// Filebeat build the beat root command for executing filebeat and it's subcommands.
func Filebeat(inputs beater.PluginFactory) *cmd.BeatsRootCmd {
var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("once"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("modules"))
Expand All @@ -47,10 +45,12 @@ func init() {
Name: Name,
HasDashboards: true,
}
RootCmd = cmd.GenRootCmdWithSettings(beater.New, settings)
RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M"))
RootCmd.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
RootCmd.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
RootCmd.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager))
RootCmd.AddCommand(genGenerateCmd())

command := cmd.GenRootCmdWithSettings(beater.New(inputs), settings)
command.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M"))
command.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
command.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
command.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager))
command.AddCommand(genGenerateCmd())
return command
}
36 changes: 36 additions & 0 deletions filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inputs

import (
"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
)

func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.Plugin {
return append(
genericInputs(),
osInputs(info, log, components)...,
)
}

func genericInputs() []v2.Plugin {
return []v2.Plugin{}
}
38 changes: 38 additions & 0 deletions filebeat/input/default-inputs/inputs_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inputs

import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
)

// inputs that are only supported on linux

type osComponents interface {
cursor.StateStore
}

func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
return []v2.Plugin{
// XXX: journald is currently disable.
// journald.Plugin(log, components),
}
}
32 changes: 32 additions & 0 deletions filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !windows,!linux

package inputs

import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
)

type osComponents interface{}

func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
return nil
}
36 changes: 36 additions & 0 deletions filebeat/input/default-inputs/inputs_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inputs

import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
)

type osComponents interface {
cursor.StateStore
}

func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
return []v2.Plugin{
// windows events logs are not available yet
// winlog.Plugin(log, components),
}
}
3 changes: 2 additions & 1 deletion filebeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"

"github.com/elastic/beats/v7/filebeat/cmd"
inputs "github.com/elastic/beats/v7/filebeat/input/default-inputs"
)

// The basic model of execution:
Expand All @@ -32,7 +33,7 @@ import (
// Finally, input uses the registrar information, on restart, to
// determine where in each file to restart a harvester.
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
if err := cmd.Filebeat(inputs.Init).Execute(); err != nil {
os.Exit(1)
}
}
17 changes: 12 additions & 5 deletions filebeat/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,35 @@ package main

import (
"flag"
"os"
"testing"

"github.com/elastic/beats/v7/filebeat/cmd"
fbcmd "github.com/elastic/beats/v7/filebeat/cmd"
inputs "github.com/elastic/beats/v7/filebeat/input/default-inputs"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/tests/system/template"
)

var systemTest *bool
var fbCommand *cmd.BeatsRootCmd

func init() {
testing.Init()
systemTest = flag.Bool("systemTest", false, "Set to true when running system tests")
cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest"))
cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile"))
fbCommand = fbcmd.Filebeat(inputs.Init)
fbCommand.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest"))
fbCommand.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile"))
}

// Test started when the test binary is started. Only calls main.
func TestSystem(t *testing.T) {
if *systemTest {
main()
if err := fbCommand.Execute(); err != nil {
os.Exit(1)
}
}
}

func TestTemplate(t *testing.T) {
template.TestTemplate(t, cmd.Name)
template.TestTemplate(t, fbCommand.Name())
}
14 changes: 9 additions & 5 deletions x-pack/filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
package cmd

import (
"github.com/elastic/beats/v7/filebeat/cmd"
fbcmd "github.com/elastic/beats/v7/filebeat/cmd"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
xpackcmd "github.com/elastic/beats/v7/x-pack/libbeat/cmd"

// Register the includes.
_ "github.com/elastic/beats/v7/x-pack/filebeat/include"
inputs "github.com/elastic/beats/v7/x-pack/filebeat/input/default-inputs"
)

// RootCmd to handle beats CLI.
var RootCmd = cmd.RootCmd
const Name = fbcmd.Name

func init() {
xpackcmd.AddXPack(RootCmd, cmd.Name)
// Filebeat build the beat root command for executing filebeat and it's subcommands.
func Filebeat() *cmd.BeatsRootCmd {
command := fbcmd.Filebeat(inputs.Init)
xpackcmd.AddXPack(command, Name)
return command
}
24 changes: 24 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package inputs

import (
"github.com/elastic/beats/v7/filebeat/beater"
ossinputs "github.com/elastic/beats/v7/filebeat/input/default-inputs"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
)

func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin {
return append(
xpackInputs(info, log, store),
ossinputs.Init(info, log, store)...,
)
}

func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin {
return []v2.Plugin{}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// Finally, input uses the registrar information, on restart, to
// determine where in each file to restart a harvester.
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
if err := cmd.Filebeat().Execute(); err != nil {
os.Exit(1)
}
}
Loading

0 comments on commit bce6dea

Please sign in to comment.