Skip to content

Commit

Permalink
Add complete k8s metadata through composable provider (elastic#27691)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored and wiwen committed Nov 1, 2021
1 parent 71fc5e2 commit e1761fd
Show file tree
Hide file tree
Showing 10 changed files with 899 additions and 319 deletions.
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newLocker(b *Beat) *locker {
}
}

// lock attemps to acquire a lock on the data path for the currently-running
// lock attempts to acquire a lock on the data path for the currently-running
// Beat instance. If another Beats instance already has a lock on the same data path
// an ErrAlreadyLocked error is returned.
func (l *locker) lock() error {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,5 @@
- Add new --enroll-delay option for install and enroll commands. {pull}27118[27118]
- Add link to troubleshooting guide on fatal exits. {issue}26367[26367] {pull}27236[27236]
- Agent now adapts the beats queue size based on output settings. {issue}26638[26638] {pull}27429[27429]
- Support ephemeral containers in Kubernetes dynamic provider. {issue}#27020[#27020] {pull}27707[27707]
- Support ephemeral containers in Kubernetes dynamic provider. {issue}27020[#27020] {pull}27707[27707]
- Add complete k8s metadata through composable provider. {pull}27691[27691]
14 changes: 14 additions & 0 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package kubernetes
import (
"time"

"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand All @@ -25,6 +26,16 @@ type Config struct {

// Needed when resource is a Pod or Node
Node string `config:"node"`

AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`

LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`

// Undocumented settings, to be deprecated in favor of `drop_fields` processor:
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
}

// Resources config section for resources' config blocks
Expand All @@ -44,6 +55,9 @@ func (c *Config) InitDefaults() {
c.CleanupTimeout = 60 * time.Second
c.SyncPeriod = 10 * time.Minute
c.Scope = "node"
c.IncludeCreatorMetadata = true
c.LabelsDedot = true
c.AnnotationsDedot = true
}

// Validate ensures correctness of config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,26 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}

return &dynamicProvider{logger, &cfg}, nil
}

// Run runs the kubernetes context provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
if p.config.Resources.Pod.Enabled {
err := p.watchResource(comm, "pod", p.config)
err := p.watchResource(comm, "pod")
if err != nil {
return err
}
}
if p.config.Resources.Node.Enabled {
err := p.watchResource(comm, "node", p.config)
err := p.watchResource(comm, "node")
if err != nil {
return err
}
}
if p.config.Resources.Service.Enabled {
err := p.watchResource(comm, "service", p.config)
err := p.watchResource(comm, "service")
if err != nil {
return err
}
Expand All @@ -76,9 +77,8 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
// and starts watching for such resource's events.
func (p *dynamicProvider) watchResource(
comm composable.DynamicProviderComm,
resourceType string,
config *Config) error {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
resourceType string) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err)
Expand All @@ -93,24 +93,24 @@ func (p *dynamicProvider) watchResource(
p.logger.Debugf(
"Initializing Kubernetes watcher for resource %s using node: %v",
resourceType,
config.Node)
p.config.Node)
nd := &kubernetes.DiscoverKubernetesNodeParams{
ConfigHost: config.Node,
ConfigHost: p.config.Node,
Client: client,
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
IsInCluster: kubernetes.IsInCluster(p.config.KubeConfig),
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
}
config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd)
p.config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd)
if err != nil {
p.logger.Debugf("Kubernetes provider skipped, unable to discover node: %w", err)
return nil
}

} else {
config.Node = ""
p.config.Node = ""
}

