-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature(kubernetes plugin): Adds support for a kuberentes plugin
fixes #569 This commit also refactors the existing docker plugin so that the kubernetes plugin can gather metrics from the docker system.
- Loading branch information
Jonathan Chauncey
committed
Feb 2, 2016
1 parent
a11e07e
commit 7c162b0
Showing
8 changed files
with
417 additions
and
280 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,301 @@ | ||
package docker | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
|
||
api "k8s.io/kubernetes/pkg/api" | ||
kube "k8s.io/kubernetes/pkg/client/unversioned" | ||
|
||
godocker "github.com/fsouza/go-dockerclient" | ||
) | ||
|
||
// CreateClient sets the client value in the Docker struct | ||
func CreateClient(endpoint string) (*godocker.Client, error) { | ||
var c *godocker.Client | ||
var err error | ||
if endpoint == "ENV" { | ||
c, err = godocker.NewClientFromEnv() | ||
} else if endpoint == "" { | ||
c, err = godocker.NewClient("unix:///var/run/docker.sock") | ||
} else { | ||
c, err = godocker.NewClient(endpoint) | ||
} | ||
return c, err | ||
} | ||
|
||
func GatherContainerMetrics(client *godocker.Client, kubeClient *kube.Client, containerNames []string, acc telegraf.Accumulator) error { | ||
opts := godocker.ListContainersOptions{} | ||
containers, err := client.ListContainers(opts) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(len(containers)) | ||
for _, container := range containers { | ||
go func(c godocker.APIContainers) { | ||
defer wg.Done() | ||
err := gatherContainer(client, kubeClient, c, containerNames, acc) | ||
if err != nil { | ||
fmt.Println(err.Error()) | ||
} | ||
}(container) | ||
} | ||
wg.Wait() | ||
return nil | ||
} | ||
|
||
func gatherContainer( | ||
client *godocker.Client, | ||
kubeClient *kube.Client, | ||
container godocker.APIContainers, | ||
containerNames []string, | ||
acc telegraf.Accumulator, | ||
) error { | ||
// Parse container name | ||
cname := "unknown" | ||
if len(container.Names) > 0 { | ||
// Not sure what to do with other names, just take the first. | ||
cname = strings.TrimPrefix(container.Names[0], "/") | ||
} | ||
|
||
tags := map[string]string{ | ||
"container_id": container.ID, | ||
"container_name": cname, | ||
"container_image": container.Image, | ||
} | ||
|
||
if kubeClient != nil { | ||
tags = gatherKubernetesLabels(kubeClient, container.ID, tags) | ||
} | ||
|
||
if len(containerNames) > 0 { | ||
if !sliceContains(cname, containerNames) { | ||
return nil | ||
} | ||
} | ||
|
||
statChan := make(chan *godocker.Stats) | ||
done := make(chan bool) | ||
statOpts := godocker.StatsOptions{ | ||
Stream: false, | ||
ID: container.ID, | ||
Stats: statChan, | ||
Done: done, | ||
Timeout: time.Duration(time.Second * 5), | ||
} | ||
|
||
go func() { | ||
client.Stats(statOpts) | ||
}() | ||
|
||
stat := <-statChan | ||
close(done) | ||
|
||
// Add labels to tags | ||
for k, v := range container.Labels { | ||
tags[k] = v | ||
} | ||
|
||
gatherContainerStats(stat, acc, tags) | ||
|
||
return nil | ||
} | ||
|
||
func gatherContainerStats( | ||
stat *godocker.Stats, | ||
acc telegraf.Accumulator, | ||
tags map[string]string, | ||
) { | ||
now := stat.Read | ||
|
||
memfields := map[string]interface{}{ | ||
"max_usage": stat.MemoryStats.MaxUsage, | ||
"usage": stat.MemoryStats.Usage, | ||
"fail_count": stat.MemoryStats.Failcnt, | ||
"limit": stat.MemoryStats.Limit, | ||
"total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault, | ||
"cache": stat.MemoryStats.Stats.Cache, | ||
"mapped_file": stat.MemoryStats.Stats.MappedFile, | ||
"total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile, | ||
"pgpgout": stat.MemoryStats.Stats.Pgpgout, | ||
"rss": stat.MemoryStats.Stats.Rss, | ||
"total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile, | ||
"writeback": stat.MemoryStats.Stats.Writeback, | ||
"unevictable": stat.MemoryStats.Stats.Unevictable, | ||
"pgpgin": stat.MemoryStats.Stats.Pgpgin, | ||
"total_unevictable": stat.MemoryStats.Stats.TotalUnevictable, | ||
"pgmajfault": stat.MemoryStats.Stats.Pgmajfault, | ||
"total_rss": stat.MemoryStats.Stats.TotalRss, | ||
"total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge, | ||
"total_writeback": stat.MemoryStats.Stats.TotalWriteback, | ||
"total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon, | ||
"rss_huge": stat.MemoryStats.Stats.RssHuge, | ||
"hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit, | ||
"total_pgfault": stat.MemoryStats.Stats.TotalPgfault, | ||
"total_active_file": stat.MemoryStats.Stats.TotalActiveFile, | ||
"active_anon": stat.MemoryStats.Stats.ActiveAnon, | ||
"total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon, | ||
"total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout, | ||
"total_cache": stat.MemoryStats.Stats.TotalCache, | ||
"inactive_anon": stat.MemoryStats.Stats.InactiveAnon, | ||
"active_file": stat.MemoryStats.Stats.ActiveFile, | ||
"pgfault": stat.MemoryStats.Stats.Pgfault, | ||
"inactive_file": stat.MemoryStats.Stats.InactiveFile, | ||
"total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin, | ||
} | ||
acc.AddFields("docker_mem", memfields, tags, now) | ||
|
||
cpufields := map[string]interface{}{ | ||
"usage_total": stat.CPUStats.CPUUsage.TotalUsage, | ||
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode, | ||
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode, | ||
"usage_system": stat.CPUStats.SystemCPUUsage, | ||
"throttling_periods": stat.CPUStats.ThrottlingData.Periods, | ||
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods, | ||
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime, | ||
} | ||
cputags := copyTags(tags) | ||
cputags["cpu"] = "cpu-total" | ||
acc.AddFields("docker_cpu", cpufields, cputags, now) | ||
|
||
for i, percpu := range stat.CPUStats.CPUUsage.PercpuUsage { | ||
percputags := copyTags(tags) | ||
percputags["cpu"] = fmt.Sprintf("cpu%d", i) | ||
acc.AddFields("docker_cpu", map[string]interface{}{"usage_total": percpu}, percputags, now) | ||
} | ||
|
||
for network, netstats := range stat.Networks { | ||
netfields := map[string]interface{}{ | ||
"rx_dropped": netstats.RxDropped, | ||
"rx_bytes": netstats.RxBytes, | ||
"rx_errors": netstats.RxErrors, | ||
"tx_packets": netstats.TxPackets, | ||
"tx_dropped": netstats.TxDropped, | ||
"rx_packets": netstats.RxPackets, | ||
"tx_errors": netstats.TxErrors, | ||
"tx_bytes": netstats.TxBytes, | ||
} | ||
// Create a new network tag dictionary for the "network" tag | ||
nettags := copyTags(tags) | ||
nettags["network"] = network | ||
acc.AddFields("docker_net", netfields, nettags, now) | ||
} | ||
|
||
gatherBlockIOMetrics(stat, acc, tags, now) | ||
} | ||
|
||
func gatherBlockIOMetrics( | ||
stat *godocker.Stats, | ||
acc telegraf.Accumulator, | ||
tags map[string]string, | ||
now time.Time, | ||
) { | ||
blkioStats := stat.BlkioStats | ||
// Make a map of devices to their block io stats | ||
deviceStatMap := make(map[string]map[string]interface{}) | ||
|
||
for _, metric := range blkioStats.IOServiceBytesRecursive { | ||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) | ||
_, ok := deviceStatMap[device] | ||
if !ok { | ||
deviceStatMap[device] = make(map[string]interface{}) | ||
} | ||
|
||
field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op)) | ||
deviceStatMap[device][field] = metric.Value | ||
} | ||
|
||
for _, metric := range blkioStats.IOServicedRecursive { | ||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) | ||
_, ok := deviceStatMap[device] | ||
if !ok { | ||
deviceStatMap[device] = make(map[string]interface{}) | ||
} | ||
|
||
field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op)) | ||
deviceStatMap[device][field] = metric.Value | ||
} | ||
|
||
for _, metric := range blkioStats.IOQueueRecursive { | ||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) | ||
field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op)) | ||
deviceStatMap[device][field] = metric.Value | ||
} | ||
|
||
for _, metric := range blkioStats.IOServiceTimeRecursive { | ||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) | ||
field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op)) | ||
deviceStatMap[device][field] = metric.Value | ||
} | ||
|
||
for _, metric := range blkioStats.IOWaitTimeRecursive { | ||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) | ||
field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op)) | ||
deviceStatMap[device][field] = metric.Value | ||
} | ||
|
||
for _, metric := range blkioStats.IOMergedRecursive { | ||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) | ||
field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op)) | ||
deviceStatMap[device][field] = metric.Value | ||
} | ||
|
||
for _, metric := range blkioStats.IOTimeRecursive { | ||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) | ||
field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op)) | ||
deviceStatMap[device][field] = metric.Value | ||
} | ||
|
||
for _, metric := range blkioStats.SectorsRecursive { | ||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) | ||
field := fmt.Sprintf("sectors_recursive_%s", strings.ToLower(metric.Op)) | ||
deviceStatMap[device][field] = metric.Value | ||
} | ||
|
||
for device, fields := range deviceStatMap { | ||
iotags := copyTags(tags) | ||
iotags["device"] = device | ||
acc.AddFields("docker_blkio", fields, iotags, now) | ||
} | ||
} | ||
|
||
func copyTags(in map[string]string) map[string]string { | ||
out := make(map[string]string) | ||
for k, v := range in { | ||
out[k] = v | ||
} | ||
return out | ||
} | ||
|
||
func gatherKubernetesLabels(kubeClient *kube.Client, containerID string, tags map[string]string) map[string]string { | ||
podClient := kubeClient.Pods(api.NamespaceAll) | ||
pods, _ := podClient.List(api.ListOptions{}) | ||
|
||
for _, pod := range pods.Items { | ||
for _, containerStatus := range pod.Status.ContainerStatuses { | ||
podContainerID := strings.TrimPrefix(containerStatus.ContainerID, "docker://") | ||
if podContainerID == containerID { | ||
for k, v := range pod.ObjectMeta.Labels { | ||
tags[k] = v | ||
} | ||
return tags | ||
} | ||
} | ||
} | ||
return tags | ||
} | ||
|
||
func sliceContains(in string, sl []string) bool { | ||
for _, str := range sl { | ||
if str == in { | ||
return true | ||
} | ||
} | ||
return false | ||
} |
2 changes: 1 addition & 1 deletion
2
plugins/inputs/docker/docker_test.go → internal/docker/docker_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
package system | ||
package docker | ||
|
||
import ( | ||
"testing" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.