From 6ce595a65e79e9f7fb2201b299bcb5bcc23c7e22 Mon Sep 17 00:00:00 2001 From: Nika Layzell Date: Thu, 1 Jul 2021 15:13:42 -0400 Subject: [PATCH] proc_macro: add an optimized CrossThread execution strategy, and a debug flag to use it This new strategy supports avoiding waiting for a reply for noblock messages. This strategy requires using a channel-like approach (similar to the previous CrossThread1 approach). This new CrossThread execution strategy takes a type parameter for the channel to use, allowing rustc to use a more efficient channel which the proc_macro crate could not declare as a dependency. --- compiler/rustc_expand/src/proc_macro.rs | 52 +++++--- compiler/rustc_session/src/options.rs | 2 + library/proc_macro/src/bridge/client.rs | 6 +- library/proc_macro/src/bridge/server.rs | 169 +++++++++++++++--------- 4 files changed, 141 insertions(+), 88 deletions(-) diff --git a/compiler/rustc_expand/src/proc_macro.rs b/compiler/rustc_expand/src/proc_macro.rs index 494b3fb61ee97..a23d28bf81f7c 100644 --- a/compiler/rustc_expand/src/proc_macro.rs +++ b/compiler/rustc_expand/src/proc_macro.rs @@ -12,7 +12,11 @@ use rustc_parse::parser::ForceCollect; use rustc_span::def_id::CrateNum; use rustc_span::{Span, DUMMY_SP}; -const EXEC_STRATEGY: pm::bridge::server::SameThread = pm::bridge::server::SameThread; +fn exec_strategy(ecx: &ExtCtxt<'_>) -> impl pm::bridge::server::ExecutionStrategy { + >>::new( + ecx.sess.opts.debugging_opts.proc_macro_cross_thread, + ) +} pub struct BangProcMacro { pub client: pm::bridge::client::Client pm::TokenStream>, @@ -27,14 +31,16 @@ impl base::ProcMacro for BangProcMacro { input: TokenStream, ) -> Result { let server = proc_macro_server::Rustc::new(ecx, self.krate); - self.client.run(&EXEC_STRATEGY, server, input, ecx.ecfg.proc_macro_backtrace).map_err(|e| { - let mut err = ecx.struct_span_err(span, "proc macro panicked"); - if let Some(s) = e.as_str() { - err.help(&format!("message: {}", s)); - } - err.emit(); - ErrorReported - }) + self.client.run(&exec_strategy(ecx), server, input, ecx.ecfg.proc_macro_backtrace).map_err( + |e| { + let mut err = ecx.struct_span_err(span, "proc macro panicked"); + if let Some(s) = e.as_str() { + err.help(&format!("message: {}", s)); + } + err.emit(); + ErrorReported + }, + ) } } @@ -53,7 +59,7 @@ impl base::AttrProcMacro for AttrProcMacro { ) -> Result { let server = proc_macro_server::Rustc::new(ecx, self.krate); self.client - .run(&EXEC_STRATEGY, server, annotation, annotated, ecx.ecfg.proc_macro_backtrace) + .run(&exec_strategy(ecx), server, annotation, annotated, ecx.ecfg.proc_macro_backtrace) .map_err(|e| { let mut err = ecx.struct_span_err(span, "custom attribute panicked"); if let Some(s) = e.as_str() { @@ -102,18 +108,22 @@ impl MultiItemModifier for ProcMacroDerive { }; let server = proc_macro_server::Rustc::new(ecx, self.krate); - let stream = - match self.client.run(&EXEC_STRATEGY, server, input, ecx.ecfg.proc_macro_backtrace) { - Ok(stream) => stream, - Err(e) => { - let mut err = ecx.struct_span_err(span, "proc-macro derive panicked"); - if let Some(s) = e.as_str() { - err.help(&format!("message: {}", s)); - } - err.emit(); - return ExpandResult::Ready(vec![]); + let stream = match self.client.run( + &exec_strategy(ecx), + server, + input, + ecx.ecfg.proc_macro_backtrace, + ) { + Ok(stream) => stream, + Err(e) => { + let mut err = ecx.struct_span_err(span, "proc-macro derive panicked"); + if let Some(s) = e.as_str() { + err.help(&format!("message: {}", s)); } - }; + err.emit(); + return ExpandResult::Ready(vec![]); + } + }; let error_count_before = ecx.sess.parse_sess.span_diagnostic.err_count(); let mut parser = diff --git a/compiler/rustc_session/src/options.rs b/compiler/rustc_session/src/options.rs index 4c40d0c367eca..04f1f063a0126 100644 --- a/compiler/rustc_session/src/options.rs +++ b/compiler/rustc_session/src/options.rs @@ -1207,6 +1207,8 @@ options! { "print layout information for each type encountered (default: no)"), proc_macro_backtrace: bool = (false, parse_bool, [UNTRACKED], "show backtraces for panics during proc-macro execution (default: no)"), + proc_macro_cross_thread: bool = (false, parse_bool, [UNTRACKED], + "run proc-macro code on a separate thread (default: no)"), profile: bool = (false, parse_bool, [TRACKED], "insert profiling code (default: no)"), profile_closures: bool = (false, parse_no_flag, [UNTRACKED], diff --git a/library/proc_macro/src/bridge/client.rs b/library/proc_macro/src/bridge/client.rs index 9d41497fb28c9..ee40a92d9ef3b 100644 --- a/library/proc_macro/src/bridge/client.rs +++ b/library/proc_macro/src/bridge/client.rs @@ -312,7 +312,11 @@ macro_rules! client_send_impl { b = bridge.dispatch.call(b); - let r = Result::<(), PanicMessage>::decode(&mut &b[..], &mut ()); + let r = if b.len() > 0 { + Result::<(), PanicMessage>::decode(&mut &b[..], &mut ()) + } else { + Ok(()) + }; bridge.cached_buffer = b; diff --git a/library/proc_macro/src/bridge/server.rs b/library/proc_macro/src/bridge/server.rs index 3b9210ea55d29..8f64d860fe41d 100644 --- a/library/proc_macro/src/bridge/server.rs +++ b/library/proc_macro/src/bridge/server.rs @@ -2,6 +2,8 @@ use super::*; +use std::marker::PhantomData; + // FIXME(eddyb) generate the definition of `HandleStore` in `server.rs`. use super::client::HandleStore; @@ -174,6 +176,50 @@ pub trait ExecutionStrategy { ) -> Buffer; } +pub struct MaybeCrossThread

{ + cross_thread: bool, + marker: PhantomData

, +} + +impl

MaybeCrossThread

{ + pub const fn new(cross_thread: bool) -> Self { + MaybeCrossThread { cross_thread, marker: PhantomData } + } +} + +impl

ExecutionStrategy for MaybeCrossThread

+where + P: MessagePipe> + Send + 'static, +{ + fn run_bridge_and_client( + &self, + dispatcher: &mut impl DispatcherTrait, + input: Buffer, + run_client: extern "C" fn(BridgeConfig<'_>, D) -> Buffer, + client_data: D, + force_show_panics: bool, + ) -> Buffer { + if self.cross_thread { + >::new().run_bridge_and_client( + dispatcher, + input, + run_client, + client_data, + force_show_panics, + ) + } else { + SameThread.run_bridge_and_client( + dispatcher, + input, + run_client, + client_data, + force_show_panics, + ) + } + } +} + +#[derive(Default)] pub struct SameThread; impl ExecutionStrategy for SameThread { @@ -194,12 +240,18 @@ impl ExecutionStrategy for SameThread { } } -// NOTE(eddyb) Two implementations are provided, the second one is a bit -// faster but neither is anywhere near as fast as same-thread execution. +pub struct CrossThread

