From 780dcab7f77aa2ee4a470521f3f959ed89942cb5 Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Thu, 17 Oct 2024 14:00:45 -0300 Subject: [PATCH 01/10] expose stateIsSyncronized flag to check if the crdt state is initialized --- packages/@dcl/sdk/src/network/index.ts | 9 +-- .../@dcl/sdk/src/network/message-bus-sync.ts | 56 ++++++++++++++++--- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/packages/@dcl/sdk/src/network/index.ts b/packages/@dcl/sdk/src/network/index.ts index aa09f36ce..7acc82e34 100644 --- a/packages/@dcl/sdk/src/network/index.ts +++ b/packages/@dcl/sdk/src/network/index.ts @@ -4,10 +4,7 @@ import { addSyncTransport } from './message-bus-sync' import { getUserData } from '~system/UserIdentity' // initialize sync transport for sdk engine -const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild } = addSyncTransport( - engine, - sendBinary, - getUserData -) +const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild, isStateSyncronized } = + addSyncTransport(engine, sendBinary, getUserData) -export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent } +export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, isStateSyncronized } diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index 0a9faf970..757e23b6a 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -1,4 +1,4 @@ -import { IEngine, Transport, RealmInfo } from '@dcl/ecs' +import { IEngine, Transport, RealmInfo, PlayerIdentityData } from '@dcl/ecs' import { type SendBinaryRequest, type SendBinaryResponse } from '~system/CommunicationsController' import { syncFilter } from './filter' @@ -61,23 +61,58 @@ export function addSyncTransport( if (sender !== myProfile.userId) return DEBUG_NETWORK_MESSAGES() && console.log('[Processing CRDT State]', data.byteLength) transport.onmessage!(data) + stateIsSyncronized = true }) - binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, (message, userId) => { + binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, async (message, userId) => { + console.log(`Sending CRDT State to: ${userId}`) transport.onmessage!(message) binaryMessageBus.emit(CommsMessage.RES_CRDT_STATE, encodeCRDTState(userId, engineToCrdt(engine))) }) + async function sleep(ms: number) { + let timer = 0 + function system(dt: number) { + timer += dt + if (timer >= ms) { + engine.removeSystem(system) + return Promise.resolve() + } + } + engine.addSystem(system) + } + const players = definePlayerHelper(engine) + let stateIsSyncronized = false + let requestCrdtStateWhenConnected = false + async function requestState() { + requestCrdtStateWhenConnected = false + let players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) + DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`) + binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + await sleep(3000) + players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) + + if (!stateIsSyncronized) { + if (players.length > 1) { + DEBUG_NETWORK_MESSAGES() && + console.log(`Requesting state again (no response). Players connected: ${players.length - 1}`) + void requestState() + } else { + DEBUG_NETWORK_MESSAGES() && console.log('No active players. State syncronized') + stateIsSyncronized = true + } + } + } + players.onEnterScene((player) => { DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId) if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) { if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { - DEBUG_NETWORK_MESSAGES() && console.log('Requesting state') - binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + void requestState() } else { DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted') requestCrdtStateWhenConnected = true @@ -87,15 +122,15 @@ export function addSyncTransport( RealmInfo.onChange(engine.RootEntity, (value) => { if (value?.isConnectedSceneRoom && requestCrdtStateWhenConnected) { - DEBUG_NETWORK_MESSAGES() && console.log('Requesting state.') - requestCrdtStateWhenConnected = false - binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + void requestState() } }) players.onLeaveScene((userId) => { DEBUG_NETWORK_MESSAGES() && console.log('[onLeaveScene]', userId) if (userId === myProfile.userId) { + DEBUG_NETWORK_MESSAGES() && console.log('Disconnected from comms') + stateIsSyncronized = false requestCrdtStateWhenConnected = false } }) @@ -107,9 +142,14 @@ export function addSyncTransport( transport.onmessage!(value) }) + function isStateSyncronized() { + return stateIsSyncronized + } + return { ...entityDefinitions, - myProfile + myProfile, + isStateSyncronized } } From 1fc98eed99273860c31be7529d6802bfc47f2314 Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Thu, 17 Oct 2024 15:16:21 -0300 Subject: [PATCH 02/10] fix system --- .../@dcl/sdk/src/network/message-bus-sync.ts | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index 757e23b6a..291d764f8 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -70,16 +70,18 @@ export function addSyncTransport( binaryMessageBus.emit(CommsMessage.RES_CRDT_STATE, encodeCRDTState(userId, engineToCrdt(engine))) }) - async function sleep(ms: number) { - let timer = 0 - function system(dt: number) { - timer += dt - if (timer >= ms) { - engine.removeSystem(system) - return Promise.resolve() + function sleep(ms: number) { + return new Promise((resolve) => { + let timer = 0 + function sleepSystem(dt: number) { + timer += dt + if (timer * 1000 >= ms) { + engine.removeSystem(sleepSystem) + resolve() + } } - } - engine.addSystem(system) + engine.addSystem(sleepSystem) + }) } const players = definePlayerHelper(engine) From 8ab1fbd39e5472281618a47c5bf73ecd9f8c55c0 Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Fri, 18 Oct 2024 10:54:12 -0300 Subject: [PATCH 03/10] move logic to comms flag instead of player enter/leave scnee --- .../@dcl/sdk/src/network/message-bus-sync.ts | 81 ++++++++++--------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index 291d764f8..eac434947 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -34,8 +34,11 @@ export function addSyncTransport( pendingMessageBusMessagesToSend.length = 0 return messages } + const players = definePlayerHelper(engine) + let stateIsSyncronized = false let transportInitialzed = false + // Add Sync Transport const transport: Transport = { filter: syncFilter(engine), @@ -55,7 +58,7 @@ export function addSyncTransport( engine.addTransport(transport) // End add sync transport - // If we dont have any state initialized, and recieve a state message. + // Receive & Process CRDT_STATE binaryMessageBus.on(CommsMessage.RES_CRDT_STATE, (value) => { const { sender, data } = decodeCRDTState(value) if (sender !== myProfile.userId) return @@ -64,34 +67,21 @@ export function addSyncTransport( stateIsSyncronized = true }) + // Answer to REQ_CRDT_STATE binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, async (message, userId) => { console.log(`Sending CRDT State to: ${userId}`) transport.onmessage!(message) binaryMessageBus.emit(CommsMessage.RES_CRDT_STATE, encodeCRDTState(userId, engineToCrdt(engine))) }) - function sleep(ms: number) { - return new Promise((resolve) => { - let timer = 0 - function sleepSystem(dt: number) { - timer += dt - if (timer * 1000 >= ms) { - engine.removeSystem(sleepSystem) - resolve() - } - } - engine.addSystem(sleepSystem) - }) - } - - const players = definePlayerHelper(engine) - - let stateIsSyncronized = false - - let requestCrdtStateWhenConnected = false + // Process CRDT messages here + binaryMessageBus.on(CommsMessage.CRDT, (value) => { + DEBUG_NETWORK_MESSAGES() && + console.log(Array.from(serializeCrdtMessages('[NetworkMessage received]:', value, engine))) + transport.onmessage!(value) + }) async function requestState() { - requestCrdtStateWhenConnected = false let players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`) binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) @@ -112,18 +102,24 @@ export function addSyncTransport( players.onEnterScene((player) => { DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId) - if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) { - if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { - void requestState() - } else { - DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted') - requestCrdtStateWhenConnected = true - } - } + // if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) { + // if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { + // void requestState() + // } else { + // DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted') + // requestCrdtStateWhenConnected = true + // } + // } }) + // Asks for the REQ_CRDT_STATE when its connected to comms RealmInfo.onChange(engine.RootEntity, (value) => { - if (value?.isConnectedSceneRoom && requestCrdtStateWhenConnected) { + if (!value?.isConnectedSceneRoom) { + DEBUG_NETWORK_MESSAGES() && console.log('Disconnected from comms') + stateIsSyncronized = false + } + + if (value?.isConnectedSceneRoom && !stateIsSyncronized) { void requestState() } }) @@ -131,23 +127,28 @@ export function addSyncTransport( players.onLeaveScene((userId) => { DEBUG_NETWORK_MESSAGES() && console.log('[onLeaveScene]', userId) if (userId === myProfile.userId) { - DEBUG_NETWORK_MESSAGES() && console.log('Disconnected from comms') - stateIsSyncronized = false - requestCrdtStateWhenConnected = false + // stateIsSyncronized = false } }) - // Process CRDT messages here - binaryMessageBus.on(CommsMessage.CRDT, (value) => { - DEBUG_NETWORK_MESSAGES() && - console.log(Array.from(serializeCrdtMessages('[NetworkMessage received]:', value, engine))) - transport.onmessage!(value) - }) - function isStateSyncronized() { return stateIsSyncronized } + function sleep(ms: number) { + return new Promise((resolve) => { + let timer = 0 + function sleepSystem(dt: number) { + timer += dt + if (timer * 1000 >= ms) { + engine.removeSystem(sleepSystem) + resolve() + } + } + engine.addSystem(sleepSystem) + }) + } + return { ...entityDefinitions, myProfile, From 46078281a95dcb02acd503fdd468804aa916986b Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Fri, 18 Oct 2024 11:20:28 -0300 Subject: [PATCH 04/10] add a max amount of retries --- packages/@dcl/sdk/src/network/message-bus-sync.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index eac434947..f74a1c8da 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -81,7 +81,7 @@ export function addSyncTransport( transport.onmessage!(value) }) - async function requestState() { + async function requestState(retryCount: number = 0) { let players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`) binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) @@ -89,10 +89,10 @@ export function addSyncTransport( players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) if (!stateIsSyncronized) { - if (players.length > 1) { + if (players.length > 1 && retryCount <= 2) { DEBUG_NETWORK_MESSAGES() && - console.log(`Requesting state again (no response). Players connected: ${players.length - 1}`) - void requestState() + console.log(`Requesting state again ${retryCount} (no response). Players connected: ${players.length - 1}`) + void requestState(retryCount + 1) } else { DEBUG_NETWORK_MESSAGES() && console.log('No active players. State syncronized') stateIsSyncronized = true From 890c629363bf003d8186b0528c546bb9de421578 Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Fri, 18 Oct 2024 11:30:26 -0300 Subject: [PATCH 05/10] increment the delay to 5s --- packages/@dcl/sdk/src/network/message-bus-sync.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index f74a1c8da..3001d7fcd 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -81,11 +81,14 @@ export function addSyncTransport( transport.onmessage!(value) }) - async function requestState(retryCount: number = 0) { + async function requestState(retryCount: number = 1) { let players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`) binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) - await sleep(3000) + + // Wait ~5s for the response. + await sleep(5000) + players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) if (!stateIsSyncronized) { From 699512b716fe8d817569295ec0fb5d86cbd50616 Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Fri, 18 Oct 2024 11:39:53 -0300 Subject: [PATCH 06/10] prevent for reconnecting if its disconnected --- packages/@dcl/sdk/src/network/message-bus-sync.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index 3001d7fcd..b4ee9eeb4 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -84,6 +84,12 @@ export function addSyncTransport( async function requestState(retryCount: number = 1) { let players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`) + + if (!RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { + DEBUG_NETWORK_MESSAGES() && console.log(`Aborting Requesting state. Disconnected`) + return + } + binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) // Wait ~5s for the response. From 83f89209de8be797edc27a01b1cd3edad747ea1a Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Fri, 18 Oct 2024 11:59:08 -0300 Subject: [PATCH 07/10] avoid requesting state on changes that are not in the bool flag --- packages/@dcl/sdk/src/network/message-bus-sync.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index b4ee9eeb4..550d653c4 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -121,6 +121,7 @@ export function addSyncTransport( // } }) + let isConnectedSceneRoomCache = !!RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom // Asks for the REQ_CRDT_STATE when its connected to comms RealmInfo.onChange(engine.RootEntity, (value) => { if (!value?.isConnectedSceneRoom) { @@ -128,9 +129,13 @@ export function addSyncTransport( stateIsSyncronized = false } - if (value?.isConnectedSceneRoom && !stateIsSyncronized) { - void requestState() + if (value?.isConnectedSceneRoom !== isConnectedSceneRoomCache) { + if (value?.isConnectedSceneRoom && !stateIsSyncronized) { + void requestState() + } } + + isConnectedSceneRoomCache = !!value?.isConnectedSceneRoom }) players.onLeaveScene((userId) => { From 331f382fb50ed44c7b7f65da0d23dff5f7b18ca4 Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Fri, 18 Oct 2024 12:13:50 -0300 Subject: [PATCH 08/10] revert --- packages/@dcl/sdk/src/network/message-bus-sync.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index 550d653c4..b4ee9eeb4 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -121,7 +121,6 @@ export function addSyncTransport( // } }) - let isConnectedSceneRoomCache = !!RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom // Asks for the REQ_CRDT_STATE when its connected to comms RealmInfo.onChange(engine.RootEntity, (value) => { if (!value?.isConnectedSceneRoom) { @@ -129,13 +128,9 @@ export function addSyncTransport( stateIsSyncronized = false } - if (value?.isConnectedSceneRoom !== isConnectedSceneRoomCache) { - if (value?.isConnectedSceneRoom && !stateIsSyncronized) { - void requestState() - } + if (value?.isConnectedSceneRoom && !stateIsSyncronized) { + void requestState() } - - isConnectedSceneRoomCache = !!value?.isConnectedSceneRoom }) players.onLeaveScene((userId) => { From f83ebfaddb22be2c0856e41fad43699cf6d50bc4 Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Fri, 18 Oct 2024 12:46:52 -0300 Subject: [PATCH 09/10] add logs --- packages/@dcl/sdk/src/network/message-bus-sync.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index b4ee9eeb4..be683721e 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -86,8 +86,8 @@ export function addSyncTransport( DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`) if (!RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { - DEBUG_NETWORK_MESSAGES() && console.log(`Aborting Requesting state. Disconnected`) - return + DEBUG_NETWORK_MESSAGES() && console.log(`Aborting Requesting state?. Disconnected`) + // return } binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) @@ -128,6 +128,10 @@ export function addSyncTransport( stateIsSyncronized = false } + if (value?.isConnectedSceneRoom) { + DEBUG_NETWORK_MESSAGES() && console.log('Connected to comms') + } + if (value?.isConnectedSceneRoom && !stateIsSyncronized) { void requestState() } From d8a340eeade3f32edf3964b72e7e73b0031e787d Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Fri, 18 Oct 2024 14:57:50 -0300 Subject: [PATCH 10/10] remove commented code --- packages/@dcl/sdk/src/network/message-bus-sync.ts | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index be683721e..f2ecc2a56 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -87,7 +87,7 @@ export function addSyncTransport( if (!RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { DEBUG_NETWORK_MESSAGES() && console.log(`Aborting Requesting state?. Disconnected`) - // return + return } binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) @@ -111,14 +111,6 @@ export function addSyncTransport( players.onEnterScene((player) => { DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId) - // if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) { - // if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { - // void requestState() - // } else { - // DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted') - // requestCrdtStateWhenConnected = true - // } - // } }) // Asks for the REQ_CRDT_STATE when its connected to comms @@ -139,9 +131,6 @@ export function addSyncTransport( players.onLeaveScene((userId) => { DEBUG_NETWORK_MESSAGES() && console.log('[onLeaveScene]', userId) - if (userId === myProfile.userId) { - // stateIsSyncronized = false - } }) function isStateSyncronized() {