diff --git a/alvr/server/src/connection.rs b/alvr/server/src/connection.rs index e59b66964b..f07d18c944 100644 --- a/alvr/server/src/connection.rs +++ b/alvr/server/src/connection.rs @@ -59,87 +59,6 @@ fn align32(value: f32) -> u32 { ((value / 32.).floor() * 32.) as u32 } -// Alternate connection trials with manual IPs and clients discovered on the local network -pub fn handshake_loop() -> IntResult { - let mut welcome_socket = WelcomeSocket::new().map_err(to_int_e!())?; - - loop { - check_interrupt!(SHOULD_CONNECT_TO_CLIENTS.value()); - - let manual_client_ips = { - let connected_hostnames_lock = CONNECTED_CLIENT_HOSTNAMES.lock(); - let mut manual_client_ips = HashMap::new(); - for (hostname, connection_info) in SERVER_DATA_MANAGER.read().client_list() { - if !connected_hostnames_lock.contains(hostname) { - for ip in &connection_info.manual_ips { - manual_client_ips.insert(*ip, hostname.clone()); - } - } - } - manual_client_ips - }; - - if !manual_client_ips.is_empty() && try_connect(manual_client_ips).is_ok() { - thread::sleep(RETRY_CONNECT_MIN_INTERVAL); - continue; - } - - let discovery_config = SERVER_DATA_MANAGER - .read() - .settings() - .connection - .client_discovery - .clone(); - if let Switch::Enabled(config) = discovery_config { - let (client_hostname, client_ip) = match welcome_socket.recv_non_blocking() { - Ok(pair) => pair, - Err(e) => { - if let InterruptibleError::Other(e) = e { - warn!("UDP handshake listening error: {e}"); - } - - thread::sleep(RETRY_CONNECT_MIN_INTERVAL); - continue; - } - }; - - let trusted = { - let mut data_manager = SERVER_DATA_MANAGER.write(); - - data_manager.update_client_list( - client_hostname.clone(), - ClientListAction::AddIfMissing { - trusted: false, - manual_ips: vec![], - }, - ); - - if config.auto_trust_clients { - data_manager - .update_client_list(client_hostname.clone(), ClientListAction::Trust); - } - - data_manager - .client_list() - .get(&client_hostname) - .unwrap() - .trusted - }; - - // do not attempt connection if the client is already connected - if trusted && !CONNECTED_CLIENT_HOSTNAMES.lock().contains(&client_hostname) { - if let Err(e) = - try_connect([(client_ip, client_hostname.clone())].into_iter().collect()) - { - error!("Handshake error for {client_hostname}: {e}"); - } - } - } - - thread::sleep(RETRY_CONNECT_MIN_INTERVAL); - } -} - pub fn contruct_openvr_config() -> OpenvrConfig { let data_manager_lock = SERVER_DATA_MANAGER.read(); let old_config = data_manager_lock.session().openvr_config.clone(); @@ -280,6 +199,87 @@ pub fn contruct_openvr_config() -> OpenvrConfig { } } +// Alternate connection trials with manual IPs and clients discovered on the local network +pub fn handshake_loop() -> IntResult { + let mut welcome_socket = WelcomeSocket::new().map_err(to_int_e!())?; + + loop { + check_interrupt!(SHOULD_CONNECT_TO_CLIENTS.value()); + + let manual_client_ips = { + let connected_hostnames_lock = CONNECTED_CLIENT_HOSTNAMES.lock(); + let mut manual_client_ips = HashMap::new(); + for (hostname, connection_info) in SERVER_DATA_MANAGER.read().client_list() { + if !connected_hostnames_lock.contains(hostname) { + for ip in &connection_info.manual_ips { + manual_client_ips.insert(*ip, hostname.clone()); + } + } + } + manual_client_ips + }; + + if !manual_client_ips.is_empty() && try_connect(manual_client_ips).is_ok() { + thread::sleep(RETRY_CONNECT_MIN_INTERVAL); + continue; + } + + let discovery_config = SERVER_DATA_MANAGER + .read() + .settings() + .connection + .client_discovery + .clone(); + if let Switch::Enabled(config) = discovery_config { + let (client_hostname, client_ip) = match welcome_socket.recv_non_blocking() { + Ok(pair) => pair, + Err(e) => { + if let InterruptibleError::Other(e) = e { + warn!("UDP handshake listening error: {e}"); + } + + thread::sleep(RETRY_CONNECT_MIN_INTERVAL); + continue; + } + }; + + let trusted = { + let mut data_manager = SERVER_DATA_MANAGER.write(); + + data_manager.update_client_list( + client_hostname.clone(), + ClientListAction::AddIfMissing { + trusted: false, + manual_ips: vec![], + }, + ); + + if config.auto_trust_clients { + data_manager + .update_client_list(client_hostname.clone(), ClientListAction::Trust); + } + + data_manager + .client_list() + .get(&client_hostname) + .unwrap() + .trusted + }; + + // do not attempt connection if the client is already connected + if trusted && !CONNECTED_CLIENT_HOSTNAMES.lock().contains(&client_hostname) { + if let Err(e) = + try_connect([(client_ip, client_hostname.clone())].into_iter().collect()) + { + error!("Handshake error for {client_hostname}: {e}"); + } + } + } + + thread::sleep(RETRY_CONNECT_MIN_INTERVAL); + } +} + fn try_connect(mut client_ips: HashMap) -> IntResult { let runtime = Runtime::new().map_err(to_int_e!())?; @@ -435,7 +435,7 @@ fn try_connect(mut client_ips: HashMap) -> IntResult { .block_on(proto_socket.send(&client_config)) .map_err(to_int_e!())?; - let (mut control_sender, control_receiver) = proto_socket.split(); + let (mut control_sender, mut control_receiver) = proto_socket.split(); let mut new_openvr_config = contruct_openvr_config(); new_openvr_config.eye_resolution_width = stream_view_resolution.x; @@ -454,6 +454,35 @@ fn try_connect(mut client_ips: HashMap) -> IntResult { crate::notify_restart_driver(); } + runtime + .block_on(control_sender.send(&ServerControlPacket::StartStream)) + .map_err(to_int_e!())?; + + match runtime + .block_on(control_receiver.recv()) + .map_err(to_int_e!()) + { + Ok(ClientControlPacket::StreamReady) => (), + Ok(_) => { + return int_fmt_e!("Got unexpected packet waiting for stream ack"); + } + Err(e) => { + return int_fmt_e!("Error while waiting for stream ack: {e}"); + } + } + + *STATISTICS_MANAGER.lock() = Some(StatisticsManager::new( + settings.connection.statistics_history_size, + Duration::from_secs_f32(1.0 / fps), + if let Switch::Enabled(config) = &settings.headset.controllers { + config.steamvr_pipeline_frames + } else { + 0.0 + }, + )); + + *BITRATE_MANAGER.lock() = BitrateManager::new(settings.video.bitrate.history_size, fps); + CONNECTED_CLIENT_HOSTNAMES .lock() .insert(client_hostname.clone()); @@ -464,13 +493,6 @@ fn try_connect(mut client_ips: HashMap) -> IntResult { runtime.block_on({ let client_hostname = client_hostname.clone(); async move { - // this is a bridge between sync and async, skips the needs for a notifier - let shutdown_detector = async { - while SHOULD_CONNECT_TO_CLIENTS.value() { - time::sleep(Duration::from_secs(1)).await; - } - }; - tokio::select! { res = connection_pipeline( client_hostname, @@ -478,14 +500,11 @@ fn try_connect(mut client_ips: HashMap) -> IntResult { control_sender, control_receiver, streaming_caps.microphone_sample_rate, - fps, ) => { if let Err(e) = res { warn!("Connection interrupted: {e:?}"); } }, - _ = DISCONNECT_CLIENT_NOTIFIER.notified() => (), - _ = shutdown_detector => (), }; } }); @@ -505,69 +524,13 @@ fn try_connect(mut client_ips: HashMap) -> IntResult { Ok(()) } -// close stream on Drop (manual disconnection or execution canceling) -struct StreamCloseGuard { - is_streaming: Arc, - streaming_hostname: String, -} - -impl Drop for StreamCloseGuard { - fn drop(&mut self) { - self.is_streaming.set(false); - - SERVER_DATA_MANAGER.write().update_client_list( - self.streaming_hostname.clone(), - ClientListAction::SetConnectionState(ConnectionState::Disconnected), - ); - - *VIDEO_RECORDING_FILE.lock() = None; - - unsafe { crate::DeinitializeStreaming() }; - - let on_disconnect_script = SERVER_DATA_MANAGER - .read() - .settings() - .connection - .on_disconnect_script - .clone(); - if !on_disconnect_script.is_empty() { - info!("Running on disconnect script (disconnect): {on_disconnect_script}"); - if let Err(e) = Command::new(&on_disconnect_script) - .env("ACTION", "disconnect") - .spawn() - { - warn!("Failed to run disconnect script: {e}"); - } - } - } -} - async fn connection_pipeline( client_hostname: String, client_ip: IpAddr, control_sender: ControlSocketSender, mut control_receiver: ControlSocketReceiver, microphone_sample_rate: u32, - refresh_rate: f32, ) -> StrResult { - let control_sender = Arc::new(Mutex::new(control_sender)); - - control_sender - .lock() - .await - .send(&ServerControlPacket::StartStream) - .await?; - - match control_receiver.recv().await { - Ok(ClientControlPacket::StreamReady) => {} - Ok(_) => { - return fmt_e!("Got unexpected packet waiting for stream ack"); - } - Err(e) => { - return fmt_e!("Error while waiting for stream ack: {e}"); - } - } - let settings = SERVER_DATA_MANAGER.read().settings().clone(); let stream_socket = tokio::select! { @@ -585,50 +548,6 @@ async fn connection_pipeline( }; let stream_socket = Arc::new(stream_socket); - *STATISTICS_MANAGER.lock() = Some(StatisticsManager::new( - settings.connection.statistics_history_size, - Duration::from_secs_f32(1.0 / refresh_rate), - if let Switch::Enabled(config) = &settings.headset.controllers { - config.steamvr_pipeline_frames - } else { - 0.0 - }, - )); - - *BITRATE_MANAGER.lock() = - BitrateManager::new(settings.video.bitrate.history_size, refresh_rate); - - { - let on_connect_script = settings.connection.on_connect_script; - - if !on_connect_script.is_empty() { - info!("Running on connect script (connect): {on_connect_script}"); - if let Err(e) = Command::new(&on_connect_script) - .env("ACTION", "connect") - .spawn() - { - warn!("Failed to run connect script: {e}"); - } - } - } - - if settings.capture.save_video_stream { - crate::create_recording_file(); - } - - unsafe { crate::InitializeStreaming() }; - - let is_streaming = Arc::new(RelaxedAtomic::new(true)); - let _stream_guard = StreamCloseGuard { - is_streaming: Arc::clone(&is_streaming), - streaming_hostname: client_hostname.clone(), - }; - - SERVER_DATA_MANAGER.write().update_client_list( - client_hostname.clone(), - ClientListAction::SetConnectionState(ConnectionState::Streaming), - ); - let game_audio_loop: BoxFuture<_> = if let Switch::Enabled(config) = settings.audio.game_audio { let sender = stream_socket.request_stream(AUDIO).await?; Box::pin(async move { @@ -945,6 +864,8 @@ async fn connection_pipeline( } }; + let control_sender = Arc::new(Mutex::new(control_sender)); + let keepalive_loop = { let control_sender = Arc::clone(&control_sender); async move { @@ -977,125 +898,160 @@ async fn connection_pipeline( } }; - let control_loop = async move { - loop { - match control_receiver.recv().await { - Ok(ClientControlPacket::PlayspaceSync(packet)) => { - if !is_tracking_ref_only { - playspace_sync_sender.send(packet).ok(); - - let data_manager_lock = SERVER_DATA_MANAGER.read(); - let config = &data_manager_lock.settings().headset; - tracking_manager.lock().await.recenter( - config.position_recentering_mode, - config.rotation_recentering_mode, - ); - } - } - Ok(ClientControlPacket::RequestIdr) => { - if let Some(sender) = &*CONTROL_CHANNEL_SENDER.lock() { - if let Some(config) = &*DECODER_CONFIG.lock() { - sender - .send(ServerControlPacket::InitializeDecoder(config.clone())) - .ok(); + let control_loop = { + let client_hostname = client_hostname.clone(); + async move { + loop { + match control_receiver.recv().await { + Ok(ClientControlPacket::PlayspaceSync(packet)) => { + if !is_tracking_ref_only { + playspace_sync_sender.send(packet).ok(); + + let data_manager_lock = SERVER_DATA_MANAGER.read(); + let config = &data_manager_lock.settings().headset; + tracking_manager.lock().await.recenter( + config.position_recentering_mode, + config.rotation_recentering_mode, + ); } } - unsafe { crate::RequestIDR() } - } - Ok(ClientControlPacket::VideoErrorReport) => { - if let Some(stats) = &mut *STATISTICS_MANAGER.lock() { - stats.report_packet_loss(); - } - unsafe { crate::VideoErrorReportReceive() }; - } - Ok(ClientControlPacket::ViewsConfig(config)) => unsafe { - crate::SetViewsConfig(FfiViewsConfig { - fov: [ - FfiFov { - left: config.fov[0].left, - right: config.fov[0].right, - up: config.fov[0].up, - down: config.fov[0].down, - }, - FfiFov { - left: config.fov[1].left, - right: config.fov[1].right, - up: config.fov[1].up, - down: config.fov[1].down, - }, - ], - ipd_m: config.ipd_m, - }); - }, - Ok(ClientControlPacket::Battery(packet)) => unsafe { - crate::SetBattery(packet.device_id, packet.gauge_value, packet.is_plugged); - - if let Some(stats) = &mut *STATISTICS_MANAGER.lock() { - stats.report_battery( - packet.device_id, - packet.gauge_value, - packet.is_plugged, - ); + Ok(ClientControlPacket::RequestIdr) => { + if let Some(sender) = &*CONTROL_CHANNEL_SENDER.lock() { + if let Some(config) = &*DECODER_CONFIG.lock() { + sender + .send(ServerControlPacket::InitializeDecoder(config.clone())) + .ok(); + } + } + unsafe { crate::RequestIDR() } } - }, - Ok(ClientControlPacket::Buttons(entries)) => { - { - let data_manager_lock = SERVER_DATA_MANAGER.read(); - if data_manager_lock.settings().logging.log_button_presses { - alvr_events::send_event(EventType::Buttons( - entries - .iter() - .map(|e| ButtonEvent { - path: BUTTON_PATH_FROM_ID - .get(&e.path_id) - .cloned() - .unwrap_or_else(|| { - format!("Unknown (ID: {:#16x})", e.path_id) - }), - value: e.value, - }) - .collect(), - )); + Ok(ClientControlPacket::VideoErrorReport) => { + if let Some(stats) = &mut *STATISTICS_MANAGER.lock() { + stats.report_packet_loss(); } + unsafe { crate::VideoErrorReportReceive() }; } + Ok(ClientControlPacket::ViewsConfig(config)) => unsafe { + crate::SetViewsConfig(FfiViewsConfig { + fov: [ + FfiFov { + left: config.fov[0].left, + right: config.fov[0].right, + up: config.fov[0].up, + down: config.fov[0].down, + }, + FfiFov { + left: config.fov[1].left, + right: config.fov[1].right, + up: config.fov[1].up, + down: config.fov[1].down, + }, + ], + ipd_m: config.ipd_m, + }); + }, + Ok(ClientControlPacket::Battery(packet)) => unsafe { + crate::SetBattery(packet.device_id, packet.gauge_value, packet.is_plugged); + + if let Some(stats) = &mut *STATISTICS_MANAGER.lock() { + stats.report_battery( + packet.device_id, + packet.gauge_value, + packet.is_plugged, + ); + } + }, + Ok(ClientControlPacket::Buttons(entries)) => { + { + let data_manager_lock = SERVER_DATA_MANAGER.read(); + if data_manager_lock.settings().logging.log_button_presses { + alvr_events::send_event(EventType::Buttons( + entries + .iter() + .map(|e| ButtonEvent { + path: BUTTON_PATH_FROM_ID + .get(&e.path_id) + .cloned() + .unwrap_or_else(|| { + format!("Unknown (ID: {:#16x})", e.path_id) + }), + value: e.value, + }) + .collect(), + )); + } + } - for entry in entries { - let value = match entry.value { - ButtonValue::Binary(value) => FfiButtonValue { - type_: crate::FfiButtonType_BUTTON_TYPE_BINARY, - __bindgen_anon_1: crate::FfiButtonValue__bindgen_ty_1 { - binary: value.into(), + for entry in entries { + let value = match entry.value { + ButtonValue::Binary(value) => FfiButtonValue { + type_: crate::FfiButtonType_BUTTON_TYPE_BINARY, + __bindgen_anon_1: crate::FfiButtonValue__bindgen_ty_1 { + binary: value.into(), + }, }, - }, - ButtonValue::Scalar(value) => FfiButtonValue { - type_: crate::FfiButtonType_BUTTON_TYPE_SCALAR, - __bindgen_anon_1: crate::FfiButtonValue__bindgen_ty_1 { - scalar: value, + ButtonValue::Scalar(value) => FfiButtonValue { + type_: crate::FfiButtonType_BUTTON_TYPE_SCALAR, + __bindgen_anon_1: crate::FfiButtonValue__bindgen_ty_1 { + scalar: value, + }, }, - }, - }; + }; - unsafe { crate::SetButton(entry.path_id, value) }; + unsafe { crate::SetButton(entry.path_id, value) }; + } + } + Ok(ClientControlPacket::Log { level, message }) => { + info!("Client {client_hostname}: [{level:?}] {message}") + } + Ok(_) => (), + Err(e) => { + info!("Client disconnected. Cause: {e}"); + break; } - } - Ok(ClientControlPacket::Log { level, message }) => { - info!("Client {client_hostname}: [{level:?}] {message}") - } - Ok(_) => (), - Err(e) => { - info!("Client disconnected. Cause: {e}"); - break; } } - } - Ok(()) + Ok(()) + } }; let receive_loop = async move { stream_socket.receive_loop().await }; - tokio::select! { + { + let on_connect_script = settings.connection.on_connect_script; + + if !on_connect_script.is_empty() { + info!("Running on connect script (connect): {on_connect_script}"); + if let Err(e) = Command::new(&on_connect_script) + .env("ACTION", "connect") + .spawn() + { + warn!("Failed to run connect script: {e}"); + } + } + } + + if settings.capture.save_video_stream { + crate::create_recording_file(); + } + + unsafe { crate::InitializeStreaming() }; + + SERVER_DATA_MANAGER.write().update_client_list( + client_hostname.clone(), + ClientListAction::SetConnectionState(ConnectionState::Streaming), + ); + + // this is a bridge between sync and async, skips the needs for a notifier + let shutdown_detector = async { + while SHOULD_CONNECT_TO_CLIENTS.value() { + time::sleep(Duration::from_secs(1)).await; + } + }; + + let res = tokio::select! { // Spawn new tasks and let the runtime manage threading res = spawn_cancelable(receive_loop) => { if let Err(e) = res { @@ -1126,15 +1082,35 @@ async fn connection_pipeline( Ok(()) } - _ = SHUTDOWN_NOTIFIER.notified() => { - control_sender - .lock() - .await - .send(&ServerControlPacket::Restarting) - .await - .ok(); + _ = SHUTDOWN_NOTIFIER.notified() => Ok(()), + _ = DISCONNECT_CLIENT_NOTIFIER.notified() => Ok(()), + _ = shutdown_detector => Ok(()), + }; - Ok(()) + SERVER_DATA_MANAGER.write().update_client_list( + client_hostname.clone(), + ClientListAction::SetConnectionState(ConnectionState::Disconnected), + ); + + *VIDEO_RECORDING_FILE.lock() = None; + + unsafe { crate::DeinitializeStreaming() }; + + let on_disconnect_script = SERVER_DATA_MANAGER + .read() + .settings() + .connection + .on_disconnect_script + .clone(); + if !on_disconnect_script.is_empty() { + info!("Running on disconnect script (disconnect): {on_disconnect_script}"); + if let Err(e) = Command::new(&on_disconnect_script) + .env("ACTION", "disconnect") + .spawn() + { + warn!("Failed to run disconnect script: {e}"); } } + + res }