Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
vladbat00 committed Apr 3, 2021
1 parent 4639e4e commit f06eb90
Show file tree
Hide file tree
Showing 11 changed files with 625 additions and 280 deletions.
171 changes: 113 additions & 58 deletions libs/client_lib/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use mr_shared_lib::{
components::PlayerDirection,
},
messages::{
ConnectedPlayer, DeltaUpdate, DisconnectedPlayer, PlayerInput, PlayerNetId, PlayerUpdate,
ReliableClientMessage, ReliableServerMessage, StartGame, UnreliableClientMessage,
UnreliableServerMessage,
ConnectedPlayer, DeltaUpdate, DisconnectedPlayer, Message, PlayerInput, PlayerNetId,
PlayerUpdate, ReliableClientMessage, ReliableServerMessage, StartGame,
UnreliableClientMessage, UnreliableServerMessage,
},
net::{AcknowledgeError, ConnectionState},
player::{Player, PlayerConnectionState, PlayerDirectionUpdate, PlayerUpdates},
net::{AcknowledgeError, ConnectionState, ConnectionStatus, SessionId},
player::{Player, PlayerDirectionUpdate, PlayerUpdates},
registry::EntityRegistry,
GameTime, SimulationTime, COMPONENT_FRAMEBUFFER_LIMIT, SIMULATIONS_PER_SECOND,
};
Expand Down Expand Up @@ -45,6 +45,7 @@ pub struct NetworkReader {
pub struct UpdateParams<'a> {
simulation_time: ResMut<'a, SimulationTime>,
game_time: ResMut<'a, GameTime>,
player_entities: Res<'a, EntityRegistry<PlayerNetId>>,
estimated_server_time: ResMut<'a, EstimatedServerTime>,
target_frames_ahead: ResMut<'a, TargetFramesAhead>,
player_delay: ResMut<'a, PlayerDelay>,
Expand Down Expand Up @@ -74,10 +75,14 @@ pub fn process_network_events(
match event {
NetworkEvent::Connected(handle) => {
log::info!("Connected: {}", handle);
if let Err(err) = network_params
.net
.send_message(*handle, ReliableClientMessage::Handshake)
{
if let Err(err) = network_params.net.send_message(
*handle,
Message {
// The server is expected to accept any session id for this message.
session_id: SessionId::new(0),
message: ReliableClientMessage::Handshake,
},
) {
log::error!("Failed to send a Handshake message: {:?}", err);
}
update_params.initial_rtt.sent_at = Some(Instant::now());
Expand All @@ -92,12 +97,25 @@ pub fn process_network_events(
for (handle, connection) in network_params.net.connections.iter_mut() {
let channels = connection.channels().unwrap();

while let Some(message) = channels.recv::<UnreliableServerMessage>() {
while let Some(message) = channels.recv::<Message<UnreliableServerMessage>>() {
log::trace!(
"UnreliableServerMessage received on [{}]: {:?}",
handle,
message
);
let Message {
message,
session_id,
} = message;

if session_id != network_params.connection_state.session_id {
log::warn!(
"Ignoring a server message: sent session id {} doesn't match {}",
session_id,
network_params.connection_state.session_id
);
continue;
}

match message {
UnreliableServerMessage::DeltaUpdate(update) => {
Expand Down Expand Up @@ -146,19 +164,35 @@ pub fn process_network_events(
}
}

while let Some(message) = channels.recv::<ReliableServerMessage>() {
while let Some(message) = channels.recv::<Message<ReliableServerMessage>>() {
log::trace!(
"ReliableServerMessage received on [{}]: {:?}",
handle,
message
);
let Message {
message,
session_id,
} = message;

let ignore_session_id_check = matches!(message, ReliableServerMessage::StartGame(_));
if session_id != network_params.connection_state.session_id && !ignore_session_id_check
{
log::warn!(
"Ignoring a server message: sent session id {} doesn't match {}",
session_id,
network_params.connection_state.session_id
);
continue;
}

match message {
ReliableServerMessage::StartGame(start_game) => {
if current_player_net_id.0 == Some(start_game.net_id) {
continue;
}
log::info!("Starting the game");
network_params.connection_state.session_id = session_id;
log::info!(
"Starting the game (update frame: {})",
start_game.game_state.frame_number
);
process_start_game_message(
start_game,
&mut network_params.connection_state,
Expand All @@ -171,11 +205,7 @@ pub fn process_network_events(
process_connected_player_message(connected_player, &mut players);
}
ReliableServerMessage::DisconnectedPlayer(disconnected_player) => {
process_disconnected_player_message(
disconnected_player,
&mut players,
&mut update_params,
);
process_disconnected_player_message(disconnected_player);
}
ReliableServerMessage::SpawnLevelObject(spawn_level_object) => {
update_params
Expand All @@ -196,8 +226,17 @@ pub fn process_network_events(
}
}

while channels.recv::<UnreliableClientMessage>().is_some() {
log::error!("Unexpected ClientMessage received on [{}]", handle);
while channels
.recv::<Message<UnreliableClientMessage>>()
.is_some()
{
log::error!(
"Unexpected UnreliableClientMessage received on [{}]",
handle
);
}
while channels.recv::<Message<ReliableClientMessage>>().is_some() {
log::error!("Unexpected ReliableClientMessage received on [{}]", handle);
}
}
}
Expand Down Expand Up @@ -255,13 +294,17 @@ pub fn send_network_updates(
}
}

let message = UnreliableClientMessage::PlayerUpdate(PlayerUpdate {
frame_number: time.frame_number,
acknowledgments: network_params.connection_state.incoming_acknowledgments(),
inputs,
});
let result = network_params.net.send_message(
connection_handle,
UnreliableClientMessage::PlayerUpdate(PlayerUpdate {
frame_number: time.frame_number,
acknowledgments: network_params.connection_state.incoming_acknowledgments(),
inputs,
}),
Message {
session_id: network_params.connection_state.session_id,
message,
},
);
if let Err(err) = result {
log::error!("Failed to send a message to {:?}: {:?}", address, err);
Expand Down Expand Up @@ -325,19 +368,44 @@ fn process_delta_update_message(
update_params.player_delay.frame_count = player_delay / 2;
}

// Despawning players that aren't mentioned in the delta update.
let players_to_remove: Vec<PlayerNetId> = players
.keys()
.copied()
.filter(|player_net_id| {
!delta_update
.players
.iter()
.any(|player| player.net_id == *player_net_id)
})
.collect();

for player_net_id in players_to_remove {
players.remove(&player_net_id);
update_params.despawn_player_commands.push(DespawnPlayer {
net_id: player_net_id,
frame_number: delta_update.frame_number,
});
}

for player_state in delta_update.players {
players.entry(player_state.net_id).or_insert_with(|| {
if update_params
.player_entities
.get_entity(player_state.net_id)
.is_none()
{
log::info!("First update with the new player {}", player_state.net_id.0);
update_params.spawn_player_commands.push(SpawnPlayer {
net_id: player_state.net_id,
start_position: player_state.position,
is_player_frame_simulated: false,
});
Player {
nickname: "?".to_owned(),
state: PlayerConnectionState::Connecting,
}
});
players
.entry(player_state.net_id)
.or_insert_with(|| Player {
nickname: "?".to_owned(),
});
}

let player_frames_ahead = if current_player_net_id == Some(player_state.net_id) {
update_params.target_frames_ahead.frames_count
Expand Down Expand Up @@ -397,8 +465,7 @@ fn process_delta_update_message(
}

// There's no need to rewind if we haven't started the game.
// TODO: deduce whether we started the game or not from some other state.
if current_player_net_id.is_some() {
if let ConnectionStatus::Connected = connection_state.status {
update_params
.simulation_time
.rewind(rewind_to_simulation_frame);
Expand All @@ -422,7 +489,6 @@ fn process_start_game_message(
start_game.net_id,
Player {
nickname: start_game.nickname,
state: PlayerConnectionState::Playing,
},
);
update_params.game_time.generation += 1;
Expand All @@ -442,6 +508,10 @@ fn process_start_game_message(
start_game.game_state.frame_number + half_rtt_frames;
update_params.estimated_server_time.updated_at = update_params.game_time.frame_number;

log::debug!(
"Spawning the current player ({})",
current_player_net_id.0.unwrap().0
);
update_params.spawn_player_commands.push(SpawnPlayer {
net_id: start_game.net_id,
start_position,
Expand All @@ -452,13 +522,16 @@ fn process_start_game_message(
}

for player in start_game.players {
if player.net_id == current_player_net_id.0.unwrap() {
continue;
}

if let Some(start_position) = player_start_position(player.net_id, &start_game.game_state) {
log::info!("Spawning player {}: {}", player.net_id.0, player.nickname);
players.insert(
player.net_id,
Player {
nickname: player.nickname,
state: PlayerConnectionState::Playing,
},
);
update_params.spawn_player_commands.push(SpawnPlayer {
Expand Down Expand Up @@ -494,31 +567,13 @@ fn process_connected_player_message(
connected_player.net_id,
Player {
nickname: connected_player.nickname,
state: PlayerConnectionState::Playing,
},
);
}

fn process_disconnected_player_message(
disconnected_player: DisconnectedPlayer,
players: &mut HashMap<PlayerNetId, Player>,
update_params: &mut UpdateParams,
) {
if let Some(player) = players.remove(&disconnected_player.net_id) {
log::info!(
"A player ({}) disconnected: {}",
disconnected_player.net_id.0,
player.nickname
);
update_params.despawn_player_commands.push(DespawnPlayer {
net_id: disconnected_player.net_id,
});
} else {
log::error!(
"Unknown player with net id {}",
disconnected_player.net_id.0
);
}
fn process_disconnected_player_message(disconnected_player: DisconnectedPlayer) {
// We actually remove players if there's no mention of them in a DeltaUpdate message.
log::info!("A player ({}) disconnected", disconnected_player.net_id.0);
}

fn player_start_position(player_net_id: PlayerNetId, delta_update: &DeltaUpdate) -> Option<Vec2> {
Expand Down
1 change: 1 addition & 0 deletions libs/server_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Plugin for MuddleServerPlugin {
resources.get_or_insert_with(PlayerNetId::default);
resources.get_or_insert_with(NetworkReader::default);
resources.get_or_insert_with(PlayerConnections::default);
resources.get_or_insert_with(Vec::<(PlayerNetId, u32)>::default);
resources.get_or_insert_with(HashMap::<u32, ConnectionState>::default);
resources.get_or_insert_with(DeferredUpdates::<PlayerInput>::default);
}
Expand Down
Loading

0 comments on commit f06eb90

Please sign in to comment.