Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Remove task_pool parameter from par_for_each(_mut) #4705

Closed
wants to merge 11 commits into from
8 changes: 4 additions & 4 deletions benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bevy_ecs::prelude::*;
use bevy_tasks::TaskPool;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use glam::*;

#[derive(Component, Copy, Clone)]
Expand Down Expand Up @@ -29,8 +29,8 @@ impl Benchmark {
)
}));

fn sys(task_pool: Res<TaskPool>, mut query: Query<(&mut Position, &mut Transform)>) {
query.par_for_each_mut(&task_pool, 128, |(mut pos, mut mat)| {
fn sys(mut query: Query<(&mut Position, &mut Transform)>) {
query.par_for_each_mut(128, |(mut pos, mut mat)| {
for _ in 0..100 {
mat.0 = mat.0.inverse();
}
Expand All @@ -39,7 +39,7 @@ impl Benchmark {
});
}

world.insert_resource(TaskPool::default());
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let mut system = IntoSystem::into_system(sys);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
Expand Down
10 changes: 4 additions & 6 deletions crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod tests {
query::{Added, ChangeTrackers, Changed, FilteredAccess, With, Without, WorldQuery},
world::{Mut, World},
};
use bevy_tasks::TaskPool;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use std::{
any::TypeId,
sync::{
Expand Down Expand Up @@ -373,7 +373,7 @@ mod tests {
#[test]
fn par_for_each_dense() {
let mut world = World::new();
let task_pool = TaskPool::default();
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let e1 = world.spawn().insert(A(1)).id();
let e2 = world.spawn().insert(A(2)).id();
let e3 = world.spawn().insert(A(3)).id();
Expand All @@ -382,7 +382,7 @@ mod tests {
let results = Arc::new(Mutex::new(Vec::new()));
world
.query::<(Entity, &A)>()
.par_for_each(&world, &task_pool, 2, |(e, &A(i))| {
.par_for_each(&world, 2, |(e, &A(i))| {
results.lock().unwrap().push((e, i));
});
results.lock().unwrap().sort();
Expand All @@ -395,8 +395,7 @@ mod tests {
#[test]
fn par_for_each_sparse() {
let mut world = World::new();

let task_pool = TaskPool::default();
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let e1 = world.spawn().insert(SparseStored(1)).id();
let e2 = world.spawn().insert(SparseStored(2)).id();
let e3 = world.spawn().insert(SparseStored(3)).id();
Expand All @@ -405,7 +404,6 @@ mod tests {
let results = Arc::new(Mutex::new(Vec::new()));
world.query::<(Entity, &SparseStored)>().par_for_each(
&world,
&task_pool,
2,
|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)),
);
Expand Down
28 changes: 17 additions & 11 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
storage::TableId,
world::{World, WorldId},
};
use bevy_tasks::TaskPool;
use bevy_tasks::ComputeTaskPool;
use fixedbitset::FixedBitSet;
use std::fmt;

Expand Down Expand Up @@ -683,15 +683,17 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
);
}

/// Runs `func` on each query result in parallel using the given `task_pool`.
/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries, see [`Self::par_for_each_mut`] for
/// write-queries.
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
#[inline]
pub fn par_for_each<'w, FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(
&mut self,
world: &'w World,
task_pool: &TaskPool,
batch_size: usize,
func: FN,
) {
Expand All @@ -700,7 +702,6 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
self.update_archetypes(world);
self.par_for_each_unchecked_manual::<ROQueryFetch<Q>, FN>(
world,
task_pool,
batch_size,
func,
world.last_change_tick(),
Expand All @@ -709,12 +710,14 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
}
}

/// Runs `func` on each query result in parallel using the given `task_pool`.
/// Runs `func` on each query result in parallel.
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
#[inline]
pub fn par_for_each_mut<'w, FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
&mut self,
world: &'w mut World,
task_pool: &TaskPool,
batch_size: usize,
func: FN,
) {
Expand All @@ -723,7 +726,6 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
self.update_archetypes(world);
self.par_for_each_unchecked_manual::<QueryFetch<Q>, FN>(
world,
task_pool,
batch_size,
func,
world.last_change_tick(),
Expand All @@ -732,10 +734,13 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
}
}

/// Runs `func` on each query result in parallel using the given `task_pool`.
/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries.
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
///
/// # Safety
///
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
Expand All @@ -744,14 +749,12 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
pub unsafe fn par_for_each_unchecked<'w, FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
&mut self,
world: &'w World,
task_pool: &TaskPool,
batch_size: usize,
func: FN,
) {
self.update_archetypes(world);
self.par_for_each_unchecked_manual::<QueryFetch<Q>, FN>(
world,
task_pool,
batch_size,
func,
world.last_change_tick(),
Expand Down Expand Up @@ -827,6 +830,9 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
/// the current change tick are given. This is faster than the equivalent
/// iter() method, but cannot be chained like a normal [`Iterator`].
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
james7132 marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Safety
///
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
Expand All @@ -840,12 +846,12 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
>(
&self,
world: &'w World,
task_pool: &TaskPool,
batch_size: usize,
func: FN,
last_change_tick: u32,
change_tick: u32,
) {
let task_pool = world.resource::<ComputeTaskPool>().clone();
james7132 marked this conversation as resolved.
Show resolved Hide resolved
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
task_pool.scope(|scope| {
Expand Down
22 changes: 13 additions & 9 deletions crates/bevy_ecs/src/system/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
},
world::{Mut, World},
};
use bevy_tasks::TaskPool;
use std::{any::TypeId, fmt::Debug};

/// Provides scoped access to components in a [`World`].
Expand Down Expand Up @@ -493,7 +492,7 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
};
}

/// Runs `f` on each query result in parallel using the given [`TaskPool`].
/// Runs `f` on each query result in parallel using the [`World`]'s [`ComputeTaskPool`].
///
/// This can only be called for immutable data, see [`Self::par_for_each_mut`] for
/// mutable access.
Expand All @@ -502,21 +501,24 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
///
/// The items in the query get sorted into batches.
/// Internally, this function spawns a group of futures that each take on a `batch_size` sized section of the items (or less if the division is not perfect).
/// Then, the tasks in the [`TaskPool`] work through these futures.
/// Then, the tasks in the [`ComputeTaskPool`] work through these futures.
///
/// You can use this value to tune between maximum multithreading ability (many small batches) and minimum parallelization overhead (few big batches).
/// Rule of thumb: If the function body is (mostly) computationally expensive but there are not many items, a small batch size (=more batches) may help to even out the load.
/// If the body is computationally cheap and you have many items, a large batch size (=fewer batches) avoids spawning additional futures that don't help to even out the load.
///
/// # Arguments
///
///* `task_pool` - The [`TaskPool`] to use
///* `batch_size` - The number of batches to spawn
///* `f` - The function to run on each item in the query
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
///
/// [`ComputeTaskPool`]: bevy_tasks::prelude::ComputeTaskPool
#[inline]
pub fn par_for_each<'this>(
&'this self,
task_pool: &TaskPool,
batch_size: usize,
f: impl Fn(ROQueryItem<'this, Q>) + Send + Sync + Clone,
) {
Expand All @@ -526,7 +528,6 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
self.state
.par_for_each_unchecked_manual::<ROQueryFetch<Q>, _>(
self.world,
task_pool,
batch_size,
f,
self.last_change_tick,
Expand All @@ -535,12 +536,16 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
};
}

/// Runs `f` on each query result in parallel using the given [`TaskPool`].
/// Runs `f` on each query result in parallel using the [`World`]'s [`ComputeTaskPool`].
/// See [`Self::par_for_each`] for more details.
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
///
/// [`ComputeTaskPool`]: bevy_tasks::prelude::ComputeTaskPool
#[inline]
pub fn par_for_each_mut<'a, FN: Fn(QueryItem<'a, Q>) + Send + Sync + Clone>(
&'a mut self,
task_pool: &TaskPool,
batch_size: usize,
f: FN,
) {
Expand All @@ -550,7 +555,6 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
self.state
.par_for_each_unchecked_manual::<QueryFetch<Q>, FN>(
self.world,
task_pool,
batch_size,
f,
self.last_change_tick,
Expand Down
16 changes: 6 additions & 10 deletions examples/ecs/parallel_query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bevy::{prelude::*, tasks::prelude::*};
use bevy::prelude::*;
use rand::random;

#[derive(Component, Deref)]
Expand All @@ -21,26 +21,22 @@ fn spawn_system(mut commands: Commands, asset_server: Res<AssetServer>) {
}

// Move sprites according to their velocity
fn move_system(pool: Res<ComputeTaskPool>, mut sprites: Query<(&mut Transform, &Velocity)>) {
fn move_system(mut sprites: Query<(&mut Transform, &Velocity)>) {
// Compute the new location of each sprite in parallel on the
// ComputeTaskPool using batches of 32 sprites
//
// This example is only for demonstrative purposes. Using a
// This example is only for demonstrative purposes. Using a
// ParallelIterator for an inexpensive operation like addition on only 128
// elements will not typically be faster than just using a normal Iterator.
// See the ParallelIterator documentation for more information on when
// to use or not use ParallelIterator over a normal Iterator.
sprites.par_for_each_mut(&pool, 32, |(mut transform, velocity)| {
sprites.par_for_each_mut(32, |(mut transform, velocity)| {
transform.translation += velocity.extend(0.0);
});
}

// Bounce sprites outside the window
fn bounce_system(
pool: Res<ComputeTaskPool>,
windows: Res<Windows>,
mut sprites: Query<(&Transform, &mut Velocity)>,
) {
fn bounce_system(windows: Res<Windows>, mut sprites: Query<(&Transform, &mut Velocity)>) {
let window = windows.primary();
let width = window.width();
let height = window.height();
Expand All @@ -51,7 +47,7 @@ fn bounce_system(
sprites
// Batch size of 32 is chosen to limit the overhead of
// ParallelIterator, since negating a vector is very inexpensive.
.par_for_each_mut(&pool, 32, |(transform, mut v)| {
.par_for_each_mut(32, |(transform, mut v)| {
if !(left < transform.translation.x
&& transform.translation.x < right
&& bottom < transform.translation.y
Expand Down