diff --git a/plugins/inputs/cloud_pubsub/README.md b/plugins/inputs/cloud_pubsub/README.md index 7dc4ef590309b..efb23a92ebfc2 100644 --- a/plugins/inputs/cloud_pubsub/README.md +++ b/plugins/inputs/cloud_pubsub/README.md @@ -101,6 +101,16 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## PubSub message data before parsing. Many GCP services that ## output JSON to Google PubSub base64-encode the JSON payload. # base64_data = false + + ## Content encoding for message payloads, can be set to "gzip" or + ## "identity" to apply no encoding. + # content_encoding = "identity" + + ## If content encoding is not "identity", sets the maximum allowed size, + ## in bytes, for a message payload when it's decompressed. Can be increased + ## for larger payloads or reduced to protect against decompression bombs. + ## Acceptable units are B, KiB, KB, MiB, MB... + # max_decompression_size = "500MB" ``` ### Multiple Subscriptions and Topics diff --git a/plugins/inputs/cloud_pubsub/cloud_pubsub.go b/plugins/inputs/cloud_pubsub/cloud_pubsub.go index 70e723d59ee83..4467f74725ee8 100644 --- a/plugins/inputs/cloud_pubsub/cloud_pubsub.go +++ b/plugins/inputs/cloud_pubsub/cloud_pubsub.go @@ -49,7 +49,9 @@ type PubSub struct { Base64Data bool `toml:"base64_data"` - Log telegraf.Logger + ContentEncoding string `toml:"content_encoding"` + MaxDecompressionSize config.Size `toml:"max_decompression_size"` + Log telegraf.Logger `toml:"-"` sub subscription stubSub func() subscription @@ -62,6 +64,7 @@ type PubSub struct { undelivered map[telegraf.TrackingID]message sem semaphore + decoder internal.ContentDecoder } func (*PubSub) SampleConfig() string { @@ -82,14 +85,6 @@ func (ps *PubSub) SetParser(parser parsers.Parser) { // Two goroutines are started - one pulling for the subscription, one // receiving delivery notifications from the accumulator. func (ps *PubSub) Start(ac telegraf.Accumulator) error { - if ps.Subscription == "" { - return fmt.Errorf(`"subscription" is required`) - } - - if ps.Project == "" { - return fmt.Errorf(`"project" is required`) - } - ps.sem = make(semaphore, ps.MaxUndeliveredMessages) ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages) @@ -176,21 +171,20 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error { return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen) } - var data []byte - if ps.Base64Data { - strData, err := base64.StdEncoding.DecodeString(string(msg.Data())) - if err != nil { - return fmt.Errorf("unable to base64 decode message: %w", err) - } - data = strData - } else { - data = msg.Data() + data, err := ps.decompressData(msg.Data()) + if err != nil { + return fmt.Errorf("unable to decompress %s message: %w", ps.ContentEncoding, err) + } + + data, err = ps.decodeB64Data(data) + if err != nil { + return fmt.Errorf("unable to decode base64 message: %w", err) } metrics, err := ps.parser.Parse(data) if err != nil { msg.Ack() - return err + return fmt.Errorf("unable to parse message: %w", err) } if len(metrics) == 0 { @@ -217,6 +211,31 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error { return nil } +func (ps *PubSub) decompressData(data []byte) ([]byte, error) { + if ps.ContentEncoding == "identity" { + return data, nil + } + + data, err := ps.decoder.Decode(data, int64(ps.MaxDecompressionSize)) + if err != nil { + return nil, err + } + + decompressedData := make([]byte, len(data)) + copy(decompressedData, data) + data = decompressedData + + return data, nil +} + +func (ps *PubSub) decodeB64Data(data []byte) ([]byte, error) { + if ps.Base64Data { + return base64.StdEncoding.DecodeString(string(data)) + } + + return data, nil +} + func (ps *PubSub) waitForDelivery(parentCtx context.Context) { for { select { @@ -286,6 +305,35 @@ func (ps *PubSub) getGCPSubscription(subID string) (subscription, error) { return &gcpSubscription{s}, nil } +func (ps *PubSub) Init() error { + if ps.Subscription == "" { + return fmt.Errorf(`"subscription" is required`) + } + + if ps.Project == "" { + return fmt.Errorf(`"project" is required`) + } + + switch ps.ContentEncoding { + case "", "identity": + ps.ContentEncoding = "identity" + case "gzip": + var err error + ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding) + if err != nil { + return err + } + default: + return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding) + } + + if ps.MaxDecompressionSize <= 0 { + ps.MaxDecompressionSize = internal.DefaultMaxDecompressionSize + } + + return nil +} + func init() { inputs.Add("cloud_pubsub", func() telegraf.Input { ps := &PubSub{ diff --git a/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go b/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go index 5c5edf455a667..a444fb8d7a91e 100644 --- a/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go +++ b/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go @@ -5,8 +5,10 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -28,6 +30,8 @@ func TestRunParse(t *testing.T) { } sub.receiver = testMessagesReceive(sub) + decoder, _ := internal.NewContentDecoder("identity") + ps := &PubSub{ Log: testutil.Logger{}, parser: testParser, @@ -35,17 +39,15 @@ func TestRunParse(t *testing.T) { Project: "projectIDontMatterForTests", Subscription: subID, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + decoder: decoder, } acc := &testutil.Accumulator{} - if err := ps.Start(acc); err != nil { - t.Fatalf("test PubSub failed to start: %s", err) - } + require.NoError(t, ps.Init()) + require.NoError(t, ps.Start(acc)) defer ps.Stop() - if ps.sub == nil { - t.Fatal("expected plugin subscription to be non-nil") - } + require.NotNil(t, ps.sub) testTracker := &testTracker{} msg := &testMsg{ @@ -73,6 +75,8 @@ func TestRunBase64(t *testing.T) { } sub.receiver = testMessagesReceive(sub) + decoder, _ := internal.NewContentDecoder("identity") + ps := &PubSub{ Log: testutil.Logger{}, parser: testParser, @@ -81,17 +85,15 @@ func TestRunBase64(t *testing.T) { Subscription: subID, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Base64Data: true, + decoder: decoder, } acc := &testutil.Accumulator{} - if err := ps.Start(acc); err != nil { - t.Fatalf("test PubSub failed to start: %s", err) - } + require.NoError(t, ps.Init()) + require.NoError(t, ps.Start(acc)) defer ps.Stop() - if ps.sub == nil { - t.Fatal("expected plugin subscription to be non-nil") - } + require.NotNil(t, ps.sub) testTracker := &testTracker{} msg := &testMsg{ @@ -106,6 +108,55 @@ func TestRunBase64(t *testing.T) { validateTestInfluxMetric(t, metric) } +func TestRunGzipDecode(t *testing.T) { + subID := "sub-run-gzip" + + testParser := &influx.Parser{} + require.NoError(t, testParser.Init()) + + sub := &stubSub{ + id: subID, + messages: make(chan *testMsg, 100), + } + sub.receiver = testMessagesReceive(sub) + + decoder, err := internal.NewContentDecoder("gzip") + require.NoError(t, err) + + ps := &PubSub{ + Log: testutil.Logger{}, + parser: testParser, + stubSub: func() subscription { return sub }, + Project: "projectIDontMatterForTests", + Subscription: subID, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + ContentEncoding: "gzip", + MaxDecompressionSize: internal.DefaultMaxDecompressionSize, + decoder: decoder, + } + + acc := &testutil.Accumulator{} + require.NoError(t, ps.Init()) + require.NoError(t, ps.Start(acc)) + defer ps.Stop() + + require.NotNil(t, ps.sub) + + testTracker := &testTracker{} + enc := internal.NewGzipEncoder() + gzippedMsg, err := enc.Encode([]byte(msgInflux)) + require.NoError(t, err) + msg := &testMsg{ + value: string(gzippedMsg), + tracker: testTracker, + } + sub.messages <- msg + acc.Wait(1) + assert.Equal(t, acc.NFields(), 1) + metric := acc.Metrics[0] + validateTestInfluxMetric(t, metric) +} + func TestRunInvalidMessages(t *testing.T) { subID := "sub-invalid-messages" @@ -118,6 +169,8 @@ func TestRunInvalidMessages(t *testing.T) { } sub.receiver = testMessagesReceive(sub) + decoder, err := internal.NewContentDecoder("identity") + require.NoError(t, err) ps := &PubSub{ Log: testutil.Logger{}, parser: testParser, @@ -125,17 +178,16 @@ func TestRunInvalidMessages(t *testing.T) { Project: "projectIDontMatterForTests", Subscription: subID, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + decoder: decoder, } acc := &testutil.Accumulator{} - if err := ps.Start(acc); err != nil { - t.Fatalf("test PubSub failed to start: %s", err) - } + require.NoError(t, ps.Init()) + require.NoError(t, ps.Start(acc)) defer ps.Stop() - if ps.sub == nil { - t.Fatal("expected plugin subscription to be non-nil") - } + + require.NotNil(t, ps.sub) testTracker := &testTracker{} msg := &testMsg{ @@ -166,6 +218,8 @@ func TestRunOverlongMessages(t *testing.T) { } sub.receiver = testMessagesReceive(sub) + decoder, err := internal.NewContentDecoder("identity") + require.NoError(t, err) ps := &PubSub{ Log: testutil.Logger{}, parser: testParser, @@ -173,17 +227,16 @@ func TestRunOverlongMessages(t *testing.T) { Project: "projectIDontMatterForTests", Subscription: subID, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + decoder: decoder, // Add MaxMessageLen Param MaxMessageLen: 1, } - if err := ps.Start(acc); err != nil { - t.Fatalf("test PubSub failed to start: %s", err) - } + require.NoError(t, ps.Init()) + require.NoError(t, ps.Start(acc)) defer ps.Stop() - if ps.sub == nil { - t.Fatal("expected plugin subscription to be non-nil") - } + + require.NotNil(t, ps.sub) testTracker := &testTracker{} msg := &testMsg{ @@ -215,6 +268,8 @@ func TestRunErrorInSubscriber(t *testing.T) { fakeErrStr := "a fake error" sub.receiver = testMessagesError(errors.New("a fake error")) + decoder, err := internal.NewContentDecoder("identity") + require.NoError(t, err) ps := &PubSub{ Log: testutil.Logger{}, parser: testParser, @@ -222,17 +277,16 @@ func TestRunErrorInSubscriber(t *testing.T) { Project: "projectIDontMatterForTests", Subscription: subID, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + decoder: decoder, RetryReceiveDelaySeconds: 1, } - if err := ps.Start(acc); err != nil { - t.Fatalf("test PubSub failed to start: %s", err) - } + require.NoError(t, ps.Init()) + require.NoError(t, ps.Start(acc)) defer ps.Stop() - if ps.sub == nil { - t.Fatal("expected plugin subscription to be non-nil") - } + require.NotNil(t, ps.sub) + acc.WaitError(1) require.Regexp(t, fakeErrStr, acc.Errors[0]) } diff --git a/plugins/inputs/cloud_pubsub/sample.conf b/plugins/inputs/cloud_pubsub/sample.conf index 218df968370b3..7a6a53076008a 100644 --- a/plugins/inputs/cloud_pubsub/sample.conf +++ b/plugins/inputs/cloud_pubsub/sample.conf @@ -73,3 +73,13 @@ ## PubSub message data before parsing. Many GCP services that ## output JSON to Google PubSub base64-encode the JSON payload. # base64_data = false + + ## Content encoding for message payloads, can be set to "gzip" or + ## "identity" to apply no encoding. + # content_encoding = "identity" + + ## If content encoding is not "identity", sets the maximum allowed size, + ## in bytes, for a message payload when it's decompressed. Can be increased + ## for larger payloads or reduced to protect against decompression bombs. + ## Acceptable units are B, KiB, KB, MiB, MB... + # max_decompression_size = "500MB" diff --git a/plugins/outputs/cloud_pubsub/README.md b/plugins/outputs/cloud_pubsub/README.md index 99451f8221ae7..d1edf10a0b863 100644 --- a/plugins/outputs/cloud_pubsub/README.md +++ b/plugins/outputs/cloud_pubsub/README.md @@ -24,6 +24,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Required. Name of PubSub topic to publish metrics to. topic = "my-topic" + ## Content encoding for message payloads, can be set to "gzip" or + ## "identity" to apply no encoding. + # content_encoding = "identity" + ## Required. Data format to consume. ## Each data format has its own unique set of configuration options. ## Read more about them here: diff --git a/plugins/outputs/cloud_pubsub/cloud_pubsub.go b/plugins/outputs/cloud_pubsub/cloud_pubsub.go index 3961b964e036f..18f58c3c77013 100644 --- a/plugins/outputs/cloud_pubsub/cloud_pubsub.go +++ b/plugins/outputs/cloud_pubsub/cloud_pubsub.go @@ -35,6 +35,7 @@ type PubSub struct { PublishNumGoroutines int `toml:"publish_num_go_routines"` PublishTimeout config.Duration `toml:"publish_timeout"` Base64Data bool `toml:"base64_data"` + ContentEncoding string `toml:"content_encoding"` Log telegraf.Logger `toml:"-"` @@ -45,6 +46,7 @@ type PubSub struct { serializer serializers.Serializer publishResults []publishResult + encoder internal.ContentEncoder } func (*PubSub) SampleConfig() string { @@ -56,17 +58,10 @@ func (ps *PubSub) SetSerializer(serializer serializers.Serializer) { } func (ps *PubSub) Connect() error { - if ps.Topic == "" { - return fmt.Errorf(`"topic" is required`) - } - - if ps.Project == "" { - return fmt.Errorf(`"project" is required`) - } - if ps.stubTopic == nil { return ps.initPubSubClient() } + return nil } @@ -168,9 +163,11 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro return nil, err } - if ps.Base64Data { - encoded := base64.StdEncoding.EncodeToString(b) - b = []byte(encoded) + b = ps.encodeB64Data(b) + + b, err = ps.compressData(b) + if err != nil { + return nil, fmt.Errorf("unable to compress message with %s: %w", ps.ContentEncoding, err) } msg := &pubsub.Message{Data: b} @@ -188,9 +185,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro continue } - if ps.Base64Data { - encoded := base64.StdEncoding.EncodeToString(b) - b = []byte(encoded) + b = ps.encodeB64Data(b) + + b, err = ps.compressData(b) + if err != nil { + ps.Log.Errorf("unable to compress message with %s: %w", ps.ContentEncoding, err) + continue } msg := &pubsub.Message{ @@ -205,6 +205,32 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro return msgs, nil } +func (ps *PubSub) encodeB64Data(data []byte) []byte { + if ps.Base64Data { + encoded := base64.StdEncoding.EncodeToString(data) + data = []byte(encoded) + } + + return data +} + +func (ps *PubSub) compressData(data []byte) ([]byte, error) { + if ps.ContentEncoding == "identity" { + return data, nil + } + + data, err := ps.encoder.Encode(data) + if err != nil { + return nil, err + } + + compressedData := make([]byte, len(data)) + copy(compressedData, data) + data = compressedData + + return data, nil +} + func (ps *PubSub) waitForResults(ctx context.Context, cancel context.CancelFunc) error { var pErr error var setErr sync.Once @@ -230,6 +256,31 @@ func (ps *PubSub) waitForResults(ctx context.Context, cancel context.CancelFunc) return pErr } +func (ps *PubSub) Init() error { + if ps.Topic == "" { + return fmt.Errorf(`"topic" is required`) + } + + if ps.Project == "" { + return fmt.Errorf(`"project" is required`) + } + + switch ps.ContentEncoding { + case "", "identity": + ps.ContentEncoding = "identity" + case "gzip": + var err error + ps.encoder, err = internal.NewContentEncoder(ps.ContentEncoding) + if err != nil { + return err + } + default: + return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding) + } + + return nil +} + func init() { outputs.Add("cloud_pubsub", func() telegraf.Output { return &PubSub{} diff --git a/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go b/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go index ebe491efdca47..1b7fefb3813aa 100644 --- a/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go +++ b/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -21,10 +22,7 @@ func TestPubSub_WriteSingle(t *testing.T) { settings.CountThreshold = 1 ps, topic, metrics := getTestResources(t, settings, testMetrics) - err := ps.Write(metrics) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } + require.NoError(t, ps.Write(metrics)) for _, testM := range testMetrics { verifyRawMetricPublished(t, testM.m, topic.published) @@ -43,10 +41,7 @@ func TestPubSub_WriteWithAttribute(t *testing.T) { "foo2": "bar2", } - err := ps.Write(metrics) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } + require.NoError(t, ps.Write(metrics)) for _, testM := range testMetrics { msg := verifyRawMetricPublished(t, testM.m, topic.published) @@ -65,10 +60,7 @@ func TestPubSub_WriteMultiple(t *testing.T) { ps, topic, metrics := getTestResources(t, settings, testMetrics) - err := ps.Write(metrics) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } + require.NoError(t, ps.Write(metrics)) for _, testM := range testMetrics { verifyRawMetricPublished(t, testM.m, topic.published) @@ -89,10 +81,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) { ps, topic, metrics := getTestResources(t, settings, testMetrics) - err := ps.Write(metrics) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } + require.NoError(t, ps.Write(metrics)) for _, testM := range testMetrics { verifyRawMetricPublished(t, testM.m, topic.published) @@ -112,10 +101,7 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) { ps, topic, metrics := getTestResources(t, settings, testMetrics) - err := ps.Write(metrics) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } + require.NoError(t, ps.Write(metrics)) for _, testM := range testMetrics { verifyRawMetricPublished(t, testM.m, topic.published) @@ -133,14 +119,12 @@ func TestPubSub_WriteBase64Single(t *testing.T) { settings.CountThreshold = 1 ps, topic, metrics := getTestResources(t, settings, testMetrics) ps.Base64Data = true + topic.Base64Data = true - err := ps.Write(metrics) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } + require.NoError(t, ps.Write(metrics)) for _, testM := range testMetrics { - verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */) + verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */, false /* gzipEncoded */) } } @@ -155,24 +139,64 @@ func TestPubSub_Error(t *testing.T) { ps, _, metrics := getTestResources(t, settings, testMetrics) err := ps.Write(metrics) - if err == nil { - t.Fatalf("expected error") + require.Error(t, err) + require.ErrorContains(t, err, errMockFail) +} + +func TestPubSub_WriteGzipSingle(t *testing.T) { + testMetrics := []testMetric{ + {testutil.TestMetric("value_1", "test"), false /*return error */}, + {testutil.TestMetric("value_2", "test"), false}, } - if err.Error() != errMockFail { - t.Fatalf("expected fake error, got %v", err) + + settings := pubsub.DefaultPublishSettings + settings.CountThreshold = 1 + ps, topic, metrics := getTestResources(t, settings, testMetrics) + topic.ContentEncoding = "gzip" + ps.ContentEncoding = "gzip" + var err error + ps.encoder, err = internal.NewContentEncoder(ps.ContentEncoding) + + require.NoError(t, err) + require.NoError(t, ps.Write(metrics)) + + for _, testM := range testMetrics { + verifyMetricPublished(t, testM.m, topic.published, false /* base64encoded */, true /* Gzipencoded */) + } +} + +func TestPubSub_WriteGzipAndBase64Single(t *testing.T) { + testMetrics := []testMetric{ + {testutil.TestMetric("value_1", "test"), false /*return error */}, + {testutil.TestMetric("value_2", "test"), false}, + } + + settings := pubsub.DefaultPublishSettings + settings.CountThreshold = 1 + ps, topic, metrics := getTestResources(t, settings, testMetrics) + topic.ContentEncoding = "gzip" + topic.Base64Data = true + ps.ContentEncoding = "gzip" + ps.Base64Data = true + var err error + ps.encoder, err = internal.NewContentEncoder(ps.ContentEncoding) + + require.NoError(t, err) + require.NoError(t, ps.Write(metrics)) + + for _, testM := range testMetrics { + verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */, true /* Gzipencoded */) } } func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message { - return verifyMetricPublished(t, m, published, false) + return verifyMetricPublished(t, m, published, false, false) } -func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool) *pubsub.Message { +func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool, gzipEncoded bool) *pubsub.Message { p := influx.Parser{} - err := p.Init() - if err != nil { - t.Fatalf("unexpected parsing error: %v", err) - } + require.NoError(t, p.Init()) + v, _ := m.GetField("value") psMsg, ok := published[v.(string)] if !ok { @@ -180,8 +204,18 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string } data := psMsg.Data + + if gzipEncoded { + decoder, _ := internal.NewContentDecoder("gzip") + var err error + data, err = decoder.Decode(data, internal.DefaultMaxDecompressionSize) + if err != nil { + t.Fatalf("Unable to decode expected gzip encoded message: %s", err) + } + } + if base64Encoded { - v, err := base64.StdEncoding.DecodeString(string(psMsg.Data)) + v, err := base64.StdEncoding.DecodeString(string(data)) if err != nil { t.Fatalf("Unable to decode expected base64-encoded message: %s", err) } @@ -190,7 +224,7 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string parsed, err := p.Parse(data) if err != nil { - t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data)) + t.Fatalf("could not parse influxdb metric from published message: %s", string(data)) } if len(parsed) > 1 { t.Fatalf("expected only one influxdb metric per published message, got %d", len(published)) diff --git a/plugins/outputs/cloud_pubsub/sample.conf b/plugins/outputs/cloud_pubsub/sample.conf index 676b6f6d7c1a2..4f259163eedd6 100644 --- a/plugins/outputs/cloud_pubsub/sample.conf +++ b/plugins/outputs/cloud_pubsub/sample.conf @@ -7,6 +7,10 @@ ## Required. Name of PubSub topic to publish metrics to. topic = "my-topic" + ## Content encoding for message payloads, can be set to "gzip" or + ## "identity" to apply no encoding. + # content_encoding = "identity" + ## Required. Data format to consume. ## Each data format has its own unique set of configuration options. ## Read more about them here: diff --git a/plugins/outputs/cloud_pubsub/topic_stubbed.go b/plugins/outputs/cloud_pubsub/topic_stubbed.go index 2fde64271effb..f57105658f19e 100644 --- a/plugins/outputs/cloud_pubsub/topic_stubbed.go +++ b/plugins/outputs/cloud_pubsub/topic_stubbed.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/influx" serializer "github.com/influxdata/telegraf/plugins/serializers/influx" @@ -49,6 +50,9 @@ type ( ReturnErr map[string]bool parsers.Parser *testing.T + Base64Data bool + ContentEncoding string + MaxDecompressionSize int64 stopped bool pLock sync.Mutex @@ -68,9 +72,11 @@ func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []te metrics := make([]telegraf.Metric, 0, len(testM)) t := &stubTopic{ - T: tT, - ReturnErr: make(map[string]bool), - published: make(map[string]*pubsub.Message), + T: tT, + ReturnErr: make(map[string]bool), + published: make(map[string]*pubsub.Message), + ContentEncoding: "identity", + MaxDecompressionSize: internal.DefaultMaxDecompressionSize, } for _, tm := range testM { @@ -89,7 +95,11 @@ func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []te PublishByteThreshold: settings.ByteThreshold, PublishNumGoroutines: settings.NumGoroutines, PublishTimeout: config.Duration(settings.Timeout), + ContentEncoding: "identity", } + + require.NoError(tT, ps.Init()) + ps.encoder, _ = internal.NewContentEncoder(ps.ContentEncoding) ps.SetSerializer(s) return ps, t, metrics @@ -185,17 +195,22 @@ func (t *stubTopic) parseIDs(msg *pubsub.Message) []string { p := influx.Parser{} err := p.Init() require.NoError(t, err) - metrics, err := p.Parse(msg.Data) + + decoder, _ := internal.NewContentDecoder(t.ContentEncoding) + d, err := decoder.Decode(msg.Data, t.MaxDecompressionSize) if err != nil { - // Just attempt to base64-decode first before returning error. - d, err := base64.StdEncoding.DecodeString(string(msg.Data)) - if err != nil { - t.Errorf("unable to base64-decode potential test message: %v", err) - } - metrics, err = p.Parse(d) + t.Errorf("unable to decode message: %v", err) + } + if t.Base64Data { + strData, err := base64.StdEncoding.DecodeString(string(d)) if err != nil { - t.Fatalf("unexpected parsing error: %v", err) + t.Errorf("unable to base64 decode message: %v", err) } + d = strData + } + metrics, err := p.Parse(d) + if err != nil { + t.Fatalf("unexpected parsing error: %v", err) } ids := make([]string, 0, len(metrics))