-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate.rs
91 lines (84 loc) · 2.36 KB
/
create.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use crate::observable::Observable;
use crate::scheduler::Scheduler;
use log::trace;
use std::marker::PhantomData;
use std::sync::mpsc::Sender;
pub struct Create<I, Item> {
create_function: I,
_marker: PhantomData<Item>,
}
pub fn create<I, Item>(create_function: I) -> Create<I, Item>
where
I: FnMut(Sender<std::io::Result<Item>>),
{
Create {
create_function,
_marker: PhantomData,
}
}
impl<I, Item> Observable for Create<I, Item>
where
I: FnMut(Sender<std::io::Result<Item>>) + Send + 'static,
Item: Send + 'static,
{
type Item = Item;
fn actual_subscribe<O>(mut self, channel: Sender<std::io::Result<Self::Item>>, pool: O)
where
O: Scheduler,
{
pool.schedule(move || {
(self.create_function)(channel);
trace!("Create finished");
})
.forget();
}
}
#[cfg(test)]
mod test {
use crate::create::create;
use crate::observable::Observable;
use crate::observer::Observer;
use futures::executor::ThreadPool;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
#[test]
fn it_creates() {
let collector = Arc::new(AtomicI32::new(0));
let collector_c = collector.clone();
let pool = ThreadPool::new().unwrap();
let handle = create(|sender| {
sender.next(1).unwrap();
sender.next(2).unwrap();
sender.next(3).unwrap();
})
.subscribe(
move |v| {
collector.fetch_add(v, Ordering::Relaxed);
},
pool,
);
futures::executor::block_on(handle);
assert_eq!(collector_c.load(Ordering::Relaxed), 6);
}
#[test]
fn it_can_mut_access_external_state() {
let collector = Arc::new(AtomicI32::new(0));
let collector_c = collector.clone();
let mut _test = 0;
let handle = create(move |sender| {
sender.next(1).unwrap();
sender.next(2).unwrap();
sender.next(3).unwrap();
let _test2 = &mut _test;
assert_eq!(*_test2, 0);
})
.subscribe(
move |v| {
collector.fetch_add(v, Ordering::Relaxed);
},
ThreadPool::new().unwrap(),
);
futures::executor::block_on(handle);
assert_eq!(collector_c.load(Ordering::Relaxed), 6);
}
}