Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor export and idxmgmt handling. #11777

Merged
merged 13 commits into from
Apr 23, 2019
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Added support for using PYTHON_EXE to control what Python interpreter is used
by `make` and `mage`. Example: `export PYTHON_EXE=python2.7`. {pull}11212[11212]
- Prometheus helper for metricbeat contains now `Namespace` field for `prometheus.MetricsMappings` {pull}11424[11424]
- Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}[]
15 changes: 4 additions & 11 deletions libbeat/cmd/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package export

import (
simitt marked this conversation as resolved.
Show resolved Hide resolved
"fmt"
"os"

"github.com/spf13/cobra"
Expand All @@ -40,26 +39,20 @@ func GenExportConfigCmd(settings instance.Settings) *cobra.Command {
}

func exportConfig(settings instance.Settings) error {
b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version)
if err != nil {
return fmt.Errorf("error initializing beat: %s", err)
}

settings.DisableConfigResolver = true

err = b.InitWithSettings(settings)
b, err := instance.NewInitializedBeat(settings)
if err != nil {
return fmt.Errorf("error initializing beat: %s", err)
fatalf("error initializing beat: %+v", err)
simitt marked this conversation as resolved.
Show resolved Hide resolved
}

var config map[string]interface{}
err = b.RawConfig.Unpack(&config)
if err != nil {
return fmt.Errorf("error unpacking config, error: %s", err)
fatalf("error unpacking config, error: %s", err)
simitt marked this conversation as resolved.
Show resolved Hide resolved
}
res, err := yaml.Marshal(config)
if err != nil {
return fmt.Errorf("Error converting config to YAML format, error: %s", err)
fatalf("error converting config to YAML format, error: %s", err)
}

os.Stdout.Write(res)
Expand Down
23 changes: 6 additions & 17 deletions libbeat/cmd/export/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package export

import (
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
Expand All @@ -40,15 +39,9 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command {
yml, _ := cmd.Flags().GetString("yml")
decode, _ := cmd.Flags().GetBool("decode")

b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version)
b, err := instance.NewInitializedBeat(settings)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating beat: %s\n", err)
os.Exit(1)
}
err = b.InitWithSettings(settings)
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err)
os.Exit(1)
fatalf("error initializing beat: %+v", err)
}

// Use empty config to use default configs if not set
Expand All @@ -58,16 +51,14 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command {

client, err := kibana.NewKibanaClient(b.Config.Kibana)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating Kibana client: %+v\n", err)
os.Exit(1)
fatalf("error creating Kibana client: %+v\n", err)
}

