Skip to content

Commit

Permalink
Make incoming labels from gcp into Loki internal labels. (#3285)
Browse files Browse the repository at this point in the history
* Make incoming labels from gcp into Loki internal labels.

If these labels are significant, we can always use relabeling.

* Add support for rebeling in gcptarget

* Test for SnakeCase
  • Loading branch information
kavirajk authored Feb 5, 2021
1 parent 923671a commit 59a34f9
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 7 deletions.
10 changes: 10 additions & 0 deletions cmd/promtail/promtail-local-pubsub-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,13 @@ scrape_configs:
use_incoming_timestamp: false # default rewrite timestamp.
labels:
job: pubsub-gcp

relabel_configs:
- action: replace
source_labels:
- __bucket_name
target_label: bucket_name
- action: replace
source_labels:
- __backend_service_name
target_label: backend_service_name
44 changes: 40 additions & 4 deletions pkg/promtail/targets/gcplog/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"cloud.google.com/go/pubsub"
json "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/util"
)

// LogEntry that will be written to the pubsub topic.
Expand All @@ -35,18 +38,51 @@ type GCPLogEntry struct {
// anyway we will be sending the entire entry to Loki.
}

func format(m *pubsub.Message, other model.LabelSet, useIncomingTimestamp bool) (api.Entry, error) {
func format(
m *pubsub.Message,
other model.LabelSet,
useIncomingTimestamp bool,
relabelConfig []*relabel.Config,
) (api.Entry, error) {
var ge GCPLogEntry

if err := json.Unmarshal(m.Data, &ge); err != nil {
return api.Entry{}, err
}

labels := model.LabelSet{
"resource_type": model.LabelValue(ge.Resource.Type),
// mandatory label for gcplog
lbs := labels.NewBuilder(nil)
lbs.Set("resource_type", ge.Resource.Type)

// labels from gcp log entry. Add it as internal labels
for k, v := range ge.Resource.Labels {
lbs.Set("__"+util.SnakeCase(k), v)
}

var processed labels.Labels

// apply relabeling
if len(relabelConfig) > 0 {
processed = relabel.Process(lbs.Labels(), relabelConfig...)
} else {
processed = lbs.Labels()
}

// final labelset that will be sent to loki
labels := make(model.LabelSet)
for _, lbl := range processed {
// ignore internal labels
if strings.HasPrefix(lbl.Name, "__") {
continue
}
// ignore invalid labels
if !model.LabelName(lbl.Name).IsValid() || !model.LabelValue(lbl.Value).IsValid() {
continue
}
labels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

// add labels from config as well.
// add labels coming from scrapeconfig
labels = labels.Merge(other)

ts := time.Now()
Expand Down
46 changes: 44 additions & 2 deletions pkg/promtail/targets/gcplog/formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"cloud.google.com/go/pubsub"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -18,9 +19,50 @@ func TestFormat(t *testing.T) {
name string
msg *pubsub.Message
labels model.LabelSet
relabel []*relabel.Config
useIncomingTimestamp bool
expected api.Entry
}{
{
name: "relabelling",
msg: &pubsub.Message{
Data: []byte(withAllFields),
},
labels: model.LabelSet{
"jobname": "pubsub-test",
},
relabel: []*relabel.Config{
{
SourceLabels: model.LabelNames{"__backend_service_name"},
Separator: ";",
Regex: relabel.MustNewRegexp("(.*)"),
TargetLabel: "backend_service_name",
Action: "replace",
Replacement: "$1",
},
{
SourceLabels: model.LabelNames{"__bucket_name"},
Separator: ";",
Regex: relabel.MustNewRegexp("(.*)"),
TargetLabel: "bucket_name",
Action: "replace",
Replacement: "$1",
},
},
useIncomingTimestamp: true,
expected: api.Entry{
Labels: model.LabelSet{
"jobname": "pubsub-test",
"resource_type": "gcs",
"backend_service_name": "http-loki",
"bucket_name": "loki-bucket",
},
Entry: logproto.Entry{
Timestamp: mustTime(t, "2020-12-22T15:01:23.045123456Z"),
Line: withAllFields,
},
},
},
{
name: "use-original-timestamp",
msg: &pubsub.Message{
Expand Down Expand Up @@ -64,7 +106,7 @@ func TestFormat(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got, err := format(c.msg, c.labels, c.useIncomingTimestamp)
got, err := format(c.msg, c.labels, c.useIncomingTimestamp, c.relabel)

require.NoError(t, err)

Expand All @@ -90,5 +132,5 @@ func mustTime(t *testing.T, v string) time.Time {
}

const (
withAllFields = `{"logName": "https://project/gcs", "resource": {"type": "gcs", "labels": {"instanceId": "344555"}}, "timestamp": "2020-12-22T15:01:23.045123456Z"}`
withAllFields = `{"logName": "https://project/gcs", "resource": {"type": "gcs", "labels": {"backendServiceName": "http-loki", "bucketName": "loki-bucket", "instanceId": "344555"}}, "timestamp": "2020-12-22T15:01:23.045123456Z"}`
)
2 changes: 1 addition & 1 deletion pkg/promtail/targets/gcplog/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (t *GcplogTarget) run() error {
case <-t.ctx.Done():
return t.ctx.Err()
case m := <-t.msgs:
entry, err := format(m, t.config.Labels, t.config.UseIncomingTimestamp)
entry, err := format(m, t.config.Labels, t.config.UseIncomingTimestamp, t.relabelConfig)
if err != nil {
level.Error(t.logger).Log("event", "error formating log entry", "cause", err)
m.Ack()
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/string.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package util

import (
"bytes"
"fmt"
"unicode"
)

func StringRef(value string) *string {
return &value
}
Expand All @@ -13,3 +19,16 @@ func StringSliceContains(slice []string, value string) bool {

return false
}

// SnakeCase converts given string `s` into `snake_case`.
func SnakeCase(s string) string {
var buf bytes.Buffer
for i, r := range s {
if unicode.IsUpper(r) && i > 0 && s[i-1] != '_' {
fmt.Fprintf(&buf, "_")
}
r = unicode.ToLower(r)
fmt.Fprintf(&buf, "%c", r)
}
return buf.String()
}
37 changes: 37 additions & 0 deletions pkg/util/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,40 @@ func TestStringSliceContains(t *testing.T) {
})
}
}

func TestStringSnakeCase(t *testing.T) {
t.Parallel()

cases := []struct {
name string
input, expected string
}{
{
name: "simple",
input: "snakeCase",
expected: "snake_case",
},
{
name: "mix",
input: "Snake_Case",
expected: "snake_case", // should be snake__case??
},
{
name: "begin-with-underscore",
input: "_Snake_Case",
expected: "_snake_case",
},
{
name: "end-with-underscore",
input: "Snake_Case_",
expected: "snake_case_",
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := SnakeCase(c.input)
assert.Equal(t, c.expected, got)
})
}
}

0 comments on commit 59a34f9

Please sign in to comment.