Skip to content

Commit

Permalink
feat: Migrate influx and influx_upstream parsers to new style
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Spaink committed Jun 30, 2022
1 parent 3f9370e commit 130975b
Show file tree
Hide file tree
Showing 27 changed files with 241 additions and 159 deletions.
5 changes: 3 additions & 2 deletions plugins/common/shim/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
)

// AddOutput adds the input to the shim. Later calls to Run() will run this.
Expand All @@ -23,7 +23,8 @@ func (s *Shim) AddOutput(output telegraf.Output) error {
}

func (s *Shim) RunOutput() error {
parser, err := parsers.NewInfluxParser()
parser := influx.Parser{}
err := parser.Init()
if err != nil {
return fmt.Errorf("Failed to create new parser: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions plugins/common/shim/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
)

Expand Down Expand Up @@ -53,7 +53,8 @@ func testSendAndReceive(t *testing.T, fieldKey string, fieldValue string) {
}()

serializer, _ := serializers.NewInfluxSerializer()
parser, _ := parsers.NewInfluxParser()
parser := influx.Parser{}
require.NoError(t, parser.Init())

m := metric.New("thing",
map[string]string{
Expand Down
17 changes: 11 additions & 6 deletions plugins/inputs/cloud_pubsub/cloud_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)

Expand All @@ -19,7 +19,8 @@ const (
func TestRunParse(t *testing.T) {
subID := "sub-run-parse"

testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())

sub := &stubSub{
id: subID,
Expand Down Expand Up @@ -63,7 +64,8 @@ func TestRunParse(t *testing.T) {
func TestRunBase64(t *testing.T) {
subID := "sub-run-base64"

testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())

sub := &stubSub{
id: subID,
Expand Down Expand Up @@ -107,7 +109,8 @@ func TestRunBase64(t *testing.T) {
func TestRunInvalidMessages(t *testing.T) {
subID := "sub-invalid-messages"

testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())

sub := &stubSub{
id: subID,
Expand Down Expand Up @@ -154,7 +157,8 @@ func TestRunOverlongMessages(t *testing.T) {

acc := &testutil.Accumulator{}

testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())

sub := &stubSub{
id: subID,
Expand Down Expand Up @@ -201,7 +205,8 @@ func TestRunErrorInSubscriber(t *testing.T) {

acc := &testutil.Accumulator{}

testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())

sub := &stubSub{
id: subID,
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers"
_ "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)

Expand Down
10 changes: 5 additions & 5 deletions plugins/inputs/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
Expand All @@ -42,8 +42,8 @@ func TestSettingConfigWorks(t *testing.T) {
}

func TestExternalInputWorks(t *testing.T) {
influxParser, err := parsers.NewInfluxParser()
require.NoError(t, err)
influxParser := &influx.Parser{}
require.NoError(t, influxParser.Init())

exe, err := os.Executable()
require.NoError(t, err)
Expand Down Expand Up @@ -76,8 +76,8 @@ func TestExternalInputWorks(t *testing.T) {
}

func TestParsesLinesContainingNewline(t *testing.T) {
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())

metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
Expand Down
17 changes: 11 additions & 6 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -46,7 +46,8 @@ var (
)

func newTestHTTPListenerV2() *HTTPListenerV2 {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
_ = parser.Init()

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Expand All @@ -70,7 +71,8 @@ func newTestHTTPAuthListener() *HTTPListenerV2 {
}

func newTestHTTPSListenerV2() *HTTPListenerV2 {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
_ = parser.Init()

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Expand Down Expand Up @@ -109,7 +111,8 @@ func createURL(listener *HTTPListenerV2, scheme string, path string, rawquery st
}

func TestInvalidListenerConfig(t *testing.T) {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
require.NoError(t, parser.Init())

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Expand Down Expand Up @@ -319,7 +322,8 @@ func TestWriteHTTPNoNewline(t *testing.T) {
}

func TestWriteHTTPExactMaxBodySize(t *testing.T) {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
require.NoError(t, parser.Init())

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Expand All @@ -344,7 +348,8 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {
}

func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
require.NoError(t, parser.Init())

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Expand Down
17 changes: 13 additions & 4 deletions plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,12 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
var metrics []telegraf.Metric
var err error
if h.ParserType == "upstream" {
parser := influx_upstream.NewParser()
parser := influx_upstream.Parser{}
err = parser.Init()
if err != ErrEOF && err != nil {
h.Log.Debugf("Error initializing parser: %v", err.Error())
return
}
parser.SetTimeFunc(influx_upstream.TimeFunc(h.timeFunc))

if precisionStr != "" {
Expand All @@ -259,13 +264,17 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {

metrics, err = parser.Parse(bytes)
} else {
metricHandler := influx.NewMetricHandler()
parser := influx.NewParser(metricHandler)
parser := influx.Parser{}
err = parser.Init()
if err != ErrEOF && err != nil {
h.Log.Debugf("Error initializing parser: %v", err.Error())
return
}
parser.SetTimeFunc(h.timeFunc)

if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr)
metricHandler.SetTimePrecision(precision)
parser.SetTimePrecision(precision)
}

metrics, err = parser.Parse(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -43,8 +43,9 @@ func TestReadsMetricsFromKafkaIntegration(t *testing.T) {
PointBuffer: 100000,
Offset: "oldest",
}
p, _ := parsers.NewInfluxParser()
k.SetParser(p)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
k.SetParser(parser)

// Verify that we can now gather the sent message
var acc testutil.Accumulator
Expand Down
23 changes: 13 additions & 10 deletions plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

"github.com/Shopify/sarama"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"

Expand Down Expand Up @@ -44,9 +44,10 @@ func TestRunParser(t *testing.T) {
k.acc = &acc
defer close(k.done)

var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
k.parser = parser

go k.receiver()
in <- saramaMsg(testMsg)
acc.Wait(1)
Expand All @@ -61,9 +62,10 @@ func TestRunParserInvalidMsg(t *testing.T) {
k.acc = &acc
defer close(k.done)

var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
k.parser = parser

go k.receiver()
in <- saramaMsg(invalidMsg)
acc.WaitError(1)
Expand Down Expand Up @@ -95,9 +97,10 @@ func TestRunParserAndGather(t *testing.T) {
k.acc = &acc
defer close(k.done)

var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
k.parser = parser

go k.receiver()
in <- saramaMsg(testMsg)
acc.Wait(1)
Expand Down
7 changes: 4 additions & 3 deletions plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -438,11 +439,11 @@ func TestTopicTag(t *testing.T) {
plugin.TopicTag = tt.topicTag()
plugin.TopicParsing = tt.topicParsing

parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)

err = plugin.Init()
err := plugin.Init()
require.Equal(t, tt.expectedError, err)
if tt.expectedError != nil {
return
Expand Down
5 changes: 3 additions & 2 deletions plugins/inputs/nsq_consumer/nsq_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -49,7 +49,8 @@ func TestReadsMetricsFromNSQ(t *testing.T) {
Nsqd: []string{"127.0.0.1:4155"},
}

p, _ := parsers.NewInfluxParser()
p := &influx.Parser{}
require.NoError(t, p.Init())
consumer.SetParser(p)
var acc testutil.Accumulator
require.Len(t, acc.Metrics, 0, "There should not be any points")
Expand Down
4 changes: 3 additions & 1 deletion plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
)

// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
Expand Down Expand Up @@ -389,7 +390,8 @@ func (sl *SocketListener) Stop() {
}

func newSocketListener() *SocketListener {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
_ = parser.Init()

return &SocketListener{
Parser: parser,
Expand Down
Loading

0 comments on commit 130975b

Please sign in to comment.