Skip to content

Commit

Permalink
Abstracting pod interface in kubernetes plugin to enable easier vendo…
Browse files Browse the repository at this point in the history
…ring
  • Loading branch information
vjsamuel committed May 2, 2017
1 parent 4660a9b commit bcbae50
Show file tree
Hide file tree
Showing 57 changed files with 386 additions and 266 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Support new `index_patterns` field when loading templates for Elasticsearch >= 6.0 {pull}4056[4056]
- Adding goimports support to make check and fmt {pull}4114[4114]
- Make kubernetes indexers/matchers pluggable {pull}4151[4151]
- Abstracting pod interface in kubernetes plugin to enable easier vendoring {pull}4152[4152]

*Filebeat*

Expand Down
12 changes: 6 additions & 6 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,6 @@ Copyright 2014-2015 The Prometheus Authors
This product includes software developed at
SoundCloud Ltd. (http://soundcloud.com/).

--------------------------------------------------------------------
github.com/ericchiang/k8s
--------------------------------------------------------------------
Apache License


--------------------------------------------------------------------
github.com/garyburd/redigo
--------------------------------------------------------------------
Expand Down Expand Up @@ -1175,6 +1169,12 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
github.com/ericchiang/k8s
--------------------------------------------------------------------
Apache License


--------------------------------------------------------------------
github.com/Azure/go-ansiterm
--------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/elastic/beats/filebeat/spooler"

// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processors/kubernetes"
_ "github.com/elastic/beats/filebeat/processor/kubernetes"
)

var (
Expand Down
File renamed without changes.
32 changes: 15 additions & 17 deletions libbeat/processors/kubernetes/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"

"github.com/elastic/beats/libbeat/common"

corev1 "github.com/ericchiang/k8s/api/v1"
)

//Names of indexers and matchers that have been defined.
Expand All @@ -27,11 +25,11 @@ var Indexing = NewRegister()
type Indexer interface {
// GetMetadata generates event metadata for the given pod, then returns the
// list of indexes to create, with the metadata to put on them
GetMetadata(pod *corev1.Pod) []MetadataIndex
GetMetadata(pod *Pod) []MetadataIndex

// GetIndexes return the list of indexes the given pod belongs to. This function
// must return the same indexes than GetMetadata
GetIndexes(pod *corev1.Pod) []string
GetIndexes(pod *Pod) []string
}

