diff --git a/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go b/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go index e6ff0d8dbf76..f4db79155d52 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go +++ b/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go @@ -6,22 +6,27 @@ package pipelinemanager import ( "crypto/sha1" + "encoding/json" "fmt" + "io" "os" "sort" "time" + "github.com/gofrs/uuid" "github.com/pkg/errors" yaml "gopkg.in/yaml.v2" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cloudid" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/file" "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/publisher/pipeline" "github.com/elastic/beats/libbeat/publisher/processing" + "github.com/elastic/beats/libbeat/version" ) // makeConfigHash is the helper function that turns a user config into a hash @@ -47,18 +52,6 @@ func makeConfigHash(cfg map[string]string) string { // load pipeline starts up a new pipeline with the given config func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Logger) (*Pipeline, error) { - hostname, err := os.Hostname() - if err != nil { - return nil, errors.Wrap(err, "error getting hostname") - } - - info := beat.Info{ - Beat: "elastic-logging-plugin", - Version: "0", - Name: name, - Hostname: hostname, - } - newCfg, err := parseCfgKeys(logOptsConfig) if err != nil { return nil, errors.Wrap(err, "error parsing config keys") @@ -81,7 +74,12 @@ func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Log return nil, fmt.Errorf("unpacking config failed: %v", err) } - processing, err := processing.MakeDefaultSupport(false)(info, log, cfg) + info, err := getBeatInfo(cfg) + if err != nil { + return nil, err + } + + processing, err := processing.MakeDefaultBeatSupport(true)(info, log, cfg) if err != nil { return nil, errors.Wrap(err, "error in MakeDefaultSupport") } @@ -128,7 +126,6 @@ func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Log // parseCfgKeys helpfully parses the values in the map, so users can specify yml structures. func parseCfgKeys(cfg map[string]string) (map[string]interface{}, error) { - outMap := make(map[string]interface{}) for cfgKey, strVal := range cfg { @@ -141,3 +138,129 @@ func parseCfgKeys(cfg map[string]string) (map[string]interface{}, error) { return outMap, nil } + +// getBeatInfo returns the beat.Info type needed to start the pipeline +func getBeatInfo(cfg *common.Config) (beat.Info, error) { + vers := version.GetDefaultVersion() + hostname, err := os.Hostname() + if err != nil { + return beat.Info{}, errors.Wrap(err, "error getting hostname") + } + eid, err := uuid.NewV4() + if err != nil { + return beat.Info{}, errors.Wrap(err, "error creating ephemeral ID") + } + + type nameStr struct { + Name string `config:"name"` + } + name := nameStr{} + err = cfg.Unpack(&name) + if err != nil { + return beat.Info{}, fmt.Errorf("unpacking config failed: %v", err) + } + + if name.Name == "" { + name.Name = "elastic-log-driver-" + hostname + } + id, err := loadMeta("/tmp/meta.json") + if err != nil { + return beat.Info{}, errors.Wrap(err, "error loading UUID") + } + + info := beat.Info{ + Beat: "elastic-logging-plugin", + Name: name.Name, + Hostname: hostname, + Version: vers, + EphemeralID: eid, + ID: id, + } + + return info, nil + +} + +// loadMeta loads the metadata file that contains the UUID +func loadMeta(metaPath string) (uuid.UUID, error) { + type meta struct { + UUID uuid.UUID `json:"uuid"` + } + // check for an existing file + f, err := openRegular(metaPath) + if err != nil && !os.IsNotExist(err) { + return uuid.Nil, errors.Wrapf(err, "beat meta file %s failed to open", metaPath) + } + + //return the UUID if it exists + if err == nil { + m := meta{} + if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF { + f.Close() + return uuid.Nil, errors.Wrapf(err, "Error reading %s", metaPath) + } + + f.Close() + if m.UUID != uuid.Nil { + return m.UUID, nil + } + } + + // file does not exist or ID is invalid, let's create a new one + newID, err := uuid.NewV4() + if err != nil { + return uuid.Nil, errors.Wrap(err, "error creating ID") + } + // write temporary file first + tempFile := metaPath + ".new" + f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return uuid.Nil, errors.Wrapf(err, "failed to create Beat meta file at %s", tempFile) + } + + encodeErr := json.NewEncoder(f).Encode(meta{UUID: newID}) + err = f.Sync() + if err != nil { + return uuid.Nil, errors.Wrapf(err, "beat meta file at %s failed to write", tempFile) + } + + err = f.Close() + if err != nil { + return uuid.Nil, errors.Wrapf(err, "beat meta file at %s failed to close", tempFile) + } + + if encodeErr != nil { + return uuid.Nil, errors.Wrapf(err, "beat meta file at %s failed to write", tempFile) + } + + // move temporary file into final location + err = file.SafeFileRotate(metaPath, tempFile) + if err != nil { + return uuid.Nil, errors.Wrapf(err, "error rotating file to %s", metaPath) + } + return newID, nil +} + +// openRegular is a wrapper to handle a file based on a path +func openRegular(filename string) (*os.File, error) { + f, err := os.Open(filename) + if err != nil { + return f, errors.Wrapf(err, "error opening file %s", filename) + } + + info, err := f.Stat() + if err != nil { + f.Close() + return nil, errors.Wrapf(err, "error statting %s", filename) + } + + if !info.Mode().IsRegular() { + f.Close() + if info.IsDir() { + return nil, fmt.Errorf("%s is a directory", filename) + } + return nil, fmt.Errorf("%s is not a regular file", filename) + } + + return f, nil +}