From 2be9fbe2a4d0b86059f16504ab5df201bbec4d6e Mon Sep 17 00:00:00 2001 From: Ferdinand Schober Date: Fri, 8 Nov 2024 17:49:47 +0100 Subject: [PATCH] cleanup emulation --- src/emulation.rs | 214 +++++++++++++++++++++++------------------------ 1 file changed, 106 insertions(+), 108 deletions(-) diff --git a/src/emulation.rs b/src/emulation.rs index 6deb34da..914a9cbf 100644 --- a/src/emulation.rs +++ b/src/emulation.rs @@ -60,7 +60,13 @@ impl Emulation { let emulation_proxy = EmulationProxy::new(backend); let (request_tx, request_rx) = channel(); let (event_tx, event_rx) = channel(); - let task = spawn_local(Self::run(listener, emulation_proxy, request_rx, event_tx)); + let emulation_task = ListenTask { + listener, + emulation_proxy, + request_rx, + event_tx, + }; + let task = spawn_local(emulation_task.run()); Self { task, request_tx, @@ -90,17 +96,32 @@ impl Emulation { self.event_rx.recv().await.expect("channel closed") } - async fn run( - mut listener: LanMouseListener, - mut emulation_proxy: EmulationProxy, - mut request_rx: Receiver, - event_tx: Sender, - ) { + /// wait for termination + pub(crate) async fn terminate(&mut self) { + log::debug!("terminating emulation"); + self.request_tx + .send(EmulationRequest::Terminate) + .expect("channel closed"); + if let Err(e) = (&mut self.task).await { + log::warn!("{e}"); + } + } +} + +struct ListenTask { + listener: LanMouseListener, + emulation_proxy: EmulationProxy, + request_rx: Receiver, + event_tx: Sender, +} + +impl ListenTask { + async fn run(mut self) { let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut last_response = HashMap::new(); loop { select! { - e = listener.next() => { + e = self.listener.next() => { let (event, addr) = match e { Some(e) => e, None => break, @@ -109,34 +130,34 @@ impl Emulation { last_response.insert(addr, Instant::now()); match event { ProtoEvent::Enter(pos) => { - if let Some(fingerprint) = listener.get_certificate_fingerprint(addr).await { + if let Some(fingerprint) = self.listener.get_certificate_fingerprint(addr).await { log::info!("releasing capture: {addr} entered this device"); - event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed"); - listener.reply(addr, ProtoEvent::Ack(0)).await; - event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed"); + self.event_tx.send(EmulationEvent::ReleaseNotify).expect("channel closed"); + self.listener.reply(addr, ProtoEvent::Ack(0)).await; + self.event_tx.send(EmulationEvent::Connected{addr, pos: to_ipc_pos(pos), fingerprint}).expect("channel closed"); } } ProtoEvent::Leave(_) => { - emulation_proxy.release_keys(addr); - listener.reply(addr, ProtoEvent::Ack(0)).await; + self.emulation_proxy.release_keys(addr); + self.listener.reply(addr, ProtoEvent::Ack(0)).await; } - ProtoEvent::Input(event) => emulation_proxy.consume(event, addr), - ProtoEvent::Ping => listener.reply(addr, ProtoEvent::Pong(emulation_proxy.emulation_active.get())).await, + ProtoEvent::Input(event) => self.emulation_proxy.consume(event, addr), + ProtoEvent::Ping => self.listener.reply(addr, ProtoEvent::Pong(self.emulation_proxy.emulation_active.get())).await, _ => {} } } - event = emulation_proxy.event() => { - event_tx.send(event).expect("channel closed"); + event = self.emulation_proxy.event() => { + self.event_tx.send(event).expect("channel closed"); } - request = request_rx.recv() => match request.expect("channel closed") { + request = self.request_rx.recv() => match request.expect("channel closed") { // reenable emulation - EmulationRequest::Reenable => emulation_proxy.reenable(), + EmulationRequest::Reenable => self.emulation_proxy.reenable(), // notify the other end that we hit a barrier (should release capture) - EmulationRequest::Release(addr) => listener.reply(addr, ProtoEvent::Leave(0)).await, + EmulationRequest::Release(addr) => self.listener.reply(addr, ProtoEvent::Leave(0)).await, EmulationRequest::ChangePort(port) => { - listener.request_port_change(port); - let result = listener.port_changed().await; - event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed"); + self.listener.request_port_change(port); + let result = self.listener.port_changed().await; + self.event_tx.send(EmulationEvent::PortChanged(result)).expect("channel closed"); } EmulationRequest::Terminate => break, }, @@ -144,8 +165,8 @@ impl Emulation { last_response.retain(|&addr,instant| { if instant.elapsed() > Duration::from_secs(5) { log::warn!("releasing keys: {addr} not responding!"); - emulation_proxy.release_keys(addr); - event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed"); + self.emulation_proxy.release_keys(addr); + self.event_tx.send(EmulationEvent::Disconnected { addr }).expect("channel closed"); false } else { true @@ -154,19 +175,8 @@ impl Emulation { } } } - listener.terminate().await; - emulation_proxy.terminate().await; - } - - /// wait for termination - pub(crate) async fn terminate(&mut self) { - log::debug!("terminating emulation"); - self.request_tx - .send(EmulationRequest::Terminate) - .expect("channel closed"); - if let Err(e) = (&mut self.task).await { - log::warn!("{e}"); - } + self.listener.terminate().await; + self.emulation_proxy.terminate().await; } } @@ -193,12 +203,15 @@ impl EmulationProxy { let (event_tx, event_rx) = channel(); let emulation_active = Rc::new(Cell::new(false)); let exit_requested = Rc::new(Cell::new(false)); - let task = spawn_local(Self::emulation_task( + let emulation_task = EmulationTask { backend, - exit_requested.clone(), + exit_requested: exit_requested.clone(), request_rx, event_tx, - )); + handles: Default::default(), + next_id: 0, + }; + let task = spawn_local(emulation_task.run()); Self { emulation_active, exit_requested, @@ -234,32 +247,42 @@ impl EmulationProxy { .expect("channel closed"); } - async fn emulation_task( - backend: Option, - exit_requested: Rc>, - mut request_rx: Receiver, - event_tx: Sender, - ) { - let mut handles = HashMap::new(); - let mut next_id = 0; + fn reenable(&self) { + self.request_tx + .send(ProxyRequest::Reenable) + .expect("channel closed"); + } + + async fn terminate(&mut self) { + self.exit_requested.replace(true); + self.request_tx + .send(ProxyRequest::Terminate) + .expect("channel closed"); + let _ = (&mut self.task).await; + } +} + +struct EmulationTask { + backend: Option, + exit_requested: Rc>, + request_rx: Receiver, + event_tx: Sender, + handles: HashMap, + next_id: EmulationHandle, +} + +impl EmulationTask { + async fn run(mut self) { loop { - if let Err(e) = Self::do_emulation( - backend, - &mut handles, - &mut next_id, - &mut request_rx, - &event_tx, - ) - .await - { + if let Err(e) = self.do_emulation().await { log::warn!("input emulation exited: {e}"); } - if exit_requested.get() { + if self.exit_requested.get() { break; } // wait for reenable request loop { - match request_rx.recv().await.expect("channel closed") { + match self.request_rx.recv().await.expect("channel closed") { ProxyRequest::Reenable => break, ProxyRequest::Terminate => return, ProxyRequest::Input(..) => { /* emulation inactive => ignore */ } @@ -269,79 +292,68 @@ impl EmulationProxy { } } - async fn do_emulation( - backend: Option, - handles: &mut HashMap, - next_id: &mut EmulationHandle, - request_rx: &mut Receiver, - event_tx: &Sender, - ) -> Result<(), InputEmulationError> { + async fn do_emulation(&mut self) -> Result<(), InputEmulationError> { log::info!("creating input emulation ..."); let mut emulation = tokio::select! { - r = InputEmulation::new(backend) => r?, + r = InputEmulation::new(self.backend) => r?, // allow termination event while requesting input emulation - _ = wait_for_termination(request_rx) => return Ok(()), + _ = wait_for_termination(&mut self.request_rx) => return Ok(()), }; // used to send enabled and disabled events let _emulation_guard = DropGuard::new( - event_tx, + self.event_tx.clone(), EmulationEvent::EmulationEnabled, EmulationEvent::EmulationDisabled, ); // create active handles - if let Err(e) = - Self::create_clients(&mut emulation, handles.values().copied(), request_rx).await - { + if let Err(e) = self.create_clients(&mut emulation).await { emulation.terminate().await; return Err(e); } - let res = Self::do_emulation_session(&mut emulation, handles, next_id, request_rx).await; + let res = self.do_emulation_session(&mut emulation).await; // FIXME replace with async drop when stabilized emulation.terminate().await; res } async fn create_clients( + &mut self, emulation: &mut InputEmulation, - handles: impl Iterator, - request_rx: &mut Receiver, ) -> Result<(), InputEmulationError> { - for handle in handles { + for handle in self.handles.values() { tokio::select! { - _ = emulation.create(handle) => {}, - _ = wait_for_termination(request_rx) => return Ok(()), + _ = emulation.create(*handle) => {}, + _ = wait_for_termination(&mut self.request_rx) => return Ok(()), } } Ok(()) } async fn do_emulation_session( + &mut self, emulation: &mut InputEmulation, - handles: &mut HashMap, - next_id: &mut EmulationHandle, - rx: &mut Receiver, ) -> Result<(), InputEmulationError> { loop { tokio::select! { - e = rx.recv() => match e.expect("channel closed") { + e = self.request_rx.recv() => match e.expect("channel closed") { ProxyRequest::Input(event, addr) => { - let handle = match handles.get(&addr) { + let handle = match self.handles.get(&addr) { Some(&handle) => handle, None => { - let handle = *next_id; - *next_id += 1; + let handle = self.next_id; + self.next_id += 1; emulation.create(handle).await; - handles.insert(addr, handle); + self.handles.insert(addr, handle); handle } }; emulation.consume(event, handle).await?; }, ProxyRequest::ReleaseKeys(addr) => { - if let Some(&handle) = handles.get(&addr) { + if let Some(&handle) = self.handles.get(&addr) { emulation.release_keys(handle).await? } } @@ -351,20 +363,6 @@ impl EmulationProxy { } } } - - fn reenable(&self) { - self.request_tx - .send(ProxyRequest::Reenable) - .expect("channel closed"); - } - - async fn terminate(&mut self) { - self.exit_requested.replace(true); - self.request_tx - .send(ProxyRequest::Terminate) - .expect("channel closed"); - let _ = (&mut self.task).await; - } } fn to_ipc_pos(pos: Position) -> lan_mouse_ipc::Position { @@ -387,20 +385,20 @@ async fn wait_for_termination(rx: &mut Receiver) { } } -struct DropGuard<'a, T> { - tx: &'a Sender, +struct DropGuard { + tx: Sender, on_drop: Option, } -impl<'a, T> DropGuard<'a, T> { - fn new(tx: &'a Sender, on_new: T, on_drop: T) -> Self { +impl DropGuard { + fn new(tx: Sender, on_new: T, on_drop: T) -> Self { tx.send(on_new).expect("channel closed"); let on_drop = Some(on_drop); Self { tx, on_drop } } } -impl<'a, T> Drop for DropGuard<'a, T> { +impl Drop for DropGuard { fn drop(&mut self) { self.tx .send(self.on_drop.take().expect("item"))