Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cloud_pubsub): Add support for gzip compression #13094

Merged
merged 16 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions plugins/inputs/cloud_pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## PubSub message data before parsing. Many GCP services that
## output JSON to Google PubSub base64-encode the JSON payload.
# base64_data = false

## Content encoding for message payloads, can be set to "gzip" or
## "identity" to apply no encoding.
# content_encoding = "identity"

## If content encoding is not "identity", sets the maximum allowed size,
## in bytes, for a message payload when it's decompressed. Can be increased
## for larger payloads or reduced to protect against decompression bombs.
## Acceptable units are B, KiB, KB, MiB, MB...
# max_decompression_size = "500MB"
```

### Multiple Subscriptions and Topics
Expand Down
86 changes: 67 additions & 19 deletions plugins/inputs/cloud_pubsub/cloud_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ type PubSub struct {

Base64Data bool `toml:"base64_data"`

Log telegraf.Logger
ContentEncoding string `toml:"content_encoding"`
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
Log telegraf.Logger `toml:"-"`

sub subscription
stubSub func() subscription
Expand All @@ -62,6 +64,7 @@ type PubSub struct {

undelivered map[telegraf.TrackingID]message
sem semaphore
decoder internal.ContentDecoder
}

func (*PubSub) SampleConfig() string {
Expand All @@ -82,14 +85,6 @@ func (ps *PubSub) SetParser(parser parsers.Parser) {
// Two goroutines are started - one pulling for the subscription, one
// receiving delivery notifications from the accumulator.
func (ps *PubSub) Start(ac telegraf.Accumulator) error {
if ps.Subscription == "" {
return fmt.Errorf(`"subscription" is required`)
}

if ps.Project == "" {
return fmt.Errorf(`"project" is required`)
}

ps.sem = make(semaphore, ps.MaxUndeliveredMessages)
ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages)

Expand Down Expand Up @@ -176,21 +171,20 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen)
}

var data []byte
if ps.Base64Data {
strData, err := base64.StdEncoding.DecodeString(string(msg.Data()))
if err != nil {
return fmt.Errorf("unable to base64 decode message: %w", err)
}
data = strData
} else {
data = msg.Data()
data, err := ps.decompressData(msg.Data())
if err != nil {
return fmt.Errorf("unable to decompress %s message: %w", ps.ContentEncoding, err)
}

data, err = ps.decodeB64Data(data)
if err != nil {
return fmt.Errorf("unable to decode base64 message: %w", err)
}

metrics, err := ps.parser.Parse(data)
if err != nil {
msg.Ack()
return err
return fmt.Errorf("unable to parse message: %w", err)
}

if len(metrics) == 0 {
Expand All @@ -217,6 +211,31 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
return nil
}

func (ps *PubSub) decompressData(data []byte) ([]byte, error) {
if ps.ContentEncoding == "identity" {
return data, nil
}

data, err := ps.decoder.Decode(data, int64(ps.MaxDecompressionSize))
if err != nil {
return nil, err
}

decompressedData := make([]byte, len(data))
copy(decompressedData, data)
data = decompressedData

return data, nil
}

func (ps *PubSub) decodeB64Data(data []byte) ([]byte, error) {
if ps.Base64Data {
return base64.StdEncoding.DecodeString(string(data))
}

return data, nil
}

func (ps *PubSub) waitForDelivery(parentCtx context.Context) {
for {
select {
Expand Down Expand Up @@ -286,6 +305,35 @@ func (ps *PubSub) getGCPSubscription(subID string) (subscription, error) {
return &gcpSubscription{s}, nil
}

func (ps *PubSub) Init() error {
if ps.Subscription == "" {
return fmt.Errorf(`"subscription" is required`)
}

if ps.Project == "" {
return fmt.Errorf(`"project" is required`)
}

switch ps.ContentEncoding {
case "", "identity":
ps.ContentEncoding = "identity"
case "gzip":
var err error
ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding)
}

if ps.MaxDecompressionSize <= 0 {
ps.MaxDecompressionSize = internal.DefaultMaxDecompressionSize
}

return nil
}

func init() {
inputs.Add("cloud_pubsub", func() telegraf.Input {
ps := &PubSub{
Expand Down
114 changes: 84 additions & 30 deletions plugins/inputs/cloud_pubsub/cloud_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
Expand All @@ -28,24 +30,24 @@ func TestRunParse(t *testing.T) {
}
sub.receiver = testMessagesReceive(sub)

decoder, _ := internal.NewContentDecoder("identity")

ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
}

acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()

if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
require.NotNil(t, ps.sub)

testTracker := &testTracker{}
msg := &testMsg{
Expand Down Expand Up @@ -73,6 +75,8 @@ func TestRunBase64(t *testing.T) {
}
sub.receiver = testMessagesReceive(sub)

decoder, _ := internal.NewContentDecoder("identity")

ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
Expand All @@ -81,17 +85,15 @@ func TestRunBase64(t *testing.T) {
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Base64Data: true,
decoder: decoder,
}

acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()

if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
require.NotNil(t, ps.sub)

testTracker := &testTracker{}
msg := &testMsg{
Expand All @@ -106,6 +108,55 @@ func TestRunBase64(t *testing.T) {
validateTestInfluxMetric(t, metric)
}

func TestRunGzipDecode(t *testing.T) {
subID := "sub-run-gzip"

testParser := &influx.Parser{}
require.NoError(t, testParser.Init())

sub := &stubSub{
id: subID,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)

decoder, err := internal.NewContentDecoder("gzip")
require.NoError(t, err)

ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ContentEncoding: "gzip",
MaxDecompressionSize: internal.DefaultMaxDecompressionSize,
decoder: decoder,
}

acc := &testutil.Accumulator{}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()

require.NotNil(t, ps.sub)

testTracker := &testTracker{}
enc := internal.NewGzipEncoder()
gzippedMsg, err := enc.Encode([]byte(msgInflux))
require.NoError(t, err)
msg := &testMsg{
value: string(gzippedMsg),
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}

func TestRunInvalidMessages(t *testing.T) {
subID := "sub-invalid-messages"

Expand All @@ -118,24 +169,25 @@ func TestRunInvalidMessages(t *testing.T) {
}
sub.receiver = testMessagesReceive(sub)

decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
}

acc := &testutil.Accumulator{}

if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}

require.NotNil(t, ps.sub)

testTracker := &testTracker{}
msg := &testMsg{
Expand Down Expand Up @@ -166,24 +218,25 @@ func TestRunOverlongMessages(t *testing.T) {
}
sub.receiver = testMessagesReceive(sub)

decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
// Add MaxMessageLen Param
MaxMessageLen: 1,
}

if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}

require.NotNil(t, ps.sub)

testTracker := &testTracker{}
msg := &testMsg{
Expand Down Expand Up @@ -215,24 +268,25 @@ func TestRunErrorInSubscriber(t *testing.T) {
fakeErrStr := "a fake error"
sub.receiver = testMessagesError(errors.New("a fake error"))

decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
RetryReceiveDelaySeconds: 1,
}

if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()

if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
require.NotNil(t, ps.sub)

acc.WaitError(1)
require.Regexp(t, fakeErrStr, acc.Errors[0])
}
Expand Down
10 changes: 10 additions & 0 deletions plugins/inputs/cloud_pubsub/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,13 @@
## PubSub message data before parsing. Many GCP services that
## output JSON to Google PubSub base64-encode the JSON payload.
# base64_data = false

## Content encoding for message payloads, can be set to "gzip" or
## "identity" to apply no encoding.
# content_encoding = "identity"

## If content encoding is not "identity", sets the maximum allowed size,
## in bytes, for a message payload when it's decompressed. Can be increased
## for larger payloads or reduced to protect against decompression bombs.
## Acceptable units are B, KiB, KB, MiB, MB...
# max_decompression_size = "500MB"
4 changes: 4 additions & 0 deletions plugins/outputs/cloud_pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Required. Name of PubSub topic to publish metrics to.
topic = "my-topic"

## Content encoding for message payloads, can be set to "gzip" or
## "identity" to apply no encoding.
# content_encoding = "identity"

## Required. Data format to consume.
## Each data format has its own unique set of configuration options.
## Read more about them here:
Expand Down
Loading