Skip to content

Commit

Permalink
migrated gelfTarget to go-gelf/v2 library that contains fix for chunk…
Browse files Browse the repository at this point in the history
…ed messages (#5992)
  • Loading branch information
vlad-diachenko authored Apr 25, 2022
1 parent 7c462ca commit 4a91fdc
Show file tree
Hide file tree
Showing 25 changed files with 626 additions and 217 deletions.
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/gelf/gelftarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/go-gelf/v2/gelf"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"gopkg.in/Graylog2/go-gelf.v2/gelf"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
Expand Down
110 changes: 102 additions & 8 deletions clients/pkg/promtail/targets/gelf/gelftarget_test.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package gelf

import (
"crypto/rand"
"fmt"
"io"
"net"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/go-gelf/v2/gelf"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
"gopkg.in/Graylog2/go-gelf.v2/gelf"

"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
)

func Test_Gelf(t *testing.T) {
client := fake.New(func() {})

tm, err := NewTargetManager(NewMetrics(nil), log.NewNopLogger(), client, []scrapeconfig.Config{
{
JobName: "gelf",
GelfConfig: &scrapeconfig.GelfTargetConfig{
ListenAddress: ":12201",
ListenAddress: ":0",
UseIncomingTimestamp: true,
Labels: model.LabelSet{"cfg": "true"},
},
Expand Down Expand Up @@ -51,9 +55,14 @@ func Test_Gelf(t *testing.T) {
},
})
require.NoError(t, err)

w, err := gelf.NewUDPWriter(":12201")
defer tm.Stop()
target := tm.targets["gelf"]
require.NotNil(t, target)
w, err := gelf.NewUDPWriter(target.gelfReader.Addr())
require.NoError(t, err)
defer func() {
require.NoError(t, w.Close())
}()
baseTs := float64(time.Unix(10, 0).Unix()) + 0.250
ts := baseTs

Expand All @@ -75,7 +84,7 @@ func Test_Gelf(t *testing.T) {

require.Eventually(t, func() bool {
return len(client.Received()) == 10
}, 200*time.Millisecond, 20*time.Millisecond)
}, 1*time.Second, 20*time.Millisecond)

for i, actual := range client.Received() {
require.Equal(t, "error", string(actual.Labels["level"]))
Expand All @@ -98,10 +107,95 @@ func Test_Gelf(t *testing.T) {
require.Equal(t, "gelftest", gelfMsg.Facility)

}

tm.Stop()
}

func TestConvertTime(t *testing.T) {
require.Equal(t, time.Unix(0, int64(time.Second+(time.Duration(250)*time.Millisecond))), secondsToUnixTimestamp(float64(time.Unix(1, 0).Unix())+0.250))
}

func Test_GelfChunksUnordered(t *testing.T) {
client := fake.New(func() {})

tm, err := NewTargetManager(NewMetrics(nil), log.NewNopLogger(), client, []scrapeconfig.Config{
{
JobName: "gelf",
GelfConfig: &scrapeconfig.GelfTargetConfig{
ListenAddress: ":0",
},
},
})
require.NoError(t, err)
defer tm.Stop()

target := tm.targets["gelf"]
require.NotNil(t, target)
connection, err := net.Dial("udp", target.gelfReader.Addr())
require.NoError(t, err)
defer func() {
require.NoError(t, connection.Close())
}()

chunksA := createChunks(t, "a")
chunksB := createChunks(t, "b")
// send messages(a, b) chunks in order: chunk-0a, chunk-0b, chunk-1a, chunk-1b
for i := 0; i < len(chunksB); i++ {
writeA, err := connection.Write(chunksA[i])
require.NoError(t, err)
require.Equal(t, len(chunksA[i]), writeA)

writeB, err := connection.Write(chunksB[i])
require.NoError(t, err)
require.Equal(t, len(chunksB[i]), writeB)
}

require.Eventually(t, func() bool {
return len(client.Received()) == 2
}, 2*time.Second, 100*time.Millisecond, "expected 2 messages to be received")
}

func createChunks(t *testing.T, char string) [][]byte {
chunksA, err := splitToChunks([]byte(fmt.Sprintf("{\"short_message\":\"%v\"}", strings.Repeat(char, gelf.ChunkSize*2))))
require.NoError(t, err)
return chunksA
}

// static value that indicated that GELF message is chunked
var magicChunked = []byte{0x1e, 0x0f}

const (
chunkedHeaderLen = 12
chunkedDataLen = gelf.ChunkSize - chunkedHeaderLen
)

func splitToChunks(messageBytes []byte) ([][]byte, error) {
chunksCount := uint8(len(messageBytes)/chunkedDataLen + 1)
messageID := make([]byte, 8)
n, err := io.ReadFull(rand.Reader, messageID)
if err != nil || n != 8 {
return nil, fmt.Errorf("rand.Reader: %d/%s", n, err)
}
chunks := make([][]byte, 0, chunksCount)
bytesLeft := len(messageBytes)
for i := uint8(0); i < chunksCount; i++ {
buf := make([]byte, 0, gelf.ChunkSize)
buf = append(buf, magicChunked...)
buf = append(buf, messageID...)
buf = append(buf, i)
buf = append(buf, chunksCount)
chunkLen := chunkedDataLen
if chunkLen > bytesLeft {
chunkLen = bytesLeft
}
off := int(i) * chunkedDataLen
chunkData := messageBytes[off : off+chunkLen]
buf = append(buf, chunkData...)

chunks = append(chunks, buf)
bytesLeft -= chunkLen
}

if bytesLeft != 0 {
return nil, fmt.Errorf("error: %d bytes left after splitting", bytesLeft)
}
return chunks, nil
}
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
Expand Down Expand Up @@ -85,7 +86,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.6.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/thanos-io/thanos v0.22.0
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.30.0+incompatible
Expand All @@ -101,7 +102,6 @@ require (
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/api v0.70.0
google.golang.org/grpc v1.44.0
gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -311,7 +311,4 @@ replace github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-
// is v0.19.1. We pin version from late september here. Feel free to remove when updating to later version.
replace github.com/thanos-io/thanos v0.22.0 => github.com/thanos-io/thanos v0.19.1-0.20211126105533-c5505f5eaa7d

// We use a fork of Graylog to avoid leaking goroutine when closing the Promtail target.
replace gopkg.in/Graylog2/go-gelf.v2 => github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8

replace github.com/cloudflare/cloudflare-go => github.com/cyriltovena/cloudflare-go v0.27.1-0.20211118103540-ff77400bcb93
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1037,8 +1037,8 @@ github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM=
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca h1:0qHzm6VS0bCsSWKHuyfpt+pdpyScdZbzY/IFIyKSYOk=
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca/go.mod h1:q51XdMLLHNZJSG6KOGujC20ed2OoLFdx0hBmOEVfRs0=
github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8 h1:aEOagXOTqtN9gd4jiDuP/5a81HdoJBqkVfn8WaxbsK4=
github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8/go.mod h1:QAvS2C7TtQRhhv9Uf/sxD+BUhpkrPFm5jK/9MzUiDCY=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
github.com/grafana/regexp v0.0.0-20220202152315-e74e38789280/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
Expand Down Expand Up @@ -1820,8 +1820,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down
File renamed without changes.
53 changes: 53 additions & 0 deletions vendor/github.com/grafana/go-gelf/v2/gelf/defragmentator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4a91fdc

Please sign in to comment.