From f06eb904abc7e1223b27d808877ff752a2e8e06a Mon Sep 17 00:00:00 2001 From: mvlabat Date: Sat, 3 Apr 2021 23:21:57 +0300 Subject: [PATCH] WIP --- libs/client_lib/src/net.rs | 171 +++--- libs/server_lib/src/lib.rs | 1 + libs/server_lib/src/net.rs | 520 ++++++++++++------- libs/shared_lib/src/game/client_factories.rs | 7 + libs/shared_lib/src/game/commands.rs | 1 + libs/shared_lib/src/game/components.rs | 52 +- libs/shared_lib/src/game/spawn.rs | 96 +++- libs/shared_lib/src/lib.rs | 7 +- libs/shared_lib/src/messages.rs | 7 + libs/shared_lib/src/net.rs | 33 +- libs/shared_lib/src/player.rs | 10 - 11 files changed, 625 insertions(+), 280 deletions(-) diff --git a/libs/client_lib/src/net.rs b/libs/client_lib/src/net.rs index b7a454cf..5a78c825 100644 --- a/libs/client_lib/src/net.rs +++ b/libs/client_lib/src/net.rs @@ -10,12 +10,12 @@ use mr_shared_lib::{ components::PlayerDirection, }, messages::{ - ConnectedPlayer, DeltaUpdate, DisconnectedPlayer, PlayerInput, PlayerNetId, PlayerUpdate, - ReliableClientMessage, ReliableServerMessage, StartGame, UnreliableClientMessage, - UnreliableServerMessage, + ConnectedPlayer, DeltaUpdate, DisconnectedPlayer, Message, PlayerInput, PlayerNetId, + PlayerUpdate, ReliableClientMessage, ReliableServerMessage, StartGame, + UnreliableClientMessage, UnreliableServerMessage, }, - net::{AcknowledgeError, ConnectionState}, - player::{Player, PlayerConnectionState, PlayerDirectionUpdate, PlayerUpdates}, + net::{AcknowledgeError, ConnectionState, ConnectionStatus, SessionId}, + player::{Player, PlayerDirectionUpdate, PlayerUpdates}, registry::EntityRegistry, GameTime, SimulationTime, COMPONENT_FRAMEBUFFER_LIMIT, SIMULATIONS_PER_SECOND, }; @@ -45,6 +45,7 @@ pub struct NetworkReader { pub struct UpdateParams<'a> { simulation_time: ResMut<'a, SimulationTime>, game_time: ResMut<'a, GameTime>, + player_entities: Res<'a, EntityRegistry>, estimated_server_time: ResMut<'a, EstimatedServerTime>, target_frames_ahead: ResMut<'a, TargetFramesAhead>, player_delay: ResMut<'a, PlayerDelay>, @@ -74,10 +75,14 @@ pub fn process_network_events( match event { NetworkEvent::Connected(handle) => { log::info!("Connected: {}", handle); - if let Err(err) = network_params - .net - .send_message(*handle, ReliableClientMessage::Handshake) - { + if let Err(err) = network_params.net.send_message( + *handle, + Message { + // The server is expected to accept any session id for this message. + session_id: SessionId::new(0), + message: ReliableClientMessage::Handshake, + }, + ) { log::error!("Failed to send a Handshake message: {:?}", err); } update_params.initial_rtt.sent_at = Some(Instant::now()); @@ -92,12 +97,25 @@ pub fn process_network_events( for (handle, connection) in network_params.net.connections.iter_mut() { let channels = connection.channels().unwrap(); - while let Some(message) = channels.recv::() { + while let Some(message) = channels.recv::>() { log::trace!( "UnreliableServerMessage received on [{}]: {:?}", handle, message ); + let Message { + message, + session_id, + } = message; + + if session_id != network_params.connection_state.session_id { + log::warn!( + "Ignoring a server message: sent session id {} doesn't match {}", + session_id, + network_params.connection_state.session_id + ); + continue; + } match message { UnreliableServerMessage::DeltaUpdate(update) => { @@ -146,19 +164,35 @@ pub fn process_network_events( } } - while let Some(message) = channels.recv::() { + while let Some(message) = channels.recv::>() { log::trace!( "ReliableServerMessage received on [{}]: {:?}", handle, message ); + let Message { + message, + session_id, + } = message; + + let ignore_session_id_check = matches!(message, ReliableServerMessage::StartGame(_)); + if session_id != network_params.connection_state.session_id && !ignore_session_id_check + { + log::warn!( + "Ignoring a server message: sent session id {} doesn't match {}", + session_id, + network_params.connection_state.session_id + ); + continue; + } match message { ReliableServerMessage::StartGame(start_game) => { - if current_player_net_id.0 == Some(start_game.net_id) { - continue; - } - log::info!("Starting the game"); + network_params.connection_state.session_id = session_id; + log::info!( + "Starting the game (update frame: {})", + start_game.game_state.frame_number + ); process_start_game_message( start_game, &mut network_params.connection_state, @@ -171,11 +205,7 @@ pub fn process_network_events( process_connected_player_message(connected_player, &mut players); } ReliableServerMessage::DisconnectedPlayer(disconnected_player) => { - process_disconnected_player_message( - disconnected_player, - &mut players, - &mut update_params, - ); + process_disconnected_player_message(disconnected_player); } ReliableServerMessage::SpawnLevelObject(spawn_level_object) => { update_params @@ -196,8 +226,17 @@ pub fn process_network_events( } } - while channels.recv::().is_some() { - log::error!("Unexpected ClientMessage received on [{}]", handle); + while channels + .recv::>() + .is_some() + { + log::error!( + "Unexpected UnreliableClientMessage received on [{}]", + handle + ); + } + while channels.recv::>().is_some() { + log::error!("Unexpected ReliableClientMessage received on [{}]", handle); } } } @@ -255,13 +294,17 @@ pub fn send_network_updates( } } + let message = UnreliableClientMessage::PlayerUpdate(PlayerUpdate { + frame_number: time.frame_number, + acknowledgments: network_params.connection_state.incoming_acknowledgments(), + inputs, + }); let result = network_params.net.send_message( connection_handle, - UnreliableClientMessage::PlayerUpdate(PlayerUpdate { - frame_number: time.frame_number, - acknowledgments: network_params.connection_state.incoming_acknowledgments(), - inputs, - }), + Message { + session_id: network_params.connection_state.session_id, + message, + }, ); if let Err(err) = result { log::error!("Failed to send a message to {:?}: {:?}", address, err); @@ -325,19 +368,44 @@ fn process_delta_update_message( update_params.player_delay.frame_count = player_delay / 2; } + // Despawning players that aren't mentioned in the delta update. + let players_to_remove: Vec = players + .keys() + .copied() + .filter(|player_net_id| { + !delta_update + .players + .iter() + .any(|player| player.net_id == *player_net_id) + }) + .collect(); + + for player_net_id in players_to_remove { + players.remove(&player_net_id); + update_params.despawn_player_commands.push(DespawnPlayer { + net_id: player_net_id, + frame_number: delta_update.frame_number, + }); + } + for player_state in delta_update.players { - players.entry(player_state.net_id).or_insert_with(|| { + if update_params + .player_entities + .get_entity(player_state.net_id) + .is_none() + { log::info!("First update with the new player {}", player_state.net_id.0); update_params.spawn_player_commands.push(SpawnPlayer { net_id: player_state.net_id, start_position: player_state.position, is_player_frame_simulated: false, }); - Player { - nickname: "?".to_owned(), - state: PlayerConnectionState::Connecting, - } - }); + players + .entry(player_state.net_id) + .or_insert_with(|| Player { + nickname: "?".to_owned(), + }); + } let player_frames_ahead = if current_player_net_id == Some(player_state.net_id) { update_params.target_frames_ahead.frames_count @@ -397,8 +465,7 @@ fn process_delta_update_message( } // There's no need to rewind if we haven't started the game. - // TODO: deduce whether we started the game or not from some other state. - if current_player_net_id.is_some() { + if let ConnectionStatus::Connected = connection_state.status { update_params .simulation_time .rewind(rewind_to_simulation_frame); @@ -422,7 +489,6 @@ fn process_start_game_message( start_game.net_id, Player { nickname: start_game.nickname, - state: PlayerConnectionState::Playing, }, ); update_params.game_time.generation += 1; @@ -442,6 +508,10 @@ fn process_start_game_message( start_game.game_state.frame_number + half_rtt_frames; update_params.estimated_server_time.updated_at = update_params.game_time.frame_number; + log::debug!( + "Spawning the current player ({})", + current_player_net_id.0.unwrap().0 + ); update_params.spawn_player_commands.push(SpawnPlayer { net_id: start_game.net_id, start_position, @@ -452,13 +522,16 @@ fn process_start_game_message( } for player in start_game.players { + if player.net_id == current_player_net_id.0.unwrap() { + continue; + } + if let Some(start_position) = player_start_position(player.net_id, &start_game.game_state) { log::info!("Spawning player {}: {}", player.net_id.0, player.nickname); players.insert( player.net_id, Player { nickname: player.nickname, - state: PlayerConnectionState::Playing, }, ); update_params.spawn_player_commands.push(SpawnPlayer { @@ -494,31 +567,13 @@ fn process_connected_player_message( connected_player.net_id, Player { nickname: connected_player.nickname, - state: PlayerConnectionState::Playing, }, ); } -fn process_disconnected_player_message( - disconnected_player: DisconnectedPlayer, - players: &mut HashMap, - update_params: &mut UpdateParams, -) { - if let Some(player) = players.remove(&disconnected_player.net_id) { - log::info!( - "A player ({}) disconnected: {}", - disconnected_player.net_id.0, - player.nickname - ); - update_params.despawn_player_commands.push(DespawnPlayer { - net_id: disconnected_player.net_id, - }); - } else { - log::error!( - "Unknown player with net id {}", - disconnected_player.net_id.0 - ); - } +fn process_disconnected_player_message(disconnected_player: DisconnectedPlayer) { + // We actually remove players if there's no mention of them in a DeltaUpdate message. + log::info!("A player ({}) disconnected", disconnected_player.net_id.0); } fn player_start_position(player_net_id: PlayerNetId, delta_update: &DeltaUpdate) -> Option { diff --git a/libs/server_lib/src/lib.rs b/libs/server_lib/src/lib.rs index dbaeb1e4..b46a4bb2 100644 --- a/libs/server_lib/src/lib.rs +++ b/libs/server_lib/src/lib.rs @@ -64,6 +64,7 @@ impl Plugin for MuddleServerPlugin { resources.get_or_insert_with(PlayerNetId::default); resources.get_or_insert_with(NetworkReader::default); resources.get_or_insert_with(PlayerConnections::default); + resources.get_or_insert_with(Vec::<(PlayerNetId, u32)>::default); resources.get_or_insert_with(HashMap::::default); resources.get_or_insert_with(DeferredUpdates::::default); } diff --git a/libs/server_lib/src/net.rs b/libs/server_lib/src/net.rs index d8e17300..7c18a348 100644 --- a/libs/server_lib/src/net.rs +++ b/libs/server_lib/src/net.rs @@ -4,15 +4,16 @@ use bevy_networking_turbulence::{NetworkEvent, NetworkResource}; use mr_shared_lib::{ game::{ commands::{DespawnPlayer, GameCommands, SpawnLevelObject, SpawnPlayer}, - components::{PlayerDirection, Position}, + components::{PlayerDirection, Position, Spawned}, level::LevelState, }, messages::{ - ConnectedPlayer, DeltaUpdate, PlayerInput, PlayerNetId, PlayerState, ReliableServerMessage, - StartGame, UnreliableClientMessage, UnreliableServerMessage, + ConnectedPlayer, DeltaUpdate, Message, PlayerInput, PlayerNetId, PlayerState, + ReliableClientMessage, ReliableServerMessage, StartGame, UnreliableClientMessage, + UnreliableServerMessage, }, - net::ConnectionState, - player::{random_name, Player, PlayerConnectionState}, + net::{ConnectionState, ConnectionStatus, SessionId}, + player::{random_name, Player}, registry::{EntityRegistry, Registry}, GameTime, }; @@ -49,6 +50,7 @@ pub struct NetworkParams<'a> { net: ResMut<'a, NetworkResource>, connection_states: ResMut<'a, HashMap>, player_connections: ResMut<'a, PlayerConnections>, + new_player_connections: ResMut<'a, Vec<(PlayerNetId, u32)>>, } pub fn process_network_events( @@ -63,42 +65,45 @@ pub fn process_network_events( match event { NetworkEvent::Connected(handle) => { log::info!("New connection: {}", handle); - let player_net_id = network_params.player_connections.register(*handle); - network_params.connection_states.entry(*handle).or_default(); - let nickname = random_name(); - players.insert( - player_net_id, - Player { - nickname, - state: PlayerConnectionState::Connecting, - }, - ); - update_params.spawn_player_commands.push(SpawnPlayer { - net_id: player_net_id, - start_position: Vec2::zero(), - is_player_frame_simulated: false, - }); - // Add an initial update to have something to extrapolate from. - update_params.deferred_player_updates.push( - player_net_id, - PlayerInput { - frame_number: time.frame_number, - direction: Vec2::zero(), - }, - ); + if network_params.player_connections.get_id(*handle).is_none() { + network_params.player_connections.register(*handle); + } + let mut connection_state = + network_params.connection_states.entry(*handle).or_default(); + + if matches!( + connection_state.status, + ConnectionStatus::Connected | ConnectionStatus::Disconnecting + ) { + log::warn!("Received a Connected event from a connection that is already connected (or being disconnected). That probably means that the clean-up wasn't properly finished"); + } + match connection_state.status { + ConnectionStatus::Disconnecting | ConnectionStatus::Disconnected => { + connection_state.status = ConnectionStatus::Uninitialized; + connection_state.session_id += SessionId::new(1); + } + _ => {} + }; } NetworkEvent::Disconnected(handle) => { log::info!("Disconnected: {}", handle); - network_params.connection_states.remove(&handle); - if let Some(player_net_id) = - network_params.player_connections.remove_by_value(*handle) - { - update_params.despawn_player_commands.push(DespawnPlayer { - net_id: player_net_id, - }); - players.remove(&player_net_id); + let mut connection_state = network_params + .connection_states + .get_mut(&handle) + .expect("Expected a connection when receiving a Disconnect event"); + if matches!( + connection_state.status, + ConnectionStatus::Disconnecting | ConnectionStatus::Disconnected + ) { + log::info!("Received a Disconnected event for a player that's already disconnected, skipped"); + continue; + } + connection_state.status = ConnectionStatus::Disconnecting; + + if let Some(player_net_id) = network_params.player_connections.get_id(*handle) { update_params.despawn_player_commands.push(DespawnPlayer { net_id: player_net_id, + frame_number: time.frame_number, }); } else { log::error!("A disconnected player wasn't in the connections list"); @@ -110,9 +115,72 @@ pub fn process_network_events( for (handle, connection) in network_params.net.connections.iter_mut() { let channels = connection.channels().unwrap(); - while let Some(client_message) = channels.recv::() { + + while let Some(client_message) = channels.recv::>() { log::trace!( - "ClientMessage received on [{}]: {:?}", + "ReliableClientMessage received on [{}]: {:?}", + handle, + client_message + ); + + match client_message.message { + // NOTE: before adding new messages, make sure to ignore them if connection status + // is not `Connected`. + ReliableClientMessage::Handshake => { + log::info!("Player handshake: {}", handle); + let player_net_id = network_params + .player_connections + .get_id(*handle) + // At the moment of writing this we never removed player connections. + .expect("Expected a registered player id for a connection"); + + let connection_state = + network_params.connection_states.entry(*handle).or_default(); + + if matches!( + connection_state.status, + ConnectionStatus::Connected | ConnectionStatus::Disconnecting + ) { + log::warn!("Received a Connected event from a connection that is already connected (or being disconnected). That probably means that the clean-up wasn't properly finished"); + } + match connection_state.status { + ConnectionStatus::Disconnecting + | ConnectionStatus::Disconnected + | ConnectionStatus::Connected + | ConnectionStatus::Handshaking => { + connection_state.session_id += SessionId::new(1); + } + ConnectionStatus::Uninitialized => {} + }; + + connection_state.status = ConnectionStatus::Handshaking; + + network_params + .new_player_connections + .push((player_net_id, *handle)); + + let nickname = random_name(); + players.insert(player_net_id, Player { nickname }); + update_params.spawn_player_commands.push(SpawnPlayer { + net_id: player_net_id, + start_position: Vec2::zero(), + is_player_frame_simulated: false, + }); + // Add an initial update to have something to extrapolate from. + update_params.deferred_player_updates.push( + player_net_id, + PlayerInput { + frame_number: time.frame_number, + direction: Vec2::zero(), + }, + ); + } + } + } + + while let Some(client_message) = channels.recv::>() { + log::trace!( + "UnreliableClientMessage received on [{}]: {:?}", handle, client_message ); @@ -128,6 +196,26 @@ pub fn process_network_events( } }; + if !matches!(connection_state.status, ConnectionStatus::Connected) { + log::warn!( + "Ignoring a message for a player ({}): expected connection status is {:?}, but it's {:?}", + player_net_id.0, + ConnectionStatus::Connected, + connection_state.status + ); + } + + if client_message.session_id != connection_state.session_id { + log::warn!( + "Ignoring a message for a player ({}): sent session id {} doesn't match {}", + player_net_id.0, + client_message.session_id, + connection_state.session_id + ); + continue; + } + let client_message = client_message.message; + match client_message { UnreliableClientMessage::PlayerUpdate(update) => { if let Err(err) = connection_state.acknowledge_incoming(update.frame_number) { @@ -158,10 +246,13 @@ pub fn process_network_events( } } - while channels.recv::().is_some() { + while channels.recv::>().is_some() { log::error!("Unexpected ReliableServerMessage received on [{}]", handle); } - while channels.recv::().is_some() { + while channels + .recv::>() + .is_some() + { log::error!( "Unexpected UnreliableServerMessage received on [{}]", handle @@ -174,154 +265,236 @@ pub fn send_network_updates( mut network_params: NetworkParams, time: Res, level_state: Res, - mut players: ResMut>, - player_entities: Query<(Entity, &Position, &PlayerDirection)>, + players: Res>, + player_entities: Query<(Entity, &Position, &PlayerDirection, &Spawned)>, players_registry: Res>, ) { - for (&connection_player_net_id, &connection_handle) in network_params.player_connections.iter() + for (&_connection_player_net_id, &connection_handle) in network_params.player_connections.iter() { let connection_state = network_params .connection_states .get_mut(&connection_handle) .expect("Expected a connection state for a connected player"); - // Broadcasting delta updates. - let player = players.get(&connection_player_net_id).unwrap(); - // Checks that a player hasn't just connected. - if let PlayerConnectionState::Playing = player.state { - if let Err(err) = network_params.net.send_message( - connection_handle, - UnreliableServerMessage::DeltaUpdate(DeltaUpdate { - frame_number: time.frame_number, - acknowledgments: connection_state.incoming_acknowledgments(), - players: players - .iter() - .map(|(&player_net_id, _player)| { - let entity = - players_registry - .get_entity(player_net_id) - .unwrap_or_else(|| { - panic!( - "Player entity ({:?}) is not registered", - player_net_id - ) - }); - create_player_state( - player_net_id, - &time, - connection_state, - entity, - &player_entities, - ) - }) - .collect(), - confirmed_actions: vec![], - }), - ) { - log::error!("Failed to send a message: {:?}", err); - } + broadcast_delta_update_messages( + &mut network_params.net, + &time, + &players, + &player_entities, + &players_registry, + connection_handle, + connection_state, + ); + + broadcast_new_player_messages( + &mut network_params.net, + &network_params.new_player_connections, + &players, + connection_handle, + connection_state, + ) + } + + broadcast_start_game_messages( + &mut network_params, + &time, + &level_state, + &players, + &player_entities, + &players_registry, + ); +} + +fn broadcast_delta_update_messages( + net: &mut NetworkResource, + time: &GameTime, + players: &HashMap, + player_entities: &Query<(Entity, &Position, &PlayerDirection, &Spawned)>, + players_registry: &EntityRegistry, + connection_handle: u32, + connection_state: &mut ConnectionState, +) { + // Checks that a player that we broadcast the message to hasn't just connected. + if !matches!(connection_state.status, ConnectionStatus::Connected) { + return; + } + + let message = UnreliableServerMessage::DeltaUpdate(DeltaUpdate { + frame_number: time.frame_number, + acknowledgments: connection_state.incoming_acknowledgments(), + players: players + .iter() + .filter_map(|(&player_net_id, _player)| { + let entity = players_registry + .get_entity(player_net_id) + .unwrap_or_else(|| { + panic!("Player entity ({:?}) is not registered", player_net_id) + }); + create_player_state( + player_net_id, + &time, + connection_state, + entity, + &player_entities, + ) + }) + .collect(), + confirmed_actions: vec![], + }); + + if let Err(err) = net.send_message( + connection_handle, + Message { + session_id: connection_state.session_id, + message, + }, + ) { + log::error!("Failed to send a message: {:?}", err); + } - connection_state.add_outcoming_packet(time.frame_number, Instant::now()); + connection_state.add_outcoming_packet(time.frame_number, Instant::now()); +} + +fn broadcast_new_player_messages( + net: &mut NetworkResource, + new_player_connections: &[(PlayerNetId, u32)], + players: &HashMap, + connection_handle: u32, + connection_state: &mut ConnectionState, +) { + // Broadcasting updates about new connected players. + for (connected_player_net_id, _connection_handle) in new_player_connections.iter() { + let player = players + .get(&connected_player_net_id) + .expect("Expected a registered Player"); + let message = ReliableServerMessage::ConnectedPlayer(ConnectedPlayer { + net_id: *connected_player_net_id, + nickname: player.nickname.clone(), + }); + + if let Err(err) = net.send_message( + connection_handle, + Message { + session_id: connection_state.session_id, + message, + }, + ) { + log::error!("Failed to send a message: {:?}", err); } + } +} - // Broadcasting updates about new connected players. - for (&connected_player_net_id, player) in players.iter() { - // Checks that a player hasn't just connected. - if let PlayerConnectionState::Playing = player.state { - continue; - } +fn broadcast_start_game_messages( + network_params: &mut NetworkParams, + time: &GameTime, + level_state: &LevelState, + players: &HashMap, + player_entities: &Query<(Entity, &Position, &PlayerDirection, &Spawned)>, + players_registry: &EntityRegistry, +) { + // Broadcasting updates about new connected players. + for (connected_player_net_id, connected_player_connection_handle) in + network_params.new_player_connections.drain(..) + { + let connection_state = network_params + .connection_states + .get_mut(&connected_player_connection_handle) + .expect("Expected a ConnectionState for a new player"); + let connected_player = players + .get(&connected_player_net_id) + .expect("Expected a new Player to exist"); - // If a player has just connected, we need to send `StartGame` message to the connected - // player and broadcast `ConnectedPlayer` to everyone else. + assert!(matches!( + connection_state.status, + ConnectionStatus::Handshaking + )); - // TODO: prepare the update in another system. - let mut players_state = players + // TODO: prepare the update in another system. + let mut players_state: Vec = players + .iter() + .filter_map(|(&iter_player_net_id, _player)| { + let entity = players_registry + .get_entity(iter_player_net_id) + .unwrap_or_else(|| { + panic!("Player entity ({:?}) is not registered", iter_player_net_id) + }); + if connected_player_net_id == iter_player_net_id { + Some(PlayerState { + net_id: connected_player_net_id, + position: Vec2::zero(), + inputs: Vec::new(), + }) + } else { + create_player_state( + iter_player_net_id, + &time, + connection_state, + entity, + &player_entities, + ) + } + }) + .collect(); + players_state.push(PlayerState { + net_id: connected_player_net_id, + position: Vec2::zero(), + inputs: Vec::new(), + }); + + let message = ReliableServerMessage::StartGame(StartGame { + net_id: connected_player_net_id, + nickname: connected_player.nickname.clone(), + objects: level_state + .objects .iter() - .map(|(&iter_player_net_id, _player)| { - let entity = players_registry - .get_entity(iter_player_net_id) - .unwrap_or_else(|| { - panic!("Player entity ({:?}) is not registered", iter_player_net_id) - }); - if connected_player_net_id == iter_player_net_id { - PlayerState { - net_id: connection_player_net_id, - position: Vec2::zero(), - inputs: Vec::new(), - } - } else { - create_player_state( - iter_player_net_id, - &time, - connection_state, - entity, - &player_entities, - ) - } + .map(|level_object| SpawnLevelObject { + object: level_object.clone(), + frame_number: time.frame_number, }) - .collect::>(); - players_state.push(PlayerState { - net_id: connection_player_net_id, - position: Vec2::zero(), - inputs: Vec::new(), - }); - - let result = network_params.net.send_message( - connection_handle, - ReliableServerMessage::StartGame(StartGame { - net_id: connection_player_net_id, + .collect(), + players: players + .iter() + .map(|(&net_id, player)| ConnectedPlayer { + net_id, nickname: player.nickname.clone(), - objects: level_state - .objects - .iter() - .map(|level_object| SpawnLevelObject { - object: level_object.clone(), - frame_number: time.frame_number, - }) - .collect(), - players: players - .iter() - .map(|(&net_id, player)| ConnectedPlayer { - net_id, - nickname: player.nickname.clone(), - }) - .collect(), - game_state: DeltaUpdate { - frame_number: time.frame_number, - acknowledgments: connection_state.incoming_acknowledgments(), - players: players_state, - confirmed_actions: Vec::new(), - }, - }), - ); - if let Err(err) = result { - log::error!("Failed to send a message: {:?}", err); - } + }) + .collect(), + game_state: DeltaUpdate { + frame_number: time.frame_number, + acknowledgments: connection_state.incoming_acknowledgments(), + players: players_state, + confirmed_actions: Vec::new(), + }, + }); - broadcast_message_to_others( - &mut network_params.net, - &network_params.player_connections, - connection_handle, - &ReliableServerMessage::ConnectedPlayer(ConnectedPlayer { - net_id: connection_player_net_id, - nickname: player.nickname.clone(), - }), - ); + let result = network_params.net.send_message( + connected_player_connection_handle, + Message { + session_id: connection_state.session_id, + message, + }, + ); + if let Err(err) = result { + log::error!("Failed to send a message: {:?}", err); + } else { + connection_state.status = ConnectionStatus::Connected; } } - - for player in players.values_mut() { - player.state = PlayerConnectionState::Playing; - } } +/// Returns `None` if the entity is not spawned for the current frame. fn create_player_state( net_id: PlayerNetId, time: &GameTime, connection_state: &ConnectionState, entity: Entity, - player_entities: &Query<(Entity, &Position, &PlayerDirection)>, -) -> PlayerState { + player_entities: &Query<(Entity, &Position, &PlayerDirection, &Spawned)>, +) -> Option { + let (_, position, player_direction, spawned) = player_entities.get(entity).unwrap(); + if !spawned.is_spawned(time.frame_number) { + return None; + } + let updates_start_frame = if connection_state.packet_loss() > 0.0 { // TODO: avoid doing the same searches when gathering updates for every player? connection_state @@ -331,8 +504,6 @@ fn create_player_state( time.frame_number }; - let (_, position, player_direction) = player_entities.get(entity).unwrap(); - // TODO: deduplicate updates (the same code is written for client). let mut inputs: Vec = Vec::new(); for (frame_number, &direction) in player_direction @@ -354,7 +525,7 @@ fn create_player_state( |input| input.frame_number, ); - PlayerState { + Some(PlayerState { net_id, position: *position .buffer @@ -368,22 +539,5 @@ fn create_player_state( ) }), inputs, - } -} - -fn broadcast_message_to_others( - net: &mut NetworkResource, - player_connections: &PlayerConnections, - exluded_connection_handle: u32, - message: &ReliableServerMessage, -) { - for (_player_net_id, &connection_handle) in player_connections.iter() { - if connection_handle == exluded_connection_handle { - continue; - } - - if let Err(err) = net.send_message(connection_handle, message.clone()) { - log::error!("Failed to send a message: {:?}", err); - } - } + }) } diff --git a/libs/shared_lib/src/game/client_factories.rs b/libs/shared_lib/src/game/client_factories.rs index b4c0b0ee..0969c17b 100644 --- a/libs/shared_lib/src/game/client_factories.rs +++ b/libs/shared_lib/src/game/client_factories.rs @@ -15,6 +15,8 @@ pub trait ClientFactory<'a> { commands.spawn(()); commands.current_entity().unwrap() } + + fn remove_renderables(_commands: &mut Commands, _entity: Entity) {} } pub struct PlayerClientFactory; @@ -41,6 +43,11 @@ impl<'a> ClientFactory<'a> for PlayerClientFactory { } commands.current_entity().unwrap() } + + #[cfg(feature = "render")] + fn remove_renderables(commands: &mut Commands, entity: Entity) { + commands.remove::(entity); + } } pub struct PlaneClientFactory; diff --git a/libs/shared_lib/src/game/commands.rs b/libs/shared_lib/src/game/commands.rs index dcac4347..47f76724 100644 --- a/libs/shared_lib/src/game/commands.rs +++ b/libs/shared_lib/src/game/commands.rs @@ -38,6 +38,7 @@ pub struct SpawnPlayer { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct DespawnPlayer { pub net_id: PlayerNetId, + pub frame_number: FrameNumber, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] diff --git a/libs/shared_lib/src/game/components.rs b/libs/shared_lib/src/game/components.rs index cc7dc3e5..245aa36b 100644 --- a/libs/shared_lib/src/game/components.rs +++ b/libs/shared_lib/src/game/components.rs @@ -42,27 +42,65 @@ pub struct Spawned { /// If a component gets old enough, we set the timestamp to `None`, as we become sure that /// we won't try to simulate an entity that wasn't spawned for a given `GameTime::sumilation_frame`. /// See `mark_mature_entities` system. - fresh_timestamp: Option, + spawned_at: Option, + despawned_at: Option, + respawned_at: Option, } impl Spawned { pub fn new(frame_spawned: FrameNumber) -> Self { Self { - fresh_timestamp: Some(frame_spawned), + spawned_at: Some(frame_spawned), + despawned_at: None, + respawned_at: None, } } pub fn is_spawned(&self, frame_number: FrameNumber) -> bool { - self.fresh_timestamp - .map_or(true, |fresh_timestamp| fresh_timestamp <= frame_number) + let is_spawned = self + .spawned_at + .map_or(true, |spawned_at| spawned_at <= frame_number); + is_spawned && !self.is_despawned(frame_number) + } + + pub fn is_despawned(&self, frame_number: FrameNumber) -> bool { + match (self.despawned_at, self.respawned_at) { + (Some(despawned_at), None) => frame_number >= despawned_at, + (Some(despawned_at), Some(respawned_at)) => { + frame_number >= despawned_at && frame_number < respawned_at + } + (None, _) => false, + } } pub fn mark_if_mature(&mut self, frame_number: FrameNumber) { - if let Some(fresh_timestamp) = self.fresh_timestamp { - if frame_number > fresh_timestamp + FrameNumber::new(COMPONENT_FRAMEBUFFER_LIMIT) { - self.fresh_timestamp = None; + if let Some(spawned_at) = self.spawned_at { + if frame_number > spawned_at + FrameNumber::new(COMPONENT_FRAMEBUFFER_LIMIT) { + self.spawned_at = None; } } + let despawned_long_time_ago = self.despawned_at.map_or(false, |despawned_at| { + frame_number > despawned_at + FrameNumber::new(COMPONENT_FRAMEBUFFER_LIMIT) + }); + if !self.can_be_removed(frame_number) && despawned_long_time_ago { + self.despawned_at = None; + self.respawned_at = None; + } + } + + pub fn set_despawned_at(&mut self, frame_number: FrameNumber) { + self.despawned_at = Some(frame_number); + } + + pub fn set_respawned_at(&mut self, frame_number: FrameNumber) { + self.respawned_at = Some(frame_number); + } + + pub fn can_be_removed(&self, frame_number: FrameNumber) -> bool { + let can_be_despawned = self.despawned_at.map_or(false, |despawned_at| { + despawned_at + FrameNumber::new(COMPONENT_FRAMEBUFFER_LIMIT) >= frame_number + }); + can_be_despawned && self.respawned_at.is_none() } } diff --git a/libs/shared_lib/src/game/spawn.rs b/libs/shared_lib/src/game/spawn.rs index 88b5938c..e1e4ef9a 100644 --- a/libs/shared_lib/src/game/spawn.rs +++ b/libs/shared_lib/src/game/spawn.rs @@ -3,7 +3,7 @@ use crate::{ client_factories::{ ClientFactory, PbrClientParams, PlaneClientFactory, PlayerClientFactory, }, - commands::{GameCommands, SpawnLevelObject, SpawnPlayer}, + commands::{DespawnPlayer, GameCommands, SpawnLevelObject, SpawnPlayer}, components::{PlayerDirection, Position, Spawned}, level::{LevelObjectDesc, LevelState}, }, @@ -20,15 +20,40 @@ pub fn spawn_players( mut pbr_client_params: PbrClientParams, mut spawn_player_commands: ResMut>, mut player_entities: ResMut>, + mut players: Query<(Entity, &mut Spawned, &mut Position, &mut PlayerDirection)>, ) { - for command in spawn_player_commands.drain() { - if player_entities.get_entity(command.net_id).is_some() { - log::debug!( - "Player ({}) entity already exists, skipping", - command.net_id.0 + let mut spawn_player_commands = spawn_player_commands.drain(); + spawn_player_commands.dedup_by_key(|command| command.net_id); + + for command in spawn_player_commands { + let frames_ahead = if command.is_player_frame_simulated { + (time.player_frame - time.server_frame).value() + } else { + 0 + }; + + if let Some(entity) = player_entities.get_entity(command.net_id) { + // TODO! make sure that we send a respawn command indeed and it's correct. + log::info!( + "Respawning player ({}) entity (frame: {}): {:?}", + command.net_id.0, + time.server_frame, + entity ); + + let (_, mut spawned, mut position, mut player_direction) = + players.get_mut(entity).unwrap(); + position + .buffer + .insert(time.server_frame, command.start_position); + player_direction + .buffer + .insert(time.server_frame, Some(Vec2::zero())); + spawned.set_respawned_at(time.server_frame); + continue; } + // TODO! make sure that we do reset a player if it's respawned. log::info!( "Spawning a new player (frame {}): {}", @@ -40,11 +65,6 @@ pub fn spawn_players( &mut pbr_client_params, &command.is_player_frame_simulated, ); - let frames_ahead = if command.is_player_frame_simulated { - (time.player_frame - time.server_frame).value() - } else { - 0 - }; commands .with( RigidBodyBuilder::new_dynamic() @@ -71,6 +91,44 @@ pub fn spawn_players( } } +pub fn despawn_players( + commands: &mut Commands, + mut despawn_player_commands: ResMut>, + player_entities: Res>, + mut players: Query<(Entity, &mut Spawned, &PlayerDirection)>, +) { + for command in despawn_player_commands.drain() { + let entity = match player_entities.get_entity(command.net_id) { + Some(entity) => entity, + None => { + log::error!( + "Player ({}) entity doesn't exist, skipping (frame: {})", + command.net_id.0, + command.frame_number + ); + continue; + } + }; + let (_, mut spawned, _) = players.get_mut(entity).unwrap(); + if !spawned.is_spawned(command.frame_number) { + log::debug!( + "Player ({}) is not spawned at frame {}, skipping the despawn command", + command.net_id.0, + command.frame_number + ); + continue; + } + + log::info!( + "Despawning player {} (frame {})", + command.net_id.0, + command.frame_number + ); + PlayerClientFactory::remove_renderables(commands, entity); + spawned.set_despawned_at(command.frame_number); + } +} + pub fn spawn_level_objects( commands: &mut Commands, mut pbr_client_params: PbrClientParams, @@ -101,9 +159,21 @@ pub fn spawn_level_objects( } } -pub fn mark_mature_entities(game_time: Res, mut spawned_entities: Query<&mut Spawned>) { - for mut spawned in spawned_entities.iter_mut() { +pub fn process_spawned_entities( + commands: &mut Commands, + game_time: Res, + mut player_entities: ResMut>, + mut object_entities: ResMut>, + mut spawned_entities: Query<(Entity, &mut Spawned)>, +) { + for (entity, mut spawned) in spawned_entities.iter_mut() { spawned.mark_if_mature(game_time.frame_number); + if spawned.can_be_removed(game_time.frame_number) { + log::debug!("Despawning entity {:?}", entity); + commands.despawn(entity); + player_entities.remove_by_entity(entity); + object_entities.remove_by_entity(entity); + } } } diff --git a/libs/shared_lib/src/lib.rs b/libs/shared_lib/src/lib.rs index fb08e6a3..6d718236 100644 --- a/libs/shared_lib/src/lib.rs +++ b/libs/shared_lib/src/lib.rs @@ -11,7 +11,7 @@ use crate::{ components::PlayerFrameSimulated, level::LevelState, movement::{player_movement, read_movement_updates, sync_position}, - spawn::{mark_mature_entities, spawn_level_objects, spawn_players}, + spawn::{despawn_players, process_spawned_entities, spawn_level_objects, spawn_players}, }, net::network_setup, player::{Player, PlayerUpdates}, @@ -127,7 +127,8 @@ impl> Plugin for MuddleSharedPlugin { .with_run_criteria(SimulationTickRunCriteria::default()) .with_stage( stage::SPAWN, - SystemStage::parallel() + SystemStage::serial() + .with_system(despawn_players.system()) .with_system(spawn_players.system()) .with_system(spawn_level_objects.system()), ) @@ -175,7 +176,7 @@ impl> Plugin for MuddleSharedPlugin { stage::POST_SIMULATIONS, SystemStage::serial() .with_system(tick_game_frame.system()) - .with_system(mark_mature_entities.system()), + .with_system(process_spawned_entities.system()), ) .with_stage( stage::POST_TICK, diff --git a/libs/shared_lib/src/messages.rs b/libs/shared_lib/src/messages.rs index f941253f..dc68c70f 100644 --- a/libs/shared_lib/src/messages.rs +++ b/libs/shared_lib/src/messages.rs @@ -1,6 +1,7 @@ use crate::{ framebuffer::FrameNumber, game::commands::{DespawnLevelObject, SpawnLevelObject}, + net::SessionId, registry::IncrementId, }; use bevy::math::Vec2; @@ -39,6 +40,12 @@ impl IncrementId for ActionNetId { } } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct Message { + pub session_id: SessionId, + pub message: T, +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum UnreliableClientMessage { PlayerUpdate(PlayerUpdate), diff --git a/libs/shared_lib/src/net.rs b/libs/shared_lib/src/net.rs index 133e5844..1d1a06bc 100644 --- a/libs/shared_lib/src/net.rs +++ b/libs/shared_lib/src/net.rs @@ -1,7 +1,8 @@ use crate::{ framebuffer::FrameNumber, + looped_counter::WrappedCounter, messages::{ - ReliableClientMessage, ReliableServerMessage, UnreliableClientMessage, + Message, ReliableClientMessage, ReliableServerMessage, UnreliableClientMessage, UnreliableServerMessage, }, TICKS_PER_NETWORK_BROADCAST, @@ -20,6 +21,19 @@ use thiserror::Error; const RTT_UPDATE_FACTOR: f32 = 0.2; const JITTER_DECREASE_THRESHOLD_SECS: u64 = 1; +pub type SessionId = WrappedCounter; + +#[derive(Debug)] +pub enum ConnectionStatus { + Uninitialized, + Handshaking, + Connected, + /// We've received a `Disconnect` event or triggered the process manually. After we finish + /// the needed clean-up, we switch the status to `Disconnected`. + Disconnecting, + Disconnected, +} + #[derive(Debug, Error)] pub enum AcknowledgeError { /// Only actual for acknowledging incoming packets. @@ -40,6 +54,8 @@ pub enum AddOutcomingPacketError { // Note: We don't expect clients or server to re-send lost packets. If we detect packet loss, // we enable redundancy to include the lost updates in future packets. pub struct ConnectionState { + pub session_id: SessionId, + pub status: ConnectionStatus, newest_acknowledged_incoming_packet: Option, incoming_packets_acks: u64, outcoming_packets_acks: VecDeque, @@ -52,6 +68,8 @@ pub struct ConnectionState { impl Default for ConnectionState { fn default() -> Self { Self { + session_id: SessionId::new(0), + status: ConnectionStatus::Uninitialized, newest_acknowledged_incoming_packet: None, incoming_packets_acks: u64::MAX - 1, outcoming_packets_acks: VecDeque::new(), @@ -326,16 +344,16 @@ impl Acknowledgment { pub fn network_setup(mut net: ResMut) { net.set_channels_builder(|builder: &mut ConnectionChannelsBuilder| { builder - .register::(CLIENT_INPUT_MESSAGE_SETTINGS) + .register::>(CLIENT_INPUT_MESSAGE_SETTINGS) .unwrap(); builder - .register::(CLIENT_RELIABLE_MESSAGE_SETTINGS) + .register::>(CLIENT_RELIABLE_MESSAGE_SETTINGS) .unwrap(); builder - .register::(SERVER_RELIABLE_MESSAGE_SETTINGS) + .register::>(SERVER_RELIABLE_MESSAGE_SETTINGS) .unwrap(); builder - .register::(SERVER_DELTA_UPDATE_MESSAGE_SETTINGS) + .register::>(SERVER_DELTA_UPDATE_MESSAGE_SETTINGS) .unwrap(); }); } @@ -400,7 +418,7 @@ const SERVER_DELTA_UPDATE_MESSAGE_SETTINGS: MessageChannelSettings = MessageChan mod tests { use crate::{ framebuffer::FrameNumber, - net::{Acknowledgment, ConnectionState}, + net::{Acknowledgment, ConnectionState, ConnectionStatus, SessionId}, TICKS_PER_NETWORK_BROADCAST, }; use std::{collections::VecDeque, time::Instant}; @@ -434,11 +452,14 @@ mod tests { .collect::>(); ConnectionState { + session_id: SessionId::new(0), + status: ConnectionStatus::Uninitialized, newest_acknowledged_incoming_packet: None, incoming_packets_acks: 0, outcoming_packets_acks: VecDeque::from(acknowledgments), packet_loss: 0.0, jitter_millis: 0.0, + last_increased_jitter: Instant::now(), rtt_millis: 0.0, } } diff --git a/libs/shared_lib/src/player.rs b/libs/shared_lib/src/player.rs index 83381938..cec006cd 100644 --- a/libs/shared_lib/src/player.rs +++ b/libs/shared_lib/src/player.rs @@ -49,19 +49,9 @@ impl PlayerUpdates { } } -#[derive(Clone, Copy)] -pub enum PlayerConnectionState { - /// From the server's perspective it means that a StartGame message hasn't been sent yet. - /// From the client's one - a client received a DeltaUpdate about a player that it doesn't know - /// about. - Connecting, - Playing, -} - #[derive(Clone)] pub struct Player { pub nickname: String, - pub state: PlayerConnectionState, } pub fn random_name() -> String {