Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process scraper: Use same scrape time for all data points coming from same process #1539

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterset"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

// scraper for Process Metrics
Expand Down Expand Up @@ -98,15 +99,17 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.ResourceMetricsSlice,
ilms.Resize(1)
metrics := ilms.At(0).Metrics()

if err = scrapeAndAppendCPUTimeMetric(metrics, s.startTime, md.handle); err != nil {
now := internal.TimeToUnixNano(time.Now())

if err = scrapeAndAppendCPUTimeMetric(metrics, s.startTime, now, md.handle); err != nil {
errs = append(errs, errors.Wrapf(err, "error reading cpu times for process %q (pid %v)", md.executable.name, md.pid))
}

if err = scrapeAndAppendMemoryUsageMetrics(metrics, md.handle); err != nil {
if err = scrapeAndAppendMemoryUsageMetrics(metrics, now, md.handle); err != nil {
errs = append(errs, errors.Wrapf(err, "error reading memory info for process %q (pid %v)", md.executable.name, md.pid))
}

if err = scrapeAndAppendDiskIOMetric(metrics, s.startTime, md.handle); err != nil {
if err = scrapeAndAppendDiskIOMetric(metrics, s.startTime, now, md.handle); err != nil {
errs = append(errs, errors.Wrapf(err, "error reading disk usage for process %q (pid %v)", md.executable.name, md.pid))
}
}
Expand Down Expand Up @@ -166,77 +169,77 @@ func (s *scraper) getProcessMetadata() ([]*processMetadata, error) {
return metadata, componenterror.CombineErrors(errs)
}

func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime pdata.TimestampUnixNano, handle processHandle) error {
func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime, now pdata.TimestampUnixNano, handle processHandle) error {
times, err := handle.Times()
if err != nil {
return err
}

startIdx := metrics.Len()
metrics.Resize(startIdx + 1)
initializeCPUTimeMetric(metrics.At(startIdx), startTime, times)
initializeCPUTimeMetric(metrics.At(startIdx), startTime, now, times)
return nil
}

func initializeCPUTimeMetric(metric pdata.Metric, startTime pdata.TimestampUnixNano, times *cpu.TimesStat) {
func initializeCPUTimeMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, times *cpu.TimesStat) {
cpuTimeDescriptor.CopyTo(metric.MetricDescriptor())

ddps := metric.DoubleDataPoints()
ddps.Resize(cpuStatesLen)
appendCPUTimeStateDataPoints(ddps, startTime, times)
appendCPUTimeStateDataPoints(ddps, startTime, now, times)
}

func scrapeAndAppendMemoryUsageMetrics(metrics pdata.MetricSlice, handle processHandle) error {
func scrapeAndAppendMemoryUsageMetrics(metrics pdata.MetricSlice, now pdata.TimestampUnixNano, handle processHandle) error {
mem, err := handle.MemoryInfo()
if err != nil {
return err
}

startIdx := metrics.Len()
metrics.Resize(startIdx + 2)
initializeMemoryUsageMetric(metrics.At(startIdx+0), physicalMemoryUsageDescriptor, int64(mem.RSS))
initializeMemoryUsageMetric(metrics.At(startIdx+1), virtualMemoryUsageDescriptor, int64(mem.VMS))
initializeMemoryUsageMetric(metrics.At(startIdx+0), physicalMemoryUsageDescriptor, now, int64(mem.RSS))
initializeMemoryUsageMetric(metrics.At(startIdx+1), virtualMemoryUsageDescriptor, now, int64(mem.VMS))
return nil
}

func initializeMemoryUsageMetric(metric pdata.Metric, descriptor pdata.MetricDescriptor, usage int64) {
func initializeMemoryUsageMetric(metric pdata.Metric, descriptor pdata.MetricDescriptor, now pdata.TimestampUnixNano, usage int64) {
descriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(1)
initializeMemoryUsageDataPoint(idps.At(0), usage)
initializeMemoryUsageDataPoint(idps.At(0), now, usage)
}

func initializeMemoryUsageDataPoint(dataPoint pdata.Int64DataPoint, usage int64) {
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
func initializeMemoryUsageDataPoint(dataPoint pdata.Int64DataPoint, now pdata.TimestampUnixNano, usage int64) {
dataPoint.SetTimestamp(now)
dataPoint.SetValue(usage)
}

func scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, startTime pdata.TimestampUnixNano, handle processHandle) error {
func scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, startTime, now pdata.TimestampUnixNano, handle processHandle) error {
io, err := handle.IOCounters()
if err != nil {
return err
}

startIdx := metrics.Len()
metrics.Resize(startIdx + 1)
initializeDiskIOMetric(metrics.At(startIdx), startTime, io)
initializeDiskIOMetric(metrics.At(startIdx), startTime, now, io)
return nil
}

func initializeDiskIOMetric(metric pdata.Metric, startTime pdata.TimestampUnixNano, io *process.IOCountersStat) {
func initializeDiskIOMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, io *process.IOCountersStat) {
diskIODescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2)
initializeDiskIODataPoint(idps.At(0), startTime, int64(io.ReadBytes), readDirectionLabelValue)
initializeDiskIODataPoint(idps.At(1), startTime, int64(io.WriteBytes), writeDirectionLabelValue)
initializeDiskIODataPoint(idps.At(0), startTime, now, int64(io.ReadBytes), readDirectionLabelValue)
initializeDiskIODataPoint(idps.At(1), startTime, now, int64(io.WriteBytes), writeDirectionLabelValue)
}

func initializeDiskIODataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.TimestampUnixNano, value int64, directionLabel string) {
func initializeDiskIODataPoint(dataPoint pdata.Int64DataPoint, startTime, now pdata.TimestampUnixNano, value int64, directionLabel string) {
labelsMap := dataPoint.LabelsMap()
labelsMap.Insert(directionLabelName, directionLabel)
dataPoint.SetStartTime(startTime)
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetTimestamp(now)
dataPoint.SetValue(value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,24 @@
package processscraper

import (
"time"

"github.com/shirou/gopsutil/cpu"

"go.opentelemetry.io/collector/consumer/pdata"
)

const cpuStatesLen = 3

func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(0), startTime, cpuTime.User, userStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(1), startTime, cpuTime.System, systemStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(2), startTime, cpuTime.Iowait, waitStateLabelValue)
func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(0), startTime, now, cpuTime.User, userStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(1), startTime, now, cpuTime.System, systemStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(2), startTime, now, cpuTime.Iowait, waitStateLabelValue)
}

func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime pdata.TimestampUnixNano, value float64, stateLabel string) {
func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime, now pdata.TimestampUnixNano, value float64, stateLabel string) {
labelsMap := dataPoint.LabelsMap()
labelsMap.Insert(stateLabelName, stateLabel)
dataPoint.SetStartTime(startTime)
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetTimestamp(now)
dataPoint.SetValue(value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

const cpuStatesLen = 0

func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
}

func getProcessExecutable(proc processHandle) (*executableMetadata, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ func TestScrapeMetrics(t *testing.T) {
}

require.Greater(t, resourceMetrics.Len(), 1)
assertResourceAttributes(t, resourceMetrics)
assertProcessResourceAttributesExist(t, resourceMetrics)
assertCPUTimeMetricValid(t, resourceMetrics, expectedStartTime)
assertMemoryUsageMetricValid(t, physicalMemoryUsageDescriptor, resourceMetrics)
assertMemoryUsageMetricValid(t, virtualMemoryUsageDescriptor, resourceMetrics)
assertDiskIOMetricValid(t, resourceMetrics, expectedStartTime)
assertSameTimeStampForAllMetricsWithinResource(t, resourceMetrics)
}

func assertResourceAttributes(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) {
func assertProcessResourceAttributesExist(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) {
for i := 0; i < resourceMetrics.Len(); i++ {
attr := resourceMetrics.At(0).Resource().Attributes()
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessID)
Expand Down Expand Up @@ -116,6 +117,15 @@ func assertDiskIOMetricValid(t *testing.T, resourceMetrics pdata.ResourceMetrics
internal.AssertInt64MetricLabelHasValue(t, diskIOMetric, 1, directionLabelName, writeDirectionLabelValue)
}

func assertSameTimeStampForAllMetricsWithinResource(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) {
for i := 0; i < resourceMetrics.Len(); i++ {
ilms := resourceMetrics.At(i).InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
internal.AssertSameTimeStampForAllMetrics(t, ilms.At(j).Metrics())
}
}
}

func getMetric(t *testing.T, descriptor pdata.MetricDescriptor, rms pdata.ResourceMetricsSlice) pdata.Metric {
for i := 0; i < rms.Len(); i++ {
metrics := getMetricSlice(t, rms.At(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package processscraper
import (
"path/filepath"
"regexp"
"time"

"github.com/shirou/gopsutil/cpu"

Expand All @@ -28,16 +27,16 @@ import (

const cpuStatesLen = 2

func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(0), startTime, cpuTime.User, userStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(1), startTime, cpuTime.System, systemStateLabelValue)
func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(0), startTime, now, cpuTime.User, userStateLabelValue)
initializeCPUTimeDataPoint(ddps.At(1), startTime, now, cpuTime.System, systemStateLabelValue)
}

func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime pdata.TimestampUnixNano, value float64, stateLabel string) {
func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime, now pdata.TimestampUnixNano, value float64, stateLabel string) {
labelsMap := dataPoint.LabelsMap()
labelsMap.Insert(stateLabelName, stateLabel)
dataPoint.SetStartTime(startTime)
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetTimestamp(now)
dataPoint.SetValue(value)
}

Expand Down