Skip to content

Commit

Permalink
Process scraper: Use same scrape time for all data points coming from…
Browse files Browse the repository at this point in the history
… same process
  • Loading branch information
james-bebbington committed Aug 12, 2020
1 parent e68440e commit 81485d8
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterset"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

// scraper for Process Metrics
Expand Down Expand Up @@ -98,15 +99,17 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.ResourceMetricsSlice,
ilms.Resize(1)
metrics := ilms.At(0).Metrics()

if err = scrapeAndAppendCPUTimeMetric(metrics, s.startTime, md.handle); err != nil {
now := internal.TimeToUnixNano(time.Now())

if err = scrapeAndAppendCPUTimeMetric(metrics, s.startTime, now, md.handle); err != nil {
errs = append(errs, errors.Wrapf(err, "error reading cpu times for process %q (pid %v)", md.executable.name, md.pid))
}

if err = scrapeAndAppendMemoryUsageMetrics(metrics, md.handle); err != nil {
if err = scrapeAndAppendMemoryUsageMetrics(metrics, now, md.handle); err != nil {
errs = append(errs, errors.Wrapf(err, "error reading memory info for process %q (pid %v)", md.executable.name, md.pid))
}

if err = scrapeAndAppendDiskIOMetric(metrics, s.startTime, md.handle); err != nil {
if err = scrapeAndAppendDiskIOMetric(metrics, s.startTime, now, md.handle); err != nil {
errs = append(errs, errors.Wrapf(err, "error reading disk usage for process %q (pid %v)", md.executable.name, md.pid))
}
}
Expand Down Expand Up @@ -166,77 +169,77 @@ func (s *scraper) getProcessMetadata() ([]*processMetadata, error) {
return metadata, componenterror.CombineErrors(errs)
}

func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime pdata.TimestampUnixNano, handle processHandle) error {
func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime, now pdata.TimestampUnixNano, handle processHandle) error {
times, err := handle.Times()
if err != nil {
return err
}

startIdx := metrics.Len()
metrics.Resize(startIdx + 1)
initializeCPUTimeMetric(metrics.At(startIdx), startTime, times)
initializeCPUTimeMetric(metrics.At(startIdx), startTime, now, times)
return nil
}

func initializeCPUTimeMetric(metric pdata.Metric, startTime pdata.TimestampUnixNano, times *cpu.TimesStat) {
func initializeCPUTimeMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, times *cpu.TimesStat) {
cpuTimeDescriptor.CopyTo(metric.MetricDescriptor())

ddps := metric.DoubleDataPoints()
ddps.Resize(cpuStatesLen)
appendCPUTimeStateDataPoints(ddps, startTime, times)
appendCPUTimeStateDataPoints(ddps, startTime, now, times)
}

func scrapeAndAppendMemoryUsageMetrics(metrics pdata.MetricSlice, handle processHandle) error {
func scrapeAndAppendMemoryUsageMetrics(metrics pdata.MetricSlice, now pdata.TimestampUnixNano, handle processHandle) error {
mem, err := handle.MemoryInfo()
if err != nil {
return err
}

startIdx := metrics.Len()
metrics.Resize(startIdx + 2)
initializeMemoryUsageMetric(metrics.At(startIdx+0), physicalMemoryUsageDescriptor, int64(mem.RSS))
initializeMemoryUsageMetric(metrics.At(startIdx+1), virtualMemoryUsageDescriptor, int64(mem.VMS))
initializeMemoryUsageMetric(metrics.At(startIdx+0), physicalMemoryUsageDescriptor, now, int64(mem.RSS))
initializeMemoryUsageMetric(metrics.At(startIdx+1), virtualMemoryUsageDescriptor, now, int64(mem.VMS))
return nil
}

func initializeMemoryUsageMetric(metric pdata.Metric, descriptor pdata.MetricDescriptor, usage int64) {
func initializeMemoryUsageMetric(metric pdata.Metric, descriptor pdata.MetricDescriptor, now pdata.TimestampUnixNano, usage int64) {
descriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(1)
initializeMemoryUsageDataPoint(idps.At(0), usage)
initializeMemoryUsageDataPoint(idps.At(0), now, usage)
}

func initializeMemoryUsageDataPoint(dataPoint pdata.Int64DataPoint, usage int64) {
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
func initializeMemoryUsageDataPoint(dataPoint pdata.Int64DataPoint, now pdata.TimestampUnixNano, usage int64) {
dataPoint.SetTimestamp(now)
dataPoint.SetValue(usage)
}

func scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, startTime pdata.TimestampUnixNano, handle processHandle) error {
func scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, startTime, now pdata.TimestampUnixNano, handle processHandle) error {
io, err := handle.IOCounters()
if err != nil {
return err
}

startIdx := metrics.Len()
metrics.Resize(startIdx + 1)
initializeDiskIOMetric(metrics.At(startIdx), startTime, io)
initializeDiskIOMetric(metrics.At(startIdx), startTime, now, io)
return nil
}

func initializeDiskIOMetric(metric pdata.Metric, startTime pdata.TimestampUnixNano, io *process.IOCountersStat) {
func initializeDiskIOMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, io *process.IOCountersStat) {
diskIODescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2)
initializeDiskIODataPoint(idps.At(0), startTime, int64(io.ReadBytes), readDirectionLabelValue)
initializeDiskIODataPoint(idps.At(1), startTime, int64(io.WriteBytes), writeDirectionLabelValue)
initializeDiskIODataPoint(idps.At(0), startTime, now, int64(io.ReadBytes), readDirectionLabelValue)
initializeDiskIODataPoint(idps.At(1), startTime, now, int64(io.WriteBytes), writeDirectionLabelValue)
}

func initializeDiskIODataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.TimestampUnixNano, value int64, directionLabel string) {
func initializeDiskIODataPoint(dataPoint pdata.Int64DataPoint, startTime, now pdata.TimestampUnixNano, value int64, directionLabel string) {
labelsMap := dataPoint.LabelsMap()
labelsMap.Insert(directionLabelName, directionLabel)
dataPoint.SetStartTime(startTime)
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetTimestamp(now)
dataPoint.SetValue(value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,24 @@
package processscraper

import (
"time"

"github.com/shirou/gopsutil/cpu"

"go.opentelemetry.io/collector/consumer/pdata"
)

const cpuStatesLen = 3

func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(0), startTime, cpuTime.User, userStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(1), startTime, cpuTime.System, systemStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(2), startTime, cpuTime.Iowait, waitStateLabelValue)
func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(0), startTime, now, cpuTime.User, userStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(1), startTime, now, cpuTime.System, systemStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(2), startTime, now, cpuTime.Iowait, waitStateLabelValue)
}

func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime pdata.TimestampUnixNano, value float64, stateLabel string) {
func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime, now pdata.TimestampUnixNano, value float64, stateLabel string) {
labelsMap := dataPoint.LabelsMap()
labelsMap.Insert(stateLabelName, stateLabel)
dataPoint.SetStartTime(startTime)
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetTimestamp(now)
dataPoint.SetValue(value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

const cpuStatesLen = 0

func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
}

func getProcessExecutable(proc processHandle) (*executableMetadata, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ func TestScrapeMetrics(t *testing.T) {
}

require.Greater(t, resourceMetrics.Len(), 1)
assertResourceAttributes(t, resourceMetrics)
assertProcessResourceAttributesExist(t, resourceMetrics)
assertCPUTimeMetricValid(t, resourceMetrics, expectedStartTime)
assertMemoryUsageMetricValid(t, physicalMemoryUsageDescriptor, resourceMetrics)
assertMemoryUsageMetricValid(t, virtualMemoryUsageDescriptor, resourceMetrics)
assertDiskIOMetricValid(t, resourceMetrics, expectedStartTime)
assertSameTimeStampForAllMetricsWithinResource(t, resourceMetrics)
}

func assertResourceAttributes(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) {
func assertProcessResourceAttributesExist(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) {
for i := 0; i < resourceMetrics.Len(); i++ {
attr := resourceMetrics.At(0).Resource().Attributes()
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessID)
Expand Down Expand Up @@ -116,6 +117,15 @@ func assertDiskIOMetricValid(t *testing.T, resourceMetrics pdata.ResourceMetrics
internal.AssertInt64MetricLabelHasValue(t, diskIOMetric, 1, directionLabelName, writeDirectionLabelValue)
}

func assertSameTimeStampForAllMetricsWithinResource(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) {
for i := 0; i < resourceMetrics.Len(); i++ {
ilms := resourceMetrics.At(i).InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
internal.AssertSameTimeStampForAllMetrics(t, ilms.At(j).Metrics())
}
}
}

func getMetric(t *testing.T, descriptor pdata.MetricDescriptor, rms pdata.ResourceMetricsSlice) pdata.Metric {
for i := 0; i < rms.Len(); i++ {
metrics := getMetricSlice(t, rms.At(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package processscraper
import (
"path/filepath"
"regexp"
"time"

"github.com/shirou/gopsutil/cpu"

Expand All @@ -28,16 +27,16 @@ import (

const cpuStatesLen = 2

func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(0), startTime, cpuTime.User, userStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(1), startTime, cpuTime.System, systemStateLabelValue)
func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(0), startTime, now, cpuTime.User, userStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(1), startTime, now, cpuTime.System, systemStateLabelValue)
}

func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime pdata.TimestampUnixNano, value float64, stateLabel string) {
func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime, now pdata.TimestampUnixNano, value float64, stateLabel string) {
labelsMap := dataPoint.LabelsMap()
labelsMap.Insert(stateLabelName, stateLabel)
dataPoint.SetStartTime(startTime)
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetTimestamp(now)
dataPoint.SetValue(value)
}

Expand Down

0 comments on commit 81485d8

Please sign in to comment.