From bce6deaca053b6a72999393657d0e3e80c0f0359 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Tue, 7 Jul 2020 14:27:10 +0200 Subject: [PATCH] Filebeat inputv2 integration (#19686) --- filebeat/beater/filebeat.go | 42 +++++++++++++++++-- filebeat/cmd/root.go | 22 +++++----- filebeat/input/default-inputs/inputs.go | 36 ++++++++++++++++ filebeat/input/default-inputs/inputs_linux.go | 38 +++++++++++++++++ filebeat/input/default-inputs/inputs_other.go | 32 ++++++++++++++ .../input/default-inputs/inputs_windows.go | 36 ++++++++++++++++ filebeat/main.go | 3 +- filebeat/main_test.go | 17 +++++--- x-pack/filebeat/cmd/root.go | 14 ++++--- .../filebeat/input/default-inputs/inputs.go | 24 +++++++++++ x-pack/filebeat/main.go | 2 +- x-pack/filebeat/main_test.go | 16 ++++--- 12 files changed, 251 insertions(+), 31 deletions(-) create mode 100644 filebeat/input/default-inputs/inputs.go create mode 100644 filebeat/input/default-inputs/inputs_linux.go create mode 100644 filebeat/input/default-inputs/inputs_other.go create mode 100644 filebeat/input/default-inputs/inputs_windows.go create mode 100644 x-pack/filebeat/input/default-inputs/inputs.go diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 357c5d790ab..837fc341a79 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -21,6 +21,7 @@ import ( "flag" "fmt" "strings" + "time" "github.com/pkg/errors" @@ -29,6 +30,8 @@ import ( "github.com/elastic/beats/v7/filebeat/fileset" _ "github.com/elastic/beats/v7/filebeat/include" "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/filebeat/input/v2/compat" "github.com/elastic/beats/v7/filebeat/registrar" "github.com/elastic/beats/v7/libbeat/autodiscover" "github.com/elastic/beats/v7/libbeat/beat" @@ -42,6 +45,8 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/go-concert/unison" _ "github.com/elastic/beats/v7/filebeat/include" @@ -66,12 +71,26 @@ var ( type Filebeat struct { config *cfg.Config moduleRegistry *fileset.ModuleRegistry + pluginFactory PluginFactory done chan struct{} pipeline beat.PipelineConnector } +type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin + +type StateStore interface { + Access() (*statestore.Store, error) + CleanupInterval() time.Duration +} + // New creates a new Filebeat pointer instance. -func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { +func New(plugins PluginFactory) beat.Creator { + return func(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { + return newBeater(b, plugins, rawConfig) + } +} + +func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *common.Config) (beat.Beater, error) { config := cfg.DefaultConfig if err := rawConfig.Unpack(&config); err != nil { return nil, fmt.Errorf("Error reading config file: %v", err) @@ -135,6 +154,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { done: make(chan struct{}), config: &config, moduleRegistry: moduleRegistry, + pluginFactory: plugins, } err = fb.setupPipelineLoaderCallback(b) @@ -268,8 +288,24 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Warn(pipelinesWarning) } - inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info, - input.NewRunnerFactory(pipelineConnector, registrar, fb.done)) + inputsLogger := logp.NewLogger("input") + v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore) + v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType) + if err != nil { + panic(err) // loader detected invalid state. + } + + var inputTaskGroup unison.TaskGroup + defer inputTaskGroup.Stop() + if err := v2InputLoader.Init(&inputTaskGroup, v2.ModeRun); err != nil { + logp.Err("Failed to initialize the input managers: %v", err) + return err + } + + inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info, compat.Combine( + compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader), + input.NewRunnerFactory(pipelineConnector, registrar, fb.done), + )) moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once) diff --git a/filebeat/cmd/root.go b/filebeat/cmd/root.go index 43b42a56b67..f2062a35dd7 100644 --- a/filebeat/cmd/root.go +++ b/filebeat/cmd/root.go @@ -33,12 +33,10 @@ import ( ) // Name of this beat -var Name = "filebeat" +const Name = "filebeat" -// RootCmd to handle beats cli -var RootCmd *cmd.BeatsRootCmd - -func init() { +// Filebeat build the beat root command for executing filebeat and it's subcommands. +func Filebeat(inputs beater.PluginFactory) *cmd.BeatsRootCmd { var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError) runFlags.AddGoFlag(flag.CommandLine.Lookup("once")) runFlags.AddGoFlag(flag.CommandLine.Lookup("modules")) @@ -47,10 +45,12 @@ func init() { Name: Name, HasDashboards: true, } - RootCmd = cmd.GenRootCmdWithSettings(beater.New, settings) - RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M")) - RootCmd.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules")) - RootCmd.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules")) - RootCmd.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager)) - RootCmd.AddCommand(genGenerateCmd()) + + command := cmd.GenRootCmdWithSettings(beater.New(inputs), settings) + command.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M")) + command.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules")) + command.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules")) + command.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager)) + command.AddCommand(genGenerateCmd()) + return command } diff --git a/filebeat/input/default-inputs/inputs.go b/filebeat/input/default-inputs/inputs.go new file mode 100644 index 00000000000..1cfce53a3eb --- /dev/null +++ b/filebeat/input/default-inputs/inputs.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inputs + +import ( + "github.com/elastic/beats/v7/filebeat/beater" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.Plugin { + return append( + genericInputs(), + osInputs(info, log, components)..., + ) +} + +func genericInputs() []v2.Plugin { + return []v2.Plugin{} +} diff --git a/filebeat/input/default-inputs/inputs_linux.go b/filebeat/input/default-inputs/inputs_linux.go new file mode 100644 index 00000000000..c2ec4960e92 --- /dev/null +++ b/filebeat/input/default-inputs/inputs_linux.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inputs + +import ( + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// inputs that are only supported on linux + +type osComponents interface { + cursor.StateStore +} + +func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin { + return []v2.Plugin{ + // XXX: journald is currently disable. + // journald.Plugin(log, components), + } +} diff --git a/filebeat/input/default-inputs/inputs_other.go b/filebeat/input/default-inputs/inputs_other.go new file mode 100644 index 00000000000..ccd6ce41752 --- /dev/null +++ b/filebeat/input/default-inputs/inputs_other.go @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows,!linux + +package inputs + +import ( + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type osComponents interface{} + +func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin { + return nil +} diff --git a/filebeat/input/default-inputs/inputs_windows.go b/filebeat/input/default-inputs/inputs_windows.go new file mode 100644 index 00000000000..3e734dfb1a2 --- /dev/null +++ b/filebeat/input/default-inputs/inputs_windows.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inputs + +import ( + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type osComponents interface { + cursor.StateStore +} + +func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin { + return []v2.Plugin{ + // windows events logs are not available yet + // winlog.Plugin(log, components), + } +} diff --git a/filebeat/main.go b/filebeat/main.go index 21289acdf6f..e8f36acde01 100644 --- a/filebeat/main.go +++ b/filebeat/main.go @@ -21,6 +21,7 @@ import ( "os" "github.com/elastic/beats/v7/filebeat/cmd" + inputs "github.com/elastic/beats/v7/filebeat/input/default-inputs" ) // The basic model of execution: @@ -32,7 +33,7 @@ import ( // Finally, input uses the registrar information, on restart, to // determine where in each file to restart a harvester. func main() { - if err := cmd.RootCmd.Execute(); err != nil { + if err := cmd.Filebeat(inputs.Init).Execute(); err != nil { os.Exit(1) } } diff --git a/filebeat/main_test.go b/filebeat/main_test.go index 84f3e686e4c..5a16694d1bd 100644 --- a/filebeat/main_test.go +++ b/filebeat/main_test.go @@ -21,28 +21,35 @@ package main import ( "flag" + "os" "testing" - "github.com/elastic/beats/v7/filebeat/cmd" + fbcmd "github.com/elastic/beats/v7/filebeat/cmd" + inputs "github.com/elastic/beats/v7/filebeat/input/default-inputs" + cmd "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/tests/system/template" ) var systemTest *bool +var fbCommand *cmd.BeatsRootCmd func init() { testing.Init() systemTest = flag.Bool("systemTest", false, "Set to true when running system tests") - cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest")) - cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile")) + fbCommand = fbcmd.Filebeat(inputs.Init) + fbCommand.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest")) + fbCommand.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile")) } // Test started when the test binary is started. Only calls main. func TestSystem(t *testing.T) { if *systemTest { - main() + if err := fbCommand.Execute(); err != nil { + os.Exit(1) + } } } func TestTemplate(t *testing.T) { - template.TestTemplate(t, cmd.Name) + template.TestTemplate(t, fbCommand.Name()) } diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index db3b5884be4..dc2ebc6fdb6 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -5,16 +5,20 @@ package cmd import ( - "github.com/elastic/beats/v7/filebeat/cmd" + fbcmd "github.com/elastic/beats/v7/filebeat/cmd" + cmd "github.com/elastic/beats/v7/libbeat/cmd" xpackcmd "github.com/elastic/beats/v7/x-pack/libbeat/cmd" // Register the includes. _ "github.com/elastic/beats/v7/x-pack/filebeat/include" + inputs "github.com/elastic/beats/v7/x-pack/filebeat/input/default-inputs" ) -// RootCmd to handle beats CLI. -var RootCmd = cmd.RootCmd +const Name = fbcmd.Name -func init() { - xpackcmd.AddXPack(RootCmd, cmd.Name) +// Filebeat build the beat root command for executing filebeat and it's subcommands. +func Filebeat() *cmd.BeatsRootCmd { + command := fbcmd.Filebeat(inputs.Init) + xpackcmd.AddXPack(command, Name) + return command } diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go new file mode 100644 index 00000000000..525bbe2a578 --- /dev/null +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -0,0 +1,24 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package inputs + +import ( + "github.com/elastic/beats/v7/filebeat/beater" + ossinputs "github.com/elastic/beats/v7/filebeat/input/default-inputs" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { + return append( + xpackInputs(info, log, store), + ossinputs.Init(info, log, store)..., + ) +} + +func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { + return []v2.Plugin{} +} diff --git a/x-pack/filebeat/main.go b/x-pack/filebeat/main.go index 3c5849a2e4c..4902d835fe6 100644 --- a/x-pack/filebeat/main.go +++ b/x-pack/filebeat/main.go @@ -19,7 +19,7 @@ import ( // Finally, input uses the registrar information, on restart, to // determine where in each file to restart a harvester. func main() { - if err := cmd.RootCmd.Execute(); err != nil { + if err := cmd.Filebeat().Execute(); err != nil { os.Exit(1) } } diff --git a/x-pack/filebeat/main_test.go b/x-pack/filebeat/main_test.go index 33c32879fb9..6fde6697e3d 100644 --- a/x-pack/filebeat/main_test.go +++ b/x-pack/filebeat/main_test.go @@ -6,28 +6,34 @@ package main // This file is mandatory as otherwise the filebeat.test binary is not generated correctly. import ( "flag" + "os" "testing" - "github.com/elastic/beats/v7/filebeat/cmd" + cmd "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/tests/system/template" + fbcmd "github.com/elastic/beats/v7/x-pack/filebeat/cmd" ) var systemTest *bool +var fbCommand *cmd.BeatsRootCmd func init() { testing.Init() systemTest = flag.Bool("systemTest", false, "Set to true when running system tests") - cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest")) - cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile")) + fbCommand = fbcmd.Filebeat() + fbCommand.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest")) + fbCommand.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile")) } // Test started when the test binary is started. Only calls main. func TestSystem(t *testing.T) { if *systemTest { - main() + if err := fbCommand.Execute(); err != nil { + os.Exit(1) + } } } func TestTemplate(t *testing.T) { - template.TestTemplate(t, cmd.Name) + template.TestTemplate(t, fbCommand.Name()) }