From b8174670a9239d7b37e6baa6e11a9b61427e1180 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 2 Jul 2024 12:03:19 -0700 Subject: [PATCH] Store aggregate read/execute count statistics (vercel/turbo#8286) ### Why? I want to determine percent "cache hit" rates for tasks. Tasks with very low task hit rates should likely have their annotations removed. Eventually, we might be able to use this information in a more automated way, by leaving the annotation in, but skipping the caching for low-cache-hit tasks. ### What? This implementation only logs persistent tasks, which should compromise all or the majority of tasks we care about for memory usage. The implementation should bail out quickly if caching is disabled, so it should be okay to leave in release builds, which is important for making it easy to gather statistics from willing users. ### Testing Run included unit tests! This is used as part of https://github.com/vercel/next.js/compare/canary...bgw/cache-hit-stats --- crates/turbo-tasks-memory/Cargo.toml | 4 +- crates/turbo-tasks-memory/src/lib.rs | 2 + .../turbo-tasks-memory/src/memory_backend.rs | 52 ++++ .../turbo-tasks-memory/src/task_statistics.rs | 86 ++++++ .../tests/task_statistics.rs | 272 ++++++++++++++++++ crates/turbo-tasks/src/backend.rs | 96 ++++--- 6 files changed, 474 insertions(+), 38 deletions(-) create mode 100644 crates/turbo-tasks-memory/src/task_statistics.rs create mode 100644 crates/turbo-tasks-memory/tests/task_statistics.rs diff --git a/crates/turbo-tasks-memory/Cargo.toml b/crates/turbo-tasks-memory/Cargo.toml index 150123a239aeb..1ab1dcef38085 100644 --- a/crates/turbo-tasks-memory/Cargo.toml +++ b/crates/turbo-tasks-memory/Cargo.toml @@ -25,6 +25,7 @@ parking_lot = { workspace = true } priority-queue = "1.3.0" ref-cast = "1.0.20" rustc-hash = { workspace = true } +serde = { workspace = true } smallvec = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } @@ -38,8 +39,9 @@ criterion = { workspace = true, features = ["async_tokio"] } lazy_static = { workspace = true } loom = "0.7.2" rand = { workspace = true, features = ["small_rng"] } +regex = { workspace = true } rstest = { workspace = true } -serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } turbo-tasks-testing = { workspace = true } diff --git a/crates/turbo-tasks-memory/src/lib.rs b/crates/turbo-tasks-memory/src/lib.rs index f1bf8e8ae7bfe..6dbce8623bc1c 100644 --- a/crates/turbo-tasks-memory/src/lib.rs +++ b/crates/turbo-tasks-memory/src/lib.rs @@ -17,6 +17,8 @@ mod memory_backend; mod memory_backend_with_pg; mod output; mod task; +mod task_statistics; pub use memory_backend::MemoryBackend; pub use memory_backend_with_pg::MemoryBackendWithPersistedGraph; +pub use task_statistics::{TaskStatistics, TaskStatisticsApi}; diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 08908a544c856..64fe7cb9182e8 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -34,6 +34,7 @@ use crate::{ gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY}, output::Output, task::{Task, DEPENDENCIES_TO_TRACK}, + task_statistics::TaskStatisticsApi, }; fn prehash_task_type(task_type: PersistentTaskType) -> PreHashed { @@ -49,6 +50,7 @@ pub struct MemoryBackend { memory_limit: usize, gc_queue: Option, idle_gc_active: AtomicBool, + task_statistics: TaskStatisticsApi, } impl Default for MemoryBackend { @@ -71,6 +73,7 @@ impl MemoryBackend { memory_limit, gc_queue: (memory_limit != usize::MAX).then(GcQueue::new), idle_gc_active: AtomicBool::new(false), + task_statistics: TaskStatisticsApi::default(), } } @@ -231,6 +234,10 @@ impl MemoryBackend { }); } } + + pub fn task_statistics(&self) -> &TaskStatisticsApi { + &self.task_statistics + } } impl Backend for MemoryBackend { @@ -529,8 +536,53 @@ impl Backend for MemoryBackend { self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks) { // fast pass without creating a new task + self.task_statistics().map(|stats| match &*task_type { + PersistentTaskType::ResolveNative(function_id, ..) + | PersistentTaskType::Native(function_id, ..) => { + stats.increment_cache_hit(*function_id); + } + PersistentTaskType::ResolveTrait(trait_type, name, inputs) => { + // HACK: Resolve the first argument (`self`) in order to attribute the cache hit + // to the concrete trait implementation, rather than the dynamic trait method. + // This ensures cache hits and misses are both attributed to the same thing. + // + // Because this task already resolved, in most cases `self` should either be + // resolved, or already in the process of being resolved. + // + // However, `self` could become unloaded due to cache eviction, and this might + // trigger an otherwise unnecessary re-evalutation. + // + // This is a potentially okay trade-off as long as we don't log statistics by + // default. The alternative would be to store function ids on completed + // ResolveTrait tasks. + let trait_type = *trait_type; + let name = name.clone(); + let this = inputs + .first() + .cloned() + .expect("No arguments for trait call"); + let stats = Arc::clone(stats); + turbo_tasks.run_once(Box::pin(async move { + let function_id = + PersistentTaskType::resolve_trait_method(trait_type, name, this) + .await?; + stats.increment_cache_hit(function_id); + Ok(()) + })); + } + }); task } else { + self.task_statistics().map(|stats| match &*task_type { + PersistentTaskType::Native(function_id, ..) => { + stats.increment_cache_miss(*function_id); + } + PersistentTaskType::ResolveTrait(..) | PersistentTaskType::ResolveNative(..) => { + // these types re-execute themselves as `Native` after + // resolving their arguments, skip counting their + // executions here to avoid double-counting + } + }); // It's important to avoid overallocating memory as this will go into the task // cache and stay there forever. We can to be as small as possible. let (task_type_hash, mut task_type) = PreHashed::into_parts(task_type); diff --git a/crates/turbo-tasks-memory/src/task_statistics.rs b/crates/turbo-tasks-memory/src/task_statistics.rs new file mode 100644 index 0000000000000..6df3abc0ffc3a --- /dev/null +++ b/crates/turbo-tasks-memory/src/task_statistics.rs @@ -0,0 +1,86 @@ +use std::{ + hash::BuildHasherDefault, + sync::{Arc, OnceLock}, +}; + +use dashmap::DashMap; +use rustc_hash::FxHasher; +use serde::{ser::SerializeMap, Serialize, Serializer}; +use turbo_tasks::{registry, FunctionId}; + +/// An API for optionally enabling, updating, and reading aggregated statistics. +#[derive(Default)] +pub struct TaskStatisticsApi { + inner: OnceLock>, +} + +impl TaskStatisticsApi { + pub fn enable(&self) -> &Arc { + self.inner.get_or_init(|| { + Arc::new(TaskStatistics { + inner: DashMap::with_hasher(Default::default()), + }) + }) + } + + pub fn is_enabled(&self) -> bool { + self.inner.get().is_some() + } + + // Calls `func` if statistics have been enabled (via + // [`TaskStatisticsApi::enable`]). + pub fn map(&self, func: impl FnOnce(&Arc) -> T) -> Option { + self.get().map(func) + } + + // Calls `func` if statistics have been enabled (via + // [`TaskStatisticsApi::enable`]). + pub fn get(&self) -> Option<&Arc> { + self.inner.get() + } +} + +/// A type representing the enabled state of [`TaskStatisticsApi`]. Implements +/// [`serde::Serialize`]. +pub struct TaskStatistics { + inner: DashMap>, +} + +impl TaskStatistics { + pub(crate) fn increment_cache_hit(&self, function_id: FunctionId) { + self.with_task_type_statistics(function_id, |stats| stats.cache_hit += 1) + } + + pub(crate) fn increment_cache_miss(&self, function_id: FunctionId) { + self.with_task_type_statistics(function_id, |stats| stats.cache_miss += 1) + } + + fn with_task_type_statistics( + &self, + task_function_id: FunctionId, + func: impl Fn(&mut TaskFunctionStatistics), + ) { + func(self.inner.entry(task_function_id).or_default().value_mut()) + } +} + +/// Statistics for an individual function. +#[derive(Default, Serialize)] +struct TaskFunctionStatistics { + cache_hit: u32, + cache_miss: u32, +} + +impl Serialize for TaskStatistics { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(self.inner.len()))?; + for entry in &self.inner { + let key = registry::get_function_global_name(*entry.key()); + map.serialize_entry(key, entry.value())?; + } + map.end() + } +} diff --git a/crates/turbo-tasks-memory/tests/task_statistics.rs b/crates/turbo-tasks-memory/tests/task_statistics.rs new file mode 100644 index 0000000000000..98c1dae4e8be7 --- /dev/null +++ b/crates/turbo-tasks-memory/tests/task_statistics.rs @@ -0,0 +1,272 @@ +#![feature(arbitrary_self_types)] + +use std::{ + future::{Future, IntoFuture}, + sync::Arc, +}; + +use anyhow::Result; +use once_cell::sync::Lazy; +use regex::Regex; +use serde_json::json; +use turbo_tasks::{TurboTasks, Vc}; +use turbo_tasks_memory::MemoryBackend; +use turbo_tasks_testing::register; + +register!(); + +#[tokio::test] +async fn test_simple_task() { + run_with_tt(|tt| async move { + for i in 0..10 { + double(i).await.unwrap(); + // use cached results + double(i).await.unwrap(); + } + for i in 0..5 { + double(i).await.unwrap(); + } + assert_eq!( + stats_json(&tt), + json!({ + "turbo-tasks-memory::::double": { + "cache_miss": 10, + "cache_hit": 15, + }, + }) + ); + }) + .await; +} + +#[tokio::test] +async fn test_await_same_vc_multiple_times() { + run_with_tt(|tt| async move { + let dvc = double(0); + // this is awaited multiple times, but only resolved once + tokio::try_join!(dvc.into_future(), dvc.into_future()).unwrap(); + dvc.await.unwrap(); + assert_eq!( + stats_json(&tt), + json!({ + "turbo-tasks-memory::::double": { + "cache_miss": 1, + "cache_hit": 0, + }, + }) + ); + }) + .await; +} + +#[tokio::test] +async fn test_vc_receiving_task() { + run_with_tt(|tt| async move { + for i in 0..10 { + let dvc = double(i); + double_vc(dvc).await.unwrap(); + // use cached results + double_vc(dvc).await.unwrap(); + } + for i in 0..5 { + let dvc = double(i); + double_vc(dvc).await.unwrap(); + } + assert_eq!( + stats_json(&tt), + json!({ + "turbo-tasks-memory::::double": { + "cache_miss": 10, + "cache_hit": 5, + }, + "turbo-tasks-memory::::double_vc": { + "cache_miss": 10, + "cache_hit": 15, + }, + }) + ); + }) + .await; +} + +#[tokio::test] +async fn test_trait_methods() { + run_with_tt(|tt| async move { + for i in 0..10 { + let wvc = wrap(i); + tokio::try_join!(wvc.double().into_future(), wvc.double().into_future()).unwrap(); + tokio::try_join!(wvc.double_vc().into_future(), wvc.double_vc().into_future()).unwrap(); + } + // use cached results + for i in 0..5 { + let wvc = wrap(i); + wvc.double().await.unwrap(); + wvc.double_vc().await.unwrap(); + } + assert_eq!( + stats_json(&tt), + json!({ + "turbo-tasks-memory::::wrap": { + "cache_miss": 10, + "cache_hit": 5, + }, + "turbo-tasks-memory::::WrappedU64::Doublable::double": { + "cache_miss": 10, + "cache_hit": 15, + }, + "turbo-tasks-memory::::WrappedU64::Doublable::double_vc": { + "cache_miss": 10, + "cache_hit": 15, + }, + }) + ); + }) + .await; +} + +#[tokio::test] +async fn test_dyn_trait_methods() { + run_with_tt(|tt| async move { + for i in 0..10 { + let wvc: Vc> = Vc::upcast(wrap(i)); + let _ = tokio::try_join!(wvc.double().resolve(), wvc.double().resolve()).unwrap(); + let _ = tokio::try_join!(wvc.double_vc().resolve(), wvc.double_vc().resolve()).unwrap(); + } + // use cached results + for i in 0..5 { + let wvc: Vc> = Vc::upcast(wrap(i)); + let _ = wvc.double().resolve().await.unwrap(); + let _ = wvc.double_vc().resolve().await.unwrap(); + } + // use cached results without dynamic dispatch + for i in 0..2 { + let wvc = wrap(i); + let _ = wvc.double().await.unwrap(); + let _ = wvc.double_vc().await.unwrap(); + } + assert_eq!( + stats_json(&tt), + json!({ + "turbo-tasks-memory::::wrap": { + "cache_miss": 10, + "cache_hit": 7, + }, + "turbo-tasks-memory::::WrappedU64::Doublable::double": { + "cache_miss": 10, + "cache_hit": 17, + }, + "turbo-tasks-memory::::WrappedU64::Doublable::double_vc": { + "cache_miss": 10, + "cache_hit": 17, + }, + }) + ); + }) + .await; +} + +// creates Vcs, but doesn't ever execute them +#[tokio::test] +async fn test_no_execution() { + run_with_tt(|tt| async move { + // don't await this! + let _ = wrap_vc(double_vc(double(123))).double().double_vc(); + assert_eq!( + stats_json(&tt), + json!({ + "turbo-tasks-memory::::double": { + "cache_miss": 1, + "cache_hit": 0, + }, + }) + ); + }) + .await; +} + +// Internally, this function uses `PersistentTaskType::Native`. +#[turbo_tasks::function] +fn double(val: u64) -> Vc { + Vc::cell(val * 2) +} + +// Internally, this function uses `PersistentTaskType::ResolveNative`. +#[turbo_tasks::function] +async fn double_vc(val: Vc) -> Result> { + let val = *val.await?; + Ok(Vc::cell(val * 2)) +} + +#[turbo_tasks::value] +struct WrappedU64(u64); + +#[turbo_tasks::function] +fn wrap(val: u64) -> Vc { + WrappedU64(val).cell() +} + +#[turbo_tasks::function] +async fn wrap_vc(val: Vc) -> Result> { + Ok(WrappedU64(*val.await?).cell()) +} + +#[turbo_tasks::value_trait] +pub trait Doublable { + fn double(&self) -> Vc; + fn double_vc(self: Vc) -> Vc; +} + +#[turbo_tasks::value_impl] +impl Doublable for WrappedU64 { + #[turbo_tasks::function] + fn double(&self) -> Vc { + WrappedU64(self.0 * 2).cell() + } + + #[turbo_tasks::function] + async fn double_vc(self: Vc) -> Result> { + let val = self.await?.0; + Ok(WrappedU64(val * 2).cell()) + } +} + +#[turbo_tasks::function] +async fn fail(val: u64) -> Result> { + anyhow::bail!("failed using {val}"); +} + +async fn run_with_tt(func: impl FnOnce(Arc>) -> Fut) +where + Fut: Future + Send + 'static, +{ + *REGISTER; + let tt = TurboTasks::new(MemoryBackend::default()); + tt.backend().task_statistics().enable(); + let fut = func(Arc::clone(&tt)); + tt.run_once(async move { + fut.await; + Ok(()) + }) + .await + .unwrap(); +} + +fn stats_json(tt: &TurboTasks) -> serde_json::Value { + remove_hashes(serde_json::to_value(tt.backend().task_statistics().get()).unwrap()) +} + +// Global task identifiers can contain a hash of the crate and dependencies. +// Remove that so that we can compare against a stable value in tests. +fn remove_hashes(mut json: serde_json::Value) -> serde_json::Value { + static HASH_RE: Lazy = Lazy::new(|| Regex::new("@[^:]+").unwrap()); + match &mut json { + serde_json::Value::Object(map) => { + let old_map = std::mem::take(map); + for (k, v) in old_map { + map.insert(HASH_RE.replace(&k, "").into_owned(), v); + } + } + _ => unreachable!("expected object"), + }; + json +} diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index c18a5c31ceec0..92d94939edf7e 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -375,6 +375,18 @@ impl PersistentTaskType { Ok(turbo_tasks.native_call(fn_id, inputs)) } + pub async fn resolve_trait_method( + trait_type: TraitTypeId, + name: Cow<'static, str>, + this: ConcreteTaskInput, + ) -> Result { + Self::resolve_trait_method_from_value( + trait_type, + this.resolve().await?.resolve_to_value().await?, + name, + ) + } + pub async fn run_resolve_trait( trait_type: TraitTypeId, name: Cow<'static, str>, @@ -383,45 +395,55 @@ impl PersistentTaskType { ) -> Result { let mut resolved_inputs = Vec::with_capacity(inputs.len()); let mut iter = inputs.into_iter(); - if let Some(this) = iter.next() { - let this = this.resolve().await?; - let this_value = this.clone().resolve_to_value().await?; - match this_value.get_trait_method(trait_type, name) { - Ok(native_fn) => { - resolved_inputs.push(this); - for input in iter { - resolved_inputs.push(input) - } - Ok(turbo_tasks.dynamic_call(native_fn, resolved_inputs)) - } - Err(name) => { - if !this_value.has_trait(trait_type) { - let traits = - this_value - .traits() - .iter() - .fold(String::new(), |mut out, t| { - let _ = write!(out, " {}", t); - out - }); - Err(anyhow!( - "{} doesn't implement {} (only{})", - this_value, - registry::get_trait(trait_type), - traits, - )) - } else { - Err(anyhow!( - "{} implements trait {}, but method {} is missing", - this_value, - registry::get_trait(trait_type), - name - )) - } + + let this = iter + .next() + .expect("No arguments for trait call") + .resolve() + .await?; + let this_value = this.clone().resolve_to_value().await?; + + let native_fn = Self::resolve_trait_method_from_value(trait_type, this_value, name)?; + resolved_inputs.push(this); + for input in iter { + resolved_inputs.push(input) + } + Ok(turbo_tasks.dynamic_call(native_fn, resolved_inputs)) + } + + /// Shared helper used by [`resolve_trait_method`] and + /// [`run_resolve_trait`]. + fn resolve_trait_method_from_value( + trait_type: TraitTypeId, + this_value: ConcreteTaskInput, + name: Cow<'static, str>, + ) -> Result { + match this_value.get_trait_method(trait_type, name) { + Ok(native_fn) => Ok(native_fn), + Err(name) => { + if !this_value.has_trait(trait_type) { + let traits = this_value + .traits() + .iter() + .fold(String::new(), |mut out, t| { + let _ = write!(out, " {}", t); + out + }); + Err(anyhow!( + "{} doesn't implement {} (only{})", + this_value, + registry::get_trait(trait_type), + traits, + )) + } else { + Err(anyhow!( + "{} implements trait {}, but method {} is missing", + this_value, + registry::get_trait(trait_type), + name + )) } } - } else { - panic!("No arguments for trait call"); } }