diff --git a/plugins/inputs/elasticsearch_query/elasticsearch_query_test.go b/plugins/inputs/elasticsearch_query/elasticsearch_query_test.go index fb1055eed5037..77998989872c2 100644 --- a/plugins/inputs/elasticsearch_query/elasticsearch_query_test.go +++ b/plugins/inputs/elasticsearch_query/elasticsearch_query_test.go @@ -3,25 +3,27 @@ package elasticsearch_query import ( "bufio" "context" + "fmt" "os" "strconv" "strings" - "sync" "testing" "time" + "github.com/docker/go-connections/nat" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" elastic5 "gopkg.in/olivere/elastic.v5" ) -var ( - testindex = "test-elasticsearch_query-" + strconv.Itoa(int(time.Now().Unix())) - setupOnce sync.Once +const ( + servicePort = "9200" + testindex = "test-elasticsearch" ) type esAggregationQueryTest struct { @@ -503,7 +505,7 @@ var testEsAggregationData = []esAggregationQueryTest{ }, } -func setupIntegrationTest() error { +func setupIntegrationTest(t *testing.T) (testutil.Container, error) { type nginxlog struct { IPaddress string `json:"IP"` Timestamp time.Time `json:"@timestamp"` @@ -515,15 +517,32 @@ func setupIntegrationTest() error { ResponseTime float64 `json:"response_time"` } + container := testutil.Container{ + Image: "elasticsearch:6.8.23", + ExposedPorts: []string{servicePort}, + Env: map[string]string{ + "discovery.type": "single-node", + }, + WaitingFor: wait.ForAll( + wait.ForLog("] mode [basic] - valid"), + wait.ForListeningPort(nat.Port(servicePort)), + ), + } + err := container.Start() + require.NoError(t, err, "failed to start container") + + url := fmt.Sprintf( + "http://%s:%s", container.Address, container.Ports[servicePort], + ) e := &ElasticsearchQuery{ - URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"}, + URLs: []string{url}, Timeout: config.Duration(time.Second * 30), Log: testutil.Logger{}, } - err := e.connectToES() + err = e.connectToES() if err != nil { - return err + return container, err } bulkRequest := e.esClient.Bulk() @@ -531,7 +550,7 @@ func setupIntegrationTest() error { // populate elasticsearch with nginx_logs test data file file, err := os.Open("testdata/nginx_logs") if err != nil { - return err + return container, err } defer file.Close() @@ -560,22 +579,22 @@ func setupIntegrationTest() error { Doc(logline)) } if scanner.Err() != nil { - return err + return container, err } _, err = bulkRequest.Do(context.Background()) if err != nil { - return err + return container, err } // force elastic to refresh indexes to get new batch data ctx := context.Background() _, err = e.esClient.Refresh().Do(ctx) if err != nil { - return err + return container, err } - return nil + return container, nil } func TestElasticsearchQuery(t *testing.T) { @@ -583,19 +602,22 @@ func TestElasticsearchQuery(t *testing.T) { t.Skip("Skipping integration test in short mode") } - setupOnce.Do(func() { - err := setupIntegrationTest() - require.NoError(t, err) - }) + container, err := setupIntegrationTest(t) + require.NoError(t, err) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() var acc testutil.Accumulator e := &ElasticsearchQuery{ - URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"}, + URLs: []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + }, Timeout: config.Duration(time.Second * 30), Log: testutil.Logger{}, } - err := e.connectToES() + err = e.connectToES() require.NoError(t, err) var aggs []esAggregation @@ -641,10 +663,11 @@ func TestElasticsearchQuery_getMetricFields(t *testing.T) { t.Skip("Skipping integration test in short mode") } - setupOnce.Do(func() { - err := setupIntegrationTest() - require.NoError(t, err) - }) + container, err := setupIntegrationTest(t) + require.NoError(t, err) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() type args struct { ctx context.Context @@ -652,12 +675,14 @@ func TestElasticsearchQuery_getMetricFields(t *testing.T) { } e := &ElasticsearchQuery{ - URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"}, + URLs: []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + }, Timeout: config.Duration(time.Second * 30), Log: testutil.Logger{}, } - err := e.connectToES() + err = e.connectToES() require.NoError(t, err) type test struct { diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index fe4e13c9ddace..e9b919d1e8d52 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -2,6 +2,7 @@ package elasticsearch import ( "context" + "fmt" "math" "net/http" "net/http/httptest" @@ -9,18 +10,47 @@ import ( "testing" "time" + "github.com/docker/go-connections/nat" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" ) +const servicePort = "9200" + +func launchTestContainer(t *testing.T) *testutil.Container { + container := testutil.Container{ + Image: "elasticsearch:6.8.23", + ExposedPorts: []string{servicePort}, + Env: map[string]string{ + "discovery.type": "single-node", + }, + WaitingFor: wait.ForAll( + wait.ForLog("] mode [basic] - valid"), + wait.ForListeningPort(nat.Port(servicePort)), + ), + } + err := container.Start() + require.NoError(t, err, "failed to start container") + + return &container +} + func TestConnectAndWriteIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } e := &Elasticsearch{ URLs: urls, @@ -49,7 +79,14 @@ func TestConnectAndWriteMetricWithNaNValueEmpty(t *testing.T) { t.Skip("Skipping integration test in short mode") } - urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } e := &Elasticsearch{ URLs: urls, @@ -85,7 +122,14 @@ func TestConnectAndWriteMetricWithNaNValueNone(t *testing.T) { t.Skip("Skipping integration test in short mode") } - urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } e := &Elasticsearch{ URLs: urls, @@ -122,7 +166,14 @@ func TestConnectAndWriteMetricWithNaNValueDrop(t *testing.T) { t.Skip("Skipping integration test in short mode") } - urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } e := &Elasticsearch{ URLs: urls, @@ -181,7 +232,14 @@ func TestConnectAndWriteMetricWithNaNValueReplacement(t *testing.T) { }, } - urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } for _, test := range tests { e := &Elasticsearch{ @@ -224,7 +282,14 @@ func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } ctx := context.Background() @@ -248,7 +313,14 @@ func TestTemplateManagementIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } e := &Elasticsearch{ URLs: urls, @@ -276,7 +348,14 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } e := &Elasticsearch{ URLs: urls,