diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 1cf8d81d61a49..7bdc63d7e2722 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -136,9 +136,8 @@ func TestSocketListener(t *testing.T) { } // Setup plugin according to test specification - logger := &testutil.CaptureLogger{} plugin := &SocketListener{ - Log: logger, + Log: &testutil.Logger{}, ServiceAddress: proto + "://" + serverAddr, ContentEncoding: tt.encoding, ReadBufferSize: tt.buffersize, @@ -190,27 +189,61 @@ func TestSocketListener(t *testing.T) { }, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics()) actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) + }) + } +} - if sl, ok := plugin.listener.(*streamListener); ok { - require.NotEmpty(t, sl.connections) - } +func TestSocketListenerStream(t *testing.T) { + logger := &testutil.CaptureLogger{} - plugin.Stop() + plugin := &SocketListener{ + Log: logger, + ServiceAddress: "tcp://127.0.0.1:0", + ReadBufferSize: 1024, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) - // Make sure we clear out old messages - logger.Clear() - if _, ok := plugin.listener.(*streamListener); ok { - // Verify that plugin.Stop() closed the client's connection - _ = client.SetReadDeadline(time.Now().Add(time.Second)) - buf := []byte{1} - _, err = client.Read(buf) - require.Equal(t, err, io.EOF) - } + // Start the plugin + var acc testutil.Accumulator + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() - require.Empty(t, logger.Errors()) - require.Empty(t, logger.Warnings()) - }) - } + addr := plugin.listener.addr() + + // Create a noop client + client, err := createClient(plugin.ServiceAddress, addr, nil) + require.NoError(t, err) + + _, err = client.Write([]byte("test value=42i\n")) + require.NoError(t, err) + + require.Eventually(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= 1 + }, time.Second, 100*time.Millisecond, "did not receive metric") + + // This has to be a stream-listener... + listener, ok := plugin.listener.(*streamListener) + require.True(t, ok) + listener.Lock() + conns := len(listener.connections) + listener.Unlock() + require.NotZero(t, conns) + + plugin.Stop() + + // Verify that plugin.Stop() closed the client's connection + _ = client.SetReadDeadline(time.Now().Add(time.Second)) + buf := []byte{1} + _, err = client.Read(buf) + require.Equal(t, err, io.EOF) + + require.Empty(t, logger.Errors()) + require.Empty(t, logger.Warnings()) } func TestCases(t *testing.T) {