Skip to content

Commit

Permalink
Merge remote-tracking branch 'brson/futures'
Browse files Browse the repository at this point in the history
  • Loading branch information
brson committed Oct 25, 2012
2 parents d82ddc2 + d29962f commit b2d5acd
Show file tree
Hide file tree
Showing 24 changed files with 66 additions and 122 deletions.
1 change: 0 additions & 1 deletion src/libcore/core.rc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ pub mod task {
pub mod spawn;
pub mod rt;
}
pub mod future;
pub mod pipes;

// Runtime and language-primitive support
Expand Down
14 changes: 9 additions & 5 deletions src/libcore/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,16 +581,20 @@ pub mod tests {

for uint::range(0, num_tasks) |_i| {
let total = total.clone();
futures.push(future::spawn(|move total| {
let (chan, port) = pipes::stream();
futures.push(move port);

do task::spawn |move total, move chan| {
for uint::range(0, count) |_i| {
do total.with |count| {
**count += 1;
}
}
}));
chan.send(());
}
};

for futures.each |f| { f.get() }
for futures.each |f| { f.recv() }

do total.with |total| {
assert **total == num_tasks * count
Expand Down Expand Up @@ -642,7 +646,7 @@ pub mod tests {
// Have to get rid of our reference before blocking.
{ let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
let res = option::swap_unwrap(&mut res);
future::get(&res);
res.recv();
}

#[test] #[should_fail] #[ignore(cfg(windows))]
Expand All @@ -657,7 +661,7 @@ pub mod tests {
}
assert unwrap_exclusive(move x) == ~~"hello";
let res = option::swap_unwrap(&mut res);
future::get(&res);
res.recv();
}

#[test] #[ignore(cfg(windows))]
Expand Down
82 changes: 15 additions & 67 deletions src/libcore/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use cmp::Eq;
use result::Result;
use pipes::{stream, Chan, Port};
use local_data_priv::{local_get, local_set};
use util::replace;

use rt::task_id;
use rt::rust_task;
Expand Down Expand Up @@ -72,25 +73,6 @@ impl TaskResult : Eq {
pure fn ne(other: &TaskResult) -> bool { !self.eq(other) }
}

/// A message type for notifying of task lifecycle events
pub enum Notification {
/// Sent when a task exits with the task handle and result
Exit(Task, TaskResult)
}

impl Notification : cmp::Eq {
pure fn eq(other: &Notification) -> bool {
match self {
Exit(e0a, e1a) => {
match (*other) {
Exit(e0b, e1b) => e0a == e0b && e1a == e1b
}
}
}
}
pure fn ne(other: &Notification) -> bool { !self.eq(other) }
}

/// Scheduler modes
pub enum SchedMode {
/// All tasks run in the same OS thread
Expand Down Expand Up @@ -200,7 +182,7 @@ pub type SchedOpts = {
pub type TaskOpts = {
linked: bool,
supervised: bool,
mut notify_chan: Option<Chan<Notification>>,
mut notify_chan: Option<Chan<TaskResult>>,
sched: Option<SchedOpts>,
};

Expand Down Expand Up @@ -246,11 +228,7 @@ priv impl TaskBuilder {
fail ~"Cannot copy a task_builder"; // Fake move mode on self
}
self.consumed = true;
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
Expand All @@ -271,11 +249,7 @@ impl TaskBuilder {
* the other will not be killed.
*/
fn unlinked() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: false,
Expand All @@ -293,11 +267,7 @@ impl TaskBuilder {
* the child.
*/
fn supervised() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: false,
Expand All @@ -314,11 +284,7 @@ impl TaskBuilder {
* other will be killed.
*/
fn linked() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: true,
Expand Down Expand Up @@ -348,7 +314,7 @@ impl TaskBuilder {
* # Failure
* Fails if a future_result was already set for this task.
*/
fn future_result(blk: fn(v: future::Future<TaskResult>)) -> TaskBuilder {
fn future_result(blk: fn(v: Port<TaskResult>)) -> TaskBuilder {
// FIXME (#3725): Once linked failure and notification are
// handled in the library, I can imagine implementing this by just
// registering an arbitrary number of task::on_exit handlers and
Expand All @@ -359,13 +325,9 @@ impl TaskBuilder {
}
// Construct the future and give it to the caller.
let (notify_pipe_ch, notify_pipe_po) = stream::<Notification>();
let (notify_pipe_ch, notify_pipe_po) = stream::<TaskResult>();
blk(do future::from_fn |move notify_pipe_po| {
match notify_pipe_po.recv() {
Exit(_, result) => result
}
});
blk(move notify_pipe_po);
// Reconfigure self to use a notify channel.
TaskBuilder({
Expand All @@ -381,11 +343,7 @@ impl TaskBuilder {
}
/// Configure a custom scheduler mode for the task.
fn sched_mode(mode: SchedMode) -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
Expand All @@ -412,11 +370,7 @@ impl TaskBuilder {
*/
fn add_wrapper(wrapper: fn@(v: fn~()) -> fn~()) -> TaskBuilder {
let prev_gen_body = self.gen_body;
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
Expand Down Expand Up @@ -447,13 +401,7 @@ impl TaskBuilder {
* must be greater than zero.
*/
fn spawn(f: fn~()) {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
let swapped_notify_chan =
option::swap_unwrap(&mut self.opts.notify_chan);
Some(move swapped_notify_chan)
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
let x = self.consume();
let opts = {
linked: x.opts.linked,
Expand Down Expand Up @@ -532,7 +480,7 @@ impl TaskBuilder {
do fr_task_builder.spawn |move f| {
comm::send(ch, f());
}
match future::get(&option::unwrap(move result)) {
match option::unwrap(move result).recv() {
Success => result::Ok(comm::recv(po)),
Failure => result::Err(())
}
Expand Down Expand Up @@ -949,14 +897,14 @@ fn test_add_wrapper() {
fn test_future_result() {
let mut result = None;
do task().future_result(|+r| { result = Some(move r); }).spawn { }
assert future::get(&option::unwrap(move result)) == Success;
assert option::unwrap(move result).recv() == Success;

result = None;
do task().future_result(|+r|
{ result = Some(move r); }).unlinked().spawn {
fail;
}
assert future::get(&option::unwrap(move result)) == Failure;
assert option::unwrap(move result).recv() == Failure;
}

#[test] #[should_fail] #[ignore(cfg(windows))]
Expand Down
22 changes: 8 additions & 14 deletions src/libcore/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,15 @@ fn TCB(me: *rust_task, tasks: TaskGroupArc, ancestors: AncestorList,
}

struct AutoNotify {
notify_chan: Chan<Notification>,
notify_chan: Chan<TaskResult>,
mut failed: bool,
drop {
let result = if self.failed { Failure } else { Success };
self.notify_chan.send(Exit(get_task(), result));
self.notify_chan.send(result);
}
}

fn AutoNotify(chan: Chan<Notification>) -> AutoNotify {
fn AutoNotify(chan: Chan<TaskResult>) -> AutoNotify {
AutoNotify {
notify_chan: move chan,
failed: true // Un-set above when taskgroup successfully made.
Expand Down Expand Up @@ -532,7 +532,7 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
// (4) ...and runs the provided body function.
fn make_child_wrapper(child: *rust_task, child_arc: TaskGroupArc,
ancestors: AncestorList, is_main: bool,
notify_chan: Option<Chan<Notification>>,
notify_chan: Option<Chan<TaskResult>>,
f: fn~()) -> fn~() {
let child_data = ~mut Some((move child_arc, move ancestors));
return fn~(move notify_chan, move child_data, move f) {
Expand Down Expand Up @@ -660,36 +660,30 @@ fn test_spawn_raw_unsupervise() {
#[test]
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_success() {
let (task_ch, task_po) = pipes::stream();
let (notify_ch, notify_po) = pipes::stream();

let opts = {
notify_chan: Some(move notify_ch),
.. default_task_opts()
};
do spawn_raw(move opts) |move task_ch| {
task_ch.send(get_task());
do spawn_raw(move opts) {
}
let task_ = task_po.recv();
assert notify_po.recv() == Exit(task_, Success);
assert notify_po.recv() == Success;
}

#[test]
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_failure() {
// New bindings for these
let (task_ch, task_po) = pipes::stream();
let (notify_ch, notify_po) = pipes::stream();

let opts = {
linked: false,
notify_chan: Some(move notify_ch),
.. default_task_opts()
};
do spawn_raw(move opts) |move task_ch| {
task_ch.send(get_task());
do spawn_raw(move opts) {
fail;
}
let task_ = task_po.recv();
assert notify_po.recv() == Exit(task_, Failure);
assert notify_po.recv() == Failure;
}
2 changes: 1 addition & 1 deletion src/libstd/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ mod tests {
}

// Wait for children to pass their asserts
for vec::each(children) |r| { future::get(r); }
for vec::each(children) |r| { r.recv(); }

// Wait for writer to finish
p.recv();
Expand Down
28 changes: 14 additions & 14 deletions src/libcore/future.rs → src/libstd/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use either::Either;
use pipes::recv;
use pipes::{recv, oneshot, ChanOne, PortOne, send_one, recv_one};
use cast::copy_lifetime;

#[doc = "The future type"]
Expand Down Expand Up @@ -67,7 +67,7 @@ pub fn from_value<A>(val: A) -> Future<A> {
Future {state: Forced(~(move val))}
}

pub fn from_port<A:Send>(port: future_pipe::client::waiting<A>) ->
pub fn from_port<A:Send>(port: PortOne<A>) ->
Future<A> {
/*!
* Create a future from a port
Expand All @@ -82,7 +82,7 @@ pub fn from_port<A:Send>(port: future_pipe::client::waiting<A>) ->
port_ <-> *port;
let port = option::unwrap(move port_);
match recv(move port) {
future_pipe::completed(move data) => move data
oneshot::send(move data) => move data
}
}
}
Expand All @@ -107,9 +107,15 @@ pub fn spawn<A:Send>(blk: fn~() -> A) -> Future<A> {
* value of the future.
*/

from_port(pipes::spawn_service_recv(future_pipe::init, |move blk, ch| {
future_pipe::server::completed(move ch, blk());
}))
let (chan, port) = oneshot::init();

let chan = ~mut Some(move chan);
do task::spawn |move blk, move chan| {
let chan = option::swap_unwrap(&mut *chan);
send_one(move chan, blk());
}

return from_port(move port);
}

pub fn get_ref<A>(future: &r/Future<A>) -> &r/A {
Expand Down Expand Up @@ -162,12 +168,6 @@ pub fn with<A,B>(future: &Future<A>, blk: fn((&A)) -> B) -> B {
blk(get_ref(future))
}

proto! future_pipe (
waiting:recv<T:Send> {
completed(T) -> !
}
)

#[allow(non_implicitly_copyable_typarams)]
pub mod test {
#[test]
Expand All @@ -178,8 +178,8 @@ pub mod test {

#[test]
pub fn test_from_port() {
let (po, ch) = future_pipe::init();
future_pipe::server::completed(move ch, ~"whale");
let (ch, po) = oneshot::init();
send_one(move ch, ~"whale");
let f = from_port(move po);
assert get(&f) == ~"whale";
}
Expand Down
1 change: 1 addition & 0 deletions src/libstd/std.rc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub mod cell;
pub mod sync;
pub mod arc;
pub mod comm;
pub mod future;

// Collections

Expand Down
Loading

0 comments on commit b2d5acd

Please sign in to comment.