From 65791756b9df7f4c4d5bd60ce27b64d2b7ad2618 Mon Sep 17 00:00:00 2001 From: sammy Date: Sat, 30 Dec 2023 15:46:16 -0500 Subject: [PATCH] x --- src/lib/server/sync-db/db.ts | 4 +- .../websockets/features/offline/sync.ts | 12 +++-- src/routes/app/offline-first/+page.server.ts | 6 ++- src/routes/app/offline-first/+page.svelte | 7 ++- src/routes/app/offline-first/sdb.ts | 54 ++++++++++--------- 5 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/lib/server/sync-db/db.ts b/src/lib/server/sync-db/db.ts index 8294db7..01a9832 100644 --- a/src/lib/server/sync-db/db.ts +++ b/src/lib/server/sync-db/db.ts @@ -1,14 +1,14 @@ import Database from 'better-sqlite3'; import { extensionPath } from '@vlcn.io/crsqlite'; -export async function dbFrom(filename) { +export function dbFrom(filename) { // TODO: should be an env-var so we can use Render's persistent disk as the path const db = new Database(`./sync-dbs/${filename}`); db.pragma('journal_mode = WAL'); db.loadExtension(extensionPath); // TODO: import schema from same place as frontend - await db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete); + db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete); SELECT crsql_as_crr('todos'); CREATE TABLE IF NOT EXISTS todonts (id PRIMARY KEY NOT NULL, content, complete); SELECT crsql_as_crr('todonts'); diff --git a/src/lib/server/websockets/features/offline/sync.ts b/src/lib/server/websockets/features/offline/sync.ts index e700858..60fbb70 100644 --- a/src/lib/server/websockets/features/offline/sync.ts +++ b/src/lib/server/websockets/features/offline/sync.ts @@ -37,8 +37,8 @@ export class Sync { clientSiteId: string; clientVersion: number; }) { - const db = await dbFrom(`${ws.session.user.userId}.db`); - const { siteId, version } = await db + const db = dbFrom(`${ws.session.user.userId}.db`); + const { siteId, version } = db .prepare('SELECT hex(crsql_site_id()) as siteId, crsql_db_version() as version;') .get(); const sync = new Sync({ ws, stream, db, version, siteId }); @@ -69,7 +69,7 @@ export class Sync { await db.prepare(INSERT_CHANGES).run(...change); }); - const changeSiteVersions = latestVersions(changes); + const changeSiteVersions = latestVersions(changes).filter(([sId]) => sId !== siteId); changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { await db @@ -142,14 +142,16 @@ export class Sync { .prepare( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" FROM crsql_changes WHERE site_id != unhex(:clientSiteId) - AND db_version > :lastVersion` + AND db_version >= :lastVersion` ) .all({ clientSiteId, lastVersion }); - const changeSiteVersions = latestVersions(changes.map((change) => Object.values(change))); + const changeSiteVersions = latestVersions( + changes.map((change) => Object.values(change)) + ).filter(([siteId]) => siteId !== this.siteId); changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { await this.db diff --git a/src/routes/app/offline-first/+page.server.ts b/src/routes/app/offline-first/+page.server.ts index 72f8077..1553dcc 100644 --- a/src/routes/app/offline-first/+page.server.ts +++ b/src/routes/app/offline-first/+page.server.ts @@ -1,15 +1,19 @@ import type { PageServerLoad } from './$types'; import { COMBINED_PATH } from '$lib/websockets/constants'; +import { dbFrom } from '$lib/server/sync-db/db'; export const load: PageServerLoad = async ({ locals, url }) => { const { username, userId } = locals.user; const { protocol, host } = url; const wsProtocol = protocol === 'https:' ? 'wss:' : 'ws:'; const dbName = `${userId}.db`; + const db = dbFrom(dbName); + const { serverSiteId } = db.prepare('SELECT hex(crsql_site_id()) as serverSiteId;').get(); const features = [{ type: 'offline', strategy: 'sync', stream: dbName }]; return { url: `${wsProtocol}//${host}${COMBINED_PATH}?features=${JSON.stringify(features)}`, username, - dbName + dbName, + serverSiteId }; }; diff --git a/src/routes/app/offline-first/+page.svelte b/src/routes/app/offline-first/+page.svelte index 0d28be7..c1078f8 100644 --- a/src/routes/app/offline-first/+page.svelte +++ b/src/routes/app/offline-first/+page.svelte @@ -15,7 +15,12 @@ `SELECT crsql_as_crr('todonts');` ]; - const { store } = db({ schema, name: data.dbName, wsUrl: data.url }); + const { store } = db({ + schema, + name: data.dbName, + wsUrl: data.url, + serverSiteId: data.serverSiteId + }); const todos = store({ query: async (db) => await db.execO('SELECT * FROM todos'), commands: { diff --git a/src/routes/app/offline-first/sdb.ts b/src/routes/app/offline-first/sdb.ts index 6eea974..6eb51ee 100644 --- a/src/routes/app/offline-first/sdb.ts +++ b/src/routes/app/offline-first/sdb.ts @@ -27,7 +27,7 @@ export function latestVersions(changes) { async function pushOfflineChangesToServer(database, ws, version) { const changes = await database.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version > ?`, + FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version >= ?`, [version] ); @@ -61,10 +61,12 @@ async function pushOfflineChangesToServer(database, ws, version) { function wsMessageHandler({ database, update, + serverSiteId, identifier }: { database: Database; update: () => Promise; + serverSiteId: string; identifier?: string; }) { return async function (event: Event) { @@ -79,17 +81,14 @@ function wsMessageHandler({ if ((type === 'update' && siteId !== clientSiteId) || type === 'pull') { await database.merge(changes); - const changeSiteVersions = latestVersions(changes); - - changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { - await database.db.exec( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), ?, 0, 0) + await database.db.exec( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version`, - [changeSiteId, changeDbVersion] - ); - }); + [serverSiteId] + ); + await update(); } @@ -113,14 +112,15 @@ async function setupWs({ url, database }: { url: string; database: Promise readable([]) }; } + console.log({ serverSiteId }); const databasePromise = Database.load({ schema, name }); const wsPromise = setupWs({ url: wsUrl, database: databasePromise }); - const store = ({ query, commands, identifier }) => { + const store = ({ query, commands }) => { const q = writable([]); databasePromise.then(async (database) => { const ws = await wsPromise; @@ -128,7 +128,10 @@ export function db({ schema, name, wsUrl }) { // Maybe this should register the listener in a store, // we may be over subscribing since we add a listener with // every `store` - ws.addEventListener('message', wsMessageHandler({ database, update, identifier })); + ws.addEventListener( + 'message', + wsMessageHandler({ database, update, identifier, serverSiteId }) + ); await update(); }); @@ -139,24 +142,25 @@ export function db({ schema, name, wsUrl }) { const db = await databasePromise; const results = await fn(db.db, args); q.set(await query(db.db)); + const serverSiteVersion = await db.db.execO( + `SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`, + [serverSiteId] + ); + const changes = await db.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - FROM crsql_changes WHERE site_id = crsql_site_id() - AND db_version >= crsql_db_version()` + FROM crsql_changes WHERE db_version >= ?`, + [serverSiteVersion[0].version] ); - const changeSiteVersions = latestVersions(changes); - - changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { - await db.db.exec( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), ?, 0, 0) + await db.db.exec( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version`, - [changeSiteId, changeDbVersion] - ); - }); - + [serverSiteId] + ); + console.log({ changes }); const ws = await wsPromise; ws.send( encoder.encode(