Skip to content

Commit

Permalink
Replace usage of Map.Insert* and Map.Update* with Map.Upsert (open-te…
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax authored Sep 9, 2022
1 parent 8fc9514 commit 82b68cb
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 231 deletions.
19 changes: 4 additions & 15 deletions internal/receiver/databricksreceiver/metrics_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package databricksreceiver
import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/metadata"
Expand Down Expand Up @@ -50,24 +49,14 @@ func (p metricsProvider) addJobStatusMetrics(ms pmetric.MetricSlice) ([]int, err
jobPt := jobPts.AppendEmpty()
pauseStatus := pauseStatusToInt(j.Settings.Schedule.PauseStatus)
jobPt.SetIntVal(pauseStatus)
jobIDAttr := pcommon.NewValueInt(int64(j.JobID))
jobPt.Attributes().Insert(metadata.A.JobID, jobIDAttr)
jobPt.Attributes().UpsertInt(metadata.A.JobID, int64(j.JobID))
for _, task := range j.Settings.Tasks {
taskPt := taskPts.AppendEmpty()
taskPt.SetIntVal(pauseStatus)
taskAttrs := taskPt.Attributes()
taskAttrs.Insert(
metadata.A.JobID,
jobIDAttr,
)
taskAttrs.Insert(
metadata.A.TaskID,
pcommon.NewValueString(task.TaskKey),
)
taskAttrs.Insert(
metadata.A.TaskType,
pcommon.NewValueString(taskType(task)),
)
taskAttrs.UpsertInt(metadata.A.JobID, int64(j.JobID))
taskAttrs.UpsertString(metadata.A.TaskID, task.TaskKey)
taskAttrs.UpsertString(metadata.A.TaskType, taskType(task))
}
}
return jobIDs, nil
Expand Down
8 changes: 3 additions & 5 deletions internal/receiver/databricksreceiver/run_metrics_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package databricksreceiver
import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/metadata"
Expand Down Expand Up @@ -67,14 +66,13 @@ func (p runMetricsProvider) addSingleJobRunMetrics(
}
jobPt := jobPts.AppendEmpty()
jobPt.SetIntVal(int64(run.ExecutionDuration))
jobIDAttr := pcommon.NewValueInt(int64(jobID))
jobPt.Attributes().Insert(metadata.Attributes.JobID, jobIDAttr)
jobPt.Attributes().UpsertInt(metadata.Attributes.JobID, int64(jobID))
for _, task := range run.Tasks {
taskPt := taskPts.AppendEmpty()
taskPt.SetIntVal(int64(task.ExecutionDuration))
taskAttrs := taskPt.Attributes()
taskAttrs.Insert(metadata.Attributes.JobID, jobIDAttr)
taskAttrs.Insert(metadata.Attributes.TaskID, pcommon.NewValueString(task.TaskKey))
taskAttrs.UpsertInt(metadata.Attributes.JobID, int64(jobID))
taskAttrs.UpsertString(metadata.Attributes.TaskID, task.TaskKey)
}
}
return nil
Expand Down
6 changes: 1 addition & 5 deletions internal/receiver/databricksreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/metadata"
Expand All @@ -37,10 +36,7 @@ func (s scraper) scrape(_ context.Context) (pmetric.Metrics, error) {
out := pmetric.NewMetrics()
rms := out.ResourceMetrics()
rm := rms.AppendEmpty()
rm.Resource().Attributes().Insert(
metadata.A.DatabricksInstanceName,
pcommon.NewValueString(s.instanceName),
)
rm.Resource().Attributes().UpsertString(metadata.A.DatabricksInstanceName, s.instanceName)
ilms := rm.ScopeMetrics()
ilm := ilms.AppendEmpty()
ms := ilm.Metrics()
Expand Down
28 changes: 12 additions & 16 deletions internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func endpointToPLogs(observerID config.ComponentID, eventType string, endpoints
pLogs = plog.NewLogs()
rlog := pLogs.ResourceLogs().AppendEmpty()
rAttrs := rlog.Resource().Attributes()
rAttrs.InsertString(eventTypeAttr, eventType)
rAttrs.InsertString(observerNameAttr, observerID.Name())
rAttrs.InsertString(observerTypeAttr, string(observerID.Type()))
rAttrs.UpsertString(eventTypeAttr, eventType)
rAttrs.UpsertString(observerNameAttr, observerID.Name())
rAttrs.UpsertString(observerTypeAttr, string(observerID.Type()))
sl := rlog.ScopeLogs().AppendEmpty()
for _, endpoint := range endpoints {
logRecord := sl.LogRecords().AppendEmpty()
Expand All @@ -134,12 +134,12 @@ func endpointToPLogs(observerID config.ComponentID, eventType string, endpoints
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.InsertString("type", string(endpoint.Details.Type()))
attrs.UpsertString("type", string(endpoint.Details.Type()))
} else {
logRecord.Body().SetStringVal(fmt.Sprintf("%s endpoint %s", eventType, endpoint.ID))
}
attrs.InsertString("endpoint", endpoint.Target)
attrs.InsertString("id", string(endpoint.ID))
attrs.UpsertString("endpoint", endpoint.Target)
attrs.UpsertString("id", string(endpoint.ID))

// sorted log record attributes for determinism
attrs.Sort()
Expand All @@ -155,38 +155,34 @@ func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer
// should result in a ValueMap
case shouldEmbedMap(endpointType, k):
if asMap, ok := v.(map[string]string); ok {
val := pcommon.NewValueMap()
mapVal := val.MapVal()
mapVal := attrs.UpsertEmptyMap(k)
for item, itemVal := range asMap {
mapVal.InsertString(item, itemVal)
mapVal.UpsertString(item, itemVal)
}
mapVal.Sort()
attrs.Insert(k, val)
} else {
return attrs, fmt.Errorf("failed parsing %v env attributes", endpointType)
}
// pod EndpointEnv is the value of the "pod" field for observer.PortType and should be
// embedded as ValueMap
case observer.EndpointType(k) == observer.PodType && endpointType == observer.PortType:
if podEnv, ok := v.(observer.EndpointEnv); ok {
val := pcommon.NewValueMap()
podAttrs, e := endpointEnvToAttrs(observer.PodType, podEnv)
if e != nil {
return attrs, fmt.Errorf("failed parsing %v pod attributes ", endpointType)
}
podAttrs.CopyTo(val.MapVal())
attrs.Insert(k, val)
podAttrs.CopyTo(attrs.UpsertEmptyMap(k))
} else {
return attrs, fmt.Errorf("failed parsing %v pod env %#v", endpointType, v)
}
default:
switch vVal := v.(type) {
case uint16:
attrs.InsertInt(k, int64(vVal))
attrs.UpsertInt(k, int64(vVal))
case bool:
attrs.InsertBool(k, vVal)
attrs.UpsertBool(k, vVal)
default:
attrs.InsertString(k, fmt.Sprintf("%v", v))
attrs.UpsertString(k, fmt.Sprintf("%v", v))
}
}
}
Expand Down
Loading

0 comments on commit 82b68cb

Please sign in to comment.