Skip to content

Commit

Permalink
bump version with backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Oct 27, 2023
1 parent 6e2abb2 commit 624a7f9
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 14 deletions.
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ ring = "=0.16.20"
rusqlite = { version = "=0.29.0", features = ["unlock_notify", "bundled"] }
rustls = "0.21.0"
rustls-pemfile = "1.0.0"
rustls-tokio-stream = "0.2.2"
rustls-webpki = "0.101.4"
rustls-native-certs = "0.6.2"
webpki-roots = "0.25.2"
Expand Down
2 changes: 1 addition & 1 deletion cli/tests/testdata/run/websocket_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Deno.test("websocket error", async () => {
// Error message got changed because we don't use warp in test_util
assertEquals(
err.message,
"InvalidData: received corrupt message of type InvalidContentType",
"InvalidData: invalid data",
);
promise1.resolve();
};
Expand Down
60 changes: 58 additions & 2 deletions cli/tests/unit/websocket_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,77 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() {
const promise = deferred();
const ws = new WebSocket(new URL("ws://localhost:4242/"));
assertEquals(ws.url, "ws://localhost:4242/");
ws.onerror = () => fail();
ws.onerror = (e) => promise.reject(e);
ws.onopen = () => ws.close();
ws.onclose = () => {
promise.resolve();
};
await promise;
});

// Ignored until split websocket
Deno.test({ ignore: true }, async function websocketSendLargePacket() {
const promise = deferred();
const ws = new WebSocket(new URL("wss://localhost:4243/"));
assertEquals(ws.url, "wss://localhost:4243/");
ws.onerror = (e) => promise.reject(e);
ws.onopen = () => {
ws.send("a".repeat(65000));
};
ws.onmessage = () => {
ws.close();
};
ws.onclose = () => {
promise.resolve();
};
await promise;
});

// Ignored until split websocket
Deno.test({ ignore: true }, async function websocketSendLargeBinaryPacket() {
const promise = deferred();
const ws = new WebSocket(new URL("wss://localhost:4243/"));
assertEquals(ws.url, "wss://localhost:4243/");
ws.onerror = (e) => promise.reject(e);
ws.onopen = () => {
ws.send(new Uint8Array(65000));
};
ws.onmessage = (msg) => {
console.log(msg);
ws.close();
};
ws.onclose = () => {
promise.resolve();
};
await promise;
});

// Ignored until split websocket
Deno.test({ ignore: true }, async function websocketSendLargeBlobPacket() {
const promise = deferred();
const ws = new WebSocket(new URL("wss://localhost:4243/"));
assertEquals(ws.url, "wss://localhost:4243/");
ws.onerror = (e) => promise.reject(e);
ws.onopen = () => {
ws.send(new Blob(["a".repeat(10)]));
};
ws.onmessage = (msg) => {
console.log(msg);
ws.close();
};
ws.onclose = () => {
promise.resolve();
};
await promise;
});

// https://github.com/denoland/deno/pull/17762
// https://github.com/denoland/deno/issues/17761
Deno.test(async function websocketPingPong() {
const promise = deferred();
const ws = new WebSocket("ws://localhost:4245/");
assertEquals(ws.url, "ws://localhost:4245/");
ws.onerror = () => fail();
ws.onerror = (e) => promise.reject(e);
ws.onmessage = (e) => {
ws.send(e.data);
};
Expand Down
3 changes: 2 additions & 1 deletion ext/websocket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fastwebsockets = { workspace = true, features = ["upgrade"] }
http.workspace = true
hyper = { workspace = true, features = ["backports"] }
once_cell.workspace = true
rustls.workspace = true
rustls-tokio-stream.workspace = true
serde.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
21 changes: 14 additions & 7 deletions ext/websocket/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,23 @@ use http::Request;
use http::Uri;
use hyper::Body;
use once_cell::sync::Lazy;
use rustls::RootCertStore;
use rustls::ServerName;
use rustls_tokio_stream::TlsStream;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::Cell;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::fmt;
use std::future::Future;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpStream;
use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::rustls::ServerName;
use tokio_rustls::TlsConnector;

use fastwebsockets::CloseCode;
use fastwebsockets::FragmentCollector;
Expand Down Expand Up @@ -280,11 +281,16 @@ where
unsafely_ignore_certificate_errors,
None,
)?;
let tls_connector = TlsConnector::from(Arc::new(tls_config));
let dnsname = ServerName::try_from(domain.as_str())
.map_err(|_| invalid_hostname(domain))?;
let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?;
handshake(cancel_resource, request, tls_socket).await?
let mut tls_connector = TlsStream::new_client_side(
tcp_socket,
tls_config.into(),
dnsname,
NonZeroUsize::new(65536),
);
let _hs = tls_connector.handshake().await?;
handshake(cancel_resource, request, tls_connector).await?
}
_ => unreachable!(),
};
Expand Down Expand Up @@ -629,7 +635,8 @@ pub async fn op_ws_next_event(

let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
loop {
let val = match ws.read_frame().await {
let val = ws.read_frame().await;
let val = match val {
Ok(val) => val,
Err(err) => {
// No message was received, socket closed while we waited.
Expand Down
1 change: 1 addition & 0 deletions test_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ async fn echo_websocket_handler(
match frame.opcode {
fastwebsockets::OpCode::Close => break,
fastwebsockets::OpCode::Text | fastwebsockets::OpCode::Binary => {
println!("got frame! {}", frame.payload.len());
ws.write_frame(frame).await.unwrap();
}
_ => {}
Expand Down
2 changes: 1 addition & 1 deletion tools/wpt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ function reportVariation(result: TestResult, expectation: boolean | string[]) {

const expectFail = expectation === false;
const failReason = result.status !== 0
? "runner failed during test"
? `runner failed during test: >>>>>>>\n${result.stderr}\n<<<<<<<\n`
: "the event loop run out of tasks during the test";
console.log(
`\nfile result: ${
Expand Down
14 changes: 13 additions & 1 deletion tools/wpt/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export interface TestResult {
duration: number;
status: number;
stderr: string;
stdout: string;
}

export interface TestHarnessStatus {
Expand Down Expand Up @@ -124,7 +125,7 @@ export async function runSingleTest(
env: {
NO_COLOR: "1",
},
stdout: "null",
stdout: "piped",
stderr: "piped",
}).spawn();

Expand All @@ -136,12 +137,22 @@ export async function runSingleTest(
const lines = proc.stderr.pipeThrough(new TextDecoderStream()).pipeThrough(
new TextLineStream(),
);
const stdoutLines = proc.stdout.pipeThrough(new TextDecoderStream())
.pipeThrough(
new TextLineStream(),
);
interval = setInterval(() => {
const passedTime = performance.now() - start;
if (passedTime > timeout) {
proc.kill("SIGINT");
}
}, 1000);
let stdout = "";
(async () => {
for await (const line of stdoutLines) {
stdout += line + "\n";
}
})();
for await (const line of lines) {
if (line.startsWith("{")) {
const data = JSON.parse(line);
Expand All @@ -167,6 +178,7 @@ export async function runSingleTest(
duration,
cases,
stderr,
stdout,
};
} finally {
clearInterval(interval);
Expand Down

0 comments on commit 624a7f9

Please sign in to comment.