Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstracting pod interface in kubernetes plugin to enable easier vendoring #4152

Merged
merged 1 commit into from
May 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Support new `index_patterns` field when loading templates for Elasticsearch >= 6.0 {pull}4056[4056]
- Adding goimports support to make check and fmt {pull}4114[4114]
- Make kubernetes indexers/matchers pluggable {pull}4151[4151]
- Abstracting pod interface in kubernetes plugin to enable easier vendoring {pull}4152[4152]

*Filebeat*

Expand Down
12 changes: 6 additions & 6 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,6 @@ Copyright 2014-2015 The Prometheus Authors
This product includes software developed at
SoundCloud Ltd. (http://soundcloud.com/).

--------------------------------------------------------------------
github.com/ericchiang/k8s
--------------------------------------------------------------------
Apache License


--------------------------------------------------------------------
github.com/garyburd/redigo
--------------------------------------------------------------------
Expand Down Expand Up @@ -1175,6 +1169,12 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
github.com/ericchiang/k8s
--------------------------------------------------------------------
Apache License


--------------------------------------------------------------------
github.com/Azure/go-ansiterm
--------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/elastic/beats/filebeat/spooler"

// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processors/kubernetes"
_ "github.com/elastic/beats/filebeat/processor/kubernetes"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep consistency with libbeat (which uses processors)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@exekias I disagree on this one as we should change it for some time in libbeat to processor. It is processors for historical reasons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok then, it seems this was agreed some time ago 👍

)

var (
Expand Down
32 changes: 15 additions & 17 deletions libbeat/processors/kubernetes/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"

"github.com/elastic/beats/libbeat/common"

corev1 "github.com/ericchiang/k8s/api/v1"
)

//Names of indexers and matchers that have been defined.
Expand All @@ -27,11 +25,11 @@ var Indexing = NewRegister()
type Indexer interface {
// GetMetadata generates event metadata for the given pod, then returns the
// list of indexes to create, with the metadata to put on them
GetMetadata(pod *corev1.Pod) []MetadataIndex
GetMetadata(pod *Pod) []MetadataIndex

// GetIndexes return the list of indexes the given pod belongs to. This function
// must return the same indexes than GetMetadata
GetIndexes(pod *corev1.Pod) []string
GetIndexes(pod *Pod) []string
}

