Skip to content

Commit

Permalink
server db is sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sammy authored and sammy committed Dec 28, 2023
1 parent 9fd49db commit 194b151
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/lib/server/sync-db/db.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import Database from 'better-sqlite3';
import { extensionPath } from '@vlcn.io/crsqlite';

export async function dbFrom(filename) {
export function dbFrom(filename) {
// TODO: should be an env-var so we can use Render's persistent disk as the path
const db = new Database(`./sync-dbs/${filename}`);
db.pragma('journal_mode = WAL');
db.loadExtension(extensionPath);

// TODO: import schema from same place as frontend
await db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete);
db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete);
SELECT crsql_as_crr('todos');
CREATE TABLE IF NOT EXISTS todonts (id PRIMARY KEY NOT NULL, content, complete);
SELECT crsql_as_crr('todonts');
Expand Down
34 changes: 16 additions & 18 deletions src/lib/server/websockets/features/offline/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ export class Sync {
clientSiteId: string;
clientVersion: number;
}) {
const db = await dbFrom(`${ws.session.user.userId}.db`);
const { siteId, version } = await db
const db = dbFrom(`${ws.session.user.userId}.db`);
const { siteId, version } = db
.prepare('SELECT hex(crsql_site_id()) as siteId, crsql_db_version() as version;')
.get();
const sync = new Sync({ ws, stream, db, version, siteId, clientSiteId });
Expand All @@ -66,21 +66,19 @@ export class Sync {
1. client inserts a new entry and sends an update
2. client receives a message of `type: 'connected'`, then it sends up all changes
*/
changes.forEach(async (change, i) => {
await db.prepare(INSERT_CHANGES).run(...change);
changes.forEach((change, i) => {
db.prepare(INSERT_CHANGES).run(...change);
});

const changeSiteVersions = latestVersions(changes);

changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => {
await db
.prepare(
`INSERT INTO crsql_tracked_peers (site_id, version, tag, event)
changeSiteVersions.forEach(([changeSiteId, changeDbVersion]) => {
db.prepare(
`INSERT INTO crsql_tracked_peers (site_id, version, tag, event)
VALUES (unhex(?), ?, 0, 0)
ON CONFLICT([site_id], [tag], [event])
DO UPDATE SET version=excluded.version`
)
.run(changeSiteId, changeDbVersion);
).run(changeSiteId, changeDbVersion);
});

await sync.receive(data);
Expand Down Expand Up @@ -122,27 +120,27 @@ export class Sync {
this.clientSiteId = clientSiteId;
}

async catchUpServer(clientSiteId) {
catchUpServer(clientSiteId) {
// Here we can send down the last seen id or something.
// that way we don't need the entire contents of the db

const result = await this.db
const result = this.db
.prepare(`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`)
.get(clientSiteId);
const version = result?.version ?? 0;
await this.notify(JSON.stringify({ type: 'connected', siteId: clientSiteId, version }));
this.notify(JSON.stringify({ type: 'connected', siteId: clientSiteId, version }));
}

async catchUpClient(clientSiteId: string) {
catchUpClient(clientSiteId: string) {
// Maybe we can do something to only send down what's needed.
// just updates after the last update by `${clientSiteId}

const result = await this.db
const result = this.db
.prepare(`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`)
.get(clientSiteId);
const lastVersion = result?.version ?? 0;

const changes = await this.db
const changes = this.db
.prepare(
`SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq"
FROM crsql_changes WHERE site_id != unhex(:clientSiteId)
Expand All @@ -155,8 +153,8 @@ export class Sync {

const changeSiteVersions = latestVersions(changes.map((change) => Object.values(change)));

changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => {
await this.db
changeSiteVersions.forEach(([changeSiteId, changeDbVersion]) => {
this.db
.prepare(
`INSERT INTO crsql_tracked_peers (site_id, version, tag, event)
VALUES (unhex(?), ?, 0, 0)
Expand Down

0 comments on commit 194b151

Please sign in to comment.