-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Agent] Support Node and Service autodiscovery in k8s provider #26801
Merged
ChrsMark
merged 25 commits into
elastic:master
from
ChrsMark:add_more_k8s_resources_k8s_provider
Jul 22, 2021
Merged
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
8e06537
First refactoring to the extandable provider
ChrsMark 8737d7b
Add node discovery
ChrsMark 8ff2e13
Add servcie discovery
ChrsMark 743348b
Add changelog entry
ChrsMark a7e6ccf
Add changelog entry
ChrsMark eb4ffa0
Fix lint
ChrsMark bd6956c
Fix service watcher
ChrsMark 5dd5b94
Refactor pod provider and add tests
ChrsMark 6e71318
Refactor node provider and add tests
ChrsMark fc494de
Refactor svc provider and add tests
ChrsMark 4f12589
Add annotations for pod too
ChrsMark 929c183
Improve Pod's watcher options
ChrsMark 43f5136
Improve container data emission
ChrsMark 54c912d
Move scope definition to global level for all resources
ChrsMark 6002dd1
Improve config wording
ChrsMark 865eabb
fix lint issue
ChrsMark 31211cf
fix lint issue
ChrsMark 975746d
Give option for unified configs to be backwards compatible
ChrsMark ea014a1
Expose config options of resources at root level too
ChrsMark 88260fd
review comments
ChrsMark 2823bb6
fix config defaults logic
ChrsMark f778936
lint
ChrsMark 6f39378
Remove usage of channels
ChrsMark d42013f
Add TODO comment about ephemeral containers
ChrsMark 701236c
Remove dedoting for now
ChrsMark File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,8 @@ package kubernetes | |
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
k8s "k8s.io/client-go/kubernetes" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common/kubernetes" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" | ||
|
@@ -20,6 +21,10 @@ const ( | |
PodPriority = 0 | ||
// ContainerPriority is the priority that container mappings are added to the provider. | ||
ContainerPriority = 1 | ||
// NodePriority is the priority that node mappings are added to the provider. | ||
NodePriority = 0 | ||
// ServicePriority is the priority that service mappings are added to the provider. | ||
ServicePriority = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should each be unique and they should be tiered. I think the proper order here should be:
That is because a Node contains a pod, a pod contains containers, and services point at pods. |
||
) | ||
|
||
func init() { | ||
|
@@ -31,12 +36,6 @@ type dynamicProvider struct { | |
config *Config | ||
} | ||
|
||
type eventWatcher struct { | ||
logger *logger.Logger | ||
cleanupTimeout time.Duration | ||
comm composable.DynamicProviderComm | ||
} | ||
|
||
// DynamicProviderBuilder builds the dynamic provider. | ||
func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) { | ||
var cfg Config | ||
|
@@ -69,15 +68,10 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { | |
p.config.Node = "" | ||
} | ||
|
||
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ | ||
SyncTimeout: p.config.SyncPeriod, | ||
Node: p.config.Node, | ||
//Namespace: p.config.Namespace, | ||
}, nil) | ||
watcher, err := p.newWatcher(comm, client) | ||
if err != nil { | ||
return errors.New(err, "couldn't create kubernetes watcher") | ||
} | ||
watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm}) | ||
|
||
err = watcher.Start() | ||
if err != nil { | ||
|
@@ -87,135 +81,28 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { | |
return nil | ||
} | ||
|
||
func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) { | ||
mapping := map[string]interface{}{ | ||
"namespace": pod.GetNamespace(), | ||
"pod": map[string]interface{}{ | ||
"uid": string(pod.GetUID()), | ||
"name": pod.GetName(), | ||
"labels": pod.GetLabels(), | ||
"ip": pod.Status.PodIP, | ||
}, | ||
} | ||
|
||
processors := []map[string]interface{}{ | ||
{ | ||
"add_fields": map[string]interface{}{ | ||
"fields": mapping, | ||
"target": "kubernetes", | ||
}, | ||
}, | ||
} | ||
|
||
// Emit the pod | ||
// We emit Pod + containers to ensure that configs matching Pod only | ||
// get Pod metadata (not specific to any container) | ||
p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors) | ||
|
||
// Emit all containers in the pod | ||
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses) | ||
|
||
// TODO deal with init containers stopping after initialization | ||
p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses) | ||
} | ||
|
||
func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) { | ||
// Collect all runtimes from status information. | ||
containerIDs := map[string]string{} | ||
runtimes := map[string]string{} | ||
for _, c := range containerstatuses { | ||
cid, runtime := kubernetes.ContainerIDWithRuntime(c) | ||
containerIDs[c.Name] = cid | ||
runtimes[c.Name] = runtime | ||
} | ||
|
||
for _, c := range containers { | ||
// If it doesn't have an ID, container doesn't exist in | ||
// the runtime, emit only an event if we are stopping, so | ||
// we are sure of cleaning up configurations. | ||
cid := containerIDs[c.Name] | ||
if cid == "" { | ||
continue | ||
// newWatcher initializes the proper watcher according to the given resource (pod, node, service). | ||
func (p *dynamicProvider) newWatcher(comm composable.DynamicProviderComm, client k8s.Interface) (kubernetes.Watcher, error) { | ||
switch p.config.Resource { | ||
case "pod": | ||
watcher, err := NewPodWatcher(comm, p.config, p.logger, client) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// ID is the combination of pod UID + container name | ||
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) | ||
|
||
mapping := map[string]interface{}{ | ||
"namespace": pod.GetNamespace(), | ||
"pod": map[string]interface{}{ | ||
"uid": string(pod.GetUID()), | ||
"name": pod.GetName(), | ||
"labels": pod.GetLabels(), | ||
"ip": pod.Status.PodIP, | ||
}, | ||
"container": map[string]interface{}{ | ||
"id": cid, | ||
"name": c.Name, | ||
"image": c.Image, | ||
"runtime": runtimes[c.Name], | ||
}, | ||
return watcher, nil | ||
case "node": | ||
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
processors := []map[string]interface{}{ | ||
{ | ||
"add_fields": map[string]interface{}{ | ||
"fields": mapping, | ||
"target": "kubernetes", | ||
}, | ||
}, | ||
return watcher, nil | ||
case "service": | ||
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Emit the container | ||
p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors) | ||
} | ||
} | ||
|
||
func (p *eventWatcher) emitStopped(pod *kubernetes.Pod) { | ||
p.comm.Remove(string(pod.GetUID())) | ||
|
||
for _, c := range pod.Spec.Containers { | ||
// ID is the combination of pod UID + container name | ||
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) | ||
p.comm.Remove(eventID) | ||
} | ||
|
||
for _, c := range pod.Spec.InitContainers { | ||
// ID is the combination of pod UID + container name | ||
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) | ||
p.comm.Remove(eventID) | ||
} | ||
} | ||
|
||
// OnAdd ensures processing of pod objects that are newly added | ||
func (p *eventWatcher) OnAdd(obj interface{}) { | ||
p.logger.Debugf("pod add: %+v", obj) | ||
p.emitRunning(obj.(*kubernetes.Pod)) | ||
} | ||
|
||
// OnUpdate emits events for a given pod depending on the state of the pod, | ||
// if it is terminating, a stop event is scheduled, if not, a stop and a start | ||
// events are sent sequentially to recreate the resources assotiated to the pod. | ||
func (p *eventWatcher) OnUpdate(obj interface{}) { | ||
pod := obj.(*kubernetes.Pod) | ||
|
||
p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) | ||
switch pod.Status.Phase { | ||
case kubernetes.PodSucceeded, kubernetes.PodFailed: | ||
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) | ||
return | ||
case kubernetes.PodPending: | ||
p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj) | ||
return | ||
return watcher, nil | ||
default: | ||
return nil, fmt.Errorf("unsupported autodiscover resource %s", p.config.Resource) | ||
} | ||
|
||
p.logger.Debugf("pod update: %+v", obj) | ||
p.emitRunning(pod) | ||
} | ||
|
||
// OnDelete stops pod objects that are deleted | ||
func (p *eventWatcher) OnDelete(obj interface{}) { | ||
p.logger.Debugf("pod delete: %+v", obj) | ||
pod := obj.(*kubernetes.Pod) | ||
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's interesting that you can override almost all settings per resource, but not scope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #26801 (comment).