Skip to content

Commit

Permalink
Loki Exporter - Adding a feature for loki exporter to encode JSON for…
Browse files Browse the repository at this point in the history
… log entry (#5846)

* initial commit

* revert un-needed changes

* test json encode as entry

* add encoding files

* merge loki encoding with the resource atttribute branch

* add format option to loki exporter config

* add test for loki/json test data

* improve variable names and comments

* Create feature for loki exporter to encode json

* mapping convert functions

* Fix linting errors

* update pdata package

* update to use otel 0.36

* Add resources in encoded json structure

* Add missing licence

* Fix review

* Group similar errors

* Fix serialization

* Add a test for unsuported body type

* Fix tests

* Fix lint

* Change error level to debug for dropped logs to avoid flood, keeping a trace of the reason

Co-authored-by: Shaun Creary <shauncreary@rentalcars.com>
  • Loading branch information
gillg and crearys authored Dec 7, 2021
1 parent 67f9f9e commit 2c54472
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 13 deletions.
2 changes: 2 additions & 0 deletions exporter/lokiexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ The following settings can be optionally configured:

- `headers` (no default): Name/value pairs added to the HTTP request headers.

- `format` (default = body): Set the log entry line format. This can be set to 'json' (the entire JSON encoded log record) or 'body' (the log record body field as a string).

Example:

```yaml
Expand Down
2 changes: 2 additions & 0 deletions exporter/lokiexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {

// Labels defines how labels should be applied to log streams sent to Loki.
Labels LabelsConfig `mapstructure:"labels"`
// Allows you to choose the entry format in the exporter
Format string `mapstructure:"format"`
}

func (c *Config) validate() error {
Expand Down
55 changes: 54 additions & 1 deletion exporter/lokiexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, 2, len(cfg.Exporters))
assert.Equal(t, 3, len(cfg.Exporters))

actualCfg := cfg.Exporters[config.NewComponentIDWithName(typeStr, "allsettings")].(*Config)
expectedCfg := Config{
Expand Down Expand Up @@ -87,6 +87,59 @@ func TestLoadConfig(t *testing.T) {
"severity": "severity",
},
},
Format: "body",
}
require.Equal(t, &expectedCfg, actualCfg)
}

func TestJSONLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)

factory := NewFactory()
factories.Exporters[config.Type(typeStr)] = factory
cfg, err := configtest.LoadConfig(path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, 3, len(cfg.Exporters))

actualCfg := cfg.Exporters[config.NewComponentIDWithName(typeStr, "json")].(*Config)
expectedCfg := Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "json")),
HTTPClientSettings: confighttp.HTTPClientSettings{
Headers: map[string]string{},
Endpoint: "https://loki:3100/loki/api/v1/push",
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "",
CertFile: "",
KeyFile: "",
},
Insecure: false,
},
ReadBufferSize: 0,
WriteBufferSize: 524288,
Timeout: time.Second * 30,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 5 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 10,
QueueSize: 5000,
},
TenantID: "example",
Labels: LabelsConfig{
Attributes: map[string]string{},
ResourceAttributes: map[string]string{},
},
Format: "json",
}
require.Equal(t, &expectedCfg, actualCfg)
}
Expand Down
73 changes: 73 additions & 0 deletions exporter/lokiexporter/encode_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lokiexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter"

import (
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/model/pdata"
)

// JSON representation of the LogRecord as described by https://developers.google.com/protocol-buffers/docs/proto3#json

type lokiEntry struct {
Name string `json:"name,omitempty"`
Body string `json:"body,omitempty"`
TraceID string `json:"traceid,omitempty"`
SpanID string `json:"spanid,omitempty"`
Severity string `json:"severity,omitempty"`
Attributes map[string]interface{} `json:"attributes,omitempty"`
Resources map[string]interface{} `json:"resources,omitempty"`
}

func serializeBody(body pdata.AttributeValue) (string, error) {
str := ""
var err error
if body.Type() == pdata.AttributeValueTypeString {
str = body.StringVal()
} else {
err = fmt.Errorf("unsuported body type to serialize")
}
return str, err
}

func encodeJSON(lr pdata.LogRecord, res pdata.Resource) (string, error) {
var logRecord lokiEntry
var jsonRecord []byte
var err error
var body string

body, err = serializeBody(lr.Body())
if err != nil {
return "", err
}
logRecord = lokiEntry{
Name: lr.Name(),
Body: body,
TraceID: lr.TraceID().HexString(),
SpanID: lr.SpanID().HexString(),
Severity: lr.SeverityText(),
Attributes: lr.Attributes().AsRaw(),
Resources: res.Attributes().AsRaw(),
}
lr.Body().Type()

jsonRecord, err = json.Marshal(logRecord)
if err != nil {
return "", err
}
return string(jsonRecord), nil
}
66 changes: 66 additions & 0 deletions exporter/lokiexporter/encode_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lokiexporter

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
)

func exampleLog() (pdata.LogRecord, pdata.Resource) {

buffer := pdata.NewLogRecord()
buffer.Body().SetStringVal("Example log")
buffer.SetName("name")
buffer.SetSeverityText("error")
buffer.Attributes().Insert("attr1", pdata.NewAttributeValueString("1"))
buffer.Attributes().Insert("attr2", pdata.NewAttributeValueString("2"))
buffer.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4}))
buffer.SetSpanID(pdata.NewSpanID([8]byte{5, 6, 7, 8}))

resource := pdata.NewResource()
resource.Attributes().Insert("host.name", pdata.NewAttributeValueString("something"))

return buffer, resource
}

func exampleJSON() string {
jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"},"resources":{"host.name":"something"}}`
return jsonExample
}

