diff --git a/crates/bevy_tasks/src/iter.rs b/crates/bevy_tasks/src/iter.rs new file mode 100644 index 00000000000000..3a3d61d0d115bf --- /dev/null +++ b/crates/bevy_tasks/src/iter.rs @@ -0,0 +1,183 @@ +use crate::TaskPool; + +/// ParallelIterator closely emulates the std::iter::Iterator +/// interface. However, it uses bevy_task to compute batches in parallel. +pub trait ParallelIterator +where + B: Iterator + Send, + Self: Sized + Send, +{ + type Item; + + fn next_batch(&mut self) -> Option; + fn task_pool(&self) -> &TaskPool; + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } + + fn count(mut self) -> usize { + self.task_pool() + .clone() + .scope(|s| { + while let Some(batch) = self.next_batch() { + s.spawn(async move { batch.count() }) + } + }) + .iter() + .sum() + } + + fn last(mut self) -> Option { + let mut last_item = None; + loop { + match self.next_batch() { + Some(batch) => last_item = batch.last(), + None => break, + } + } + last_item + } + + // TODO: Optimize with size_hint on each batch + fn nth(mut self, n: usize) -> Option { + let mut i = 0; + while let Some(batch) = self.next_batch() { + for item in batch { + if i == n { + return Some(item); + } + i += 1; + } + } + None + } + + // TODO: Use IntoParallelIterator for U + fn chain(self, other: U) -> Chain + where + U: ParallelIterator, + { + Chain { + left: self, + right: other, + left_in_progress: true, + } + } + + // TODO: Use IntoParallelIterator for U + fn zip(self, other: U) -> Zip + where + B2: Iterator + Send, + U: ParallelIterator, + { + Zip { + left: self, + left_batch: None, + right: other, + right_batch: None, + } + } + + fn map(self, f: F) -> Map + where + F: FnMut(Self::Item) -> T + Send + Clone, + { + Map { iter: self, f } + } + + fn for_each(mut self, f: F) + where + F: FnMut(Self::Item) + Send + Clone + Sync, + { + self.task_pool().clone().scope(|s| { + while let Some(batch) = self.next_batch() { + let newf = f.clone(); + s.spawn(async move { + batch.for_each(newf); + }); + } + }); + } +} + +pub struct Chain { + left: T, + right: U, + left_in_progress: bool, +} + +impl ParallelIterator for Chain +where + B: Iterator + Send, + T: ParallelIterator, + U: ParallelIterator, +{ + type Item = T::Item; + + fn next_batch(&mut self) -> Option { + if self.left_in_progress { + match self.left.next_batch() { + b @ Some(_) => return b, + None => self.left_in_progress = false, + } + } + self.right.next_batch() + } + + fn task_pool(&self) -> &TaskPool { + if self.left_in_progress { + self.left.task_pool() + } else { + self.right.task_pool() + } + } +} + +pub struct Zip { + left: T, + left_batch: Option, + right: U, + right_batch: Option, +} + +impl ParallelIterator> for Zip +where + B1: Iterator + Send, + B2: Iterator + Send, + T: ParallelIterator, + U: ParallelIterator, +{ + type Item = (T::Item, U::Item); + + fn next_batch(&mut self) -> Option> { + unimplemented!() + } + + // TODO: not sure what to do with this + fn task_pool(&self) -> &TaskPool { + self.left.task_pool() + } +} + +pub struct Map { + iter: P, + f: F, +} + +impl ParallelIterator> for Map +where + B: Iterator + Send, + U: ParallelIterator, + F: FnMut(U::Item) -> T + Send + Clone, +{ + type Item = T; + + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|b| b.map(self.f.clone())) + } + + fn task_pool(&self) -> &TaskPool { + self.iter.task_pool() + } +} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 8ac79b3e1cf377..b3fa4087a8c22d 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -10,6 +10,9 @@ pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; mod usages; pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool}; +mod iter; +pub use iter::ParallelIterator; + pub mod prelude { pub use crate::{ slice::{ParallelSlice, ParallelSliceMut},