watcher, err := p.newWatcher(resourceType, comm, client, config)
watcher, err := p.newWatcher(resourceType, comm, client)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}
Expand All @@ -126,23 +126,22 @@ func (p *dynamicProvider) watchResource(
func (p *dynamicProvider) newWatcher(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface,
config *Config) (kubernetes.Watcher, error) {
client k8s.Interface) (kubernetes.Watcher, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
case "node":
watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
case "service":
watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
Expand Down
61 changes: 37 additions & 24 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand All @@ -25,6 +26,7 @@ type node struct {
comm composable.DynamicProviderComm
scope string
config *Config
metagen metadata.MetaGen
}

type nodeData struct {
Expand All @@ -49,13 +51,25 @@ func NewNodeWatcher(
if err != nil {
return nil, errors.New(err, "couldn't create kubernetes watcher")
}
watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope, cfg})

rawConfig, err := common.NewConfigFrom(cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client)
watcher.AddEventHandler(&node{
logger,
cfg.CleanupTimeout,
comm,
scope,
cfg,
metaGen})

return watcher, nil
}

func (n *node) emitRunning(node *kubernetes.Node) {
data := generateNodeData(node, n.config)
data := generateNodeData(node, n.config, n.metagen)
if data == nil {
return
}
Expand Down Expand Up @@ -165,7 +179,7 @@ func isNodeReady(node *kubernetes.Node) bool {
return false
}

func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
func generateNodeData(node *kubernetes.Node, cfg *Config, kubeMetaGen metadata.MetaGen) *nodeData {
host := getAddress(node)

// If a node doesn't have an IP then dont monitor it
Expand All @@ -178,41 +192,40 @@ func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
return nil
}

//TODO: add metadata here too ie -> meta := n.metagen.Generate(node)
meta := kubeMetaGen.Generate(node)
kubemetaMap, err := meta.GetValue("kubernetes")
if err != nil {
return &nodeData{}
}

// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := common.MapStr{}
for k, v := range node.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}

labels := common.MapStr{}
for k, v := range node.GetObjectMeta().GetLabels() {
// TODO: add dedoting option
safemapstr.Put(labels, k, v)
}
// k8sMapping includes only the metadata that fall under kubernetes.*
// and these are available as dynamic vars through the provider
k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone())

mapping := map[string]interface{}{
"node": map[string]interface{}{
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": labels,
"annotations": annotations,
"ip": host,
},
}
// add annotations to be discoverable by templates
k8sMapping["annotations"] = annotations

processors := []map[string]interface{}{
{
processors := []map[string]interface{}{}
// meta map includes metadata that go under kubernetes.*
// but also other ECS fields like orchestrator.*
for field, metaMap := range meta {
processor := map[string]interface{}{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
"fields": metaMap,
"target": field,
},
},
}
processors = append(processors, processor)
}
return &nodeData{
node: node,
mapping: mapping,
mapping: k8sMapping,
processors: processors,
}
}
100 changes: 83 additions & 17 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package kubernetes
import (
"testing"

"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -41,32 +43,96 @@ func TestGenerateNodeData(t *testing.T) {
},
}

data := generateNodeData(node, &Config{})
data := generateNodeData(node, &Config{}, &nodeMeta{})

mapping := map[string]interface{}{
"node": map[string]interface{}{
"node": common.MapStr{
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": common.MapStr{
"foo": "bar",
},
"annotations": common.MapStr{
"baz": "ban",
},
"ip": "node1",
"ip": "node1",
},
"annotations": common.MapStr{
"baz": "ban",
},
"labels": common.MapStr{
"foo": "bar",
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
processors := map[string]interface{}{
"orchestrator": common.MapStr{
"cluster": common.MapStr{
"name": "devcluster",
"url": "8.8.8.8:9090"},
}, "kubernetes": common.MapStr{
"labels": common.MapStr{"foo": "bar"},
"annotations": common.MapStr{"baz": "ban"},
"node": common.MapStr{
"ip": "node1",
"name": "testnode",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133"},
},
}

assert.Equal(t, node, data.node)
assert.Equal(t, mapping, data.mapping)
assert.Equal(t, processors, data.processors)
for _, v := range data.processors {
k := v["add_fields"].(map[string]interface{})
target := k["target"].(string)
fields := k["fields"]
assert.Equal(t, processors[target], fields)
}
}

type nodeMeta struct{}

// Generate generates node metadata from a resource object
// Metadata map is in the following form:
// {
// "kubernetes": {},
// "some.ecs.field": "asdf"
// }
// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by
// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method
func (n *nodeMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr {
ecsFields := n.GenerateECS(obj)
meta := common.MapStr{
"kubernetes": n.GenerateK8s(obj, opts...),
}
meta.DeepUpdate(ecsFields)
return meta
}

// GenerateECS generates node ECS metadata from a resource object
func (n *nodeMeta) GenerateECS(obj kubernetes.Resource) common.MapStr {
return common.MapStr{
"orchestrator": common.MapStr{
"cluster": common.MapStr{
"name": "devcluster",
"url": "8.8.8.8:9090",
},
},
}
}

// GenerateK8s generates node metadata from a resource object
func (n *nodeMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr {
k8sNode := obj.(*kubernetes.Node)
return common.MapStr{
"node": common.MapStr{
"uid": string(k8sNode.GetUID()),
"name": k8sNode.GetName(),
"ip": "node1",
},
"labels": common.MapStr{
"foo": "bar",
},
"annotations": common.MapStr{
"baz": "ban",
},
}
}

// GenerateFromName generates node metadata from a node name
func (n *nodeMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr {
return nil
}
Loading

0 comments on commit e1761fd

Please sign in to comment.