Skip to content

Commit

Permalink
Finish ControlSocket sync conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
zmerp committed Aug 8, 2023
1 parent 25c5a9e commit 803ffb5
Show file tree
Hide file tree
Showing 7 changed files with 507 additions and 292 deletions.
124 changes: 54 additions & 70 deletions alvr/client_core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ fn connection_pipeline(
let (mut proto_control_socket, server_ip) = {
let config = Config::load();
let announcer_socket = AnnouncerSocket::new(&config.hostname).to_con()?;
let listener_socket = alvr_sockets::get_server_listener(&runtime).to_con()?;
let listener_socket = alvr_sockets::get_server_listener(Duration::from_secs(1)).to_con()?;

loop {
if !IS_ALIVE.value() {
Expand All @@ -145,7 +145,6 @@ fn connection_pipeline(
}

if let Ok(pair) = ProtoControlSocket::connect_to(
&runtime,
DISCOVERY_RETRY_PAUSE,
PeerType::Server(&listener_socket),
) {
Expand All @@ -171,22 +170,18 @@ fn connection_pipeline(
.unwrap();

proto_control_socket
.send(
&runtime,
&ClientConnectionResult::ConnectionAccepted {
client_protocol_id: alvr_common::protocol_id(),
display_name: platform::device_model(),
server_ip,
streaming_capabilities: Some(VideoStreamingCapabilities {
default_view_resolution: recommended_view_resolution,
supported_refresh_rates,
microphone_sample_rate,
}),
},
)
.send(&ClientConnectionResult::ConnectionAccepted {
client_protocol_id: alvr_common::protocol_id(),
display_name: platform::device_model(),
server_ip,
streaming_capabilities: Some(VideoStreamingCapabilities {
default_view_resolution: recommended_view_resolution,
supported_refresh_rates,
microphone_sample_rate,
}),
})
.to_con()?;
let config_packet =
proto_control_socket.recv::<StreamConfigPacket>(&runtime, Duration::from_secs(1))?;
let config_packet = proto_control_socket.recv::<StreamConfigPacket>()?;

let settings = {
let mut session_desc = SessionConfig::default();
Expand Down Expand Up @@ -228,9 +223,11 @@ fn connection_pipeline(
},
));

let (mut control_sender, mut control_receiver) = proto_control_socket.split();
let (mut control_sender, mut control_receiver) = proto_control_socket
.split(Duration::from_millis(500))
.to_con()?;

match control_receiver.recv(&runtime, Duration::from_secs(1)) {
match control_receiver.recv() {
Ok(ServerControlPacket::StartStream) => {
info!("Stream starting");
set_hud_message(STREAM_STARTING_MESSAGE);
Expand Down Expand Up @@ -261,7 +258,7 @@ fn connection_pipeline(
)
.to_con()?;

if let Err(e) = control_sender.send(&runtime, &ClientControlPacket::StreamReady) {
if let Err(e) = control_sender.send(&ClientControlPacket::StreamReady) {
info!("Server disconnected. Cause: {e}");
set_hud_message(SERVER_DISCONNECTED_MESSAGE);
return Ok(());
Expand Down Expand Up @@ -416,41 +413,30 @@ fn connection_pipeline(

while IS_STREAMING.value() && IS_RESUMED.value() && IS_ALIVE.value() {
if let Ok(packet) = control_channel_receiver.recv_timeout(Duration::from_millis(500)) {
if let Some(runtime) = &*CONNECTION_RUNTIME.read() {
if let Err(e) = control_sender.send(runtime, &packet) {
info!("Server disconnected. Cause: {e}");
set_hud_message(SERVER_DISCONNECTED_MESSAGE);
if let Err(e) = control_sender.send(&packet) {
info!("Server disconnected. Cause: {e}");
set_hud_message(SERVER_DISCONNECTED_MESSAGE);

break;
}
break;
}
}

if Instant::now() > keepalive_deadline {
if let Some(runtime) = &*CONNECTION_RUNTIME.read() {
control_sender
.send(runtime, &ClientControlPacket::KeepAlive)
.ok();
control_sender.send(&ClientControlPacket::KeepAlive).ok();

keepalive_deadline = Instant::now() + KEEPALIVE_INTERVAL;
}
keepalive_deadline = Instant::now() + KEEPALIVE_INTERVAL;
}

#[cfg(target_os = "android")]
if Instant::now() > battery_deadline {
if let Some(runtime) = &*CONNECTION_RUNTIME.read() {
let (gauge_value, is_plugged) = battery_manager.status();
control_sender
.send(
runtime,
&ClientControlPacket::Battery(crate::BatteryPacket {
device_id: *alvr_common::HEAD_ID,
gauge_value,
is_plugged,
}),
)
.ok();
}
let (gauge_value, is_plugged) = battery_manager.status();
control_sender
.send(&ClientControlPacket::Battery(crate::BatteryPacket {
device_id: *alvr_common::HEAD_ID,
gauge_value,
is_plugged,
}))
.ok();

battery_deadline = Instant::now() + Duration::from_secs(5);
}
Expand All @@ -461,36 +447,34 @@ fn connection_pipeline(
}
});

let control_receive_thread = thread::spawn(move || loop {
let maybe_packet = if let Some(runtime) = &*CONNECTION_RUNTIME.read() {
control_receiver.recv(runtime, Duration::from_millis(500))
} else {
return;
};
let control_receive_thread = thread::spawn(move || {
while IS_STREAMING.value() {
let maybe_packet = control_receiver.recv();

match maybe_packet {
Ok(ServerControlPacket::InitializeDecoder(config)) => {
decoder::create_decoder(config);
}
Ok(ServerControlPacket::Restarting) => {
info!("{SERVER_RESTART_MESSAGE}");
set_hud_message(SERVER_RESTART_MESSAGE);
if let Some(notifier) = &*DISCONNECT_SERVER_NOTIFIER.lock() {
notifier.send(()).ok();
match maybe_packet {
Ok(ServerControlPacket::InitializeDecoder(config)) => {
decoder::create_decoder(config);
}
Ok(ServerControlPacket::Restarting) => {
info!("{SERVER_RESTART_MESSAGE}");
set_hud_message(SERVER_RESTART_MESSAGE);
if let Some(notifier) = &*DISCONNECT_SERVER_NOTIFIER.lock() {
notifier.send(()).ok();
}

return;
}
Ok(_) => (),
Err(ConnectionError::TryAgain) => (),
Err(e) => {
info!("{SERVER_DISCONNECTED_MESSAGE} Cause: {e}");
set_hud_message(SERVER_DISCONNECTED_MESSAGE);
if let Some(notifier) = &*DISCONNECT_SERVER_NOTIFIER.lock() {
notifier.send(()).ok();
return;
}
Ok(_) => (),
Err(ConnectionError::TryAgain) => (),
Err(e) => {
info!("{SERVER_DISCONNECTED_MESSAGE} Cause: {e}");
set_hud_message(SERVER_DISCONNECTED_MESSAGE);
if let Some(notifier) = &*DISCONNECT_SERVER_NOTIFIER.lock() {
notifier.send(()).ok();
}

return;
return;
}
}
}
});
Expand Down
Loading

0 comments on commit 803ffb5

Please sign in to comment.