// MetadataIndex holds a pair of index -> metadata info
Expand All @@ -51,7 +49,7 @@ type Matcher interface {
//GenMeta takes in pods to generate metadata for them
type GenMeta interface {
//GenerateMetaData generates metadata by taking in a pod as an input
GenerateMetaData(pod *corev1.Pod) common.MapStr
GenerateMetaData(pod *Pod) common.MapStr
}

type Indexers struct {
Expand Down Expand Up @@ -133,7 +131,7 @@ func (r *Register) GetMatcher(name string) MatcherConstructor {
}

// GetMetadata returns the composed metadata list from all registered indexers
func (i *Indexers) GetMetadata(pod *corev1.Pod) []MetadataIndex {
func (i *Indexers) GetMetadata(pod *Pod) []MetadataIndex {
var metadata []MetadataIndex
i.RLock()
defer i.RUnlock()
Expand All @@ -146,7 +144,7 @@ func (i *Indexers) GetMetadata(pod *corev1.Pod) []MetadataIndex {
}

// GetIndexes returns the composed index list from all registered indexers
func (i *Indexers) GetIndexes(pod *corev1.Pod) []string {
func (i *Indexers) GetIndexes(pod *Pod) []string {
var indexes []string
i.RLock()
defer i.RUnlock()
Expand Down Expand Up @@ -179,7 +177,7 @@ type GenDefaultMeta struct {
}

// GenerateMetaData generates default metadata for the given pod taking to account certain filters
func (g *GenDefaultMeta) GenerateMetaData(pod *corev1.Pod) common.MapStr {
func (g *GenDefaultMeta) GenerateMetaData(pod *Pod) common.MapStr {
labelMap := common.MapStr{}
annotationsMap := common.MapStr{}

Expand All @@ -195,9 +193,9 @@ func (g *GenDefaultMeta) GenerateMetaData(pod *corev1.Pod) common.MapStr {

meta := common.MapStr{
"pod": common.MapStr{
"name": pod.Metadata.GetName(),
"name": pod.Metadata.Name,
},
"namespace": pod.Metadata.GetNamespace(),
"namespace": pod.Metadata.Namespace,
}

if len(labelMap) != 0 {
Expand Down Expand Up @@ -236,18 +234,18 @@ func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &PodNameIndexer{genMeta: genMeta}, nil
}

func (p *PodNameIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
func (p *PodNameIndexer) GetMetadata(pod *Pod) []MetadataIndex {
data := p.genMeta.GenerateMetaData(pod)
return []MetadataIndex{
{
Index: pod.Metadata.GetName(),
Index: pod.Metadata.Name,
Data: data,
},
}
}

func (p *PodNameIndexer) GetIndexes(pod *corev1.Pod) []string {
return []string{pod.Metadata.GetName()}
func (p *PodNameIndexer) GetIndexes(pod *Pod) []string {
return []string{pod.Metadata.Name}
}

// ContainerIndexer indexes pods based on all their containers IDs
Expand All @@ -259,7 +257,7 @@ func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &ContainerIndexer{genMeta: genMeta}, nil
}

func (c *ContainerIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
func (c *ContainerIndexer) GetMetadata(pod *Pod) []MetadataIndex {
commonMeta := c.genMeta.GenerateMetaData(pod)
containers := c.GetIndexes(pod)
var metadata []MetadataIndex
Expand All @@ -277,10 +275,10 @@ func (c *ContainerIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
return metadata
}

func (c *ContainerIndexer) GetIndexes(pod *corev1.Pod) []string {
func (c *ContainerIndexer) GetIndexes(pod *Pod) []string {
var containers []string
for _, status := range pod.Status.ContainerStatuses {
cID := status.GetContainerID()
cID := status.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
Expand Down
40 changes: 19 additions & 21 deletions libbeat/processors/kubernetes/indexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"testing"

"github.com/elastic/beats/libbeat/common"
corev1 "github.com/ericchiang/k8s/api/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/stretchr/testify/assert"
)

Expand All @@ -19,15 +17,15 @@ func TestPodIndexer(t *testing.T) {

podName := "testpod"
ns := "testns"
pod := corev1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &podName,
Namespace: &ns,
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"labelkey": "labelvalue",
},
},
Spec: &corev1.PodSpec{},
Spec: PodSpec{},
}

indexers := podIndexer.GetMetadata(&pod)
Expand Down Expand Up @@ -61,16 +59,16 @@ func TestContainerIndexer(t *testing.T) {
ns := "testns"
container := "container"

pod := corev1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &podName,
Namespace: &ns,
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"labelkey": "labelvalue",
},
},
Status: &corev1.PodStatus{
ContainerStatuses: make([]*corev1.ContainerStatus, 0),
Status: PodStatus{
ContainerStatuses: make([]PodContainerStatus, 0),
},
}

Expand All @@ -90,10 +88,10 @@ func TestContainerIndexer(t *testing.T) {

cid := "docker://abcde"

pod.Status.ContainerStatuses = []*corev1.ContainerStatus{
pod.Status.ContainerStatuses = []PodContainerStatus{
{
Name: &container,
ContainerID: &cid,
Name: container,
ContainerID: cid,
},
}
expected["container"] = common.MapStr{
Expand Down Expand Up @@ -152,10 +150,10 @@ func TestFilteredGenMeta(t *testing.T) {

podName := "testpod"
ns := "testns"
pod := corev1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &podName,
Namespace: &ns,
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"foo": "bar",
"x": "y",
Expand All @@ -165,7 +163,7 @@ func TestFilteredGenMeta(t *testing.T) {
"c": "d",
},
},
Spec: &corev1.PodSpec{},
Spec: PodSpec{},
}

indexers := podIndexer.GetMetadata(&pod)
Expand Down
46 changes: 33 additions & 13 deletions libbeat/processors/kubernetes/podwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"context"
"encoding/json"
"sync"
"time"

Expand All @@ -28,7 +29,7 @@ type PodWatcher struct {
type annotationCache struct {
sync.Mutex
annotations map[string]common.MapStr
pods map[string]*corev1.Pod // pod uid -> Pod
pods map[string]*Pod // pod uid -> Pod
}

type NodeOption struct{}
Expand All @@ -48,7 +49,7 @@ func NewPodWatcher(kubeClient *k8s.Client, indexers *Indexers, syncPeriod time.D
stop: cancel,
annotationCache: annotationCache{
annotations: make(map[string]common.MapStr),
pods: make(map[string]*corev1.Pod),
pods: make(map[string]*Pod),
},
}
}
Expand Down Expand Up @@ -124,44 +125,63 @@ func (p *PodWatcher) Run() bool {
}
}

func (p *PodWatcher) onPodAdd(pod *corev1.Pod) {
func (p *PodWatcher) onPodAdd(pod *Pod) {
metadata := p.indexers.GetMetadata(pod)
p.annotationCache.Lock()
defer p.annotationCache.Unlock()

p.annotationCache.pods[pod.Metadata.GetUid()] = pod
p.annotationCache.pods[pod.Metadata.UID] = pod

for _, m := range metadata {
p.annotationCache.annotations[m.Index] = m.Data
}
}

func (p *PodWatcher) onPodUpdate(pod *corev1.Pod) {
oldPod := p.GetPod(pod.Metadata.GetUid())
if oldPod.Metadata.GetResourceVersion() != pod.Metadata.GetResourceVersion() {
func (p *PodWatcher) onPodUpdate(pod *Pod) {
oldPod := p.GetPod(pod.Metadata.UID)
if oldPod.Metadata.ResourceVersion != pod.Metadata.ResourceVersion {
//Process the new pod changes
p.onPodDelete(oldPod)
p.onPodAdd(pod)
}
}

func (p *PodWatcher) onPodDelete(pod *corev1.Pod) {
func (p *PodWatcher) onPodDelete(pod *Pod) {
p.annotationCache.Lock()
defer p.annotationCache.Unlock()

delete(p.annotationCache.pods, pod.Metadata.GetUid())
delete(p.annotationCache.pods, pod.Metadata.UID)

for _, index := range p.indexers.GetIndexes(pod) {
delete(p.annotationCache.annotations, index)
}
}

func (p *PodWatcher) getPodMeta(pod *corev1.Pod) *Pod {
bytes, err := json.Marshal(pod)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

po := &Pod{}
err = json.Unmarshal(bytes, po)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

return po

}

func (p *PodWatcher) worker() {
for pod := range p.podQueue {
if pod.Metadata.GetDeletionTimestamp() != nil {
for po := range p.podQueue {
pod := p.getPodMeta(po)
if pod.Metadata.DeletionTimestamp != "" {
p.onPodDelete(pod)
} else {
existing := p.GetPod(pod.Metadata.GetUid())
existing := p.GetPod(pod.Metadata.UID)
if existing != nil {
p.onPodUpdate(pod)
} else {
Expand All @@ -181,7 +201,7 @@ func (p *PodWatcher) GetMetaData(arg string) common.MapStr {
return nil
}

func (p *PodWatcher) GetPod(uid string) *corev1.Pod {
func (p *PodWatcher) GetPod(uid string) *Pod {
p.annotationCache.Lock()
defer p.annotationCache.Unlock()
return p.annotationCache.pods[uid]
Expand Down
Loading