diff --git a/js/modules/k6/marshalling_test.go b/js/modules/k6/marshalling_test.go index 8c4ca3ae3a8..7251f477c94 100644 --- a/js/modules/k6/marshalling_test.go +++ b/js/modules/k6/marshalling_test.go @@ -54,7 +54,7 @@ func TestSetupDataMarshalling(t *testing.T) { export function setup() { let res = http.get("HTTPBIN_URL/html"); let html_selection = html.parseHTML(res.body); - let ws_res = ws.connect("ws://HTTPBIN_DOMAIN:HTTPBIN_PORT/ws-echo", function(socket){ + let ws_res = ws.connect("WSBIN_URL/ws-echo", function(socket){ socket.on("open", function() { socket.send("test") }) diff --git a/js/modules/k6/ws/ws.go b/js/modules/k6/ws/ws.go index 87506004309..8f3a82a208a 100644 --- a/js/modules/k6/ws/ws.go +++ b/js/modules/k6/ws/ws.go @@ -201,6 +201,7 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP // Run the user-provided set up function if _, err := setupFn(goja.Undefined(), rt.ToValue(&socket)); err != nil { + _ = socket.closeConnection(websocket.CloseGoingAway) return nil, err } diff --git a/js/modules/k6/ws/ws_test.go b/js/modules/k6/ws/ws_test.go index eae1320a545..9d21fec27fc 100644 --- a/js/modules/k6/ws/ws_test.go +++ b/js/modules/k6/ws/ws_test.go @@ -22,18 +22,13 @@ package ws import ( "context" "crypto/tls" - "fmt" - "net" "strconv" - "strings" "testing" - "time" "github.com/dop251/goja" "github.com/loadimpact/k6/js/common" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/metrics" - "github.com/loadimpact/k6/lib/netext" "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/stats" "github.com/stretchr/testify/assert" @@ -85,31 +80,27 @@ func assertMetricEmitted(t *testing.T, metric *stats.Metric, sampleContainers [] assert.True(t, seenMetric, "url %s didn't emit %s", url, metric.Name) } -func makeWsProto(s string) string { - return "ws" + strings.TrimPrefix(s, "http") -} - func TestSession(t *testing.T) { //TODO: split and paralelize tests + t.Parallel() + tb := testutils.NewHTTPMultiBin(t) + defer tb.Cleanup() + sr := tb.Replacer.Replace root, err := lib.NewGroup("", nil) assert.NoError(t, err) rt := goja.New() rt.SetFieldNameMapper(common.FieldNameMapper{}) - dialer := netext.NewDialer(net.Dialer{ - Timeout: 10 * time.Second, - KeepAlive: 60 * time.Second, - DualStack: true, - }) samples := make(chan stats.SampleContainer, 1000) state := &lib.State{ Group: root, - Dialer: dialer, + Dialer: tb.Dialer, Options: lib.Options{ SystemTags: lib.GetTagSet("url", "proto", "status", "subproto"), }, - Samples: samples, + Samples: samples, + TLSConfig: tb.TLSClientConfig, } ctx := context.Background() @@ -119,45 +110,45 @@ func TestSession(t *testing.T) { rt.Set("ws", common.Bind(rt, New(), &ctx)) t.Run("connect_ws", func(t *testing.T) { - _, err := common.RunString(rt, ` - let res = ws.connect("ws://demos.kaazing.com/echo", function(socket){ + _, err := common.RunString(rt, sr(` + let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ socket.close() }); if (res.status != 101) { throw new Error("connection failed with status: " + res.status); } - `) + `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", "ws://demos.kaazing.com/echo", 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") t.Run("connect_wss", func(t *testing.T) { - _, err := common.RunString(rt, ` - let res = ws.connect("wss://demos.kaazing.com/echo", function(socket){ + _, err := common.RunString(rt, sr(` + let res = ws.connect("WSSBIN_URL/ws-echo", function(socket){ socket.close() }); if (res.status != 101) { throw new Error("TLS connection failed with status: " + res.status); } - `) + `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", "wss://demos.kaazing.com/echo", 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), 101, "") t.Run("open", func(t *testing.T) { - _, err := common.RunString(rt, ` + _, err := common.RunString(rt, sr(` let opened = false; - let res = ws.connect("ws://demos.kaazing.com/echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ socket.on("open", function() { opened = true; socket.close() }) }); if (!opened) { throw new Error ("open event not fired"); } - `) + `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", "ws://demos.kaazing.com/echo", 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") t.Run("send_receive", func(t *testing.T) { - _, err := common.RunString(rt, ` - let res = ws.connect("ws://demos.kaazing.com/echo", function(socket){ + _, err := common.RunString(rt, sr(` + let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ socket.on("open", function() { socket.send("test") }) @@ -168,35 +159,35 @@ func TestSession(t *testing.T) { socket.close() }); }); - `) + `)) assert.NoError(t, err) }) samplesBuf := stats.GetBufferedSamples(samples) - assertSessionMetricsEmitted(t, samplesBuf, "", "ws://demos.kaazing.com/echo", 101, "") - assertMetricEmitted(t, metrics.WSMessagesSent, samplesBuf, "ws://demos.kaazing.com/echo") - assertMetricEmitted(t, metrics.WSMessagesReceived, samplesBuf, "ws://demos.kaazing.com/echo") + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "") + assertMetricEmitted(t, metrics.WSMessagesSent, samplesBuf, sr("WSBIN_URL/ws-echo")) + assertMetricEmitted(t, metrics.WSMessagesReceived, samplesBuf, sr("WSBIN_URL/ws-echo")) t.Run("interval", func(t *testing.T) { - _, err := common.RunString(rt, ` + _, err := common.RunString(rt, sr(` let counter = 0; - let res = ws.connect("ws://demos.kaazing.com/echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ socket.setInterval(function () { counter += 1; if (counter > 2) { socket.close(); } }, 100); }); if (counter < 3) {throw new Error ("setInterval should have been called at least 3 times, counter=" + counter);} - `) + `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", "ws://demos.kaazing.com/echo", 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") t.Run("timeout", func(t *testing.T) { - _, err := common.RunString(rt, ` + _, err := common.RunString(rt, sr(` let start = new Date().getTime(); let ellapsed = new Date().getTime() - start; - let res = ws.connect("ws://demos.kaazing.com/echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ socket.setTimeout(function () { ellapsed = new Date().getTime() - start; socket.close(); @@ -205,15 +196,15 @@ func TestSession(t *testing.T) { if (ellapsed > 3000 || ellapsed < 500) { throw new Error ("setTimeout occurred after " + ellapsed + "ms, expected 500 doc.find('p').each("wat"), () => doc.find('p').map(), () => doc.find('p').map("wat"), - () => ws.connect("ws://HTTPBIN_IP:HTTPBIN_PORT/ws-echo"), + () => ws.connect("WSBIN_URL/ws-echo"), ]; testCases.forEach(function(fn, idx) { diff --git a/lib/testutils/httpmultibin.go b/lib/testutils/httpmultibin.go index d6e6b82f6a8..8eaa920e6e2 100644 --- a/lib/testutils/httpmultibin.go +++ b/lib/testutils/httpmultibin.go @@ -100,31 +100,30 @@ type jsonBody struct { Compression string `json:"compression"` } -func getWebsocketEchoHandler(t testing.TB) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - t.Logf("[%p %s] Upgrading to websocket connection...", req, req.URL) - conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header()) - if !assert.NoError(t, err) { - return - } - t.Logf("[%p %s] Upgraded...", req, req.URL) +func websocketEchoHandler(w http.ResponseWriter, req *http.Request) { + conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header()) + if err != nil { + return + } + for { mt, message, err := conn.ReadMessage() - t.Logf("[%p %s] Read message '%s' of type %d (error '%v')", req, req.URL, message, mt, err) - assert.NoError(t, err) - assert.NoError(t, conn.WriteMessage(mt, message)) - assert.NoError(t, conn.Close()) - t.Logf("[%p %s] Wrote back message '%s' of type %d and closed the connection", req, req.URL, message, mt) - }) -} -func getWebsocketCloserHandler(t testing.TB) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header()) - if !assert.NoError(t, err) { - return + if err != nil { + break } - assert.NoError(t, conn.Close()) - }) + err = conn.WriteMessage(mt, message) + if err != nil { + break + } + } +} + +func websocketCloserHandler(w http.ResponseWriter, req *http.Request) { + conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header()) + if err != nil { + return + } + _ = conn.Close() } func writeJSON(w io.Writer, v interface{}) error { @@ -190,8 +189,8 @@ func NewHTTPMultiBin(t testing.TB) *HTTPMultiBin { // Create a http.ServeMux and set the httpbin handler as the default mux := http.NewServeMux() mux.Handle("/brotli", getEncodedHandler(t, httpext.CompressionTypeBr)) - mux.Handle("/ws-echo", getWebsocketEchoHandler(t)) - mux.Handle("/ws-close", getWebsocketCloserHandler(t)) + mux.HandleFunc("/ws-echo", websocketEchoHandler) + mux.HandleFunc("/ws-close", websocketCloserHandler) mux.Handle("/zstd", getEncodedHandler(t, httpext.CompressionTypeZstd)) mux.Handle("/zstd-br", getZstdBrHandler(t)) mux.Handle("/", httpbin.New().Handler()) @@ -238,11 +237,13 @@ func NewHTTPMultiBin(t testing.TB) *HTTPMultiBin { "HTTPBIN_IP_URL", httpSrv.URL, "HTTPBIN_DOMAIN", httpDomain, "HTTPBIN_URL", fmt.Sprintf("http://%s:%s", httpDomain, httpURL.Port()), + "WSBIN_URL", fmt.Sprintf("ws://%s:%s", httpDomain, httpURL.Port()), "HTTPBIN_IP", httpIP.String(), "HTTPBIN_PORT", httpURL.Port(), "HTTPSBIN_IP_URL", httpsSrv.URL, "HTTPSBIN_DOMAIN", httpsDomain, "HTTPSBIN_URL", fmt.Sprintf("https://%s:%s", httpsDomain, httpsURL.Port()), + "WSSBIN_URL", fmt.Sprintf("wss://%s:%s", httpsDomain, httpsURL.Port()), "HTTPSBIN_IP", httpsIP.String(), "HTTPSBIN_PORT", httpsURL.Port(), ),