// MetadataIndex holds a pair of index -> metadata info
Expand All @@ -51,7 +49,7 @@ type Matcher interface {
//GenMeta takes in pods to generate metadata for them
type GenMeta interface {
//GenerateMetaData generates metadata by taking in a pod as an input
GenerateMetaData(pod *corev1.Pod) common.MapStr
GenerateMetaData(pod *Pod) common.MapStr
}

type Indexers struct {
Expand Down Expand Up @@ -133,7 +131,7 @@ func (r *Register) GetMatcher(name string) MatcherConstructor {
}

// GetMetadata returns the composed metadata list from all registered indexers
func (i *Indexers) GetMetadata(pod *corev1.Pod) []MetadataIndex {
func (i *Indexers) GetMetadata(pod *Pod) []MetadataIndex {
var metadata []MetadataIndex
i.RLock()
defer i.RUnlock()
Expand All @@ -146,7 +144,7 @@ func (i *Indexers) GetMetadata(pod *corev1.Pod) []MetadataIndex {
}

// GetIndexes returns the composed index list from all registered indexers
func (i *Indexers) GetIndexes(pod *corev1.Pod) []string {
func (i *Indexers) GetIndexes(pod *Pod) []string {
var indexes []string
i.RLock()
defer i.RUnlock()
Expand Down Expand Up @@ -179,7 +177,7 @@ type GenDefaultMeta struct {
}

// GenerateMetaData generates default metadata for the given pod taking to account certain filters
func (g *GenDefaultMeta) GenerateMetaData(pod *corev1.Pod) common.MapStr {
func (g *GenDefaultMeta) GenerateMetaData(pod *Pod) common.MapStr {
labelMap := common.MapStr{}
annotationsMap := common.MapStr{}

Expand All @@ -195,9 +193,9 @@ func (g *GenDefaultMeta) GenerateMetaData(pod *corev1.Pod) common.MapStr {

meta := common.MapStr{
"pod": common.MapStr{
"name": pod.Metadata.GetName(),
"name": pod.Metadata.Name,
},
"namespace": pod.Metadata.GetNamespace(),
"namespace": pod.Metadata.Namespace,
}

if len(labelMap) != 0 {
Expand Down Expand Up @@ -236,18 +234,18 @@ func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &PodNameIndexer{genMeta: genMeta}, nil
}

func (p *PodNameIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
func (p *PodNameIndexer) GetMetadata(pod *Pod) []MetadataIndex {
data := p.genMeta.GenerateMetaData(pod)
return []MetadataIndex{
{
Index: pod.Metadata.GetName(),
Index: pod.Metadata.Name,
Data: data,
},
}
}

func (p *PodNameIndexer) GetIndexes(pod *corev1.Pod) []string {
return []string{pod.Metadata.GetName()}
func (p *PodNameIndexer) GetIndexes(pod *Pod) []string {
return []string{pod.Metadata.Name}
}

// ContainerIndexer indexes pods based on all their containers IDs
Expand All @@ -259,7 +257,7 @@ func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &ContainerIndexer{genMeta: genMeta}, nil
}

func (c *ContainerIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
func (c *ContainerIndexer) GetMetadata(pod *Pod) []MetadataIndex {
commonMeta := c.genMeta.GenerateMetaData(pod)
containers := c.GetIndexes(pod)
var metadata []MetadataIndex
Expand All @@ -277,10 +275,10 @@ func (c *ContainerIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
return metadata
}

func (c *ContainerIndexer) GetIndexes(pod *corev1.Pod) []string {
func (c *ContainerIndexer) GetIndexes(pod *Pod) []string {
var containers []string
for _, status := range pod.Status.ContainerStatuses {
cID := status.GetContainerID()
cID := status.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
Expand Down
40 changes: 19 additions & 21 deletions libbeat/processors/kubernetes/indexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"testing"

"github.com/elastic/beats/libbeat/common"
corev1 "github.com/ericchiang/k8s/api/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/stretchr/testify/assert"
)

Expand All @@ -19,15 +17,15 @@ func TestPodIndexer(t *testing.T) {

podName := "testpod"
ns := "testns"
pod := corev1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &podName,
Namespace: &ns,
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"labelkey": "labelvalue",
},
},
Spec: &corev1.PodSpec{},
Spec: PodSpec{},
}

indexers := podIndexer.GetMetadata(&pod)
Expand Down Expand Up @@ -61,16 +59,16 @@ func TestContainerIndexer(t *testing.T) {
ns := "testns"
container := "container"

pod := corev1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &podName,
Namespace: &ns,
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"labelkey": "labelvalue",
},
},
Status: &corev1.PodStatus{
ContainerStatuses: make([]*corev1.ContainerStatus, 0),
Status: PodStatus{
ContainerStatuses: make([]PodContainerStatus, 0),
},
}

Expand All @@ -90,10 +88,10 @@ func TestContainerIndexer(t *testing.T) {

cid := "docker://abcde"

pod.Status.ContainerStatuses = []*corev1.ContainerStatus{
pod.Status.ContainerStatuses = []PodContainerStatus{
{
Name: &container,
ContainerID: &cid,
Name: container,
ContainerID: cid,
},
}
expected["container"] = common.MapStr{
Expand Down Expand Up @@ -152,10 +150,10 @@ func TestFilteredGenMeta(t *testing.T) {

podName := "testpod"
ns := "testns"
pod := corev1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &podName,
Namespace: &ns,
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"foo": "bar",
"x": "y",
Expand All @@ -165,7 +163,7 @@ func TestFilteredGenMeta(t *testing.T) {
"c": "d",
},
},
Spec: &corev1.PodSpec{},
Spec: PodSpec{},
}

indexers := podIndexer.GetMetadata(&pod)
Expand Down
46 changes: 33 additions & 13 deletions libbeat/processors/kubernetes/podwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"context"
"encoding/json"
"sync"
"time"

Expand All @@ -28,7 +29,7 @@ type PodWatcher struct {
type annotationCache struct {
sync.Mutex
annotations map[string]common.MapStr
pods map[string]*corev1.Pod // pod uid -> Pod
pods map[string]*Pod // pod uid -> Pod
}

type NodeOption struct{}
Expand All @@ -48,7 +49,7 @@ func NewPodWatcher(kubeClient *k8s.Client, indexers *Indexers, syncPeriod time.D
stop: cancel,
annotationCache: annotationCache{
annotations: make(map[string]common.MapStr),
pods: make(map[string]*corev1.Pod),
pods: make(map[string]*Pod),
},
}
}
Expand Down Expand Up @@ -124,44 +125,63 @@ func (p *PodWatcher) Run() bool {
}
}

func (p *PodWatcher) onPodAdd(pod *corev1.Pod) {
func (p *PodWatcher) onPodAdd(pod *Pod) {
metadata := p.indexers.GetMetadata(pod)
p.annotationCache.Lock()
defer p.annotationCache.Unlock()

p.annotationCache.pods[pod.Metadata.GetUid()] = pod
p.annotationCache.pods[pod.Metadata.UID] = pod

for _, m := range metadata {
p.annotationCache.annotations[m.Index] = m.Data
}
}

func (p *PodWatcher) onPodUpdate(pod *corev1.Pod) {
oldPod := p.GetPod(pod.Metadata.GetUid())
if oldPod.Metadata.GetResourceVersion() != pod.Metadata.GetResourceVersion() {
func (p *PodWatcher) onPodUpdate(pod *Pod) {
oldPod := p.GetPod(pod.Metadata.UID)
if oldPod.Metadata.ResourceVersion != pod.Metadata.ResourceVersion {
//Process the new pod changes
p.onPodDelete(oldPod)
p.onPodAdd(pod)
}
}

func (p *PodWatcher) onPodDelete(pod *corev1.Pod) {
func (p *PodWatcher) onPodDelete(pod *Pod) {
p.annotationCache.Lock()
defer p.annotationCache.Unlock()

delete(p.annotationCache.pods, pod.Metadata.GetUid())
delete(p.annotationCache.pods, pod.Metadata.UID)

for _, index := range p.indexers.GetIndexes(pod) {
delete(p.annotationCache.annotations, index)
}
}

func (p *PodWatcher) getPodMeta(pod *corev1.Pod) *Pod {
bytes, err := json.Marshal(pod)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

po := &Pod{}
err = json.Unmarshal(bytes, po)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

return po

}

func (p *PodWatcher) worker() {
for pod := range p.podQueue {
if pod.Metadata.GetDeletionTimestamp() != nil {
for po := range p.podQueue {
pod := p.getPodMeta(po)
if pod.Metadata.DeletionTimestamp != "" {
p.onPodDelete(pod)
} else {
existing := p.GetPod(pod.Metadata.GetUid())
existing := p.GetPod(pod.Metadata.UID)
if existing != nil {
p.onPodUpdate(pod)
} else {
Expand All @@ -181,7 +201,7 @@ func (p *PodWatcher) GetMetaData(arg string) common.MapStr {
return nil
}

func (p *PodWatcher) GetPod(uid string) *corev1.Pod {
func (p *PodWatcher) GetPod(uid string) *Pod {
p.annotationCache.Lock()
defer p.annotationCache.Unlock()
return p.annotationCache.pods[uid]
Expand Down
Loading

0 comments on commit bcbae50

Please sign in to comment.