Skip to content

Commit

Permalink
fix(inputs.socket_listener): Fix race in tests (#13300)
Browse files Browse the repository at this point in the history
(cherry picked from commit 759691a)
  • Loading branch information
srebhan authored and powersj committed May 22, 2023
1 parent 7a2c853 commit 2c7df59
Showing 1 changed file with 39 additions and 14 deletions.
53 changes: 39 additions & 14 deletions plugins/inputs/socket_listener/socket_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -182,22 +181,48 @@ func TestSocketListener(t *testing.T) {
}, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics())
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
})
}
}

if sl, ok := plugin.listener.(*streamListener); ok {
require.NotEmpty(t, sl.connections)
}
func TestSocketListenerStream(t *testing.T) {
plugin := &SocketListener{
Log: testutil.Logger{},
ServiceAddress: "tcp://127.0.0.1:0",
ReadBufferSize: 1024,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)

plugin.Stop()
// Start the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

if _, ok := plugin.listener.(*streamListener); ok {
// Verify that plugin.Stop() closed the client's connection
_ = client.SetReadDeadline(time.Now().Add(time.Second))
buf := []byte{1}
_, err = client.Read(buf)
require.Equal(t, err, io.EOF)
}
})
}
addr := plugin.listener.addr()

// Create a noop client
client, err := createClient(plugin.ServiceAddress, addr, nil)
require.NoError(t, err)

_, err = client.Write([]byte("test value=42i\n"))
require.NoError(t, err)

require.Eventually(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= 1
}, time.Second, 100*time.Millisecond, "did not receive metric")

// This has to be a stream-listener...
listener, ok := plugin.listener.(*streamListener)
require.True(t, ok)
listener.Lock()
conns := len(listener.connections)
listener.Unlock()
require.NotZero(t, conns)
}

func TestCases(t *testing.T) {
Expand Down

0 comments on commit 2c7df59

Please sign in to comment.