Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(kubernetes plugin): Adds support for a kuberentes plugin #635

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,27 @@ github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339
github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757
github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804
github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d
github.com/blang/semver aea32c919a18e5ef4537bbd283ff29594b1b0165
github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
github.com/docker/docker 2b27fe17a1b3fb8472fde96d768fa70996adf201
github.com/docker/go-units 5d2041e26a699eaca682e2ea41c8f891e1060444
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
github.com/emicklei/go-restful/swagger b86acf97a74ed7603ac78d012f5535b4d587b156
github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4
github.com/ghodss/yaml 73d445a93680fa1a78ae23a5839bad48f32ba1ee
github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3
github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239
github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263
github.com/golang/glog fca8c8854093a154ff1eb580aae10276ad6b1b5f
github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3
github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a
github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2
github.com/google/cadvisor/info/v1 bef8522964928c3de92319c548adbc07fa1ea113
github.com/google/gofuzz e4af62d086c303f2bed467b227fc0a034b218916
github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d
github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
Expand All @@ -28,13 +36,15 @@ github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24
github.com/influxdata/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6
github.com/influxdb/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6
github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264
github.com/juju/ratelimit 77ed1c8a01217656d2080ad51981f6e99adaa177
github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38
github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f
github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453
github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9
github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f
github.com/opencontainers/runc 072fa6fdccaba49b11ba91ad4265b1ec1043787e
github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
github.com/prometheus/client_golang 67994f177195311c3ea3d4407ed0175e34a4256f
Expand All @@ -47,6 +57,8 @@ github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94
github.com/stretchr/testify f390dcf405f7b83c997eac1b06768bb9f44dec18
github.com/spf13/pflag 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7
github.com/ugorji/go/codec 646ae4a518c1c3be0739df898118d9bccf993858
github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
Expand All @@ -57,3 +69,5 @@ gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
k8s.io/kubernetes/pkg 3254df3a7c9e25b243a3a9b0abdba1888cf52956
speter.net/go/exp/math/dec/inf 42ca6cd68aa922bc3f32f1e056e61b65945d9ad7
301 changes: 301 additions & 0 deletions internal/docker/docker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
package docker

import (
"fmt"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"

api "k8s.io/kubernetes/pkg/api"
kube "k8s.io/kubernetes/pkg/client/unversioned"

godocker "github.com/fsouza/go-dockerclient"
)

// CreateClient sets the client value in the Docker struct
func CreateClient(endpoint string) (*godocker.Client, error) {
var c *godocker.Client
var err error
if endpoint == "ENV" {
c, err = godocker.NewClientFromEnv()
} else if endpoint == "" {
c, err = godocker.NewClient("unix:///var/run/docker.sock")
} else {
c, err = godocker.NewClient(endpoint)
}
return c, err
}

func GatherContainerMetrics(client *godocker.Client, kubeClient *kube.Client, containerNames []string, acc telegraf.Accumulator) error {
opts := godocker.ListContainersOptions{}
containers, err := client.ListContainers(opts)
if err != nil {
return err
}

var wg sync.WaitGroup
wg.Add(len(containers))
for _, container := range containers {
go func(c godocker.APIContainers) {
defer wg.Done()
err := gatherContainer(client, kubeClient, c, containerNames, acc)
if err != nil {
fmt.Println(err.Error())
}
}(container)
}
wg.Wait()
return nil
}

func gatherContainer(
client *godocker.Client,
kubeClient *kube.Client,
container godocker.APIContainers,
containerNames []string,
acc telegraf.Accumulator,
) error {
// Parse container name
cname := "unknown"
if len(container.Names) > 0 {
// Not sure what to do with other names, just take the first.
cname = strings.TrimPrefix(container.Names[0], "/")
}

tags := map[string]string{
"container_id": container.ID,
"container_name": cname,
"container_image": container.Image,
}

if kubeClient != nil {
tags = gatherKubernetesLabels(kubeClient, container.ID, tags)
}

if len(containerNames) > 0 {
if !sliceContains(cname, containerNames) {
return nil
}
}

statChan := make(chan *godocker.Stats)
done := make(chan bool)
statOpts := godocker.StatsOptions{
Stream: false,
ID: container.ID,
Stats: statChan,
Done: done,
Timeout: time.Duration(time.Second * 5),
}

go func() {
client.Stats(statOpts)
}()

stat := <-statChan
close(done)

// Add labels to tags
for k, v := range container.Labels {
tags[k] = v
}

gatherContainerStats(stat, acc, tags)

return nil
}

