Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
sammy authored and sammy committed Dec 30, 2023
1 parent 62c6d2a commit 6579175
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 34 deletions.
4 changes: 2 additions & 2 deletions src/lib/server/sync-db/db.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import Database from 'better-sqlite3';
import { extensionPath } from '@vlcn.io/crsqlite';

export async function dbFrom(filename) {
export function dbFrom(filename) {
// TODO: should be an env-var so we can use Render's persistent disk as the path
const db = new Database(`./sync-dbs/${filename}`);
db.pragma('journal_mode = WAL');
db.loadExtension(extensionPath);

// TODO: import schema from same place as frontend
await db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete);
db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete);
SELECT crsql_as_crr('todos');
CREATE TABLE IF NOT EXISTS todonts (id PRIMARY KEY NOT NULL, content, complete);
SELECT crsql_as_crr('todonts');
Expand Down
12 changes: 7 additions & 5 deletions src/lib/server/websockets/features/offline/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ export class Sync {
clientSiteId: string;
clientVersion: number;
}) {
const db = await dbFrom(`${ws.session.user.userId}.db`);
const { siteId, version } = await db
const db = dbFrom(`${ws.session.user.userId}.db`);
const { siteId, version } = db
.prepare('SELECT hex(crsql_site_id()) as siteId, crsql_db_version() as version;')
.get();
const sync = new Sync({ ws, stream, db, version, siteId });
Expand Down Expand Up @@ -69,7 +69,7 @@ export class Sync {
await db.prepare(INSERT_CHANGES).run(...change);
});

const changeSiteVersions = latestVersions(changes);
const changeSiteVersions = latestVersions(changes).filter(([sId]) => sId !== siteId);

changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => {
await db
Expand Down Expand Up @@ -142,14 +142,16 @@ export class Sync {
.prepare(
`SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq"
FROM crsql_changes WHERE site_id != unhex(:clientSiteId)
AND db_version > :lastVersion`
AND db_version >= :lastVersion`
)
.all({
clientSiteId,
lastVersion
});

const changeSiteVersions = latestVersions(changes.map((change) => Object.values(change)));
const changeSiteVersions = latestVersions(
changes.map((change) => Object.values(change))
).filter(([siteId]) => siteId !== this.siteId);

changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => {
await this.db
Expand Down
6 changes: 5 additions & 1 deletion src/routes/app/offline-first/+page.server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import type { PageServerLoad } from './$types';
import { COMBINED_PATH } from '$lib/websockets/constants';
import { dbFrom } from '$lib/server/sync-db/db';

export const load: PageServerLoad = async ({ locals, url }) => {
const { username, userId } = locals.user;
const { protocol, host } = url;
const wsProtocol = protocol === 'https:' ? 'wss:' : 'ws:';
const dbName = `${userId}.db`;
const db = dbFrom(dbName);
const { serverSiteId } = db.prepare('SELECT hex(crsql_site_id()) as serverSiteId;').get();
const features = [{ type: 'offline', strategy: 'sync', stream: dbName }];
return {
url: `${wsProtocol}//${host}${COMBINED_PATH}?features=${JSON.stringify(features)}`,
username,
dbName
dbName,
serverSiteId
};
};
7 changes: 6 additions & 1 deletion src/routes/app/offline-first/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
`SELECT crsql_as_crr('todonts');`
];
const { store } = db({ schema, name: data.dbName, wsUrl: data.url });
const { store } = db({
schema,
name: data.dbName,
wsUrl: data.url,
serverSiteId: data.serverSiteId
});
const todos = store({
query: async (db) => await db.execO('SELECT * FROM todos'),
commands: {
Expand Down
54 changes: 29 additions & 25 deletions src/routes/app/offline-first/sdb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export function latestVersions(changes) {
async function pushOfflineChangesToServer(database, ws, version) {
const changes = await database.db.exec(
`SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq"
FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version > ?`,
FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version >= ?`,
[version]
);

Expand Down Expand Up @@ -61,10 +61,12 @@ async function pushOfflineChangesToServer(database, ws, version) {
function wsMessageHandler({
database,
update,
serverSiteId,
identifier
}: {
database: Database;
update: () => Promise<void>;
serverSiteId: string;
identifier?: string;
}) {
return async function (event: Event) {
Expand All @@ -79,17 +81,14 @@ function wsMessageHandler({
if ((type === 'update' && siteId !== clientSiteId) || type === 'pull') {
await database.merge(changes);

const changeSiteVersions = latestVersions(changes);

changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => {
await database.db.exec(
`INSERT INTO crsql_tracked_peers (site_id, version, tag, event)
VALUES (unhex(?), ?, 0, 0)
await database.db.exec(
`INSERT INTO crsql_tracked_peers (site_id, version, tag, event)
VALUES (unhex(?), crsql_db_version(), 0, 0)
ON CONFLICT([site_id], [tag], [event])
DO UPDATE SET version=excluded.version`,
[changeSiteId, changeDbVersion]
);
});
[serverSiteId]
);

await update();
}

Expand All @@ -113,22 +112,26 @@ async function setupWs({ url, database }: { url: string; database: Promise<Datab
return ws;
}

export function db({ schema, name, wsUrl }) {
export function db({ schema, name, wsUrl, serverSiteId, identifier }) {
if (!browser) {
// No SSR
return { store: () => readable([]) };
}
console.log({ serverSiteId });
const databasePromise = Database.load({ schema, name });
const wsPromise = setupWs({ url: wsUrl, database: databasePromise });
const store = ({ query, commands, identifier }) => {
const store = ({ query, commands }) => {
const q = writable([]);
databasePromise.then(async (database) => {
const ws = await wsPromise;
const update = async () => q.set(await query(database.db));
// Maybe this should register the listener in a store,
// we may be over subscribing since we add a listener with
// every `store`
ws.addEventListener('message', wsMessageHandler({ database, update, identifier }));
ws.addEventListener(
'message',
wsMessageHandler({ database, update, identifier, serverSiteId })
);
await update();
});

Expand All @@ -139,24 +142,25 @@ export function db({ schema, name, wsUrl }) {
const db = await databasePromise;
const results = await fn(db.db, args);
q.set(await query(db.db));
const serverSiteVersion = await db.db.execO(
`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`,
[serverSiteId]
);

const changes = await db.db.exec(
`SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq"
FROM crsql_changes WHERE site_id = crsql_site_id()
AND db_version >= crsql_db_version()`
FROM crsql_changes WHERE db_version >= ?`,
[serverSiteVersion[0].version]
);

const changeSiteVersions = latestVersions(changes);

changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => {
await db.db.exec(
`INSERT INTO crsql_tracked_peers (site_id, version, tag, event)
VALUES (unhex(?), ?, 0, 0)
await db.db.exec(
`INSERT INTO crsql_tracked_peers (site_id, version, tag, event)
VALUES (unhex(?), crsql_db_version(), 0, 0)
ON CONFLICT([site_id], [tag], [event])
DO UPDATE SET version=excluded.version`,
[changeSiteId, changeDbVersion]
);
});

[serverSiteId]
);
console.log({ changes });
const ws = await wsPromise;
ws.send(
encoder.encode(
Expand Down

0 comments on commit 6579175

Please sign in to comment.