(PhantomData

); -pub struct CrossThread1; +impl

CrossThread

{ + pub const fn new() -> Self { + CrossThread(PhantomData) + } +} -impl ExecutionStrategy for CrossThread1 { +impl

ExecutionStrategy for CrossThread

+where + P: MessagePipe> + Send + 'static, +{ fn run_bridge_and_client( &self, dispatcher: &mut impl DispatcherTrait, @@ -208,15 +260,18 @@ impl ExecutionStrategy for CrossThread1 { client_data: D, force_show_panics: bool, ) -> Buffer { - use std::sync::mpsc::channel; - - let (req_tx, req_rx) = channel(); - let (res_tx, res_rx) = channel(); + let (mut server, mut client) = P::new(); let join_handle = thread::spawn(move || { - let mut dispatch = |b| { - req_tx.send(b).unwrap(); - res_rx.recv().unwrap() + let mut dispatch = |b: Buffer| -> Buffer { + let method_tag = api_tags::Method::decode(&mut &b[..], &mut ()); + client.send(b); + + if method_tag.should_wait() { + client.recv().expect("server died while client waiting for reply") + } else { + Buffer::new() + } }; run_client( @@ -225,73 +280,55 @@ impl ExecutionStrategy for CrossThread1 { ) }); - for b in req_rx { - res_tx.send(dispatcher.dispatch(b)).unwrap(); + while let Some(b) = server.recv() { + let method_tag = api_tags::Method::decode(&mut &b[..], &mut ()); + let b = dispatcher.dispatch(b); + + if method_tag.should_wait() { + server.send(b); + } else if let Err(err) = >::decode(&mut &b[..], &mut ()) { + panic::resume_unwind(err.into()); + } } join_handle.join().unwrap() } } -pub struct CrossThread2; - -impl ExecutionStrategy for CrossThread2 { - fn run_bridge_and_client( - &self, - dispatcher: &mut impl DispatcherTrait, - input: Buffer, - run_client: extern "C" fn(BridgeConfig<'_>, D) -> Buffer, - client_data: D, - force_show_panics: bool, - ) -> Buffer { - use std::sync::{Arc, Mutex}; - - enum State { - Req(T), - Res(T), - } - - let mut state = Arc::new(Mutex::new(State::Res(Buffer::new()))); +/// A message pipe used for communicating between server and client threads. +pub trait MessagePipe: Sized { + /// Create a new pair of endpoints for the message pipe. + fn new() -> (Self, Self); - let server_thread = thread::current(); - let state2 = state.clone(); - let join_handle = thread::spawn(move || { - let mut dispatch = |b| { - *state2.lock().unwrap() = State::Req(b); - server_thread.unpark(); - loop { - thread::park(); - if let State::Res(b) = &mut *state2.lock().unwrap() { - break b.take(); - } - } - }; + /// Send a message to the other endpoint of this pipe. + fn send(&mut self, value: T); - let r = run_client( - BridgeConfig { input, dispatch: (&mut dispatch).into(), force_show_panics }, - client_data, - ); + /// Receive a message from the other endpoint of this pipe. + /// + /// Returns `None` if the other end of the pipe has been destroyed, and no + /// message was received. + fn recv(&mut self) -> Option; +} - // Wake up the server so it can exit the dispatch loop. - drop(state2); - server_thread.unpark(); +/// Implementation of `MessagePipe` using `std::sync::mpsc` +pub struct StdMessagePipe { + tx: std::sync::mpsc::Sender, + rx: std::sync::mpsc::Receiver, +} - r - }); +impl MessagePipe for StdMessagePipe { + fn new() -> (Self, Self) { + let (tx1, rx1) = std::sync::mpsc::channel(); + let (tx2, rx2) = std::sync::mpsc::channel(); + (StdMessagePipe { tx: tx1, rx: rx2 }, StdMessagePipe { tx: tx2, rx: rx1 }) + } - // Check whether `state2` was dropped, to know when to stop. - while Arc::get_mut(&mut state).is_none() { - thread::park(); - let mut b = match &mut *state.lock().unwrap() { - State::Req(b) => b.take(), - _ => continue, - }; - b = dispatcher.dispatch(b.take()); - *state.lock().unwrap() = State::Res(b); - join_handle.thread().unpark(); - } + fn send(&mut self, v: T) { + self.tx.send(v).unwrap(); + } - join_handle.join().unwrap() + fn recv(&mut self) -> Option { + self.rx.recv().ok() } }