func TestConvertString(t *testing.T) {
in := exampleJSON()
out, err := encodeJSON(exampleLog())
t.Log(in)
t.Log(out, err)
assert.Equal(t, in, out)
}

func TestConvertNonString(t *testing.T) {
in := exampleJSON()
log, resource := exampleLog()
mapVal := pdata.NewAttributeValueMap()
mapVal.MapVal().Insert("key1", pdata.NewAttributeValueString("value"))
mapVal.MapVal().Insert("key2", pdata.NewAttributeValueString("value"))
mapVal.CopyTo(log.Body())

out, err := encodeJSON(log, resource)
t.Log(in)
t.Log(out, err)
assert.EqualError(t, err, "unsuported body type to serialize")
}
2 changes: 0 additions & 2 deletions exporter/lokiexporter/example/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ services:
container_name: otel
command:
- "--config=/etc/otel-collector-config.yml"
# Memory Ballast size should be max 1/3 to 1/2 of memory.
- "--mem-ballast-size-mib=683"
volumes:
- ./otel-collector-config.yml:/etc/otel-collector-config.yml
ports:
Expand Down
8 changes: 8 additions & 0 deletions exporter/lokiexporter/example/otel-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,23 @@ processors:
exporters:
loki:
endpoint: "http://loki:3100/loki/api/v1/push"
# Will encode the whole OTEL message in json
# This format happen after extracting labels
format: json
labels:
attributes:
container_name: ""
source: ""
resources:
host.name: "hostname"

extensions:
health_check:
pprof:
zpages:
memory_ballast:
# Memory Ballast size should be max 1/3 to 1/2 of memory.
size_mib: 64

service:
extensions: [pprof, zpages, health_check]
Expand Down
57 changes: 50 additions & 7 deletions exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package lokiexporter // import "github.com/open-telemetry/opentelemetry-collecto
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -30,23 +31,31 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/third_party/loki/logproto"
)

type lokiExporter struct {
config *Config
logger *zap.Logger
client *http.Client
wg sync.WaitGroup
config *Config
logger *zap.Logger
client *http.Client
wg sync.WaitGroup
convert func(pdata.LogRecord, pdata.Resource) (*logproto.Entry, error)
}

func newExporter(config *Config, logger *zap.Logger) *lokiExporter {
return &lokiExporter{
lokiexporter := &lokiExporter{
config: config,
logger: logger,
}
if config.Format == "json" {
lokiexporter.convert = convertLogToJSONEntry
} else {
lokiexporter.convert = convertLogBodyToEntry
}
return lokiexporter
}

func (l *lokiExporter) pushLogData(ctx context.Context, ld pdata.Logs) error {
Expand Down Expand Up @@ -119,6 +128,8 @@ func (l *lokiExporter) stop(context.Context) (err error) {
}

func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, numDroppedLogs int) {
var errs error

streams := make(map[string]*logproto.Stream)
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
Expand All @@ -135,7 +146,24 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n
continue
}
labels := mergedLabels.String()
entry := convertLogToLokiEntry(log)
var entry *logproto.Entry
var err error
entry, err = l.convert(log, resource)
if err != nil {
// Couldn't convert so dropping log.
numDroppedLogs++
errs = multierr.Append(
errs,
errors.New(
fmt.Sprint(
"failed to convert, dropping log",
zap.String("format", l.config.Format),
zap.Error(err),
),
),
)
continue
}

if stream, ok := streams[labels]; ok {
stream.Entries = append(stream.Entries, *entry)
Expand All @@ -150,6 +178,10 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n
}
}

if errs != nil {
l.logger.Debug("some logs has been dropped", zap.Error(errs))
}

pr = &logproto.PushRequest{
Streams: make([]logproto.Stream, len(streams)),
}
Expand Down Expand Up @@ -195,9 +227,20 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap,
return ls
}

func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry {
func convertLogBodyToEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) {
return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: lr.Body().StringVal(),
}, nil
}

func convertLogToJSONEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) {
line, err := encodeJSON(lr, res)
if err != nil {
return nil, err
}
return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: line,
}, nil
}
Loading

0 comments on commit 2c54472

Please sign in to comment.