Skip to content

Commit

Permalink
Limited the bxl executing threads
Browse files Browse the repository at this point in the history
Summary:
d16r reported that in bxl resolve a lot of anon targets, bxl will stuck. It is because of we use `block_in_place` for eval bxl, dynamic target, and anon targets. And we will also call `block_on` for each api that will access dice, like `query`, `configured_target_node`, etc. When handling a lot of anon target, it will exceed the tokio  thread pool limit. In d16r case, when it creates 511 anon target, it will stuck.

 {F1974276542}

This diff limited the number of threads running eval anon target and dynamic output in bxl using semaphore.

Reviewed By: JakobDegen

Differential Revision: D68041699

fbshipit-source-id: f8f98a1e7f35baf2278dfc8f74fe5b604fc86e24
  • Loading branch information
Nero5023 authored and facebook-github-bot committed Jan 11, 2025
1 parent e324b9b commit 2b3b02d
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 6 deletions.
1 change: 1 addition & 0 deletions app/buck2_bxl/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rust_library(
"fbsource//third-party/rust:indexmap",
"fbsource//third-party/rust:itertools",
"fbsource//third-party/rust:num-bigint",
"fbsource//third-party/rust:once_cell",
"fbsource//third-party/rust:serde",
"fbsource//third-party/rust:serde_json",
"fbsource//third-party/rust:tokio",
Expand Down
1 change: 1 addition & 0 deletions app/buck2_bxl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ buck2_query = { workspace = true }
buck2_query_parser = { workspace = true }
buck2_server_ctx = { workspace = true }
buck2_util = { workspace = true }
once_cell = { workspace = true }

[dev-dependencies]
ctor = { workspace = true }
Expand Down
35 changes: 33 additions & 2 deletions app/buck2_bxl/src/bxl/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
*/

use std::cell::RefCell;
use std::future::Future;
use std::rc::Rc;
use std::sync::Arc;

use buck2_build_api::bxl::result::BxlResult;
use buck2_build_api::bxl::types::BxlFunctionLabel;
Expand Down Expand Up @@ -51,6 +53,7 @@ use dice::DiceComputations;
use dice::DiceTransaction;
use dupe::Dupe;
use itertools::Itertools;
use once_cell::sync::Lazy;
use starlark::environment::Module;
use starlark::eval::Evaluator;
use starlark::values::structs::AllocStruct;
Expand All @@ -60,6 +63,7 @@ use starlark::values::UnpackValue;
use starlark::values::ValueOfUnchecked;
use starlark::values::ValueTyped;
use starlark_map::ordered_map::OrderedMap;
use tokio::sync::Semaphore;

use crate::bxl::key::BxlKey;
use crate::bxl::starlark_defs::bxl_function::FrozenBxlFunction;
Expand All @@ -71,6 +75,31 @@ use crate::bxl::starlark_defs::context::BxlContextCoreData;
use crate::bxl::starlark_defs::eval_extra::BxlEvalExtra;
use crate::bxl::starlark_defs::functions::BxlErrorWithoutStacktrace;

pub(crate) static LIMITED_EXECUTOR: Lazy<Arc<LimitedExecutor>> = Lazy::new(|| {
Arc::new(LimitedExecutor::new(500)) // Default working thread of tokio is 512 threads. We set it to 500 for here to leave some room for other things.
});

/// A limited executor that can be used to limit the number of concurrent bxl execution threads.
pub(crate) struct LimitedExecutor {
semaphore: Arc<Semaphore>,
}

impl LimitedExecutor {
fn new(limit: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(limit)),
}
}

pub(crate) async fn execute<F, T>(&self, task: F) -> T
where
F: Future<Output = T>,
{
let _permit = self.semaphore.acquire().await.unwrap();
task.await
}
}

