Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leader election for autodiscover #20281

Merged
merged 22 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add leader election for autodiscover
Signed-off-by: chrismark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark committed Jul 28, 2020
commit 5309468cb46bb5708d70471305f99a0e23ff7898
6 changes: 6 additions & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
Expand Down
6 changes: 6 additions & 0 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ require (
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200701041122-1837592efa10
golang.org/x/tools v0.0.0-20200727233628-55644ead90ce
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.29.1
Expand Down
5 changes: 5 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Config struct {
// Scope can be either node or cluster.
Scope string `config:"scope"`
Resource string `config:"resource"`
// Unique identifies if this provider enables it's templates only when it is elected as leader in a k8s cluster
Unique bool `config:"unique"`

Prefix string `config:"prefix"`
Hints *common.Config `config:"hints"`
Expand Down Expand Up @@ -98,6 +100,9 @@ func (c *Config) Validate() error {
if c.Scope != "node" && c.Scope != "cluster" {
return fmt.Errorf("invalid `scope` configured. supported values are `node` and `cluster`")
}
if c.Unique && c.Scope != "cluster" {
logp.L().Warnf("can only set `unique` when scope is `cluster`")
}

return nil
}
88 changes: 81 additions & 7 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
package kubernetes

import (
"context"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/gofrs/uuid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -49,13 +56,15 @@ type Eventer interface {

// Provider implements autodiscover provider for docker containers
type Provider struct {
config *Config
bus bus.Bus
templates template.Mapper
builders autodiscover.Builders
appenders autodiscover.Appenders
logger *logp.Logger
eventer Eventer
config *Config
bus bus.Bus
templates template.Mapper
builders autodiscover.Builders
appenders autodiscover.Appenders
logger *logp.Logger
eventer Eventer
leaderElection leaderelection.LeaderElectionConfig
cancel context.CancelFunc
}

// AutodiscoverBuilder builds and returns an autodiscover provider
Expand Down Expand Up @@ -118,6 +127,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore
return nil, errWrap(err)
}

p.leaderElection = p.newLeaderElectionConfig(client, "some")
return p, nil
}

Expand All @@ -126,11 +136,15 @@ func (p *Provider) Start() {
if err := p.eventer.Start(); err != nil {
p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
leaderelection.RunOrDie(ctx, p.leaderElection)
}

// Stop signals the stop channel to force the watch loop routine to stop.
func (p *Provider) Stop() {
p.eventer.Stop()
p.cancel()
}

// String returns a description of kubernetes autodiscover provider.
Expand All @@ -154,3 +168,63 @@ func (p *Provider) publish(event bus.Event) {
p.appenders.Append(event)
p.bus.Publish(event)
}

func (p *Provider) startLeading(uuid string, eventID string) {
event := bus.Event{
"start": true,
"provider": uuid,
"id": eventID,
"unique": "true",
}
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
}
p.bus.Publish(event)
}

func (p *Provider) stopLeading(uuid string, eventID string) {
event := bus.Event{
"stop": true,
"provider": uuid,
"id": eventID,
"unique": "true",
}
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
}
p.bus.Publish(event)
}

func (p *Provider) newLeaderElectionConfig(client k8s.Interface, uuid string) leaderelection.LeaderElectionConfig {
id := "beats-leader-" + uuid
lease := metav1.ObjectMeta{
Name: "beats-cluster-leader",
Namespace: "default",
}
metaUID := lease.GetObjectMeta().GetUID()
return leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: lease,
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
},
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
p.logger.Debugf("leader election lock GAINED, id %v", id)
eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano())
p.startLeading(uuid, eventID)
},
OnStoppedLeading: func() {
p.logger.Debugf("leader election lock LOST, id %v", id)
eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano())
p.stopLeading(uuid, eventID)
},
},
}
}