From d06372f6b14d9375f25f7793ecce61afbe60dbc4 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 18 Jul 2024 00:21:09 +0200 Subject: [PATCH] fix collectibles counting (vercel/turbo#8778) ### Description fix a bunch of cases where collectibles were not correctly counted ### Testing Instructions --- .../turbo-tasks-memory/src/memory_backend.rs | 7 +- .../src/memory_backend_with_pg.rs | 8 +- crates/turbo-tasks-memory/src/task.rs | 4 +- .../src/task/aggregation.rs | 59 ++++++------ .../tests/recompute_collectibles.rs | 92 +++++++++++++++++++ crates/turbo-tasks-testing/Cargo.toml | 1 + crates/turbo-tasks-testing/src/lib.rs | 7 +- crates/turbo-tasks/src/backend.rs | 9 +- crates/turbo-tasks/src/manager.rs | 13 ++- 9 files changed, 151 insertions(+), 49 deletions(-) create mode 100644 crates/turbo-tasks-memory/tests/recompute_collectibles.rs diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 624f6fe764e50..249bf4b8a2ca3 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -13,7 +13,6 @@ use std::{ }; use anyhow::{bail, Result}; -use auto_hash_map::AutoMap; use dashmap::{mapref::entry::Entry, DashMap}; use rustc_hash::FxHasher; use tokio::task::futures::TaskLocalFuture; @@ -21,8 +20,8 @@ use tracing::trace_span; use turbo_prehash::{BuildHasherExt, PassThroughHash, PreHashed}; use turbo_tasks::{ backend::{ - Backend, BackendJobId, CellContent, PersistentTaskType, TaskExecutionSpec, - TransientTaskType, + Backend, BackendJobId, CellContent, PersistentTaskType, TaskCollectiblesMap, + TaskExecutionSpec, TransientTaskType, }, event::EventListener, util::{IdFactoryWithReuse, NoMoveVec}, @@ -470,7 +469,7 @@ impl Backend for MemoryBackend { trait_id: TraitTypeId, reader: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> AutoMap { + ) -> TaskCollectiblesMap { Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id)); Task::read_collectibles(id, trait_id, reader, self, turbo_tasks) } diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index 19ee36169a778..3c2c5e8c91983 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -13,13 +13,13 @@ use std::{ }; use anyhow::{anyhow, Result}; -use auto_hash_map::{AutoMap, AutoSet}; +use auto_hash_map::AutoSet; use concurrent_queue::ConcurrentQueue; use dashmap::{mapref::entry::Entry, DashMap, DashSet}; use turbo_tasks::{ backend::{ - Backend, BackendJobId, CellContent, PersistentTaskType, TaskExecutionSpec, - TransientTaskType, + Backend, BackendJobId, CellContent, PersistentTaskType, TaskCollectiblesMap, + TaskExecutionSpec, TransientTaskType, }, event::{Event, EventListener}, persisted_graph::{ @@ -1446,7 +1446,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ _trait_id: TraitTypeId, _reader: TaskId, _turbo_tasks: &dyn TurboTasksBackendApi>, - ) -> AutoMap { + ) -> TaskCollectiblesMap { todo!() } diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index af0b69892707c..81eef31114c1f 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -21,7 +21,7 @@ use tokio::task_local; use tracing::Span; use turbo_prehash::PreHashed; use turbo_tasks::{ - backend::{PersistentTaskType, TaskExecutionSpec}, + backend::{PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec}, event::{Event, EventListener}, get_invalidator, registry, CellId, Invalidator, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, ValueTypeId, @@ -1573,7 +1573,7 @@ impl Task { reader: TaskId, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> AutoMap { + ) -> TaskCollectiblesMap { let aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); let mut aggregation_data = aggregation_context.aggregation_data(id); aggregation_data.read_collectibles(trait_type, reader) diff --git a/crates/turbo-tasks-memory/src/task/aggregation.rs b/crates/turbo-tasks-memory/src/task/aggregation.rs index f4188b2a0caeb..581768a7a753a 100644 --- a/crates/turbo-tasks-memory/src/task/aggregation.rs +++ b/crates/turbo-tasks-memory/src/task/aggregation.rs @@ -10,7 +10,10 @@ use auto_hash_map::{map::Entry, AutoMap}; use either::Either; use parking_lot::Mutex; use rustc_hash::FxHasher; -use turbo_tasks::{event::Event, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi}; +use turbo_tasks::{ + backend::TaskCollectiblesMap, event::Event, RawVc, TaskId, TaskIdSet, TraitTypeId, + TurboTasksBackendApi, +}; use super::{ meta_state::{FullTaskWriteGuard, TaskMetaStateWriteGuard}, @@ -32,7 +35,7 @@ pub enum RootType { #[derive(Debug, Default)] pub struct CollectiblesInfo { - collectibles: AutoMap, + collectibles: TaskCollectiblesMap, dependent_tasks: TaskIdSet, } @@ -90,8 +93,8 @@ impl Aggregated { ) { if let Entry::Occupied(mut entry) = self.collectibles.entry(trait_type) { let info = entry.get_mut(); - info.dependent_tasks.remove(&reader); - if info.is_unset() { + let removed = info.dependent_tasks.remove(&reader); + if removed && info.is_unset() { entry.remove(); } } @@ -101,7 +104,7 @@ impl Aggregated { &mut self, trait_type: TraitTypeId, reader: TaskId, - ) -> AutoMap { + ) -> TaskCollectiblesMap { match self.collectibles.entry(trait_type) { Entry::Occupied(mut e) => { let info = e.get_mut(); @@ -293,7 +296,7 @@ impl<'a> AggregationContext for TaskAggregationContext<'a> { } Entry::Vacant(e) => { let mut collectibles_info = CollectiblesInfo::default(); - update_count_entry(collectibles_info.collectibles.entry(collectible), count); + collectibles_info.collectibles.insert(collectible, count); e.insert(collectibles_info); } } @@ -489,8 +492,10 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> { change.dirty_tasks_update.push((self.id, 1)); } if let Some(collectibles) = guard.collectibles.as_ref() { - for (&(trait_type_id, collectible), _) in collectibles.iter() { - change.collectibles.push((trait_type_id, collectible, 1)); + for (&(trait_type_id, collectible), count) in collectibles.iter() { + change + .collectibles + .push((trait_type_id, collectible, *count)); } } if let TaskStateType::InProgress(box InProgressState { @@ -499,8 +504,10 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> { }) = &guard.state_type { if let Some(collectibles) = outdated_collectibles.as_ref() { - for (&(trait_type_id, collectible), _) in collectibles.iter() { - change.collectibles.push((trait_type_id, collectible, 1)); + for (&(trait_type_id, collectible), count) in collectibles.iter() { + change + .collectibles + .push((trait_type_id, collectible, *count)); } } } @@ -541,8 +548,10 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> { change.dirty_tasks_update.push((self.id, -1)); } if let Some(collectibles) = guard.collectibles.as_ref() { - for (&(trait_type_id, collectible), _) in collectibles.iter() { - change.collectibles.push((trait_type_id, collectible, -1)); + for (&(trait_type_id, collectible), count) in collectibles.iter() { + change + .collectibles + .push((trait_type_id, collectible, -count)); } } if let TaskStateType::InProgress(box InProgressState { @@ -551,8 +560,10 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> { }) = &guard.state_type { if let Some(collectibles) = outdated_collectibles.as_ref() { - for (&(trait_type_id, collectible), _) in collectibles.iter() { - change.collectibles.push((trait_type_id, collectible, -1)); + for (&(trait_type_id, collectible), count) in collectibles.iter() { + change + .collectibles + .push((trait_type_id, collectible, -*count)); } } } @@ -588,19 +599,13 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> { { data.unfinished_tasks = unfinished_tasks_update.into_iter().collect(); } - data.dirty_tasks = dirty_tasks_update.into_iter().collect(); - data.collectibles = collectibles - .into_iter() - .map(|(trait_type_id, collectible, count)| { - ( - trait_type_id, - CollectiblesInfo { - collectibles: [(collectible, count)].iter().cloned().collect(), - dependent_tasks: TaskIdSet::default(), - }, - ) - }) - .collect(); + for (t, n) in dirty_tasks_update.into_iter() { + data.dirty_tasks.insert(t, n); + } + for (trait_type_id, collectible, count) in collectibles.into_iter() { + let info = data.collectibles.entry(trait_type_id).or_default(); + update_count_entry(info.collectibles.entry(collectible), count); + } } data } diff --git a/crates/turbo-tasks-memory/tests/recompute_collectibles.rs b/crates/turbo-tasks-memory/tests/recompute_collectibles.rs new file mode 100644 index 0000000000000..7771df738fd15 --- /dev/null +++ b/crates/turbo-tasks-memory/tests/recompute_collectibles.rs @@ -0,0 +1,92 @@ +#![feature(arbitrary_self_types)] + +use anyhow::{bail, Result}; +use turbo_tasks::{emit, CollectiblesSource, RcStr, State, ValueToString, Vc}; +use turbo_tasks_testing::{register, run}; + +register!(); + +#[tokio::test] +async fn recompute() { + run! { + let input = ChangingInput { + state: State::new(1), + }.cell(); + let output = compute(input, 100); + let read = output.await?; + assert_eq!(read.value, 42); + assert_eq!(read.collectible, "1"); + + for i in 2..100 { + input.await?.state.set(i); + let read = output.strongly_consistent().await?; + assert_eq!(read.value, 42); + assert_eq!(read.collectible, i.to_string()); + } + } +} + +#[turbo_tasks::value] +struct ChangingInput { + state: State, +} + +#[turbo_tasks::value] +struct Output { + value: u32, + collectible: String, +} + +#[turbo_tasks::value] +struct Collectible { + value: u32, +} + +#[turbo_tasks::value_impl] +impl ValueToString for Collectible { + #[turbo_tasks::function] + fn to_string(&self) -> Vc { + Vc::cell(self.value.to_string().into()) + } +} + +#[turbo_tasks::function] +fn inner_compute(input: Vc) -> Vc { + inner_compute2(input, 1000) +} + +#[turbo_tasks::function] +async fn inner_compute2(input: Vc, innerness: u32) -> Result> { + if innerness > 0 { + return Ok(inner_compute2(input, innerness - 1)); + } + let collectible: Vc> = Vc::upcast( + Collectible { + value: *input.await?.state.get(), + } + .cell(), + ); + emit(collectible); + + Ok(Vc::cell(42)) +} + +#[turbo_tasks::function] +async fn compute(input: Vc, innerness: u32) -> Result> { + if innerness > 0 { + return Ok(compute(input, innerness - 1)); + } + let operation = inner_compute(input); + let value = *operation.await?; + let collectibles = operation.peek_collectibles::>(); + if collectibles.len() != 1 { + bail!("expected 1 collectible, found {}", collectibles.len()); + } + let first = *collectibles.iter().next().unwrap(); + let collectible = first.to_string().await?; + Ok(Output { + value, + collectible: collectible.to_string(), + } + .cell()) +} diff --git a/crates/turbo-tasks-testing/Cargo.toml b/crates/turbo-tasks-testing/Cargo.toml index 82b185512083c..71f1f31d9c6fc 100644 --- a/crates/turbo-tasks-testing/Cargo.toml +++ b/crates/turbo-tasks-testing/Cargo.toml @@ -17,5 +17,6 @@ anyhow = { workspace = true } auto-hash-map = { workspace = true } futures = { workspace = true } lazy_static = { workspace = true } +rustc-hash = { workspace = true } tokio = { workspace = true } turbo-tasks = { workspace = true } diff --git a/crates/turbo-tasks-testing/src/lib.rs b/crates/turbo-tasks-testing/src/lib.rs index 7224e7db52fc9..5088018b5daca 100644 --- a/crates/turbo-tasks-testing/src/lib.rs +++ b/crates/turbo-tasks-testing/src/lib.rs @@ -13,10 +13,9 @@ use std::{ }; use anyhow::{anyhow, Result}; -use auto_hash_map::AutoMap; use futures::FutureExt; use turbo_tasks::{ - backend::CellContent, + backend::{CellContent, TaskCollectiblesMap}, event::{Event, EventListener}, registry, test_helpers::with_turbo_tasks_for_testing, @@ -244,12 +243,12 @@ impl TurboTasksApi for VcStorage { fn unemit_collectibles( &self, _trait_type: turbo_tasks::TraitTypeId, - _collectibles: &AutoMap, + _collectibles: &TaskCollectiblesMap, ) { unimplemented!() } - fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> AutoMap { + fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap { unimplemented!() } diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index 69aa087822de6..b5c8c8eabe415 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -1,9 +1,9 @@ use std::{ any::Any, borrow::Cow, - fmt, - fmt::{Debug, Display, Write}, + fmt::{self, Debug, Display, Write}, future::Future, + hash::BuildHasherDefault, mem::take, pin::Pin, sync::Arc, @@ -12,6 +12,7 @@ use std::{ use anyhow::{anyhow, bail, Result}; use auto_hash_map::AutoMap; +use rustc_hash::FxHasher; use serde::{Deserialize, Serialize}; use tracing::Span; @@ -276,6 +277,8 @@ impl CellContent { } } +pub type TaskCollectiblesMap = AutoMap, 1>; + pub trait Backend: Sync + Send { #[allow(unused_variables)] fn initialize(&mut self, task_id_provider: &dyn TaskIdProvider) {} @@ -388,7 +391,7 @@ pub trait Backend: Sync + Send { trait_id: TraitTypeId, reader: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> AutoMap; + ) -> TaskCollectiblesMap; fn emit_collectible( &self, diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index d6ff7ee10dfc7..2738a633e3948 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -24,7 +24,10 @@ use tracing::{info_span, instrument, trace_span, Instrument, Level}; use turbo_tasks_malloc::TurboMalloc; use crate::{ - backend::{Backend, CellContent, PersistentTaskType, TaskExecutionSpec, TransientTaskType}, + backend::{ + Backend, CellContent, PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec, + TransientTaskType, + }, capture_future::{self, CaptureFuture}, event::{Event, EventListener}, id::{BackendJobId, FunctionId, TraitTypeId}, @@ -109,11 +112,11 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { index: CellId, ) -> Result>; - fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> AutoMap; + fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap; fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc); fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32); - fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &AutoMap); + fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap); /// INVALIDATION: Be careful with this, it will not track dependencies, so /// using it could break cache invalidation. @@ -1010,7 +1013,7 @@ impl TurboTasksApi for TurboTasks { .try_read_own_task_cell_untracked(current_task, index, self) } - fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> AutoMap { + fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap { self.backend.read_task_collectibles( task, trait_id, @@ -1038,7 +1041,7 @@ impl TurboTasksApi for TurboTasks { ); } - fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &AutoMap) { + fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) { for (&collectible, &count) in collectibles { if count > 0 { self.backend.unemit_collectible(