diff --git a/app/buck2_bxl/BUCK b/app/buck2_bxl/BUCK index 61c6faee60fb3..1ec0863af8044 100644 --- a/app/buck2_bxl/BUCK +++ b/app/buck2_bxl/BUCK @@ -22,6 +22,7 @@ rust_library( "fbsource//third-party/rust:indexmap", "fbsource//third-party/rust:itertools", "fbsource//third-party/rust:num-bigint", + "fbsource//third-party/rust:once_cell", "fbsource//third-party/rust:serde", "fbsource//third-party/rust:serde_json", "fbsource//third-party/rust:tokio", diff --git a/app/buck2_bxl/Cargo.toml b/app/buck2_bxl/Cargo.toml index a9b5f19744229..ebb84d504569c 100644 --- a/app/buck2_bxl/Cargo.toml +++ b/app/buck2_bxl/Cargo.toml @@ -51,6 +51,7 @@ buck2_query = { workspace = true } buck2_query_parser = { workspace = true } buck2_server_ctx = { workspace = true } buck2_util = { workspace = true } +once_cell = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/app/buck2_bxl/src/bxl/eval.rs b/app/buck2_bxl/src/bxl/eval.rs index 893206cdf52de..171c68cff2785 100644 --- a/app/buck2_bxl/src/bxl/eval.rs +++ b/app/buck2_bxl/src/bxl/eval.rs @@ -8,7 +8,9 @@ */ use std::cell::RefCell; +use std::future::Future; use std::rc::Rc; +use std::sync::Arc; use buck2_build_api::bxl::result::BxlResult; use buck2_build_api::bxl::types::BxlFunctionLabel; @@ -51,6 +53,7 @@ use dice::DiceComputations; use dice::DiceTransaction; use dupe::Dupe; use itertools::Itertools; +use once_cell::sync::Lazy; use starlark::environment::Module; use starlark::eval::Evaluator; use starlark::values::structs::AllocStruct; @@ -60,6 +63,7 @@ use starlark::values::UnpackValue; use starlark::values::ValueOfUnchecked; use starlark::values::ValueTyped; use starlark_map::ordered_map::OrderedMap; +use tokio::sync::Semaphore; use crate::bxl::key::BxlKey; use crate::bxl::starlark_defs::bxl_function::FrozenBxlFunction; @@ -71,6 +75,31 @@ use crate::bxl::starlark_defs::context::BxlContextCoreData; use crate::bxl::starlark_defs::eval_extra::BxlEvalExtra; use crate::bxl::starlark_defs::functions::BxlErrorWithoutStacktrace; +pub(crate) static LIMITED_EXECUTOR: Lazy> = Lazy::new(|| { + Arc::new(LimitedExecutor::new(500)) // Default working thread of tokio is 512 threads. We set it to 500 for here to leave some room for other things. +}); + +/// A limited executor that can be used to limit the number of concurrent bxl execution threads. +pub(crate) struct LimitedExecutor { + semaphore: Arc, +} + +impl LimitedExecutor { + fn new(limit: usize) -> Self { + Self { + semaphore: Arc::new(Semaphore::new(limit)), + } + } + + pub(crate) async fn execute(&self, task: F) -> T + where + F: Future, + { + let _permit = self.semaphore.acquire().await.unwrap(); + task.await + } +} + pub(crate) async fn eval( ctx: &mut DiceComputations<'_>, key: BxlKey, @@ -84,6 +113,8 @@ pub(crate) async fn eval( let dispatcher = ctx.per_transaction_data().get_dispatcher().dupe(); + let limited_executor = LIMITED_EXECUTOR.clone(); + let (_, futs) = unsafe { // SAFETY: as long as we don't `forget` the return object from `scope_and_collect`, it is safe @@ -95,13 +126,13 @@ pub(crate) async fn eval( // to terminate. scope_and_collect_with_dice(ctx, |ctx, s| { s.spawn_cancellable( - eval_bxl_inner( + limited_executor.execute(eval_bxl_inner( ctx, dispatcher, key, profile_mode_or_instrumentation, liveness, - ), + )), || Err(buck2_error!([], "cancelled")), ) }) diff --git a/app/buck2_bxl/src/bxl/starlark_defs/context/anon_target.rs b/app/buck2_bxl/src/bxl/starlark_defs/context/anon_target.rs index e807786f613e9..e2321cefa3e6e 100644 --- a/app/buck2_bxl/src/bxl/starlark_defs/context/anon_target.rs +++ b/app/buck2_bxl/src/bxl/starlark_defs/context/anon_target.rs @@ -68,6 +68,7 @@ use starlark::values::ValueTyped; use starlark::values::ValueTypedComplex; use starlark_map::ordered_map::OrderedMap; +use crate::bxl::eval::LIMITED_EXECUTOR; use crate::bxl::key::BxlKey; use crate::bxl::starlark_defs::context::BxlContext; use crate::bxl::starlark_defs::context::BxlContextCoreData; @@ -190,6 +191,8 @@ async fn eval_bxl_for_anon_target( // future context, we need the future that it's attached to the cancellation context can // yield and be polled. To ensure that, we have to spawn the future that then enters block_in_place + let limited_executor = LIMITED_EXECUTOR.clone(); + let (_, futs) = unsafe { // SAFETY: as long as we don't `forget` the return object from `scope_and_collect`, it is safe @@ -201,14 +204,14 @@ async fn eval_bxl_for_anon_target( // to terminate. scope_and_collect_with_dice(dice, |dice, s| { s.spawn_cancellable( - eval_bxl_for_anon_target_inner( + limited_executor.execute(eval_bxl_for_anon_target_inner( dice, anon_target, global_cfg_options, dependents_analyses, execution_platform, liveness, - ), + )), || Err(buck2_error!([], "cancelled")), ) }) diff --git a/app/buck2_bxl/src/bxl/starlark_defs/context/dynamic.rs b/app/buck2_bxl/src/bxl/starlark_defs/context/dynamic.rs index 29c164a371a0b..91399be54b208 100644 --- a/app/buck2_bxl/src/bxl/starlark_defs/context/dynamic.rs +++ b/app/buck2_bxl/src/bxl/starlark_defs/context/dynamic.rs @@ -56,6 +56,7 @@ use starlark::values::typing::StarlarkCallableChecked; use starlark::values::OwnedRefFrozenRef; use starlark::values::ValueTyped; +use crate::bxl::eval::LIMITED_EXECUTOR; use crate::bxl::key::BxlDynamicKey; use crate::bxl::starlark_defs::context::starlark_async::BxlSafeDiceComputations; use crate::bxl::starlark_defs::context::BxlContext; @@ -107,6 +108,8 @@ pub(crate) async fn eval_bxl_for_dynamic_output<'v>( print: EventDispatcherPrintHandler(dispatcher.dupe()), }; + let limited_executor = LIMITED_EXECUTOR.clone(); + // Note: because we use `block_in_place`, that will prevent the inner future from being polled // and yielded. So, for cancellation observers to work properly within the dice cancellable // future context, we need the future that it's attached to the cancellation context can @@ -122,7 +125,7 @@ pub(crate) async fn eval_bxl_for_dynamic_output<'v>( // to terminate. scope_and_collect_with_dice(dice_ctx, |dice_ctx, s| { s.spawn_cancellable( - async move { + limited_executor.execute(async move { with_starlark_eval_provider( dice_ctx, &mut StarlarkProfilerOpt::disabled(), @@ -134,7 +137,7 @@ pub(crate) async fn eval_bxl_for_dynamic_output<'v>( }, ) .await - }, + }), || Err(buck2_error!([], "cancelled")), ) }) diff --git a/tests/core/bxl/test_anon_bxl_data/anon_bxl.bxl b/tests/core/bxl/test_anon_bxl_data/anon_bxl.bxl index fb91597ec4755..0f837ad55379b 100644 --- a/tests/core/bxl/test_anon_bxl_data/anon_bxl.bxl +++ b/tests/core/bxl/test_anon_bxl_data/anon_bxl.bxl @@ -92,3 +92,26 @@ check_anon_ouput_artifact = bxl_main( impl = _check_anon_ouput_artifact, cli_args = {}, ) + +def _eval_lost_of_anon_bxl(ctx: bxl.Context): + actions = ctx.bxl_actions().actions + + promise = actions.anon_targets([( + anon, + { + "x": i, + }, + ) for i in range(0, 2000)]).promise + + outputs = [] + for anon_res in ctx.resolve(actions, promise): + outputs.append(anon_res[Info].hello) + + ensured = ctx.output.ensure_multiple(outputs) + for ensure in ensured: + ctx.output.print(ensure.abs_path()) + +eval_lost_of_anon_bxl = bxl_main( + impl = _eval_lost_of_anon_bxl, + cli_args = {}, +)