Skip to content

Commit

Permalink
PR review improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Jun 19, 2023
1 parent 356d973 commit da2cd6d
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 822 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ internal API changes are not present.
Main (unreleased)
-----------------

### Breaking changes

- The algorithm for the "hash" action of `otelcol.processor.attributes` has changed.
The change was made in PR [#22831](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/22831) of opentelemetry-collector-contrib. (@ptodev)

- `otelcol.exporter.loki` now includes the instrumentation scope in its output. (@ptodev)

- `otelcol.extension.jaeger_remote_sampling` removes the `\` HTTP endpoint. The `/sampling` endpoint is still functional.
The change was made in PR [#18070](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/18070) of opentelemetry-collector-contrib. (@ptodev)

### Features

- The Pyroscope scrape component computes and sends delta profiles automatically when required to reduce bandwidth usage. (@cyriltovena)
Expand All @@ -18,7 +28,6 @@ Main (unreleased)

- Integrations: Introduce the `squid` integration. (@armstrmi)


- New Grafana Agent Flow components:

- `prometheus.exporter.kafka` collects metrics from Kafka Server (@oliver-zhang)
Expand All @@ -40,6 +49,8 @@ Main (unreleased)

- Update `node_exporter` dependency to v1.6.0. (@spartan0x117)

- Update OpenTelemetry Collector dependencies from v0.63.0 to v0.79.0. (@ptodev)

### Bugfixes

- Add signing region to remote.s3 component for use with custom endpoints so that Authorization Headers work correctly when
Expand Down
17 changes: 11 additions & 6 deletions component/otelcol/config_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ type GRPCClientArguments struct {
TLS TLSClientArguments `river:"tls,block,optional"`
Keepalive *KeepaliveClientArguments `river:"keepalive,block,optional"`

ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
WaitForReady bool `river:"wait_for_ready,attr,optional"`
Headers map[string]configopaque.String `river:"headers,attr,optional"`
BalancerName string `river:"balancer_name,attr,optional"`
ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
WaitForReady bool `river:"wait_for_ready,attr,optional"`
Headers map[string]string `river:"headers,attr,optional"`
BalancerName string `river:"balancer_name,attr,optional"`

// Auth is a binding to an otelcol.auth.* component extension which handles
// authentication.
Expand All @@ -154,6 +154,11 @@ func (args *GRPCClientArguments) Convert() *otelconfiggrpc.GRPCClientSettings {
return nil
}

opaqueHeaders := make(map[string]configopaque.String)
for headerName, headerVal := range args.Headers {
opaqueHeaders[headerName] = configopaque.String(headerVal)
}

// Configure the authentication if args.Auth is set.
var auth *otelconfigauth.Authentication
if args.Auth != nil {
Expand All @@ -171,7 +176,7 @@ func (args *GRPCClientArguments) Convert() *otelconfiggrpc.GRPCClientSettings {
ReadBufferSize: int(args.ReadBufferSize),
WriteBufferSize: int(args.WriteBufferSize),
WaitForReady: args.WaitForReady,
Headers: args.Headers,
Headers: opaqueHeaders,
BalancerName: args.BalancerName,

Auth: auth,
Expand Down
17 changes: 11 additions & 6 deletions component/otelcol/config_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
otelcomponent "go.opentelemetry.io/collector/component"
otelconfigauth "go.opentelemetry.io/collector/config/configauth"
otelconfighttp "go.opentelemetry.io/collector/config/confighttp"
otelconfigopaque "go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configopaque"
otelextension "go.opentelemetry.io/collector/extension"
)

Expand Down Expand Up @@ -80,10 +80,10 @@ type HTTPClientArguments struct {

TLS TLSClientArguments `river:"tls,block,optional"`

ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
Timeout time.Duration `river:"timeout,attr,optional"`
Headers map[string]otelconfigopaque.String `river:"headers,attr,optional"`
ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
Timeout time.Duration `river:"timeout,attr,optional"`
Headers map[string]string `river:"headers,attr,optional"`
// CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) TODO (@tpaschalis)
MaxIdleConns *int `river:"max_idle_conns,attr,optional"`
MaxIdleConnsPerHost *int `river:"max_idle_conns_per_host,attr,optional"`
Expand All @@ -107,6 +107,11 @@ func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings {
auth = &otelconfigauth.Authentication{AuthenticatorID: args.Auth.ID}
}

opaqueHeaders := make(map[string]configopaque.String)
for headerName, headerVal := range args.Headers {
opaqueHeaders[headerName] = configopaque.String(headerVal)
}

return &otelconfighttp.HTTPClientSettings{
Endpoint: args.Endpoint,

Expand All @@ -117,7 +122,7 @@ func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings {
ReadBufferSize: int(args.ReadBufferSize),
WriteBufferSize: int(args.WriteBufferSize),
Timeout: args.Timeout,
Headers: args.Headers,
Headers: opaqueHeaders,
// CustomRoundTripper: func(http.RoundTripper) (http.RoundTripper, error) { panic("not implemented") }, TODO (@tpaschalis)
MaxIdleConns: args.MaxIdleConns,
MaxIdleConnsPerHost: args.MaxIdleConnsPerHost,
Expand Down
3 changes: 1 addition & 2 deletions component/otelcol/exporter/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/grafana/agent/component/otelcol/exporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
otelpexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
otelextension "go.opentelemetry.io/collector/extension"
)
Expand Down Expand Up @@ -83,7 +82,7 @@ type GRPCClientArguments otelcol.GRPCClientArguments
// DefaultGRPCClientArguments holds component-specific default settings for
// GRPCClientArguments.
var DefaultGRPCClientArguments = GRPCClientArguments{
Headers: map[string]configopaque.String{},
Headers: map[string]string{},
Compression: otelcol.CompressionTypeGzip,
WriteBufferSize: 512 * 1024,
}
Expand Down
94 changes: 4 additions & 90 deletions component/otelcol/exporter/loki/internal/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ package convert

import (
"context"
"fmt"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component/common/loki"
loki_translator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand Down Expand Up @@ -69,31 +67,7 @@ func (conv *Converter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
for k := 0; k < logs.Len(); k++ {
conv.metrics.entriesTotal.Inc()

// we may remove attributes, so to avoid mutating the original
// log entry, we make our own copy and change that instead.
log := plog.NewLogRecord()
logs.At(k).CopyTo(log)

// similarly, we may remove resources, so to avoid mutating the
// original log entry, we make and use our own copy instead.
resource := pcommon.NewResource()
rls.At(i).Resource().CopyTo(resource)

// adds level attribute from log.severityNumber
addLogLevelAttributeAndHint(log)

// TODO (@tpaschalis) If we want to pre-populate a tenant
// label from the OTel hint, it should happen here. with the
// upstream getTenantFromTenantHint helper.

format := getFormatFromFormatHint(log.Attributes(), resource.Attributes())

mergedLabels := convertAttributesAndMerge(log.Attributes(), resource.Attributes())
// remove the attributes that were promoted to labels
removeAttributes(log.Attributes(), mergedLabels)
removeAttributes(resource.Attributes(), mergedLabels)

entry, err := convertLogToLokiEntry(log, resource, format, scope)
entry, err := loki_translator.LogToLokiEntry(logs.At(k), rls.At(i).Resource(), scope)
if err != nil {
level.Error(conv.log).Log("msg", "failed to convert log to loki entry", "err", err)
conv.metrics.entriesFailed.Inc()
Expand All @@ -102,8 +76,8 @@ func (conv *Converter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {

conv.metrics.entriesProcessed.Inc()
entries = append(entries, loki.Entry{
Labels: mergedLabels,
Entry: *entry,
Labels: entry.Labels,
Entry: *entry.Entry,
})
}
}
Expand Down Expand Up @@ -131,63 +105,3 @@ func (conv *Converter) UpdateFanout(fanout []loki.LogsReceiver) {

conv.next = fanout
}

func addLogLevelAttributeAndHint(log plog.LogRecord) {
if log.SeverityNumber() == plog.SeverityNumberUnspecified {
return
}
addHint(log)
if _, found := log.Attributes().Get(levelAttributeName); !found {
level := severityNumberToLevel[log.SeverityNumber().String()]
log.Attributes().PutStr(levelAttributeName, level)
}
}

func addHint(log plog.LogRecord) {
if value, found := log.Attributes().Get(hintAttributes); found && !strings.Contains(value.AsString(), levelAttributeName) {
log.Attributes().PutStr(hintAttributes, fmt.Sprintf("%s,%s", value.AsString(), levelAttributeName))
} else {
log.Attributes().PutStr(hintAttributes, levelAttributeName)
}
}

var severityNumberToLevel = map[string]string{
plog.SeverityNumberUnspecified.String(): "UNSPECIFIED",
plog.SeverityNumberTrace.String(): "TRACE",
plog.SeverityNumberTrace2.String(): "TRACE2",
plog.SeverityNumberTrace3.String(): "TRACE3",
plog.SeverityNumberTrace4.String(): "TRACE4",
plog.SeverityNumberDebug.String(): "DEBUG",
plog.SeverityNumberDebug2.String(): "DEBUG2",
plog.SeverityNumberDebug3.String(): "DEBUG3",
plog.SeverityNumberDebug4.String(): "DEBUG4",
plog.SeverityNumberInfo.String(): "INFO",
plog.SeverityNumberInfo2.String(): "INFO2",
plog.SeverityNumberInfo3.String(): "INFO3",
plog.SeverityNumberInfo4.String(): "INFO4",
plog.SeverityNumberWarn.String(): "WARN",
plog.SeverityNumberWarn2.String(): "WARN2",
plog.SeverityNumberWarn3.String(): "WARN3",
plog.SeverityNumberWarn4.String(): "WARN4",
plog.SeverityNumberError.String(): "ERROR",
plog.SeverityNumberError2.String(): "ERROR2",
plog.SeverityNumberError3.String(): "ERROR3",
plog.SeverityNumberError4.String(): "ERROR4",
plog.SeverityNumberFatal.String(): "FATAL",
plog.SeverityNumberFatal2.String(): "FATAL2",
plog.SeverityNumberFatal3.String(): "FATAL3",
plog.SeverityNumberFatal4.String(): "FATAL4",
}

func getFormatFromFormatHint(logAttr pcommon.Map, resourceAttr pcommon.Map) string {
format := formatJSON
formatVal, found := resourceAttr.Get(hintFormat)
if !found {
formatVal, found = logAttr.Get(hintFormat)
}

if found {
format = formatVal.AsString()
}
return format
}
Loading

0 comments on commit da2cd6d

Please sign in to comment.