// Export dashboards from yml file
if yml != "" {
results, info, err := dashboards.ExportAllFromYml(client, yml)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting dashboards from yml: %+v\n", err)
os.Exit(1)
fatalf("error getting dashboards from yml: %+v\n", err)
}
for i, r := range results {
if decode {
Expand All @@ -76,9 +67,8 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command {

err = dashboards.SaveToFile(r, info.Dashboards[i].File, filepath.Dir(yml), client.GetVersion())
if err != nil {
fmt.Fprintf(os.Stderr, "Error saving dashboard '%s' to file '%s' : %+v\n",
fatalf("error saving dashboard '%s' to file '%s' : %+v\n",
info.Dashboards[i].ID, info.Dashboards[i].File, err)
os.Exit(1)
}
}
return
Expand All @@ -88,8 +78,7 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command {
if dashboard != "" {
result, err := dashboards.Export(client, dashboard)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting dashboard: %+v\n", err)
os.Exit(1)
fatalf("error getting dashboard: %+v\n", err)
}

if decode {
Expand Down
31 changes: 8 additions & 23 deletions libbeat/cmd/export/ilm_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@
package export

import (
"fmt"
"os"

"github.com/spf13/cobra"

"github.com/elastic/beats/libbeat/cmd/instance"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/libbeat/idxmgmt"
simitt marked this conversation as resolved.
Show resolved Hide resolved
)

// GenGetILMPolicyCmd is the command used to export the ilm policy.
Expand All @@ -34,28 +30,17 @@ func GenGetILMPolicyCmd(settings instance.Settings) *cobra.Command {
Use: "ilm-policy",
Short: "Export ILM policy",
Run: func(cmd *cobra.Command, args []string) {
b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version)
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err)
os.Exit(1)
}
err = b.InitWithSettings(settings)
b, err := instance.NewInitializedBeat(settings)
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err)
os.Exit(1)
fatalf("error initializing beat: %+v", err)
}

ilmFactory := settings.ILM
if ilmFactory == nil {
ilmFactory = ilm.DefaultSupport
idxManager := b.IdxMgmtSupporter().Manager(nil, idxmgmt.BeatsAssets(b.Fields))
templateLoadCfg := idxmgmt.SetupConfig{Load: new(bool)}
ilmLoadCfg := idxmgmt.DefaultSetupConfig()
if err := idxManager.Setup(templateLoadCfg, ilmLoadCfg); err != nil {
fatalf("exporting ilm-policy failed: %+v", err)
}

ilm, err := ilmFactory(nil, b.Info, b.RawConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing ILM support: %s\n", err)
}

fmt.Println(common.MapStr(ilm.Policy().Body).StringToPrint())
},
}

Expand Down
8 changes: 2 additions & 6 deletions libbeat/cmd/export/index_pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,9 @@ func GenIndexPatternConfigCmd(settings instance.Settings) *cobra.Command {
Run: func(cmd *cobra.Command, args []string) {
version, _ := cmd.Flags().GetString("es.version")

b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version)
b, err := instance.NewInitializedBeat(settings)
if err != nil {
fatalf("Error initializing beat: %+v", err)
}
err = b.InitWithSettings(settings)
if err != nil {
fatalf("Error initializing beat: %+v", err)
fatalf("error initializing beat: %+v", err)
}

if version == "" {
Expand Down
77 changes: 22 additions & 55 deletions libbeat/cmd/export/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,84 +21,41 @@ import (
"fmt"
"os"

"github.com/elastic/beats/libbeat/idxmgmt/ilm"

simitt marked this conversation as resolved.
Show resolved Hide resolved
"github.com/spf13/cobra"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cmd/instance"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/template"
)

func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command {
genTemplateConfigCmd := &cobra.Command{
Use: "template",
Short: "Export index template to stdout",
Run: func(cmd *cobra.Command, args []string) {
version, _ := cmd.Flags().GetString("es.version")
index, _ := cmd.Flags().GetString("index")
noILM, _ := cmd.Flags().GetBool("noilm")

b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version)
if err != nil {
fatalf("Error initializing beat: %+v", err)
}
err = b.InitWithSettings(settings)
if err != nil {
fatalf("Error initializing beat: %+v", err)
}

if version == "" {
version = b.Info.Version
}
esVersion, err := common.NewVersion(version)
if err != nil {
fatalf("Invalid Elasticsearch version: %+v", err)
}

imFactory := settings.IndexManagement
if imFactory == nil {
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM)
}
indexManager, err := imFactory(logp.NewLogger("index-management"), b.Info, b.RawConfig)
if err != nil {
fatalf("Error initializing the index manager: %+v", err)
if noILM {
settings.ILM = ilmNoopSupport
}

tmplCfg, err := indexManager.TemplateConfig(!noILM)
b, err := instance.NewInitializedBeat(settings)
if err != nil {
fatalf("Template error detected: %+v", err)
}
if tmplCfg.Enabled == false {
tmplCfg = template.DefaultConfig()
}

tmpl, err := template.New(b.Info.Version, index, *esVersion, tmplCfg, b.Config.Migration.Enabled())
if err != nil {
fatalf("Error generating template: %+v", err)
fatalf("error initializing beat: %+v", err)
}

var templateString common.MapStr
if tmplCfg.Fields != "" {
fieldsPath := paths.Resolve(paths.Config, tmplCfg.Fields)
templateString, err = tmpl.LoadFile(fieldsPath)
} else {
templateString, err = tmpl.LoadBytes(b.Fields)
}
if err != nil {
fatalf("Error generating template: %+v", err)
}

_, err = os.Stdout.WriteString(templateString.StringToPrint() + "\n")
if err != nil {
fatalf("Error writing template: %+v", err)
idxManager := b.IdxMgmtSupporter().Manager(nil, idxmgmt.BeatsAssets(b.Fields))
simitt marked this conversation as resolved.
Show resolved Hide resolved
ilmLoadCfg := idxmgmt.SetupConfig{Load: new(bool)}
templateLoadCfg := idxmgmt.DefaultSetupConfig()
if err := idxManager.Setup(templateLoadCfg, ilmLoadCfg); err != nil {
fatalf("exporting template failed: %+v", err)
}
},
}

genTemplateConfigCmd.Flags().String("es.version", settings.Version, "Elasticsearch version")
genTemplateConfigCmd.Flags().String("index", settings.IndexPrefix, "Base index name")
genTemplateConfigCmd.Flags().Bool("noilm", false, "Generate template with ILM disabled")

return genTemplateConfigCmd
Expand All @@ -109,3 +66,13 @@ func fatalf(msg string, vs ...interface{}) {
fmt.Fprintln(os.Stderr)
os.Exit(1)
}

func ilmNoopSupport(log *logp.Logger, info beat.Info, config *common.Config) (ilm.Supporter, error) {
if log == nil {
log = logp.NewLogger("export template")
} else {
log = log.Named("export template")
}

return ilm.NoopSupport(info, config)
}
21 changes: 19 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ func Run(settings Settings, bt beat.Creator) error {
}())
}

// NewInitializedBeat creates a new beat where all information and initialization is derived from settings
func NewInitializedBeat(settings Settings) (*Beat, error) {
simitt marked this conversation as resolved.
Show resolved Hide resolved
b, err := NewBeat(settings.Name, settings.IndexPrefix, settings.Version)
if err != nil {
return nil, err
}
if err := b.InitWithSettings(settings); err != nil {
return nil, err
}
return b, nil
}

// NewBeat creates a new beat instance
func NewBeat(name, indexPrefix, v string) (*Beat, error) {
if v == "" {
Expand Down Expand Up @@ -271,6 +283,11 @@ func (b *Beat) Keystore() keystore.Keystore {
return b.keystore
}

// IdxMgmtSupporter return the configured indexmanager for this beat
func (b *Beat) IdxMgmtSupporter() idxmgmt.Supporter {
simitt marked this conversation as resolved.
Show resolved Hide resolved
return b.index
}

// create and return the beater, this method also initializes all needed items,
// including template registering, publisher, xpack monitoring
func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
Expand Down Expand Up @@ -477,7 +494,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
// prepare index by loading templates, lifecycle policies and write aliases

m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields))
err = m.Setup(setup.Template, setup.ILMPolicy)
err = m.Setup(idxmgmt.SetupConfig{Force: &setup.Template}, idxmgmt.SetupConfig{Force: &setup.ILMPolicy})
if err != nil {
return err
}
Expand Down Expand Up @@ -776,7 +793,7 @@ func (b *Beat) registerESIndexManagement() error {
func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
return func(esClient *elasticsearch.Client) error {
m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields))
return m.Setup(false, false)
return m.Setup(idxmgmt.DefaultSetupConfig(), idxmgmt.DefaultSetupConfig())
}
}

