-
Notifications
You must be signed in to change notification settings - Fork 133
/
Copy pathtokio_adv_server_cr.rs
142 lines (124 loc) · 5.24 KB
/
tokio_adv_server_cr.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// More advanced server example, tokio + crossroads version
// This is supposed to look like a D-Bus service that allows the user to manipulate storage devices.
use dbus_tokio::connection;
use futures::future;
use tokio::time::sleep;
use dbus::channel::{MatchingReceiver, Sender};
use dbus::message::MatchRule;
use dbus::nonblock::SyncConnection;
use std::time::Duration;
use dbus::{Path, Message};
use dbus_crossroads::{MethodErr, Crossroads, IfaceToken, IfaceBuilder};
use std::sync::{Arc, Mutex};
// Our storage device
#[derive(Debug)]
struct Device {
description: String,
path: Path<'static>,
online: bool,
checking: bool,
}
impl Device {
// Creates a "test" device (not a real one, since this is an example).
fn new_bogus(index: i32) -> Device {
Device {
description: format!("This is device {}, which is {}.", index,
["totally awesome", "really fancy", "still going strong"][(index as usize) % 3]),
path: format!("/Device{}", index).into(),
online: index % 2 == 0,
checking: false,
}
}
}
fn register_iface(cr: &Arc<Mutex<Crossroads>>, conn: Arc<SyncConnection>) -> IfaceToken<Device> {
let cr2 = cr.clone();
let mut cr_lock = cr.lock().unwrap();
cr_lock.register("com.example.dbus.rs.device", |b: &mut IfaceBuilder<Device>| {
// The online property can be both set and get
b.property("online")
.get(|_, device| Ok(device.online))
.set(|_, device, value| {
if value && device.checking {
Err(MethodErr::failed(&"Device currently under check, cannot bring online"))?
}
device.online = value;
Ok(Some(value))
});
// The "checking" property is read only
b.property("checking")
.emits_changed_false()
.get(|_, device| Ok(device.checking));
// ...and so is the "description" property
b.property("description")
.emits_changed_const()
.get(|_, device| Ok(device.description.clone()));
// Add a method for starting a device check.
// This method has no input or output arguments.
b.method("check", (), (), move |_, device, _: ()| {
if device.checking {
Err(MethodErr::failed(&"Device currently under check, cannot start another check"))?
}
if device.online {
Err(MethodErr::failed(&"Device is currently online, cannot start check"))?
}
device.checking = true;
let path = device.path.clone();
let cr_clone = cr2.clone();
let conn_clone = conn.clone();
tokio::spawn(async move {
// Let's pretend we're doing a 15 second check of the device.
sleep(Duration::from_secs(15)).await;
// Now we need to set checking to false again.
// However, at this point we have no longer access to the "device" variable,
// so we have to do this the manual way.
let mut cr = cr_clone.lock().unwrap();
let device: &mut Device = cr.data_mut(&path).unwrap();
device.checking = false;
// Send a signal that the check completed.
let msg = Message::signal(&path, &"com.example.dbus.rs.device".into(), &"CheckComplete".into());
let _ = conn_clone.send(msg);
});
Ok(())
});
// Advertise that we send a signal when check completes.
b.signal::<(), _>("CheckComplete", ());
})
}
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to the D-Bus session bus (this is blocking, unfortunately).
let (resource, c) = connection::new_session_sync()?;
// Create a new crossroads instance.
//
// We have to wrap it inside an arc/mutex because we need to modify it outside message handling,
// i e, when a check is completed.
let cr = Arc::new(Mutex::new(Crossroads::new()));
// Build and register our "com.example.dbus.rs.device" interface.
let token = register_iface(&cr, c.clone());
// Create devices and register them in the tree
{
let mut cr_lock = cr.lock().unwrap();
for i in 0..10 {
let d = Device::new_bogus(i);
cr_lock.insert(d.path.clone(), &[token], d);
}
}
// We add the Crossroads instance to the connection so that incoming method calls will be handled.
c.start_receive(MatchRule::new_method_call(), Box::new(move |msg, conn| {
let mut cr_lock = cr.lock().unwrap();
cr_lock.handle_message(msg, conn).unwrap();
true
}));
// The resource is a task that should be spawned onto a tokio compatible
// reactor ASAP. If the resource ever finishes, you lost connection to D-Bus.
//
// To shut down the connection, both call _handle.abort() and drop the connection.
let _handle = tokio::spawn(async {
let err = resource.await;
panic!("Lost connection to D-Bus: {}", err);
});
c.request_name("com.example.dbus.rs.advancedserverexample", false, true, false).await?;
// Run forever.
future::pending::<()>().await;
unreachable!()
}