From ff69158fea6ae46ec088eaf958968b3bb7106531 Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Thu, 17 Oct 2024 14:00:45 -0300 Subject: [PATCH] expose stateIsSyncronized flag to check if the crdt state is initialized --- packages/@dcl/sdk/src/network/index.ts | 9 ++-- .../@dcl/sdk/src/network/message-bus-sync.ts | 52 ++++++++++++++++--- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/packages/@dcl/sdk/src/network/index.ts b/packages/@dcl/sdk/src/network/index.ts index aa09f36ce..0376aaf12 100644 --- a/packages/@dcl/sdk/src/network/index.ts +++ b/packages/@dcl/sdk/src/network/index.ts @@ -4,10 +4,7 @@ import { addSyncTransport } from './message-bus-sync' import { getUserData } from '~system/UserIdentity' // initialize sync transport for sdk engine -const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild } = addSyncTransport( - engine, - sendBinary, - getUserData -) +const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild, stateIsSyncronized } = + addSyncTransport(engine, sendBinary, getUserData) -export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent } +export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, stateIsSyncronized } diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index 0a9faf970..cb8cdecfc 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -1,4 +1,4 @@ -import { IEngine, Transport, RealmInfo } from '@dcl/ecs' +import { IEngine, Transport, RealmInfo, PlayerIdentityData } from '@dcl/ecs' import { type SendBinaryRequest, type SendBinaryResponse } from '~system/CommunicationsController' import { syncFilter } from './filter' @@ -61,23 +61,58 @@ export function addSyncTransport( if (sender !== myProfile.userId) return DEBUG_NETWORK_MESSAGES() && console.log('[Processing CRDT State]', data.byteLength) transport.onmessage!(data) + stateIsSyncronized = true }) - binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, (message, userId) => { + binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, async (message, userId) => { + console.log(`Sending CRDT State to: ${userId}`) transport.onmessage!(message) binaryMessageBus.emit(CommsMessage.RES_CRDT_STATE, encodeCRDTState(userId, engineToCrdt(engine))) }) + async function sleep(ms: number) { + let timer = 0 + function system(dt: number) { + timer += dt + if (timer >= ms) { + engine.removeSystem(system) + return Promise.resolve() + } + } + engine.addSystem(system) + } + const players = definePlayerHelper(engine) + let stateIsSyncronized = false + let requestCrdtStateWhenConnected = false + async function requestState() { + requestCrdtStateWhenConnected = false + let players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) + DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`) + binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + await sleep(3000) + players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) + + if (!stateIsSyncronized) { + if (players.length > 1) { + DEBUG_NETWORK_MESSAGES() && + console.log(`Requesting state again (no response). Players connected: ${players.length - 1}`) + void requestState() + } else { + DEBUG_NETWORK_MESSAGES() && console.log('No active players. State syncronized') + stateIsSyncronized = true + } + } + } + players.onEnterScene((player) => { DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId) if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) { if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { - DEBUG_NETWORK_MESSAGES() && console.log('Requesting state') - binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + void requestState() } else { DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted') requestCrdtStateWhenConnected = true @@ -87,15 +122,15 @@ export function addSyncTransport( RealmInfo.onChange(engine.RootEntity, (value) => { if (value?.isConnectedSceneRoom && requestCrdtStateWhenConnected) { - DEBUG_NETWORK_MESSAGES() && console.log('Requesting state.') - requestCrdtStateWhenConnected = false - binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + void requestState() } }) players.onLeaveScene((userId) => { DEBUG_NETWORK_MESSAGES() && console.log('[onLeaveScene]', userId) if (userId === myProfile.userId) { + DEBUG_NETWORK_MESSAGES() && console.log('Disconnected from comms') + stateIsSyncronized = false requestCrdtStateWhenConnected = false } }) @@ -109,7 +144,8 @@ export function addSyncTransport( return { ...entityDefinitions, - myProfile + myProfile, + stateIsSyncronized } }