Expand Down
32 changes: 22 additions & 10 deletions libbeat/idxmgmt/idxmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ type Supporter interface {
// ILM, or aliases.
Enabled() bool

// ILM provides access to the configured ILM support.
ILM() ilm.Supporter

// TemplateConfig returns the template configuration used by the index supporter.
TemplateConfig(withILM bool) (template.TemplateConfig, error)

// BuildSelector create an index selector.
// The defaultIndex string is interpreted as format string. It is used
// as default index if the configuration provided does not define an index or
Expand All @@ -57,25 +51,43 @@ type Supporter interface {

// Manager creates a new manager that can be used to execute the required steps
// for initializing an index, ILM policies, and write aliases.
Manager(client ESClient, assets Asseter) Manager
Manager(client Client, assets Asseter) Manager
}

// Asseter provides access to beats assets required to load the template.
type Asseter interface {
Fields(name string) []byte
}

// ESClient defines the minimal interface required for the index manager to
// Client defines the minimal interface required for the index manager to
// prepare an index.
type ESClient interface {
type Client interface {
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
GetVersion() common.Version
}
simitt marked this conversation as resolved.
Show resolved Hide resolved

// Manager is used to initialize indices, ILM policies, and aliases within the
// Elastic Stack.
type Manager interface {
Setup(forceTemplate, forcePolicy bool) error
Setup(template, ilm SetupConfig) error
}

// SetupConfig is used to define loading and forcing to overwrite during the setup process
type SetupConfig struct {
Load *bool
Force *bool
}
simitt marked this conversation as resolved.
Show resolved Hide resolved

func DefaultSetupConfig() SetupConfig {
simitt marked this conversation as resolved.
Show resolved Hide resolved
simitt marked this conversation as resolved.
Show resolved Hide resolved
return SetupConfig{Force: new(bool)}
}

func (c *SetupConfig) ShouldLoad() bool {
simitt marked this conversation as resolved.
Show resolved Hide resolved
simitt marked this conversation as resolved.
Show resolved Hide resolved
return c.Load == nil || *c.Load
}

func (c *SetupConfig) ShouldForce() bool {
simitt marked this conversation as resolved.
Show resolved Hide resolved
simitt marked this conversation as resolved.
Show resolved Hide resolved
return c.Force != nil && *c.Force
}

// DefaultSupport initializes the default index management support used by most Beats.
Expand Down
Loading