Skip to content

Commit

Permalink
cleanup emulation
Browse files Browse the repository at this point in the history
  • Loading branch information
feschber committed Nov 8, 2024
1 parent 9837e3d commit 2be9fbe
Showing 1 changed file with 106 additions and 108 deletions.
214 changes: 106 additions & 108 deletions src/emulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,13 @@ impl Emulation {
let emulation_proxy = EmulationProxy::new(backend);
let (request_tx, request_rx) = channel();
let (event_tx, event_rx) = channel();
let task = spawn_local(Self::run(listener, emulation_proxy, request_rx, event_tx));
let emulation_task = ListenTask {
listener,
emulation_proxy,
request_rx,
event_tx,
};
let task = spawn_local(emulation_task.run());
Self {
task,
request_tx,
Expand Down Expand Up @@ -90,17 +96,32 @@ impl Emulation {
self.event_rx.recv().await.expect("channel closed")
}

async fn run(
mut listener: LanMouseListener,
mut emulation_proxy: EmulationProxy,
mut request_rx: Receiver<EmulationRequest>,
event_tx: Sender<EmulationEvent>,
) {
/// wait for termination
pub(crate) async fn terminate(&mut self) {
log::debug!("terminating emulation");
self.request_tx
.send(EmulationRequest::Terminate)
.expect("channel closed");
if let Err(e) = (&mut self.task).await {
log::warn!("{e}");
}
}
}

struct ListenTask {
listener: LanMouseListener,
emulation_proxy: EmulationProxy,
request_rx: Receiver<EmulationRequest>,
event_tx: Sender<EmulationEvent>,
}

impl ListenTask {
async fn run(mut self) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut last_response = HashMap::new();
loop {
select! {
e = listener.next() => {
e = self.listener.next() => {
let (event, addr) = match e {
Some(e) => e,
None => break,
Expand All @@ -109,43 +130,43 @@ impl Emulation {
last_response.insert(addr, Instant::now());
match event {
ProtoEvent::Enter(pos) => {
if let Some(fingerprint) = listener.get_certificate_fingerprint(addr).await {
if let Some(fingerprint) = self.listener.get_certificate_fingerprint(addr).await {
log::info!("releasing capture: {addr} entered this device");
event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed");
listener.reply(addr, ProtoEvent::Ack(0)).await;
event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed");
self.event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed");
self.listener.reply(addr, ProtoEvent::Ack(0)).await;
self.event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed");
}
}
ProtoEvent::Leave(_) => {
emulation_proxy.release_keys(addr);
listener.reply(addr, ProtoEvent::Ack(0)).await;
self.emulation_proxy.release_keys(addr);
self.listener.reply(addr, ProtoEvent::Ack(0)).await;
}
ProtoEvent::Input(event) => emulation_proxy.consume(event, addr),
ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(emulation_proxy.emulation_active.get())).await,
ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr),
ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong(self.emulation_proxy.emulation_active.get())).await,
_ => {}
}
}
event = emulation_proxy.event() => {
event_tx.send(event).expect("channel closed");
event = self.emulation_proxy.event() => {
self.event_tx.send(event).expect("channel closed");
}
request = request_rx.recv() => match request.expect("channel closed") {
request = self.request_rx.recv() => match request.expect("channel closed") {
// reenable emulation
EmulationRequest::Reenable => emulation_proxy.reenable(),
EmulationRequest::Reenable => self.emulation_proxy.reenable(),
// notify the other end that we hit a barrier (should release capture)
EmulationRequest::Release(addr) => listener.reply(addr, ProtoEvent::Leave(0)).await,
EmulationRequest::Release(addr) => self.listener.reply(addr, ProtoEvent::Leave(0)).await,
EmulationRequest::ChangePort(port) => {
listener.request_port_change(port);
let result = listener.port_changed().await;
event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed");
self.listener.request_port_change(port);
let result = self.listener.port_changed().await;
self.event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed");
}
EmulationRequest::Terminate => break,
},
_ = interval.tick() => {
last_response.retain(|&addr,instant| {
if instant.elapsed() > Duration::from_secs(5) {
log::warn!("releasing keys: {addr} not responding!");
emulation_proxy.release_keys(addr);
event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed");
self.emulation_proxy.release_keys(addr);
self.event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed");
false
} else {
true
Expand All @@ -154,19 +175,8 @@ impl Emulation {
}
}
}
listener.terminate().await;
emulation_proxy.terminate().await;
}

/// wait for termination
pub(crate) async fn terminate(&mut self) {
log::debug!("terminating emulation");
self.request_tx
.send(EmulationRequest::Terminate)
.expect("channel closed");
if let Err(e) = (&mut self.task).await {
log::warn!("{e}");
}
self.listener.terminate().await;
self.emulation_proxy.terminate().await;
}
}

Expand All @@ -193,12 +203,15 @@ impl EmulationProxy {
let (event_tx, event_rx) = channel();
let emulation_active = Rc::new(Cell::new(false));
let exit_requested = Rc::new(Cell::new(false));
let task = spawn_local(Self::emulation_task(
let emulation_task = EmulationTask {
backend,
exit_requested.clone(),
exit_requested: exit_requested.clone(),
request_rx,
event_tx,
));
handles: Default::default(),
next_id: 0,
};
let task = spawn_local(emulation_task.run());
Self {
emulation_active,
exit_requested,
Expand Down Expand Up @@ -234,32 +247,42 @@ impl EmulationProxy {
.expect("channel closed");
}

async fn emulation_task(
backend: Option<input_emulation::Backend>,
exit_requested: Rc<Cell<bool>>,
mut request_rx: Receiver<ProxyRequest>,
event_tx: Sender<EmulationEvent>,
) {
let mut handles = HashMap::new();
let mut next_id = 0;
fn reenable(&self) {
self.request_tx
.send(ProxyRequest::Reenable)
.expect("channel closed");
}

async fn terminate(&mut self) {
self.exit_requested.replace(true);
self.request_tx
.send(ProxyRequest::Terminate)
.expect("channel closed");
let _ = (&mut self.task).await;
}
}

struct EmulationTask {
backend: Option<input_emulation::Backend>,
exit_requested: Rc<Cell<bool>>,
request_rx: Receiver<ProxyRequest>,
event_tx: Sender<EmulationEvent>,
handles: HashMap<SocketAddr, EmulationHandle>,
next_id: EmulationHandle,
}

impl EmulationTask {
async fn run(mut self) {
loop {
if let Err(e) = Self::do_emulation(
backend,
&mut handles,
&mut next_id,
&mut request_rx,
&event_tx,
)
.await
{
if let Err(e) = self.do_emulation().await {
log::warn!("input emulation exited: {e}");
}
if exit_requested.get() {
if self.exit_requested.get() {
break;
}
// wait for reenable request
loop {
match request_rx.recv().await.expect("channel closed") {
match self.request_rx.recv().await.expect("channel closed") {
ProxyRequest::Reenable => break,
ProxyRequest::Terminate => return,
ProxyRequest::Input(..) => { /* emulation inactive => ignore */ }
Expand All @@ -269,79 +292,68 @@ impl EmulationProxy {
}
}

async fn do_emulation(
backend: Option<input_emulation::Backend>,
handles: &mut HashMap<SocketAddr, EmulationHandle>,
next_id: &mut EmulationHandle,
request_rx: &mut Receiver<ProxyRequest>,
event_tx: &Sender<EmulationEvent>,
) -> Result<(), InputEmulationError> {
async fn do_emulation(&mut self) -> Result<(), InputEmulationError> {
log::info!("creating input emulation ...");
let mut emulation = tokio::select! {
r = InputEmulation::new(backend) => r?,
r = InputEmulation::new(self.backend) => r?,
// allow termination event while requesting input emulation
_ = wait_for_termination(request_rx) => return Ok(()),
_ = wait_for_termination(&mut self.request_rx) => return Ok(()),
};

// used to send enabled and disabled events
let _emulation_guard = DropGuard::new(
event_tx,
self.event_tx.clone(),
EmulationEvent::EmulationEnabled,
EmulationEvent::EmulationDisabled,
);

// create active handles
if let Err(e) =
Self::create_clients(&mut emulation, handles.values().copied(), request_rx).await
{
if let Err(e) = self.create_clients(&mut emulation).await {
emulation.terminate().await;
return Err(e);
}

let res = Self::do_emulation_session(&mut emulation, handles, next_id, request_rx).await;
let res = self.do_emulation_session(&mut emulation).await;
// FIXME replace with async drop when stabilized
emulation.terminate().await;
res
}

async fn create_clients(
&mut self,
emulation: &mut InputEmulation,
handles: impl Iterator<Item = EmulationHandle>,
request_rx: &mut Receiver<ProxyRequest>,
) -> Result<(), InputEmulationError> {
for handle in handles {
for handle in self.handles.values() {
tokio::select! {
_ = emulation.create(handle) => {},
_ = wait_for_termination(request_rx) => return Ok(()),
_ = emulation.create(*handle) => {},
_ = wait_for_termination(&mut self.request_rx) => return Ok(()),
}
}
Ok(())
}

async fn do_emulation_session(
&mut self,
emulation: &mut InputEmulation,
handles: &mut HashMap<SocketAddr, EmulationHandle>,
next_id: &mut EmulationHandle,
rx: &mut Receiver<ProxyRequest>,
) -> Result<(), InputEmulationError> {
loop {
tokio::select! {
e = rx.recv() => match e.expect("channel closed") {
e = self.request_rx.recv() => match e.expect("channel closed") {
ProxyRequest::Input(event, addr) => {
let handle = match handles.get(&addr) {
let handle = match self.handles.get(&addr) {
Some(&handle) => handle,
None => {
let handle = *next_id;
*next_id += 1;
let handle = self.next_id;
self.next_id += 1;
emulation.create(handle).await;
handles.insert(addr, handle);
self.handles.insert(addr, handle);
handle
}
};
emulation.consume(event, handle).await?;
},
ProxyRequest::ReleaseKeys(addr) => {
if let Some(&handle) = handles.get(&addr) {
if let Some(&handle) = self.handles.get(&addr) {
emulation.release_keys(handle).await?
}
}
Expand All @@ -351,20 +363,6 @@ impl EmulationProxy {
}
}
}

fn reenable(&self) {
self.request_tx
.send(ProxyRequest::Reenable)
.expect("channel closed");
}

async fn terminate(&mut self) {
self.exit_requested.replace(true);
self.request_tx
.send(ProxyRequest::Terminate)
.expect("channel closed");
let _ = (&mut self.task).await;
}
}

fn to_ipc_pos(pos: Position) -> lan_mouse_ipc::Position {
Expand All @@ -387,20 +385,20 @@ async fn wait_for_termination(rx: &mut Receiver<ProxyRequest>) {
}
}

struct DropGuard<'a, T> {
tx: &'a Sender<T>,
struct DropGuard<T> {
tx: Sender<T>,
on_drop: Option<T>,
}

impl<'a, T> DropGuard<'a, T> {
fn new(tx: &'a Sender<T>, on_new: T, on_drop: T) -> Self {
impl<T> DropGuard<T> {
fn new(tx: Sender<T>, on_new: T, on_drop: T) -> Self {
tx.send(on_new).expect("channel closed");
let on_drop = Some(on_drop);
Self { tx, on_drop }
}
}

impl<'a, T> Drop for DropGuard<'a, T> {
impl<T> Drop for DropGuard<T> {
fn drop(&mut self) {
self.tx
.send(self.on_drop.take().expect("item"))
Expand Down

0 comments on commit 2be9fbe

Please sign in to comment.