Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jupyter): send binary data with Deno.jupyter.broadcast #20755

Merged
merged 8 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/js/40_jupyter.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ function enableJupyter() {
} = core.ensureFastOps();

globalThis.Deno.jupyter = {
async broadcast(msgType, content, { metadata = {} } = {}) {
await op_jupyter_broadcast(msgType, content, metadata);
async broadcast(msgType, content, { metadata = {}, buffers = [] } = {}) {
await op_jupyter_broadcast(msgType, content, metadata, buffers);
},
};
}
Expand Down
2 changes: 2 additions & 0 deletions cli/ops/jupyter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub async fn op_jupyter_broadcast(
#[string] message_type: String,
#[serde] content: serde_json::Value,
#[serde] metadata: serde_json::Value,
#[serde] buffers: Vec<deno_core::JsBuffer>,
) -> Result<(), AnyError> {
let (iopub_socket, last_execution_request) = {
let s = state.borrow();
Expand All @@ -54,6 +55,7 @@ pub async fn op_jupyter_broadcast(
.new_message(&message_type)
.with_content(content)
.with_metadata(metadata)
.with_buffers(buffers.into_iter().map(|b| b.into()).collect())
.send(&mut *iopub_socket.lock().await)
.await?;
}
Expand Down
41 changes: 40 additions & 1 deletion cli/tests/testdata/jupyter/integration_test.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,49 @@
"});"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "6e9b530f-554d-4ef7-a5d6-69432283fd40",
"metadata": {},
"outputs": [],
"source": [
"// Smoke test: Send example Jupyter Widgets messages with \"extra\" context.\n",
"// No return because we don't have a front-end widget to get the message from.\n",
"await Deno.jupyter.broadcast(\n",
" \"comm_open\",\n",
" {\n",
" \"comm_id\": \"foo\",\n",
" \"target_name\": \"jupyter.widget\",\n",
" \"data\": {\n",
" \"state\": {},\n",
" },\n",
" },\n",
" {\n",
" \"metadata\": { \"version\": \"2.1.0\" },\n",
" },\n",
");\n",
"\n",
"await Deno.jupyter.broadcast(\n",
" \"comm_msg\",\n",
" {\n",
" \"comm_id\": \"foo\",\n",
" \"data\": {\n",
" \"method\": \"update\",\n",
" \"state\": { \"answer\": null },\n",
" \"buffer_paths\": [[\"answer\"]]\n",
" },\n",
" },\n",
" {\n",
" \"buffers\": [new Uint8Array([42])],\n",
" },\n",
");"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0181f28e",
"id": "f678313e-06c6-4fb8-a4ef-54a417129a82",
"metadata": {},
"outputs": [],
"source": []
Expand Down
58 changes: 36 additions & 22 deletions cli/tools/jupyter/jupyter_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub(crate) struct JupyterMessage {
parent_header: serde_json::Value,
metadata: serde_json::Value,
content: serde_json::Value,
buffers: Vec<Bytes>,
}

const DELIMITER: &[u8] = b"<IDS|MSG>";
Expand All @@ -146,6 +147,11 @@ impl JupyterMessage {
parent_header: serde_json::from_slice(&raw_message.jparts[1])?,
metadata: serde_json::from_slice(&raw_message.jparts[2])?,
content: serde_json::from_slice(&raw_message.jparts[3])?,
buffers: if raw_message.jparts.len() > 4 {
raw_message.jparts[4..].to_vec()
} else {
vec![]
},
})
}

Expand Down Expand Up @@ -179,6 +185,7 @@ impl JupyterMessage {
parent_header: self.header.clone(),
metadata: json!({}),
content: json!({}),
buffers: vec![],
}
}

Expand Down Expand Up @@ -214,36 +221,43 @@ impl JupyterMessage {
self
}

pub(crate) fn with_buffers(mut self, buffers: Vec<Bytes>) -> JupyterMessage {
self.buffers = buffers;
self
}

pub(crate) async fn send<S: zeromq::SocketSend>(
&self,
connection: &mut Connection<S>,
) -> Result<(), AnyError> {
// If performance is a concern, we can probably avoid the clone and to_vec calls with a bit
// of refactoring.
let mut jparts: Vec<Bytes> = vec![
serde_json::to_string(&self.header)
.unwrap()
.as_bytes()
.to_vec()
.into(),
serde_json::to_string(&self.parent_header)
.unwrap()
.as_bytes()
.to_vec()
.into(),
serde_json::to_string(&self.metadata)
.unwrap()
.as_bytes()
.to_vec()
.into(),
serde_json::to_string(&self.content)
.unwrap()
.as_bytes()
.to_vec()
.into(),
];
jparts.extend_from_slice(&self.buffers);
let raw_message = RawMessage {
zmq_identities: self.zmq_identities.clone(),
jparts: vec![
serde_json::to_string(&self.header)
.unwrap()
.as_bytes()
.to_vec()
.into(),
serde_json::to_string(&self.parent_header)
.unwrap()
.as_bytes()
.to_vec()
.into(),
serde_json::to_string(&self.metadata)
.unwrap()
.as_bytes()
.to_vec()
.into(),
serde_json::to_string(&self.content)
.unwrap()
.as_bytes()
.to_vec()
.into(),
],
jparts,
};
raw_message.send(connection).await
}
Expand Down
1 change: 1 addition & 0 deletions cli/tsc/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,7 @@ declare namespace Deno {
content: Record<string, unknown>,
extra?: {
metadata?: Record<string, unknown>;
buffers?: Uint8Array[];
},
): Promise<void>;
}
Expand Down