Skip to content

Commit

Permalink
Add network device dimension and option to filter by it
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington committed Jul 29, 2020
1 parent 71392e5 commit 1815991
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 53 deletions.
80 changes: 71 additions & 9 deletions receiver/hostmetricsreceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
# Host Metrics Receiver

The Host Metrics receiver generates metrics about the host system. This is
intended to be used when the collector is deployed as an agent.
The Host Metrics receiver generates metrics about the host system scraped
from various sources. This is intended to be used when the collector is
deployed as an agent.

The categories of metrics scraped can be configured under the `scrapers` key.
For example:
If you are only interested in a subset of metrics from a particular source,
it is recommended you use this receiver with the
[Filter Processor](https://github.com/open-telemetry/opentelemetry-collector/tree/master/processor/filterprocessor).

## Configuration

The collection interval and the categories of metrics to be scraped can be
configured:

```yaml
hostmetrics:
collection_interval: 1m
collection_interval: <duration> # default = 1m
scrapers:
cpu:
memory:
disk:
<scraper1>:
<scraper2>:
...
```

If you would like to scrape some metrics at a different frequency than others,
you can configure multiple `hostmetrics` receivers with different
`collection_interval values`. For example:
`collection_interval` values. For example:

```yaml
receivers:
Expand All @@ -38,3 +45,58 @@ service:
metrics:
receivers: [hostmetrics, hostmetrics/disk]
```
## Scrapers
The available scrapers are:
Scraper | Supported OSs | Description
-----------|--------------------|-------------
cpu | All | CPU utilization metrics
disk | All | Disk I/O metrics
load | All | CPU load metrics
filesystem | All | File System utilization metrics
memory | All | Memory utilization metrics
network | All | Network interface I/O metrics & TCP connection metrics
processes | Linux | Process count metrics
swap | All | Swap space utilization and I/O metrics
process | Linux & Windows | Per process CPU, Memory, and Disk I/O metrics
Several scrapers support additional configuration:
#### Disk
```yaml
disk:
<include|exclude>:
devices: [ <device name>, ... ]
match_type: <strict|regexp>
```
#### File System
```yaml
filesystem:
<include|exclude>:
devices: [ <device name>, ... ]
match_type: <strict|regexp>
```
#### Network
```yaml
network:
<include|exclude>:
interfaces: [ <interface name>, ... ]
match_type: <strict|regexp>
```
#### Process
```yaml
process:
disk:
<include|exclude>:
names: [ <process name>, ... ]
match_type: <strict|regexp>
```
13 changes: 9 additions & 4 deletions receiver/hostmetricsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@ func TestLoadConfig(t *testing.T) {
loadscraper.TypeStr: &loadscraper.Config{},
filesystemscraper.TypeStr: &filesystemscraper.Config{},
memoryscraper.TypeStr: &memoryscraper.Config{},
networkscraper.TypeStr: &networkscraper.Config{},
processesscraper.TypeStr: &processesscraper.Config{},
swapscraper.TypeStr: &swapscraper.Config{},
networkscraper.TypeStr: &networkscraper.Config{
Include: networkscraper.MatchConfig{
Interfaces: []string{"test1"},
Config: filterset.Config{MatchType: "strict"},
},
},
processesscraper.TypeStr: &processesscraper.Config{},
swapscraper.TypeStr: &swapscraper.Config{},
processscraper.TypeStr: &processscraper.Config{
Include: processscraper.MatchConfig{
Names: []string{"test1", "test2"},
Names: []string{"test2", "test3"},
Config: filterset.Config{MatchType: "regexp"},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,23 @@

package networkscraper

import "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
import (
"go.opentelemetry.io/collector/internal/processor/filterset"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

// Config relating to Network Metric Scraper.
type Config struct {
internal.ConfigSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Include specifies a filter on the network interfaces that should be included from the generated metrics.
Include MatchConfig `mapstructure:"include"`
// Exclude specifies a filter on the network interfaces that should be excluded from the generated metrics.
Exclude MatchConfig `mapstructure:"exclude"`
}

type MatchConfig struct {
filterset.Config `mapstructure:",squash"`

Interfaces []string `mapstructure:"interfaces"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (f *Factory) CreateMetricsScraper(
logger *zap.Logger,
config internal.Config,
) (internal.Scraper, error) {
cfg := config.(*Config)
return obsreportscraper.WrapScraper(newNetworkScraper(ctx, cfg), TypeStr), nil
scraper, err := newNetworkScraper(ctx, config.(*Config))
if err != nil {
return nil, err
}

return obsreportscraper.WrapScraper(scraper, TypeStr), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,12 @@ func TestCreateMetricsScraper(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, scraper)
}

func TestCreateMetricsScraper_Error(t *testing.T) {
factory := &Factory{}
cfg := &Config{Include: MatchConfig{Interfaces: []string{""}}}

_, err := factory.CreateMetricsScraper(context.Background(), zap.NewNop(), cfg)

assert.Error(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// network metric constants

const (
interfaceLabelName = "interface"
directionLabelName = "direction"
stateLabelName = "state"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@ package networkscraper

import (
"context"
"fmt"
"time"

"github.com/shirou/gopsutil/host"
"github.com/shirou/gopsutil/net"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterset"
)

// scraper for Network Metrics
type scraper struct {
config *Config
startTime pdata.TimestampUnixNano
includeFS filterset.FilterSet
excludeFS filterset.FilterSet

// for mocking
bootTime func() (uint64, error)
Expand All @@ -37,8 +41,26 @@ type scraper struct {
}

// newNetworkScraper creates a set of Network related metrics
func newNetworkScraper(_ context.Context, cfg *Config) *scraper {
return &scraper{config: cfg, bootTime: host.BootTime, ioCounters: net.IOCounters, connections: net.Connections}
func newNetworkScraper(_ context.Context, cfg *Config) (*scraper, error) {
scraper := &scraper{config: cfg, bootTime: host.BootTime, ioCounters: net.IOCounters, connections: net.Connections}

var err error

if len(cfg.Include.Interfaces) > 0 {
scraper.includeFS, err = filterset.CreateFilterSet(cfg.Include.Interfaces, &cfg.Include.Config)
if err != nil {
return nil, fmt.Errorf("error creating network interface include filters: %w", err)
}
}

if len(cfg.Exclude.Interfaces) > 0 {
scraper.excludeFS, err = filterset.CreateFilterSet(cfg.Exclude.Interfaces, &cfg.Exclude.Config)
if err != nil {
return nil, fmt.Errorf("error creating network interface exclude filters: %w", err)
}
}

return scraper, nil
}

// Initialize
Expand Down Expand Up @@ -82,33 +104,73 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) {

func (s *scraper) scrapeAndAppendNetworkCounterMetrics(metrics pdata.MetricSlice, startTime pdata.TimestampUnixNano) error {
// get total stats only
networkStatsSlice, err := s.ioCounters( /*perNetworkInterfaceController=*/ false)
ioCounters, err := s.ioCounters( /*perNetworkInterfaceController=*/ true)
if err != nil {
return err
}

networkStats := networkStatsSlice[0]
// filter network interfaces by name
ioCounters = s.filterByInterface(ioCounters)

if len(ioCounters) > 0 {
startIdx := metrics.Len()
metrics.Resize(startIdx + 4)
initializeNetworkPacketsMetric(metrics.At(startIdx+0), networkPacketsDescriptor, startTime, ioCounters)
initializeNetworkDroppedPacketsMetric(metrics.At(startIdx+1), networkDroppedPacketsDescriptor, startTime, ioCounters)
initializeNetworkErrorsMetric(metrics.At(startIdx+2), networkErrorsDescriptor, startTime, ioCounters)
initializeNetworkIOMetric(metrics.At(startIdx+3), networkIODescriptor, startTime, ioCounters)
}

startIdx := metrics.Len()
metrics.Resize(startIdx + 4)
initializeNetworkMetric(metrics.At(startIdx+0), networkPacketsDescriptor, startTime, networkStats.PacketsSent, networkStats.PacketsRecv)
initializeNetworkMetric(metrics.At(startIdx+1), networkDroppedPacketsDescriptor, startTime, networkStats.Dropout, networkStats.Dropin)
initializeNetworkMetric(metrics.At(startIdx+2), networkErrorsDescriptor, startTime, networkStats.Errout, networkStats.Errin)
initializeNetworkMetric(metrics.At(startIdx+3), networkIODescriptor, startTime, networkStats.BytesSent, networkStats.BytesRecv)
return nil
}

func initializeNetworkMetric(metric pdata.Metric, metricDescriptor pdata.MetricDescriptor, startTime pdata.TimestampUnixNano, transmitValue, receiveValue uint64) {
func initializeNetworkPacketsMetric(metric pdata.Metric, metricDescriptor pdata.MetricDescriptor, startTime pdata.TimestampUnixNano, ioCountersSlice []net.IOCountersStat) {
metricDescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2)
initializeNetworkDataPoint(idps.At(0), startTime, transmitDirectionLabelValue, int64(transmitValue))
initializeNetworkDataPoint(idps.At(1), startTime, receiveDirectionLabelValue, int64(receiveValue))
idps.Resize(2 * len(ioCountersSlice))
for idx, ioCounters := range ioCountersSlice {
initializeNetworkDataPoint(idps.At(2*idx+0), startTime, ioCounters.Name, transmitDirectionLabelValue, int64(ioCounters.PacketsSent))
initializeNetworkDataPoint(idps.At(2*idx+1), startTime, ioCounters.Name, receiveDirectionLabelValue, int64(ioCounters.PacketsRecv))
}
}

func initializeNetworkDataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.TimestampUnixNano, directionLabel string, value int64) {
func initializeNetworkDroppedPacketsMetric(metric pdata.Metric, metricDescriptor pdata.MetricDescriptor, startTime pdata.TimestampUnixNano, ioCountersSlice []net.IOCountersStat) {
metricDescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2 * len(ioCountersSlice))
for idx, ioCounters := range ioCountersSlice {
initializeNetworkDataPoint(idps.At(2*idx+0), startTime, ioCounters.Name, transmitDirectionLabelValue, int64(ioCounters.Dropout))
initializeNetworkDataPoint(idps.At(2*idx+1), startTime, ioCounters.Name, receiveDirectionLabelValue, int64(ioCounters.Dropin))
}
}

func initializeNetworkErrorsMetric(metric pdata.Metric, metricDescriptor pdata.MetricDescriptor, startTime pdata.TimestampUnixNano, ioCountersSlice []net.IOCountersStat) {
metricDescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2 * len(ioCountersSlice))
for idx, ioCounters := range ioCountersSlice {
initializeNetworkDataPoint(idps.At(2*idx+0), startTime, ioCounters.Name, transmitDirectionLabelValue, int64(ioCounters.Errout))
initializeNetworkDataPoint(idps.At(2*idx+1), startTime, ioCounters.Name, receiveDirectionLabelValue, int64(ioCounters.Errin))
}
}

func initializeNetworkIOMetric(metric pdata.Metric, metricDescriptor pdata.MetricDescriptor, startTime pdata.TimestampUnixNano, ioCountersSlice []net.IOCountersStat) {
metricDescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2 * len(ioCountersSlice))
for idx, ioCounters := range ioCountersSlice {
initializeNetworkDataPoint(idps.At(2*idx+0), startTime, ioCounters.Name, transmitDirectionLabelValue, int64(ioCounters.BytesSent))
initializeNetworkDataPoint(idps.At(2*idx+1), startTime, ioCounters.Name, receiveDirectionLabelValue, int64(ioCounters.BytesRecv))
}
}

func initializeNetworkDataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.TimestampUnixNano, interfaceLabel, directionLabel string, value int64) {
labelsMap := dataPoint.LabelsMap()
labelsMap.Insert(interfaceLabelName, interfaceLabel)
labelsMap.Insert(directionLabelName, directionLabel)
dataPoint.SetStartTime(startTime)
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
Expand Down Expand Up @@ -170,3 +232,22 @@ func initializeNetworkTCPConnectionsDataPoint(dataPoint pdata.Int64DataPoint, st
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetValue(value)
}

func (s *scraper) filterByInterface(ioCounters []net.IOCountersStat) []net.IOCountersStat {
if s.includeFS == nil && s.excludeFS == nil {
return ioCounters
}

filteredIOCounters := make([]net.IOCountersStat, 0, len(ioCounters))
for _, io := range ioCounters {
if s.includeInterface(io.Name) {
filteredIOCounters = append(filteredIOCounters, io)
}
}
return filteredIOCounters
}

func (s *scraper) includeInterface(interfaceName string) bool {
return (s.includeFS == nil || s.includeFS.Matches(interfaceName)) &&
(s.excludeFS == nil || !s.excludeFS.Matches(interfaceName))
}
Loading

0 comments on commit 1815991

Please sign in to comment.