pub(crate) async fn eval(
ctx: &mut DiceComputations<'_>,
key: BxlKey,
Expand All @@ -84,6 +113,8 @@ pub(crate) async fn eval(

let dispatcher = ctx.per_transaction_data().get_dispatcher().dupe();

let limited_executor = LIMITED_EXECUTOR.clone();

let (_, futs) = unsafe {
// SAFETY: as long as we don't `forget` the return object from `scope_and_collect`, it is safe

Expand All @@ -95,13 +126,13 @@ pub(crate) async fn eval(
// to terminate.
scope_and_collect_with_dice(ctx, |ctx, s| {
s.spawn_cancellable(
eval_bxl_inner(
limited_executor.execute(eval_bxl_inner(
ctx,
dispatcher,
key,
profile_mode_or_instrumentation,
liveness,
),
)),
|| Err(buck2_error!([], "cancelled")),
)
})
Expand Down
7 changes: 5 additions & 2 deletions app/buck2_bxl/src/bxl/starlark_defs/context/anon_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use starlark::values::ValueTyped;
use starlark::values::ValueTypedComplex;
use starlark_map::ordered_map::OrderedMap;

use crate::bxl::eval::LIMITED_EXECUTOR;
use crate::bxl::key::BxlKey;
use crate::bxl::starlark_defs::context::BxlContext;
use crate::bxl::starlark_defs::context::BxlContextCoreData;
Expand Down Expand Up @@ -190,6 +191,8 @@ async fn eval_bxl_for_anon_target(
// future context, we need the future that it's attached to the cancellation context can
// yield and be polled. To ensure that, we have to spawn the future that then enters block_in_place

let limited_executor = LIMITED_EXECUTOR.clone();

let (_, futs) = unsafe {
// SAFETY: as long as we don't `forget` the return object from `scope_and_collect`, it is safe

Expand All @@ -201,14 +204,14 @@ async fn eval_bxl_for_anon_target(
// to terminate.
scope_and_collect_with_dice(dice, |dice, s| {
s.spawn_cancellable(
eval_bxl_for_anon_target_inner(
limited_executor.execute(eval_bxl_for_anon_target_inner(
dice,
anon_target,
global_cfg_options,
dependents_analyses,
execution_platform,
liveness,
),
)),
|| Err(buck2_error!([], "cancelled")),
)
})
Expand Down
7 changes: 5 additions & 2 deletions app/buck2_bxl/src/bxl/starlark_defs/context/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use starlark::values::typing::StarlarkCallableChecked;
use starlark::values::OwnedRefFrozenRef;
use starlark::values::ValueTyped;

use crate::bxl::eval::LIMITED_EXECUTOR;
use crate::bxl::key::BxlDynamicKey;
use crate::bxl::starlark_defs::context::starlark_async::BxlSafeDiceComputations;
use crate::bxl::starlark_defs::context::BxlContext;
Expand Down Expand Up @@ -107,6 +108,8 @@ pub(crate) async fn eval_bxl_for_dynamic_output<'v>(
print: EventDispatcherPrintHandler(dispatcher.dupe()),
};

let limited_executor = LIMITED_EXECUTOR.clone();

// Note: because we use `block_in_place`, that will prevent the inner future from being polled
// and yielded. So, for cancellation observers to work properly within the dice cancellable
// future context, we need the future that it's attached to the cancellation context can
Expand All @@ -122,7 +125,7 @@ pub(crate) async fn eval_bxl_for_dynamic_output<'v>(
// to terminate.
scope_and_collect_with_dice(dice_ctx, |dice_ctx, s| {
s.spawn_cancellable(
async move {
limited_executor.execute(async move {
with_starlark_eval_provider(
dice_ctx,
&mut StarlarkProfilerOpt::disabled(),
Expand All @@ -134,7 +137,7 @@ pub(crate) async fn eval_bxl_for_dynamic_output<'v>(
},
)
.await
},
}),
|| Err(buck2_error!([], "cancelled")),
)
})
Expand Down
23 changes: 23 additions & 0 deletions tests/core/bxl/test_anon_bxl_data/anon_bxl.bxl
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,26 @@ check_anon_ouput_artifact = bxl_main(
impl = _check_anon_ouput_artifact,
cli_args = {},
)

def _eval_lost_of_anon_bxl(ctx: bxl.Context):
actions = ctx.bxl_actions().actions

promise = actions.anon_targets([(
anon,
{
"x": i,
},
) for i in range(0, 2000)]).promise

outputs = []
for anon_res in ctx.resolve(actions, promise):
outputs.append(anon_res[Info].hello)

ensured = ctx.output.ensure_multiple(outputs)
for ensure in ensured:
ctx.output.print(ensure.abs_path())

eval_lost_of_anon_bxl = bxl_main(
impl = _eval_lost_of_anon_bxl,
cli_args = {},
)

0 comments on commit 2b3b02d

Please sign in to comment.