From 5309468cb46bb5708d70471305f99a0e23ff7898 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 27 Jul 2020 17:21:51 +0300 Subject: [PATCH 01/19] Add leader election for autodiscover Signed-off-by: chrismark --- deploy/kubernetes/metricbeat-kubernetes.yaml | 6 ++ .../metricbeat/metricbeat-role.yaml | 6 ++ go.mod | 2 +- .../providers/kubernetes/config.go | 5 ++ .../providers/kubernetes/kubernetes.go | 88 +++++++++++++++++-- 5 files changed, 99 insertions(+), 8 deletions(-) diff --git a/deploy/kubernetes/metricbeat-kubernetes.yaml b/deploy/kubernetes/metricbeat-kubernetes.yaml index bc687dfac9d6..32cd5568025d 100644 --- a/deploy/kubernetes/metricbeat-kubernetes.yaml +++ b/deploy/kubernetes/metricbeat-kubernetes.yaml @@ -367,6 +367,12 @@ rules: - "/metrics" verbs: - get +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - '*' --- apiVersion: v1 kind: ServiceAccount diff --git a/deploy/kubernetes/metricbeat/metricbeat-role.yaml b/deploy/kubernetes/metricbeat/metricbeat-role.yaml index 152f9c4e9deb..3f802c2bcf23 100644 --- a/deploy/kubernetes/metricbeat/metricbeat-role.yaml +++ b/deploy/kubernetes/metricbeat/metricbeat-role.yaml @@ -33,3 +33,9 @@ rules: - "/metrics" verbs: - get +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - '*' diff --git a/go.mod b/go.mod index 26aa275ef403..ecc79111d31d 100644 --- a/go.mod +++ b/go.mod @@ -167,7 +167,7 @@ require ( golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - golang.org/x/tools v0.0.0-20200701041122-1837592efa10 + golang.org/x/tools v0.0.0-20200727233628-55644ead90ce google.golang.org/api v0.15.0 google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb google.golang.org/grpc v1.29.1 diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index a1ec2db5dd51..4629841005f0 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -44,6 +44,8 @@ type Config struct { // Scope can be either node or cluster. Scope string `config:"scope"` Resource string `config:"resource"` + // Unique identifies if this provider enables it's templates only when it is elected as leader in a k8s cluster + Unique bool `config:"unique"` Prefix string `config:"prefix"` Hints *common.Config `config:"hints"` @@ -98,6 +100,9 @@ func (c *Config) Validate() error { if c.Scope != "node" && c.Scope != "cluster" { return fmt.Errorf("invalid `scope` configured. supported values are `node` and `cluster`") } + if c.Unique && c.Scope != "cluster" { + logp.L().Warnf("can only set `unique` when scope is `cluster`") + } return nil } diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index e1a2cb02ee0f..c08f665e341c 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -20,7 +20,14 @@ package kubernetes import ( + "context" "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "github.com/gofrs/uuid" "github.com/pkg/errors" @@ -49,13 +56,15 @@ type Eventer interface { // Provider implements autodiscover provider for docker containers type Provider struct { - config *Config - bus bus.Bus - templates template.Mapper - builders autodiscover.Builders - appenders autodiscover.Appenders - logger *logp.Logger - eventer Eventer + config *Config + bus bus.Bus + templates template.Mapper + builders autodiscover.Builders + appenders autodiscover.Appenders + logger *logp.Logger + eventer Eventer + leaderElection leaderelection.LeaderElectionConfig + cancel context.CancelFunc } // AutodiscoverBuilder builds and returns an autodiscover provider @@ -118,6 +127,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore return nil, errWrap(err) } + p.leaderElection = p.newLeaderElectionConfig(client, "some") return p, nil } @@ -126,11 +136,15 @@ func (p *Provider) Start() { if err := p.eventer.Start(); err != nil { p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err) } + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + leaderelection.RunOrDie(ctx, p.leaderElection) } // Stop signals the stop channel to force the watch loop routine to stop. func (p *Provider) Stop() { p.eventer.Stop() + p.cancel() } // String returns a description of kubernetes autodiscover provider. @@ -154,3 +168,63 @@ func (p *Provider) publish(event bus.Event) { p.appenders.Append(event) p.bus.Publish(event) } + +func (p *Provider) startLeading(uuid string, eventID string) { + event := bus.Event{ + "start": true, + "provider": uuid, + "id": eventID, + "unique": "true", + } + if config := p.templates.GetConfig(event); config != nil { + event["config"] = config + } + p.bus.Publish(event) +} + +func (p *Provider) stopLeading(uuid string, eventID string) { + event := bus.Event{ + "stop": true, + "provider": uuid, + "id": eventID, + "unique": "true", + } + if config := p.templates.GetConfig(event); config != nil { + event["config"] = config + } + p.bus.Publish(event) +} + +func (p *Provider) newLeaderElectionConfig(client k8s.Interface, uuid string) leaderelection.LeaderElectionConfig { + id := "beats-leader-" + uuid + lease := metav1.ObjectMeta{ + Name: "beats-cluster-leader", + Namespace: "default", + } + metaUID := lease.GetObjectMeta().GetUID() + return leaderelection.LeaderElectionConfig{ + Lock: &resourcelock.LeaseLock{ + LeaseMeta: lease, + Client: client.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + }, + ReleaseOnCancel: true, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + p.logger.Debugf("leader election lock GAINED, id %v", id) + eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano()) + p.startLeading(uuid, eventID) + }, + OnStoppedLeading: func() { + p.logger.Debugf("leader election lock LOST, id %v", id) + eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano()) + p.stopLeading(uuid, eventID) + }, + }, + } +} From 3e3e324243845d590f5d9ae6bda8813618ede332 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 30 Jul 2020 15:54:30 +0300 Subject: [PATCH 02/19] Add unique condition programmatically Signed-off-by: chrismark --- .../providers/kubernetes/config.go | 1 + .../providers/kubernetes/kubernetes.go | 42 +++++++++++++++---- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index 4629841005f0..d09137c21bf6 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -62,6 +62,7 @@ func defaultConfig() *Config { Resource: "pod", CleanupTimeout: 60 * time.Second, Prefix: "co.elastic", + Unique: false, } } diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index c08f665e341c..7528bb4ee7c2 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -31,13 +31,14 @@ import ( "github.com/gofrs/uuid" "github.com/pkg/errors" - + "github.com/elastic/beats/v7/libbeat/autodiscover" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/bus" "github.com/elastic/beats/v7/libbeat/common/kubernetes" "github.com/elastic/beats/v7/libbeat/common/kubernetes/k8skeystore" + "github.com/elastic/beats/v7/libbeat/conditions" "github.com/elastic/beats/v7/libbeat/keystore" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -81,6 +82,11 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore return nil, errWrap(err) } + if config.Unique { + // enrich the config with Unique templates before building Mapper + initUniqueTemplate(config) + } + client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { return nil, errWrap(err) @@ -127,7 +133,10 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore return nil, errWrap(err) } - p.leaderElection = p.newLeaderElectionConfig(client, "some") + if p.config.Unique { + p.initLeaderElectionConfig(client, "some") + } + return p, nil } @@ -136,15 +145,20 @@ func (p *Provider) Start() { if err := p.eventer.Start(); err != nil { p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err) } - ctx, cancel := context.WithCancel(context.Background()) - p.cancel = cancel - leaderelection.RunOrDie(ctx, p.leaderElection) + + if p.config.Unique { + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + leaderelection.RunOrDie(ctx, p.leaderElection) + } } // Stop signals the stop channel to force the watch loop routine to stop. func (p *Provider) Stop() { p.eventer.Stop() - p.cancel() + if p.cancel != nil { + p.cancel() + } } // String returns a description of kubernetes autodiscover provider. @@ -195,14 +209,14 @@ func (p *Provider) stopLeading(uuid string, eventID string) { p.bus.Publish(event) } -func (p *Provider) newLeaderElectionConfig(client k8s.Interface, uuid string) leaderelection.LeaderElectionConfig { +func (p *Provider) initLeaderElectionConfig(client k8s.Interface, uuid string) { id := "beats-leader-" + uuid lease := metav1.ObjectMeta{ Name: "beats-cluster-leader", Namespace: "default", } metaUID := lease.GetObjectMeta().GetUID() - return leaderelection.LeaderElectionConfig{ + p.leaderElection = leaderelection.LeaderElectionConfig{ Lock: &resourcelock.LeaseLock{ LeaseMeta: lease, Client: client.CoordinationV1(), @@ -228,3 +242,15 @@ func (p *Provider) newLeaderElectionConfig(client k8s.Interface, uuid string) le }, } } + +func initUniqueTemplate(config *Config) { + m := make(map[string]interface{}) + m["unique"] = "true" + fields := &conditions.Fields{} + fields.Unpack(m) + for _, template := range config.Templates { + template.ConditionConfig = &conditions.Config{ + Contains: fields, + } + } +} From a328803421bb057ecf2eaddc7480aeeb7f09cff1 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 30 Jul 2020 16:53:44 +0300 Subject: [PATCH 03/19] Adopt async start of leader elector Signed-off-by: chrismark --- .../providers/kubernetes/kubernetes.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 7528bb4ee7c2..216688d85654 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -31,7 +31,7 @@ import ( "github.com/gofrs/uuid" "github.com/pkg/errors" - + "github.com/elastic/beats/v7/libbeat/autodiscover" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/common" @@ -149,8 +149,21 @@ func (p *Provider) Start() { if p.config.Unique { ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel - leaderelection.RunOrDie(ctx, p.leaderElection) + p.StartLeaderElector(ctx, p.leaderElection) + } +} + +// StartLeaderElector starts a Leader Elector in the background with the provided config +func (p *Provider) StartLeaderElector(ctx context.Context, lec leaderelection.LeaderElectionConfig) { + le, err := leaderelection.NewLeaderElector(lec) + if err != nil { + p.logger.Errorf("leader election lock GAINED, id %v", err) + } + if lec.WatchDog != nil { + lec.WatchDog.SetLeaderElection(le) } + p.logger.Debugf("Starting Leader Elector") + go le.Run(ctx) } // Stop signals the stop channel to force the watch loop routine to stop. From ebdbf7070c59809d2e3b240811d7184adff79226 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 30 Jul 2020 18:03:15 +0300 Subject: [PATCH 04/19] Add identifier parameter Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/config.go | 3 ++- libbeat/autodiscover/providers/kubernetes/kubernetes.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index d09137c21bf6..5b4a0a77bfce 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -45,7 +45,8 @@ type Config struct { Scope string `config:"scope"` Resource string `config:"resource"` // Unique identifies if this provider enables it's templates only when it is elected as leader in a k8s cluster - Unique bool `config:"unique"` + Unique bool `config:"unique"` + Identifier string `config:"identifier"` Prefix string `config:"prefix"` Hints *common.Config `config:"hints"` diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 216688d85654..dfc355327b2e 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -77,6 +77,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore } config := defaultConfig() + config.Identifier = uuid.String() err := c.Unpack(&config) if err != nil { return nil, errWrap(err) @@ -134,7 +135,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore } if p.config.Unique { - p.initLeaderElectionConfig(client, "some") + p.initLeaderElectionConfig(client, p.config.Identifier) } return p, nil From 5603106f0476e9be275b4a4b7607f3dd8740b0c3 Mon Sep 17 00:00:00 2001 From: chrismark Date: Fri, 31 Jul 2020 14:57:59 +0300 Subject: [PATCH 05/19] Improve cancel func naming Signed-off-by: chrismark --- go.mod | 2 +- .../providers/kubernetes/config.go | 1 + .../providers/kubernetes/kubernetes.go | 33 ++++++++++--------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index ecc79111d31d..91d5e961a260 100644 --- a/go.mod +++ b/go.mod @@ -167,7 +167,7 @@ require ( golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - golang.org/x/tools v0.0.0-20200727233628-55644ead90ce + golang.org/x/tools v0.0.0-20200731060945-b5fad4ed8dd6 google.golang.org/api v0.15.0 google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb google.golang.org/grpc v1.29.1 diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index 5b4a0a77bfce..d94d26b917c7 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -64,6 +64,7 @@ func defaultConfig() *Config { CleanupTimeout: 60 * time.Second, Prefix: "co.elastic", Unique: false, + Identifier: "", } } diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index dfc355327b2e..b203457b4437 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -57,15 +57,15 @@ type Eventer interface { // Provider implements autodiscover provider for docker containers type Provider struct { - config *Config - bus bus.Bus - templates template.Mapper - builders autodiscover.Builders - appenders autodiscover.Appenders - logger *logp.Logger - eventer Eventer - leaderElection leaderelection.LeaderElectionConfig - cancel context.CancelFunc + config *Config + bus bus.Bus + templates template.Mapper + builders autodiscover.Builders + appenders autodiscover.Appenders + logger *logp.Logger + eventer Eventer + leaderElection leaderelection.LeaderElectionConfig + cancelLeaderElection context.CancelFunc } // AutodiscoverBuilder builds and returns an autodiscover provider @@ -135,7 +135,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore } if p.config.Unique { - p.initLeaderElectionConfig(client, p.config.Identifier) + p.initLeaderElectionConfig(client, uuid.String()) } return p, nil @@ -149,7 +149,7 @@ func (p *Provider) Start() { if p.config.Unique { ctx, cancel := context.WithCancel(context.Background()) - p.cancel = cancel + p.cancelLeaderElection = cancel p.StartLeaderElector(ctx, p.leaderElection) } } @@ -170,8 +170,8 @@ func (p *Provider) StartLeaderElector(ctx context.Context, lec leaderelection.Le // Stop signals the stop channel to force the watch loop routine to stop. func (p *Provider) Stop() { p.eventer.Stop() - if p.cancel != nil { - p.cancel() + if p.cancelLeaderElection != nil { + p.cancelLeaderElection() } } @@ -224,7 +224,10 @@ func (p *Provider) stopLeading(uuid string, eventID string) { } func (p *Provider) initLeaderElectionConfig(client k8s.Interface, uuid string) { - id := "beats-leader-" + uuid + id := "beats-leader-" + p.config.Node + if p.config.Identifier != "" { + id = id + "-" + p.config.Identifier + } lease := metav1.ObjectMeta{ Name: "beats-cluster-leader", Namespace: "default", @@ -241,7 +244,7 @@ func (p *Provider) initLeaderElectionConfig(client k8s.Interface, uuid string) { ReleaseOnCancel: true, LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, - RetryPeriod: 5 * time.Second, + RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { p.logger.Debugf("leader election lock GAINED, id %v", id) From 7544de539975b7754481d6567e8ba41abdecd9de Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 3 Aug 2020 10:39:05 +0300 Subject: [PATCH 06/19] minor typo Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index d94d26b917c7..6ab8220146d7 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -44,7 +44,7 @@ type Config struct { // Scope can be either node or cluster. Scope string `config:"scope"` Resource string `config:"resource"` - // Unique identifies if this provider enables it's templates only when it is elected as leader in a k8s cluster + // Unique identifies if this provider enables its templates only when it is elected as leader in a k8s cluster Unique bool `config:"unique"` Identifier string `config:"identifier"` From 29563ea912a190a8bfcd28660e414d229368baac Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 3 Aug 2020 15:09:12 +0300 Subject: [PATCH 07/19] Add LeaderLease config Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/config.go | 6 +++--- .../autodiscover/providers/kubernetes/kubernetes.go | 11 ++++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index 6ab8220146d7..d14ef50dff4f 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -45,8 +45,8 @@ type Config struct { Scope string `config:"scope"` Resource string `config:"resource"` // Unique identifies if this provider enables its templates only when it is elected as leader in a k8s cluster - Unique bool `config:"unique"` - Identifier string `config:"identifier"` + Unique bool `config:"unique"` + LeaderLease string `config:"leader_lease"` Prefix string `config:"prefix"` Hints *common.Config `config:"hints"` @@ -64,7 +64,7 @@ func defaultConfig() *Config { CleanupTimeout: 60 * time.Second, Prefix: "co.elastic", Unique: false, - Identifier: "", + LeaderLease: "beats-cluster-leader", } } diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index b203457b4437..bf2e8ec9f2c9 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -77,7 +77,6 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore } config := defaultConfig() - config.Identifier = uuid.String() err := c.Unpack(&config) if err != nil { return nil, errWrap(err) @@ -224,12 +223,14 @@ func (p *Provider) stopLeading(uuid string, eventID string) { } func (p *Provider) initLeaderElectionConfig(client k8s.Interface, uuid string) { - id := "beats-leader-" + p.config.Node - if p.config.Identifier != "" { - id = id + "-" + p.config.Identifier + var id string + if p.config.Node != "" { + id = "beats-leader-" + p.config.Node + } else { + id = "beats-leader-" + uuid } lease := metav1.ObjectMeta{ - Name: "beats-cluster-leader", + Name: p.config.LeaderLease, Namespace: "default", } metaUID := lease.GetObjectMeta().GetUID() From 0fc279a87eee340a526a559b0d9c44d2966d042e Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 5 Aug 2020 12:07:39 +0300 Subject: [PATCH 08/19] Disable eventers Signed-off-by: chrismark --- .../providers/kubernetes/kubernetes.go | 60 +++++++------------ 1 file changed, 22 insertions(+), 38 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index bf2e8ec9f2c9..fbe3f51683b8 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -38,7 +38,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/bus" "github.com/elastic/beats/v7/libbeat/common/kubernetes" "github.com/elastic/beats/v7/libbeat/common/kubernetes/k8skeystore" - "github.com/elastic/beats/v7/libbeat/conditions" "github.com/elastic/beats/v7/libbeat/keystore" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -82,11 +81,6 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore return nil, errWrap(err) } - if config.Unique { - // enrich the config with Unique templates before building Mapper - initUniqueTemplate(config) - } - client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { return nil, errWrap(err) @@ -118,23 +112,23 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore logger: logger, } - switch config.Resource { - case "pod": - p.eventer, err = NewPodEventer(uuid, c, client, p.publish) - case "node": - p.eventer, err = NewNodeEventer(uuid, c, client, p.publish) - case "service": - p.eventer, err = NewServiceEventer(uuid, c, client, p.publish) - default: - return nil, fmt.Errorf("unsupported autodiscover resource %s", config.Resource) - } - - if err != nil { - return nil, errWrap(err) - } - if p.config.Unique { p.initLeaderElectionConfig(client, uuid.String()) + } else { + switch config.Resource { + case "pod": + p.eventer, err = NewPodEventer(uuid, c, client, p.publish) + case "node": + p.eventer, err = NewNodeEventer(uuid, c, client, p.publish) + case "service": + p.eventer, err = NewServiceEventer(uuid, c, client, p.publish) + default: + return nil, fmt.Errorf("unsupported autodiscover resource %s", config.Resource) + } + + if err != nil { + return nil, errWrap(err) + } } return p, nil @@ -142,14 +136,14 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore // Start for Runner interface. func (p *Provider) Start() { - if err := p.eventer.Start(); err != nil { - p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err) - } - if p.config.Unique { ctx, cancel := context.WithCancel(context.Background()) p.cancelLeaderElection = cancel p.StartLeaderElector(ctx, p.leaderElection) + } else { + if err := p.eventer.Start(); err != nil { + p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err) + } } } @@ -168,7 +162,9 @@ func (p *Provider) StartLeaderElector(ctx context.Context, lec leaderelection.Le // Stop signals the stop channel to force the watch loop routine to stop. func (p *Provider) Stop() { - p.eventer.Stop() + if p.eventer != nil { + p.eventer.Stop() + } if p.cancelLeaderElection != nil { p.cancelLeaderElection() } @@ -260,15 +256,3 @@ func (p *Provider) initLeaderElectionConfig(client k8s.Interface, uuid string) { }, } } - -func initUniqueTemplate(config *Config) { - m := make(map[string]interface{}) - m["unique"] = "true" - fields := &conditions.Fields{} - fields.Unpack(m) - for _, template := range config.Templates { - template.ConditionConfig = &conditions.Config{ - Contains: fields, - } - } -} From 37878a964b5bbb175e4935af3ed0fba2ebc9979f Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 5 Aug 2020 12:49:32 +0300 Subject: [PATCH 09/19] Split unique provider to different interfae implementation Signed-off-by: chrismark --- go.mod | 2 +- .../providers/kubernetes/kubernetes.go | 92 +++++++++++-------- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 4f58b09a530a..dcbadfbb2bff 100644 --- a/go.mod +++ b/go.mod @@ -167,7 +167,7 @@ require ( golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - golang.org/x/tools v0.0.0-20200731060945-b5fad4ed8dd6 + golang.org/x/tools v0.0.0-20200804234916-fec4f28ebb08 google.golang.org/api v0.15.0 google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb google.golang.org/grpc v1.29.1 diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index fbe3f51683b8..ac4a07324cc0 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -56,13 +56,18 @@ type Eventer interface { // Provider implements autodiscover provider for docker containers type Provider struct { - config *Config - bus bus.Bus - templates template.Mapper - builders autodiscover.Builders - appenders autodiscover.Appenders - logger *logp.Logger - eventer Eventer + config *Config + bus bus.Bus + templates template.Mapper + builders autodiscover.Builders + appenders autodiscover.Appenders + logger *logp.Logger + eventer Eventer +} + +// UniqueProvider implements unique autodiscover provider for Kubernetes clusters +type UniqueProvider struct { + Provider leaderElection leaderelection.LeaderElectionConfig cancelLeaderElection context.CancelFunc } @@ -113,7 +118,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore } if p.config.Unique { - p.initLeaderElectionConfig(client, uuid.String()) + return uniqueProviderBuilder(p, client, uuid) } else { switch config.Resource { case "pod": @@ -136,38 +141,14 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore // Start for Runner interface. func (p *Provider) Start() { - if p.config.Unique { - ctx, cancel := context.WithCancel(context.Background()) - p.cancelLeaderElection = cancel - p.StartLeaderElector(ctx, p.leaderElection) - } else { - if err := p.eventer.Start(); err != nil { - p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err) - } - } -} - -// StartLeaderElector starts a Leader Elector in the background with the provided config -func (p *Provider) StartLeaderElector(ctx context.Context, lec leaderelection.LeaderElectionConfig) { - le, err := leaderelection.NewLeaderElector(lec) - if err != nil { - p.logger.Errorf("leader election lock GAINED, id %v", err) + if err := p.eventer.Start(); err != nil { + p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err) } - if lec.WatchDog != nil { - lec.WatchDog.SetLeaderElection(le) - } - p.logger.Debugf("Starting Leader Elector") - go le.Run(ctx) } // Stop signals the stop channel to force the watch loop routine to stop. func (p *Provider) Stop() { - if p.eventer != nil { - p.eventer.Stop() - } - if p.cancelLeaderElection != nil { - p.cancelLeaderElection() - } + p.eventer.Stop() } // String returns a description of kubernetes autodiscover provider. @@ -192,7 +173,42 @@ func (p *Provider) publish(event bus.Event) { p.bus.Publish(event) } -func (p *Provider) startLeading(uuid string, eventID string) { +func uniqueProviderBuilder(p *Provider, client k8s.Interface, uuid uuid.UUID) (autodiscover.Provider, error) { + uniqueProvider := &UniqueProvider{ + Provider: *p, + } + uniqueProvider.initLeaderElectionConfig(client, uuid.String()) + return uniqueProvider, nil +} + +// Start for Runner interface. +func (p *UniqueProvider) Start() { + ctx, cancel := context.WithCancel(context.Background()) + p.cancelLeaderElection = cancel + p.startLeaderElector(ctx, p.leaderElection) +} + +// Stop signals the stop channel to force the watch loop routine to stop. +func (p *UniqueProvider) Stop() { + if p.cancelLeaderElection != nil { + p.cancelLeaderElection() + } +} + +// startLeaderElector starts a Leader Elector in the background with the provided config +func (p *UniqueProvider) startLeaderElector(ctx context.Context, lec leaderelection.LeaderElectionConfig) { + le, err := leaderelection.NewLeaderElector(lec) + if err != nil { + p.logger.Errorf("leader election lock GAINED, id %v", err) + } + if lec.WatchDog != nil { + lec.WatchDog.SetLeaderElection(le) + } + p.logger.Debugf("Starting Leader Elector") + go le.Run(ctx) +} + +func (p *UniqueProvider) startLeading(uuid string, eventID string) { event := bus.Event{ "start": true, "provider": uuid, @@ -205,7 +221,7 @@ func (p *Provider) startLeading(uuid string, eventID string) { p.bus.Publish(event) } -func (p *Provider) stopLeading(uuid string, eventID string) { +func (p *UniqueProvider) stopLeading(uuid string, eventID string) { event := bus.Event{ "stop": true, "provider": uuid, @@ -218,7 +234,7 @@ func (p *Provider) stopLeading(uuid string, eventID string) { p.bus.Publish(event) } -func (p *Provider) initLeaderElectionConfig(client k8s.Interface, uuid string) { +func (p *UniqueProvider) initLeaderElectionConfig(client k8s.Interface, uuid string) { var id string if p.config.Node != "" { id = "beats-leader-" + p.config.Node From 5ef9a9a28930a8de2f307a8a6780aa411bfc1196 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 5 Aug 2020 15:17:44 +0300 Subject: [PATCH 10/19] Introduce EventManager wrapper Signed-off-by: chrismark --- .../providers/kubernetes/kubernetes.go | 202 +++++++++++------- .../providers/kubernetes/node_test.go | 11 +- .../providers/kubernetes/pod_test.go | 8 +- .../providers/kubernetes/service_test.go | 9 +- 4 files changed, 148 insertions(+), 82 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index ac4a07324cc0..28c08d80a1bb 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -54,22 +54,35 @@ type Eventer interface { Stop() } +// EventManager allows defining ways in which kubernetes resource events are observed and processed +type EventManager interface { + GenerateHints(event bus.Event) bus.Event + Start() + Stop() +} + // Provider implements autodiscover provider for docker containers type Provider struct { - config *Config - bus bus.Bus - templates template.Mapper - builders autodiscover.Builders - appenders autodiscover.Appenders - logger *logp.Logger - eventer Eventer + config *Config + bus bus.Bus + templates template.Mapper + builders autodiscover.Builders + appenders autodiscover.Appenders + logger *logp.Logger + eventManager EventManager +} + +// eventerManager implements unique autodiscover provider for Kubernetes clusters +type eventerManager struct { + eventer Eventer + logger *logp.Logger } -// UniqueProvider implements unique autodiscover provider for Kubernetes clusters -type UniqueProvider struct { - Provider +// leaderElectionManager implements unique autodiscover provider for Kubernetes clusters +type leaderElectionManager struct { leaderElection leaderelection.LeaderElectionConfig cancelLeaderElection context.CancelFunc + logger *logp.Logger } // AutodiscoverBuilder builds and returns an autodiscover provider @@ -118,22 +131,9 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore } if p.config.Unique { - return uniqueProviderBuilder(p, client, uuid) + p.eventManager, err = NewLeaderElectionManager(uuid, config, client, p.startLeading, p.stopLeading, logger) } else { - switch config.Resource { - case "pod": - p.eventer, err = NewPodEventer(uuid, c, client, p.publish) - case "node": - p.eventer, err = NewNodeEventer(uuid, c, client, p.publish) - case "service": - p.eventer, err = NewServiceEventer(uuid, c, client, p.publish) - default: - return nil, fmt.Errorf("unsupported autodiscover resource %s", config.Resource) - } - - if err != nil { - return nil, errWrap(err) - } + p.eventManager, err = NewEventerManager(uuid, c, config, client, p.publish, errWrap) } return p, nil @@ -141,14 +141,12 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore // Start for Runner interface. func (p *Provider) Start() { - if err := p.eventer.Start(); err != nil { - p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err) - } + p.eventManager.Start() } // Stop signals the stop channel to force the watch loop routine to stop. func (p *Provider) Stop() { - p.eventer.Stop() + p.eventManager.Stop() } // String returns a description of kubernetes autodiscover provider. @@ -162,7 +160,7 @@ func (p *Provider) publish(event bus.Event) { event["config"] = config } else { // If there isn't a default template then attempt to use builders - e := p.eventer.GenerateHints(event) + e := p.eventManager.GenerateHints(event) if config := p.builders.GetConfig(e); config != nil { event["config"] = config } @@ -173,42 +171,7 @@ func (p *Provider) publish(event bus.Event) { p.bus.Publish(event) } -func uniqueProviderBuilder(p *Provider, client k8s.Interface, uuid uuid.UUID) (autodiscover.Provider, error) { - uniqueProvider := &UniqueProvider{ - Provider: *p, - } - uniqueProvider.initLeaderElectionConfig(client, uuid.String()) - return uniqueProvider, nil -} - -// Start for Runner interface. -func (p *UniqueProvider) Start() { - ctx, cancel := context.WithCancel(context.Background()) - p.cancelLeaderElection = cancel - p.startLeaderElector(ctx, p.leaderElection) -} - -// Stop signals the stop channel to force the watch loop routine to stop. -func (p *UniqueProvider) Stop() { - if p.cancelLeaderElection != nil { - p.cancelLeaderElection() - } -} - -// startLeaderElector starts a Leader Elector in the background with the provided config -func (p *UniqueProvider) startLeaderElector(ctx context.Context, lec leaderelection.LeaderElectionConfig) { - le, err := leaderelection.NewLeaderElector(lec) - if err != nil { - p.logger.Errorf("leader election lock GAINED, id %v", err) - } - if lec.WatchDog != nil { - lec.WatchDog.SetLeaderElection(le) - } - p.logger.Debugf("Starting Leader Elector") - go le.Run(ctx) -} - -func (p *UniqueProvider) startLeading(uuid string, eventID string) { +func (p *Provider) startLeading(uuid string, eventID string) { event := bus.Event{ "start": true, "provider": uuid, @@ -221,7 +184,7 @@ func (p *UniqueProvider) startLeading(uuid string, eventID string) { p.bus.Publish(event) } -func (p *UniqueProvider) stopLeading(uuid string, eventID string) { +func (p *Provider) stopLeading(uuid string, eventID string) { event := bus.Event{ "stop": true, "provider": uuid, @@ -234,19 +197,54 @@ func (p *UniqueProvider) stopLeading(uuid string, eventID string) { p.bus.Publish(event) } -func (p *UniqueProvider) initLeaderElectionConfig(client k8s.Interface, uuid string) { +func NewEventerManager( + uuid uuid.UUID, + c *common.Config, + cfg *Config, + client k8s.Interface, + publish func(event bus.Event), + errWrap func(err error) error, +) (EventManager, error) { + var err error + em := &eventerManager{} + switch cfg.Resource { + case "pod": + em.eventer, err = NewPodEventer(uuid, c, client, publish) + case "node": + em.eventer, err = NewNodeEventer(uuid, c, client, publish) + case "service": + em.eventer, err = NewServiceEventer(uuid, c, client, publish) + default: + return nil, fmt.Errorf("unsupported autodiscover resource %s", cfg.Resource) + } + + if err != nil { + return nil, errWrap(err) + } + return em, nil +} + +func NewLeaderElectionManager( + uuid uuid.UUID, + cfg *Config, + client k8s.Interface, + startLeading func(uuid string, eventID string), + stopLeading func(uuid string, eventID string), + logger *logp.Logger, +) (EventManager, error) { + lem := &leaderElectionManager{} var id string - if p.config.Node != "" { - id = "beats-leader-" + p.config.Node + if cfg.Node != "" { + id = "beats-leader-" + cfg.Node } else { - id = "beats-leader-" + uuid + id = "beats-leader-" + uuid.String() } lease := metav1.ObjectMeta{ - Name: p.config.LeaderLease, + Name: cfg.LeaderLease, Namespace: "default", } metaUID := lease.GetObjectMeta().GetUID() - p.leaderElection = leaderelection.LeaderElectionConfig{ + lem.leaderElection = leaderelection.LeaderElectionConfig{ Lock: &resourcelock.LeaseLock{ LeaseMeta: lease, Client: client.CoordinationV1(), @@ -260,15 +258,65 @@ func (p *UniqueProvider) initLeaderElectionConfig(client k8s.Interface, uuid str RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { - p.logger.Debugf("leader election lock GAINED, id %v", id) + logger.Debugf("leader election lock GAINED, id %v", id) eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano()) - p.startLeading(uuid, eventID) + startLeading(uuid.String(), eventID) }, OnStoppedLeading: func() { - p.logger.Debugf("leader election lock LOST, id %v", id) + logger.Debugf("leader election lock LOST, id %v", id) eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano()) - p.stopLeading(uuid, eventID) + stopLeading(uuid.String(), eventID) }, }, } + return lem, nil +} + +// Start for EventManager interface. +func (p *eventerManager) Start() { + if err := p.eventer.Start(); err != nil { + p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err) + } +} + +// Stop signals the stop channel to force the watch loop routine to stop. +func (p *eventerManager) Stop() { + p.eventer.Stop() +} + +// GenerateHints for EventManager interface. +func (p *eventerManager) GenerateHints(event bus.Event) bus.Event { + return p.eventer.GenerateHints(event) +} + +// Start for EventManager interface. +func (p *leaderElectionManager) Start() { + ctx, cancel := context.WithCancel(context.Background()) + p.cancelLeaderElection = cancel + p.startLeaderElector(ctx, p.leaderElection) +} + +// Stop signals the stop channel to force the leader election loop routine to stop. +func (p *leaderElectionManager) Stop() { + if p.cancelLeaderElection != nil { + p.cancelLeaderElection() + } +} + +// GenerateHints for EventManager interface. +func (p *leaderElectionManager) GenerateHints(event bus.Event) bus.Event { + return event +} + +// startLeaderElector starts a Leader Elector in the background with the provided config +func (p *leaderElectionManager) startLeaderElector(ctx context.Context, lec leaderelection.LeaderElectionConfig) { + le, err := leaderelection.NewLeaderElector(lec) + if err != nil { + p.logger.Errorf("leader election lock GAINED, id %v", err) + } + if lec.WatchDog != nil { + lec.WatchDog.SetLeaderElection(le) + } + p.logger.Debugf("Starting Leader Elector") + go le.Run(ctx) } diff --git a/libbeat/autodiscover/providers/kubernetes/node_test.go b/libbeat/autodiscover/providers/kubernetes/node_test.go index 736bd153cf20..8c29c008fc8a 100644 --- a/libbeat/autodiscover/providers/kubernetes/node_test.go +++ b/libbeat/autodiscover/providers/kubernetes/node_test.go @@ -246,8 +246,9 @@ func TestEmitEvent_Node(t *testing.T) { } metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil) + config := defaultConfig() p := &Provider{ - config: defaultConfig(), + config: config, bus: bus.New(logp.NewLogger("bus"), "test"), templates: mapper, logger: logp.NewLogger("kubernetes"), @@ -261,7 +262,7 @@ func TestEmitEvent_Node(t *testing.T) { logger: logp.NewLogger("kubernetes.no"), } - p.eventer = no + p.eventManager = NewMockNodeEventerManager(no) listener := p.bus.Subscribe() @@ -279,6 +280,12 @@ func TestEmitEvent_Node(t *testing.T) { } } +func NewMockNodeEventerManager(no *node) EventManager { + em := &eventerManager{} + em.eventer = no + return em +} + func TestNode_isUpdated(t *testing.T) { tests := []struct { old *kubernetes.Node diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index f22bfc64ceac..0e435fda4c5f 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -1013,7 +1013,7 @@ func TestEmitEvent(t *testing.T) { logger: logp.NewLogger("kubernetes.pod"), } - p.eventer = pod + p.eventManager = NewMockPodEventerManager(pod) listener := p.bus.Subscribe() @@ -1034,6 +1034,12 @@ func TestEmitEvent(t *testing.T) { } } +func NewMockPodEventerManager(pod *pod) EventManager { + em := &eventerManager{} + em.eventer = pod + return em +} + func getNestedAnnotations(in common.MapStr) common.MapStr { out := common.MapStr{} diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index 7ead61fc3a03..c45c691f75e6 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -414,8 +414,7 @@ func TestEmitEvent_Service(t *testing.T) { logger: logp.NewLogger("kubernetes.service"), } - p.eventer = service - + p.eventManager = NewMockServiceEventerManager(service) listener := p.bus.Subscribe() service.emit(test.Service, test.Flag) @@ -431,3 +430,9 @@ func TestEmitEvent_Service(t *testing.T) { }) } } + +func NewMockServiceEventerManager(svc *service) EventManager { + em := &eventerManager{} + em.eventer = svc + return em +} From 8fd60699f3d6ebef7a23beca49f2d3773302d89b Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 5 Aug 2020 15:19:37 +0300 Subject: [PATCH 11/19] fix docstring Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/kubernetes.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 28c08d80a1bb..774d05c80067 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -72,13 +72,13 @@ type Provider struct { eventManager EventManager } -// eventerManager implements unique autodiscover provider for Kubernetes clusters +// eventerManager implements start/stop methods for autodiscover provider with resource eventer type eventerManager struct { eventer Eventer logger *logp.Logger } -// leaderElectionManager implements unique autodiscover provider for Kubernetes clusters +// leaderElectionManager implements start/stop methods for unique autodiscover provider for Kubernetes clusters type leaderElectionManager struct { leaderElection leaderelection.LeaderElectionConfig cancelLeaderElection context.CancelFunc From def947ac389cc6edd6486b8ed1d751964f58af80 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 5 Aug 2020 15:31:22 +0300 Subject: [PATCH 12/19] fix docstring Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/kubernetes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 774d05c80067..9c764d731f79 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -78,7 +78,7 @@ type eventerManager struct { logger *logp.Logger } -// leaderElectionManager implements start/stop methods for unique autodiscover provider for Kubernetes clusters +// leaderElectionManager implements start/stop methods for autodiscover provider with leaderElection type leaderElectionManager struct { leaderElection leaderelection.LeaderElectionConfig cancelLeaderElection context.CancelFunc From b1a1905fb0e64955cc19979a7b372289d782736b Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 5 Aug 2020 17:33:51 +0300 Subject: [PATCH 13/19] Improve error handling Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/kubernetes.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 9c764d731f79..904c6b8cf708 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -133,7 +133,11 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore if p.config.Unique { p.eventManager, err = NewLeaderElectionManager(uuid, config, client, p.startLeading, p.stopLeading, logger) } else { - p.eventManager, err = NewEventerManager(uuid, c, config, client, p.publish, errWrap) + p.eventManager, err = NewEventerManager(uuid, c, config, client, p.publish) + } + + if err != nil { + return nil, errWrap(err) } return p, nil @@ -203,7 +207,6 @@ func NewEventerManager( cfg *Config, client k8s.Interface, publish func(event bus.Event), - errWrap func(err error) error, ) (EventManager, error) { var err error em := &eventerManager{} @@ -219,7 +222,7 @@ func NewEventerManager( } if err != nil { - return nil, errWrap(err) + return nil, err } return em, nil } From 536cedee954b3824e439597e33ce8d06a566679b Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 6 Aug 2020 11:38:12 +0300 Subject: [PATCH 14/19] Add documentation for new settings Signed-off-by: chrismark --- go.mod | 2 +- libbeat/docs/shared-autodiscover.asciidoc | 34 +++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index dcbadfbb2bff..71acd2ff53fd 100644 --- a/go.mod +++ b/go.mod @@ -167,7 +167,7 @@ require ( golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - golang.org/x/tools v0.0.0-20200804234916-fec4f28ebb08 + golang.org/x/tools v0.0.0-20200806022845-90696ccdc692 google.golang.org/api v0.15.0 google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb google.golang.org/grpc v1.29.1 diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index 7c59b9f84520..a3c4ab1279c7 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -242,6 +242,40 @@ running configuration for a container, 60s by default. include_labels: ["nodelabel2"] ------------------------------------------------------------------------------------- +`unique`:: (Optional) Defaults to `false`. Marking an autodiscover provider as unique results into + making the provider to enable the provided templates only when it will gain the leader lease. + This setting can only be combined with `cluster` scope. When `unique` is enabled enabled, `resource` + and `add_resource_metadata` settings are not taken into account. +`leader_lease`:: (Optional) Defaults to `beats-cluster-leader`. This will be name of the lock lease. + One can monitor the status of the lease with `kubectl describe lease beats-cluster-leader`. + Different Beats that refer to the same leader lease will be competetitors in holding the lease + and only one will be elected as leader each time. Example: + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +metricbeat.autodiscover: + providers: + - type: kubernetes + scope: cluster + node: ${NODE_NAME} + unique: true + identifier: leader-election-metricbeat + templates: + - config: + - module: kubernetes + hosts: ["kube-state-metrics:8080"] + period: 10s + add_metadata: true + metricsets: + - state_node +------------------------------------------------------------------------------------- + +The above configuration when deployed on one or more Metribceat instances will enable `state_node` +metricset only for the Metricbeat instance that will gain the leader lease/lock. With this deployment +strategy we can ensure that cluster-wide metricsets are only enabled by one Beat instance when +deploying a Beat as Daemonset. + + include::../../{beatname_lc}/docs/autodiscover-kubernetes-config.asciidoc[] ifdef::autodiscoverJolokia[] From ced788a514ed60963c68ea723e296c553450037d Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 6 Aug 2020 11:40:30 +0300 Subject: [PATCH 15/19] Add changelog entry Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c12c2a9574f3..07210c99efb2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -364,6 +364,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added the `max_cached_sessions` option to the script processor. {pull}19562[19562] - Add support for DNS over TLS for the dns_processor. {pull}19321[19321] - Set index.max_docvalue_fields_search in index template to increase value to 200 fields. {issue}20215[20215] +- Add leader election for autodiscover. {pull}20281[20281] *Auditbeat* From 7de6f96c18ed5cbbabe433317922b0c064da6bd8 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 6 Aug 2020 14:40:15 +0300 Subject: [PATCH 16/19] Update default lease name to fallback to Beats' name Signed-off-by: chrismark --- libbeat/autodiscover/autodiscover.go | 2 +- libbeat/autodiscover/provider.go | 6 +++--- libbeat/autodiscover/providers/docker/docker.go | 8 +++++++- libbeat/autodiscover/providers/jolokia/jolokia.go | 8 +++++++- libbeat/autodiscover/providers/kubernetes/config.go | 1 - .../autodiscover/providers/kubernetes/kubernetes.go | 11 +++++++++-- libbeat/docs/shared-autodiscover.asciidoc | 2 +- .../autodiscover/providers/aws/ec2/provider.go | 8 +++++++- .../autodiscover/providers/aws/elb/provider.go | 8 +++++++- 9 files changed, 42 insertions(+), 12 deletions(-) diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index e26a2521c16d..ce175b650721 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -83,7 +83,7 @@ func NewAutodiscover( // Init providers var providers []Provider for _, providerCfg := range config.Providers { - provider, err := Registry.BuildProvider(bus, providerCfg, keystore) + provider, err := Registry.BuildProvider(name, bus, providerCfg, keystore) if err != nil { return nil, errors.Wrap(err, "error in autodiscover provider settings") } diff --git a/libbeat/autodiscover/provider.go b/libbeat/autodiscover/provider.go index 510e09ab4bf4..ce4a0c504236 100644 --- a/libbeat/autodiscover/provider.go +++ b/libbeat/autodiscover/provider.go @@ -35,7 +35,7 @@ type Provider interface { } // ProviderBuilder creates a new provider based on the given config and returns it -type ProviderBuilder func(bus.Bus, uuid.UUID, *common.Config, keystore.Keystore) (Provider, error) +type ProviderBuilder func(string, bus.Bus, uuid.UUID, *common.Config, keystore.Keystore) (Provider, error) // AddProvider registers a new ProviderBuilder func (r *registry) AddProvider(name string, provider ProviderBuilder) error { @@ -70,7 +70,7 @@ func (r *registry) GetProvider(name string) ProviderBuilder { } // BuildProvider reads provider configuration and instantiate one -func (r *registry) BuildProvider(bus bus.Bus, c *common.Config, keystore keystore.Keystore) (Provider, error) { +func (r *registry) BuildProvider(beatName string, bus bus.Bus, c *common.Config, keystore keystore.Keystore) (Provider, error) { var config ProviderConfig err := c.Unpack(&config) if err != nil { @@ -87,5 +87,5 @@ func (r *registry) BuildProvider(bus bus.Bus, c *common.Config, keystore keystor return nil, err } - return builder(bus, uuid, c, keystore) + return builder(beatName, bus, uuid, c, keystore) } diff --git a/libbeat/autodiscover/providers/docker/docker.go b/libbeat/autodiscover/providers/docker/docker.go index 553b981177ec..2680eab54b3b 100644 --- a/libbeat/autodiscover/providers/docker/docker.go +++ b/libbeat/autodiscover/providers/docker/docker.go @@ -59,7 +59,13 @@ type Provider struct { } // AutodiscoverBuilder builds and returns an autodiscover provider -func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) { +func AutodiscoverBuilder( + beatName string, + bus bus.Bus, + uuid uuid.UUID, + c *common.Config, + keystore keystore.Keystore, +) (autodiscover.Provider, error) { logger := logp.NewLogger("docker") errWrap := func(err error) error { diff --git a/libbeat/autodiscover/providers/jolokia/jolokia.go b/libbeat/autodiscover/providers/jolokia/jolokia.go index 5a8876a011ac..928c1cc8e786 100644 --- a/libbeat/autodiscover/providers/jolokia/jolokia.go +++ b/libbeat/autodiscover/providers/jolokia/jolokia.go @@ -53,7 +53,13 @@ type Provider struct { // AutodiscoverBuilder builds a Jolokia Discovery autodiscover provider, it fails if // there is some problem with the configuration -func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) { +func AutodiscoverBuilder( + beatName string, + bus bus.Bus, + uuid uuid.UUID, + c *common.Config, + keystore keystore.Keystore, +) (autodiscover.Provider, error) { errWrap := func(err error) error { return errors.Wrap(err, "error setting up jolokia autodiscover provider") } diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index d14ef50dff4f..84672659f742 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -64,7 +64,6 @@ func defaultConfig() *Config { CleanupTimeout: 60 * time.Second, Prefix: "co.elastic", Unique: false, - LeaderLease: "beats-cluster-leader", } } diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 904c6b8cf708..3307a7397206 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -86,7 +86,13 @@ type leaderElectionManager struct { } // AutodiscoverBuilder builds and returns an autodiscover provider -func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) { +func AutodiscoverBuilder( + beatName string, + bus bus.Bus, + uuid uuid.UUID, + c *common.Config, + keystore keystore.Keystore, +) (autodiscover.Provider, error) { logger := logp.NewLogger("autodiscover") errWrap := func(err error) error { @@ -94,6 +100,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore } config := defaultConfig() + config.LeaderLease = fmt.Sprintf("%v-cluster-leader", beatName) err := c.Unpack(&config) if err != nil { return nil, errWrap(err) @@ -235,7 +242,7 @@ func NewLeaderElectionManager( stopLeading func(uuid string, eventID string), logger *logp.Logger, ) (EventManager, error) { - lem := &leaderElectionManager{} + lem := &leaderElectionManager{logger: logger} var id string if cfg.Node != "" { id = "beats-leader-" + cfg.Node diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index a3c4ab1279c7..c7993c29bef5 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -246,7 +246,7 @@ running configuration for a container, 60s by default. making the provider to enable the provided templates only when it will gain the leader lease. This setting can only be combined with `cluster` scope. When `unique` is enabled enabled, `resource` and `add_resource_metadata` settings are not taken into account. -`leader_lease`:: (Optional) Defaults to `beats-cluster-leader`. This will be name of the lock lease. +`leader_lease`:: (Optional) Defaults to `{beatname_lc}-cluster-leader`. This will be name of the lock lease. One can monitor the status of the lease with `kubectl describe lease beats-cluster-leader`. Different Beats that refer to the same leader lease will be competetitors in holding the lease and only one will be elected as leader each time. Example: diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go index 06b153626fe1..029a54d5403b 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -37,7 +37,13 @@ type Provider struct { } // AutodiscoverBuilder is the main builder for this provider. -func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) { +func AutodiscoverBuilder( + beatName string, + bus bus.Bus, + uuid uuid.UUID, + c *common.Config, + keystore keystore.Keystore, +) (autodiscover.Provider, error) { cfgwarn.Experimental("aws_ec2 autodiscover is experimental") config := awsauto.DefaultConfig() diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go index 54021a8db883..b0617baad4da 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go @@ -39,7 +39,13 @@ type Provider struct { } // AutodiscoverBuilder is the main builder for this provider. -func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) { +func AutodiscoverBuilder( + beatName string, + bus bus.Bus, + uuid uuid.UUID, + c *common.Config, + keystore keystore.Keystore, +) (autodiscover.Provider, error) { cfgwarn.Experimental("aws_elb autodiscover is experimental") config := awsauto.DefaultConfig() From c0b4d23a80eac495f70125ad05d8cacd1624259a Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 6 Aug 2020 15:08:57 +0300 Subject: [PATCH 17/19] fix tests Signed-off-by: chrismark --- libbeat/autodiscover/autodiscover_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index deec66ece8e7..49dc50509e66 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -140,7 +140,7 @@ func TestAutodiscover(t *testing.T) { // Register mock autodiscover provider busChan := make(chan bus.Bus, 1) Registry = NewRegistry() - Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) { + Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) { // intercept bus to mock events busChan <- b @@ -259,7 +259,7 @@ func TestAutodiscoverHash(t *testing.T) { busChan := make(chan bus.Bus, 1) Registry = NewRegistry() - Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) { + Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) { // intercept bus to mock events busChan <- b @@ -323,7 +323,7 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) { // Register mock autodiscover provider busChan := make(chan bus.Bus, 1) Registry = NewRegistry() - Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) { + Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) { // intercept bus to mock events busChan <- b From 3f725d5d7fe9b5417db63e6dc1269ba44cabc29d Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 6 Aug 2020 16:09:31 +0300 Subject: [PATCH 18/19] Fix docker test Signed-off-by: chrismark --- .../autodiscover/providers/docker/docker_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/autodiscover/providers/docker/docker_integration_test.go b/libbeat/autodiscover/providers/docker/docker_integration_test.go index 5d6baaa0b83b..bbb2bc979bcc 100644 --- a/libbeat/autodiscover/providers/docker/docker_integration_test.go +++ b/libbeat/autodiscover/providers/docker/docker_integration_test.go @@ -56,7 +56,7 @@ func TestDockerStart(t *testing.T) { s := &template.MapperSettings{nil, nil} config.Templates = *s k, _ := keystore.NewFileKeystore("test") - provider, err := AutodiscoverBuilder(bus, UUID, common.MustNewConfigFrom(config), k) + provider, err := AutodiscoverBuilder("mockBeat", bus, UUID, common.MustNewConfigFrom(config), k) if err != nil { t.Fatal(err) } From fc3f8473b367ea0f7a14268b728e75e11f7eccd5 Mon Sep 17 00:00:00 2001 From: chrismark Date: Fri, 7 Aug 2020 12:08:32 +0300 Subject: [PATCH 19/19] review changes Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 2 +- libbeat/autodiscover/providers/kubernetes/kubernetes.go | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7df4c49c81a2..6737decd3ccc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -367,7 +367,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added the `max_cached_sessions` option to the script processor. {pull}19562[19562] - Add support for DNS over TLS for the dns_processor. {pull}19321[19321] - Set index.max_docvalue_fields_search in index template to increase value to 200 fields. {issue}20215[20215] -- Add leader election for autodiscover. {pull}20281[20281] +- Add leader election for Kubernetes autodiscover. {pull}20281[20281] - Add capability of enriching process metadata with contianer id also for non-privileged containers in `add_process_metadata` processor. {pull}19767[19767] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 3307a7397206..190c646ef0c9 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -301,7 +301,7 @@ func (p *eventerManager) GenerateHints(event bus.Event) bus.Event { // Start for EventManager interface. func (p *leaderElectionManager) Start() { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.TODO()) p.cancelLeaderElection = cancel p.startLeaderElector(ctx, p.leaderElection) } @@ -322,10 +322,7 @@ func (p *leaderElectionManager) GenerateHints(event bus.Event) bus.Event { func (p *leaderElectionManager) startLeaderElector(ctx context.Context, lec leaderelection.LeaderElectionConfig) { le, err := leaderelection.NewLeaderElector(lec) if err != nil { - p.logger.Errorf("leader election lock GAINED, id %v", err) - } - if lec.WatchDog != nil { - lec.WatchDog.SetLeaderElection(le) + p.logger.Errorf("error while creating Leader Elector: %v", err) } p.logger.Debugf("Starting Leader Elector") go le.Run(ctx)