func gatherContainerStats(
stat *godocker.Stats,
acc telegraf.Accumulator,
tags map[string]string,
) {
now := stat.Read

memfields := map[string]interface{}{
"max_usage": stat.MemoryStats.MaxUsage,
"usage": stat.MemoryStats.Usage,
"fail_count": stat.MemoryStats.Failcnt,
"limit": stat.MemoryStats.Limit,
"total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault,
"cache": stat.MemoryStats.Stats.Cache,
"mapped_file": stat.MemoryStats.Stats.MappedFile,
"total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile,
"pgpgout": stat.MemoryStats.Stats.Pgpgout,
"rss": stat.MemoryStats.Stats.Rss,
"total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile,
"writeback": stat.MemoryStats.Stats.Writeback,
"unevictable": stat.MemoryStats.Stats.Unevictable,
"pgpgin": stat.MemoryStats.Stats.Pgpgin,
"total_unevictable": stat.MemoryStats.Stats.TotalUnevictable,
"pgmajfault": stat.MemoryStats.Stats.Pgmajfault,
"total_rss": stat.MemoryStats.Stats.TotalRss,
"total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge,
"total_writeback": stat.MemoryStats.Stats.TotalWriteback,
"total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon,
"rss_huge": stat.MemoryStats.Stats.RssHuge,
"hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit,
"total_pgfault": stat.MemoryStats.Stats.TotalPgfault,
"total_active_file": stat.MemoryStats.Stats.TotalActiveFile,
"active_anon": stat.MemoryStats.Stats.ActiveAnon,
"total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon,
"total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout,
"total_cache": stat.MemoryStats.Stats.TotalCache,
"inactive_anon": stat.MemoryStats.Stats.InactiveAnon,
"active_file": stat.MemoryStats.Stats.ActiveFile,
"pgfault": stat.MemoryStats.Stats.Pgfault,
"inactive_file": stat.MemoryStats.Stats.InactiveFile,
"total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin,
}
acc.AddFields("docker_mem", memfields, tags, now)

cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
"usage_system": stat.CPUStats.SystemCPUUsage,
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
}
cputags := copyTags(tags)
cputags["cpu"] = "cpu-total"
acc.AddFields("docker_cpu", cpufields, cputags, now)

for i, percpu := range stat.CPUStats.CPUUsage.PercpuUsage {
percputags := copyTags(tags)
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
acc.AddFields("docker_cpu", map[string]interface{}{"usage_total": percpu}, percputags, now)
}

for network, netstats := range stat.Networks {
netfields := map[string]interface{}{
"rx_dropped": netstats.RxDropped,
"rx_bytes": netstats.RxBytes,
"rx_errors": netstats.RxErrors,
"tx_packets": netstats.TxPackets,
"tx_dropped": netstats.TxDropped,
"rx_packets": netstats.RxPackets,
"tx_errors": netstats.TxErrors,
"tx_bytes": netstats.TxBytes,
}
// Create a new network tag dictionary for the "network" tag
nettags := copyTags(tags)
nettags["network"] = network
acc.AddFields("docker_net", netfields, nettags, now)
}

gatherBlockIOMetrics(stat, acc, tags, now)
}

func gatherBlockIOMetrics(
stat *godocker.Stats,
acc telegraf.Accumulator,
tags map[string]string,
now time.Time,
) {
blkioStats := stat.BlkioStats
// Make a map of devices to their block io stats
deviceStatMap := make(map[string]map[string]interface{})

for _, metric := range blkioStats.IOServiceBytesRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device]
if !ok {
deviceStatMap[device] = make(map[string]interface{})
}

field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOServicedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device]
if !ok {
deviceStatMap[device] = make(map[string]interface{})
}

field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOQueueRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOServiceTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOWaitTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOMergedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.SectorsRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("sectors_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for device, fields := range deviceStatMap {
iotags := copyTags(tags)
iotags["device"] = device
acc.AddFields("docker_blkio", fields, iotags, now)
}
}

func copyTags(in map[string]string) map[string]string {
out := make(map[string]string)
for k, v := range in {
out[k] = v
}
return out
}

func gatherKubernetesLabels(kubeClient *kube.Client, containerID string, tags map[string]string) map[string]string {
podClient := kubeClient.Pods(api.NamespaceAll)
pods, _ := podClient.List(api.ListOptions{})

for _, pod := range pods.Items {
for _, containerStatus := range pod.Status.ContainerStatuses {
podContainerID := strings.TrimPrefix(containerStatus.ContainerID, "docker://")
if podContainerID == containerID {
for k, v := range pod.ObjectMeta.Labels {
tags[k] = v
}
return tags
}
}
}
return tags
}

func sliceContains(in string, sl []string) bool {
for _, str := range sl {
if str == in {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package system
package docker

import (
"testing"
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
Expand Down
6 changes: 3 additions & 3 deletions plugins/inputs/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ on the availability of per-cpu stats on your system.
### Tags:

- All stats have the following tags:
- cont_id (container ID)
- cont_image (container image)
- cont_name (container name)
- container_id (container ID)
- container_image (container image)
- container_name (container name)
- docker_cpu specific:
- cpu
- docker_net specific:
Expand Down
Loading