From 624a7f9d76d5f995441982287046f9a89db9c65e Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Fri, 15 Sep 2023 12:42:56 -0600 Subject: [PATCH] bump version with backpressure --- Cargo.lock | 14 +++++- Cargo.toml | 1 + cli/tests/testdata/run/websocket_test.ts | 2 +- cli/tests/unit/websocket_test.ts | 60 +++++++++++++++++++++++- ext/websocket/Cargo.toml | 3 +- ext/websocket/lib.rs | 21 ++++++--- test_util/src/lib.rs | 1 + tools/wpt.ts | 2 +- tools/wpt/runner.ts | 14 +++++- 9 files changed, 104 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1fcfb8b9b24aaa..da7ead0e2da557 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,9 +1781,10 @@ dependencies = [ "http", "hyper 0.14.27", "once_cell", + "rustls", + "rustls-tokio-stream", "serde", "tokio", - "tokio-rustls", ] [[package]] @@ -4526,6 +4527,17 @@ dependencies = [ "base64 0.21.4", ] +[[package]] +name = "rustls-tokio-stream" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2e42608a58a7a0cfc9d95a3be176d6bc3808ca066381b4f09f73d84e0d96c96" +dependencies = [ + "futures", + "rustls", + "tokio", +] + [[package]] name = "rustls-webpki" version = "0.101.6" diff --git a/Cargo.toml b/Cargo.toml index 81e0f388c85df9..8aecc850023ea6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ ring = "=0.16.20" rusqlite = { version = "=0.29.0", features = ["unlock_notify", "bundled"] } rustls = "0.21.0" rustls-pemfile = "1.0.0" +rustls-tokio-stream = "0.2.2" rustls-webpki = "0.101.4" rustls-native-certs = "0.6.2" webpki-roots = "0.25.2" diff --git a/cli/tests/testdata/run/websocket_test.ts b/cli/tests/testdata/run/websocket_test.ts index 924738abe28087..41283e7741c4c9 100644 --- a/cli/tests/testdata/run/websocket_test.ts +++ b/cli/tests/testdata/run/websocket_test.ts @@ -163,7 +163,7 @@ Deno.test("websocket error", async () => { // Error message got changed because we don't use warp in test_util assertEquals( err.message, - "InvalidData: received corrupt message of type InvalidContentType", + "InvalidData: invalid data", ); promise1.resolve(); }; diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts index 11f0fd7dc314f0..4249eb4cee071b 100644 --- a/cli/tests/unit/websocket_test.ts +++ b/cli/tests/unit/websocket_test.ts @@ -21,7 +21,7 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() { const promise = deferred(); const ws = new WebSocket(new URL("ws://localhost:4242/")); assertEquals(ws.url, "ws://localhost:4242/"); - ws.onerror = () => fail(); + ws.onerror = (e) => promise.reject(e); ws.onopen = () => ws.close(); ws.onclose = () => { promise.resolve(); @@ -29,13 +29,69 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() { await promise; }); +// Ignored until split websocket +Deno.test({ ignore: true }, async function websocketSendLargePacket() { + const promise = deferred(); + const ws = new WebSocket(new URL("wss://localhost:4243/")); + assertEquals(ws.url, "wss://localhost:4243/"); + ws.onerror = (e) => promise.reject(e); + ws.onopen = () => { + ws.send("a".repeat(65000)); + }; + ws.onmessage = () => { + ws.close(); + }; + ws.onclose = () => { + promise.resolve(); + }; + await promise; +}); + +// Ignored until split websocket +Deno.test({ ignore: true }, async function websocketSendLargeBinaryPacket() { + const promise = deferred(); + const ws = new WebSocket(new URL("wss://localhost:4243/")); + assertEquals(ws.url, "wss://localhost:4243/"); + ws.onerror = (e) => promise.reject(e); + ws.onopen = () => { + ws.send(new Uint8Array(65000)); + }; + ws.onmessage = (msg) => { + console.log(msg); + ws.close(); + }; + ws.onclose = () => { + promise.resolve(); + }; + await promise; +}); + +// Ignored until split websocket +Deno.test({ ignore: true }, async function websocketSendLargeBlobPacket() { + const promise = deferred(); + const ws = new WebSocket(new URL("wss://localhost:4243/")); + assertEquals(ws.url, "wss://localhost:4243/"); + ws.onerror = (e) => promise.reject(e); + ws.onopen = () => { + ws.send(new Blob(["a".repeat(10)])); + }; + ws.onmessage = (msg) => { + console.log(msg); + ws.close(); + }; + ws.onclose = () => { + promise.resolve(); + }; + await promise; +}); + // https://github.com/denoland/deno/pull/17762 // https://github.com/denoland/deno/issues/17761 Deno.test(async function websocketPingPong() { const promise = deferred(); const ws = new WebSocket("ws://localhost:4245/"); assertEquals(ws.url, "ws://localhost:4245/"); - ws.onerror = () => fail(); + ws.onerror = (e) => promise.reject(e); ws.onmessage = (e) => { ws.send(e.data); }; diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml index a144d016390e5a..b7d95ba9a8e59e 100644 --- a/ext/websocket/Cargo.toml +++ b/ext/websocket/Cargo.toml @@ -22,6 +22,7 @@ fastwebsockets = { workspace = true, features = ["upgrade"] } http.workspace = true hyper = { workspace = true, features = ["backports"] } once_cell.workspace = true +rustls.workspace = true +rustls-tokio-stream.workspace = true serde.workspace = true tokio.workspace = true -tokio-rustls.workspace = true diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 83d553eebefb71..2cdbf1ceed9187 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -29,6 +29,9 @@ use http::Request; use http::Uri; use hyper::Body; use once_cell::sync::Lazy; +use rustls::RootCertStore; +use rustls::ServerName; +use rustls_tokio_stream::TlsStream; use serde::Serialize; use std::borrow::Cow; use std::cell::Cell; @@ -36,15 +39,13 @@ use std::cell::RefCell; use std::convert::TryFrom; use std::fmt; use std::future::Future; +use std::num::NonZeroUsize; use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::net::TcpStream; -use tokio_rustls::rustls::RootCertStore; -use tokio_rustls::rustls::ServerName; -use tokio_rustls::TlsConnector; use fastwebsockets::CloseCode; use fastwebsockets::FragmentCollector; @@ -280,11 +281,16 @@ where unsafely_ignore_certificate_errors, None, )?; - let tls_connector = TlsConnector::from(Arc::new(tls_config)); let dnsname = ServerName::try_from(domain.as_str()) .map_err(|_| invalid_hostname(domain))?; - let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; - handshake(cancel_resource, request, tls_socket).await? + let mut tls_connector = TlsStream::new_client_side( + tcp_socket, + tls_config.into(), + dnsname, + NonZeroUsize::new(65536), + ); + let _hs = tls_connector.handshake().await?; + handshake(cancel_resource, request, tls_connector).await? } _ => unreachable!(), }; @@ -629,7 +635,8 @@ pub async fn op_ws_next_event( let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; loop { - let val = match ws.read_frame().await { + let val = ws.read_frame().await; + let val = match val { Ok(val) => val, Err(err) => { // No message was received, socket closed while we waited. diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index b7106a2b39014b..b9d29aa40e93fd 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -342,6 +342,7 @@ async fn echo_websocket_handler( match frame.opcode { fastwebsockets::OpCode::Close => break, fastwebsockets::OpCode::Text | fastwebsockets::OpCode::Binary => { + println!("got frame! {}", frame.payload.len()); ws.write_frame(frame).await.unwrap(); } _ => {} diff --git a/tools/wpt.ts b/tools/wpt.ts index 07f6b6ba94e2a2..f81807a8e7787b 100755 --- a/tools/wpt.ts +++ b/tools/wpt.ts @@ -597,7 +597,7 @@ function reportVariation(result: TestResult, expectation: boolean | string[]) { const expectFail = expectation === false; const failReason = result.status !== 0 - ? "runner failed during test" + ? `runner failed during test: >>>>>>>\n${result.stderr}\n<<<<<<<\n` : "the event loop run out of tasks during the test"; console.log( `\nfile result: ${ diff --git a/tools/wpt/runner.ts b/tools/wpt/runner.ts index fb39ddfa49099f..99a297dc3a26e9 100644 --- a/tools/wpt/runner.ts +++ b/tools/wpt/runner.ts @@ -54,6 +54,7 @@ export interface TestResult { duration: number; status: number; stderr: string; + stdout: string; } export interface TestHarnessStatus { @@ -124,7 +125,7 @@ export async function runSingleTest( env: { NO_COLOR: "1", }, - stdout: "null", + stdout: "piped", stderr: "piped", }).spawn(); @@ -136,12 +137,22 @@ export async function runSingleTest( const lines = proc.stderr.pipeThrough(new TextDecoderStream()).pipeThrough( new TextLineStream(), ); + const stdoutLines = proc.stdout.pipeThrough(new TextDecoderStream()) + .pipeThrough( + new TextLineStream(), + ); interval = setInterval(() => { const passedTime = performance.now() - start; if (passedTime > timeout) { proc.kill("SIGINT"); } }, 1000); + let stdout = ""; + (async () => { + for await (const line of stdoutLines) { + stdout += line + "\n"; + } + })(); for await (const line of lines) { if (line.startsWith("{")) { const data = JSON.parse(line); @@ -167,6 +178,7 @@ export async function runSingleTest( duration, cases, stderr, + stdout, }; } finally { clearInterval(interval);