Skip to content

Commit

Permalink
[internal] Make GIL usage explicit for session_poll_workunits (#13609)
Browse files Browse the repository at this point in the history
Prework for #13526. We were reacquiring the GIL directly inside `py.allow_threads(||)`. Rust-cpython allows that, but PyO3 does not. (PyO3 has the benefit of instilling much stronger confidence in reasoning when the GIL is held or not.)

We can easily work around this by having `WorkunitStore.with_latest_workunits` return `Vec<Workunit>` instead of `&[Workunit]`. No new clones are happening! We already were making this Vec. Only now we move the value out, rather than referencing it via a closure passed in as a callback.

That change allows for operating on the `Vec<Workunit>` safely outside of the `py.allow_threads(||)` to convert it into the FFI boundary.

## Benchmark

These benchmarks use the Toolchain BuildSense plugin + anonymous telemetrics streaming workunit handlers.

### No pantsd

Before:

```
❯ hyperfine -w 1 -r 10 './pants -ldebug --no-pantsd list ::'
  Time (mean ± σ):      4.611 s ±  0.206 s    [User: 2.940 s, System: 0.474 s]
  Range (min … max):    4.419 s …  5.153 s    10 runs
```

After:

```
❯ hyperfine -w 1 -r 10 './pants -ldebug --no-pantsd list ::'
  Time (mean ± σ):      4.655 s ±  0.113 s    [User: 2.975 s, System: 0.509 s]
  Range (min … max):    4.430 s …  4.827 s    10 runs
```

### Pantsd

Before:

```
❯ hyperfine -w 1 -r 10 './pants -ldebug --concurrent --pantsd list ::'
  Time (mean ± σ):      2.641 s ±  0.021 s    [User: 2.026 s, System: 0.323 s]
  Range (min … max):    2.602 s …  2.670 s    10 runs
```

After:

```
❯ hyperfine -w 1 -r 10 './pants -ldebug --concurrent --pantsd list ::'
  Time (mean ± σ):      2.651 s ±  0.023 s    [User: 2.036 s, System: 0.334 s]
  Range (min … max):    2.611 s …  2.693 s    10 runs
```

[ci skip-build-wheels]
  • Loading branch information
Eric-Arellano authored Nov 14, 2021
1 parent 66673f0 commit 996ff41
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 104 deletions.
13 changes: 6 additions & 7 deletions src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1760,13 +1760,12 @@ async fn remote_workunits_are_stored() {
.await
.unwrap();

let got_workunit_items: HashSet<String> =
workunit_store.with_latest_workunits(log::Level::Trace, |_, completed| {
completed
.iter()
.map(|workunit| workunit.name.clone())
.collect()
});
let got_workunit_items: HashSet<String> = workunit_store
.latest_workunits(log::Level::Trace)
.1
.into_iter()
.map(|workunit| workunit.name)
.collect();

let wanted_workunit_items = hashset! {
String::from("remote execution action scheduling"),
Expand Down
46 changes: 20 additions & 26 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,19 +1117,19 @@ async fn workunit_to_py_value(
externs::store_dict(py, dict_entries)
}

async fn workunits_to_py_tuple_value<'a>(
workunits: impl Iterator<Item = &'a Workunit>,
async fn workunits_to_py_tuple_value(
py: Python<'_>,
workunits: Vec<Workunit>,
core: &Arc<Core>,
session: &Session,
) -> CPyResult<Value> {
let mut workunit_values = Vec::new();
for workunit in workunits {
let py_value = workunit_to_py_value(workunit, core, session).await?;
let py_value = workunit_to_py_value(&workunit, core, session).await?;
workunit_values.push(py_value);
}

let gil = Python::acquire_gil();
Ok(externs::store_tuple(gil.python(), workunit_values))
Ok(externs::store_tuple(py, workunit_values))
}

fn session_poll_workunits(
Expand All @@ -1144,28 +1144,22 @@ fn session_poll_workunits(
with_scheduler(py, scheduler_ptr, |scheduler| {
with_session(py, session_ptr, |session| {
let core = scheduler.core.clone();
py.allow_threads(|| {
session
.workunit_store()
.with_latest_workunits(py_level.into(), |started, completed| {
let mut started_iter = started.iter();
let started = core.executor.block_on(workunits_to_py_tuple_value(
&mut started_iter,
&scheduler.core,
session,
))?;

let mut completed_iter = completed.iter();
let completed = core.executor.block_on(workunits_to_py_tuple_value(
&mut completed_iter,
&scheduler.core,
session,
))?;
let (started, completed) =
py.allow_threads(|| session.workunit_store().latest_workunits(py_level.into()));

let gil = Python::acquire_gil();
Ok(externs::store_tuple(gil.python(), vec![started, completed]).into())
})
})
let started_val = core.executor.block_on(workunits_to_py_tuple_value(
py,
started,
&scheduler.core,
session,
))?;
let completed_val = core.executor.block_on(workunits_to_py_tuple_value(
py,
completed,
&scheduler.core,
session,
))?;
Ok(externs::store_tuple(py, vec![started_val, completed_val]).into())
})
})
}
Expand Down
129 changes: 58 additions & 71 deletions src/rust/engine/workunit_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,84 +257,76 @@ impl StreamingWorkunitData {
}
}

pub fn with_latest_workunits<F, T>(&mut self, max_verbosity: log::Level, f: F) -> T
where
F: FnOnce(&[Workunit], &[Workunit]) -> T,
{
pub fn latest_workunits(&mut self, max_verbosity: log::Level) -> (Vec<Workunit>, Vec<Workunit>) {
let should_emit = |workunit: &Workunit| -> bool { workunit.metadata.level <= max_verbosity };
let mut started_messages = vec![];
let mut completed_messages = vec![];

let (started_workunits, completed_workunits) = {
let mut started_messages = vec![];
let mut completed_messages = vec![];

{
let receiver = self.msg_rx.lock();
while let Ok(msg) = receiver.try_recv() {
match msg {
StoreMsg::Started(started) => started_messages.push(started),
StoreMsg::Completed(span, metadata, time, new_counters) => {
completed_messages.push((span, metadata, time, new_counters))
}
StoreMsg::Canceled(..) => (),
{
let receiver = self.msg_rx.lock();
while let Ok(msg) = receiver.try_recv() {
match msg {
StoreMsg::Started(started) => started_messages.push(started),
StoreMsg::Completed(span, metadata, time, new_counters) => {
completed_messages.push((span, metadata, time, new_counters))
}
StoreMsg::Canceled(..) => (),
}
}
}

let mut workunit_records = self.workunit_records.lock();
let mut started_workunits: Vec<Workunit> = vec![];
for mut started in started_messages.into_iter() {
let span_id = started.span_id;
workunit_records.insert(span_id, started.clone());
let mut workunit_records = self.workunit_records.lock();
let mut started_workunits: Vec<Workunit> = vec![];
for mut started in started_messages.into_iter() {
let span_id = started.span_id;
workunit_records.insert(span_id, started.clone());

if should_emit(&started) {
started.parent_id =
first_matched_parent(&workunit_records, started.parent_id, |_| false, should_emit);
started_workunits.push(started);
}
if should_emit(&started) {
started.parent_id =
first_matched_parent(&workunit_records, started.parent_id, |_| false, should_emit);
started_workunits.push(started);
}
}

let mut completed_workunits: Vec<Workunit> = vec![];
for (span_id, new_metadata, end_time, new_counters) in completed_messages.into_iter() {
match workunit_records.entry(span_id) {
Entry::Vacant(_) => {
log::warn!("No previously-started workunit found for id: {}", span_id);
continue;
}
Entry::Occupied(o) => {
let (span_id, mut workunit) = o.remove_entry();
let time_span = match workunit.state {
WorkunitState::Completed { .. } => {
log::warn!("Workunit {} was already completed", span_id);
continue;
}
WorkunitState::Started { start_time, .. } => {
TimeSpan::from_start_and_end_systemtime(&start_time, &end_time)
}
};
let new_state = WorkunitState::Completed { time_span };
workunit.state = new_state;
if let Some(metadata) = new_metadata {
workunit.metadata = metadata;
let mut completed_workunits: Vec<Workunit> = vec![];
for (span_id, new_metadata, end_time, new_counters) in completed_messages.into_iter() {
match workunit_records.entry(span_id) {
Entry::Vacant(_) => {
log::warn!("No previously-started workunit found for id: {}", span_id);
continue;
}
Entry::Occupied(o) => {
let (span_id, mut workunit) = o.remove_entry();
let time_span = match workunit.state {
WorkunitState::Completed { .. } => {
log::warn!("Workunit {} was already completed", span_id);
continue;
}
workunit.counters = new_counters;
workunit_records.insert(span_id, workunit.clone());

if should_emit(&workunit) {
workunit.parent_id = first_matched_parent(
&workunit_records,
workunit.parent_id,
|_| false,
should_emit,
);
completed_workunits.push(workunit);
WorkunitState::Started { start_time, .. } => {
TimeSpan::from_start_and_end_systemtime(&start_time, &end_time)
}
};
let new_state = WorkunitState::Completed { time_span };
workunit.state = new_state;
if let Some(metadata) = new_metadata {
workunit.metadata = metadata;
}
workunit.counters = new_counters;
workunit_records.insert(span_id, workunit.clone());

if should_emit(&workunit) {
workunit.parent_id = first_matched_parent(
&workunit_records,
workunit.parent_id,
|_| false,
should_emit,
);
completed_workunits.push(workunit);
}
}
}
(started_workunits, completed_workunits)
};

f(&started_workunits, &completed_workunits)
}
(started_workunits, completed_workunits)
}
}

Expand Down Expand Up @@ -729,13 +721,8 @@ impl WorkunitStore {
self.complete_workunit_impl(workunit, end_time);
}

pub fn with_latest_workunits<F, T>(&mut self, max_verbosity: log::Level, f: F) -> T
where
F: FnOnce(&[Workunit], &[Workunit]) -> T,
{
self
.streaming_workunit_data
.with_latest_workunits(max_verbosity, f)
pub fn latest_workunits(&mut self, max_verbosity: log::Level) -> (Vec<Workunit>, Vec<Workunit>) {
self.streaming_workunit_data.latest_workunits(max_verbosity)
}

///
Expand Down

0 comments on commit 996ff41

Please sign in to comment.